The following issues were found
src/test/java/io/reactivex/rxjava3/flowable/FlowableConversionTest.java
48 issues
Line: 154
@Override
public void onComplete() {
System.out.println("Complete");
}
@Override
public void onError(Throwable e) {
System.out.println("error: " + e.getMessage());
Reported by PMD.
Line: 159
@Override
public void onError(Throwable e) {
System.out.println("error: " + e.getMessage());
e.printStackTrace();
}
@Override
public void onNext(String t) {
Reported by PMD.
Line: 165
@Override
public void onNext(String t) {
System.out.println(t);
}
});
List<Object> crewOfBattlestarGalactica = Arrays.asList(new Object[] {"William Adama", "Laura Roslin", "Lee Adama", new Cylon()});
Reported by PMD.
Line: 175
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object pv) {
System.out.println(pv);
}
})
.to(new ConvertToCylonDetector<>())
.beep(new Predicate<Object>() {
@Override
Reported by PMD.
Line: 256
Integer i = queue.poll();
if (i != null) {
x++;
System.out.println(x + " item: " + i);
}
}
Assert.assertNull(thrown.get());
}
}
Reported by PMD.
Line: 31
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;
public class FlowableConversionTest extends RxJavaTest {
public static class Cylon { }
public static class Jail {
Object cylon;
Reported by PMD.
Line: 31
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;
public class FlowableConversionTest extends RxJavaTest {
public static class Cylon { }
public static class Jail {
Object cylon;
Reported by PMD.
Line: 36
public static class Cylon { }
public static class Jail {
Object cylon;
Jail(Object cylon) {
this.cylon = cylon;
}
}
Reported by PMD.
Line: 44
}
public static class CylonDetectorObservable<T> {
protected Publisher<T> onSubscribe;
public static <T> CylonDetectorObservable<T> create(Publisher<T> onSubscribe) {
return new CylonDetectorObservable<>(onSubscribe);
}
Reported by PMD.
Line: 65
public <O> O x(Function<Publisher<T>, O> operator) {
try {
return operator.apply(onSubscribe);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
public <R> CylonDetectorObservable<? extends R> compose(Function<CylonDetectorObservable<? super T>, CylonDetectorObservable<? extends R>> transformer) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromArray.java
47 issues
Line: 143
}
@Override
void slowPath(long r) {
long e = 0;
T[] arr = array;
int f = arr.length;
int i = index;
Subscriber<? super T> a = downstream;
Reported by PMD.
Line: 143
}
@Override
void slowPath(long r) {
long e = 0;
T[] arr = array;
int f = arr.length;
int i = index;
Subscriber<? super T> a = downstream;
Reported by PMD.
Line: 226
}
@Override
void slowPath(long r) {
long e = 0;
T[] arr = array;
int f = arr.length;
int i = index;
ConditionalSubscriber<? super T> a = downstream;
Reported by PMD.
Line: 226
}
@Override
void slowPath(long r) {
long e = 0;
T[] arr = array;
int f = arr.length;
int i = index;
ConditionalSubscriber<? super T> a = downstream;
Reported by PMD.
Line: 27
import java.util.Objects;
public final class FlowableFromArray<T> extends Flowable<T> {
final T[] array;
public FlowableFromArray(T[] array) {
this.array = array;
}
Reported by PMD.
Line: 29
public final class FlowableFromArray<T> extends Flowable<T> {
final T[] array;
public FlowableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Subscriber<? super T> s) {
Reported by PMD.
Line: 46
abstract static class BaseArraySubscription<T> extends BasicQueueSubscription<T> {
private static final long serialVersionUID = -2252972430506210021L;
final T[] array;
int index;
volatile boolean cancelled;
Reported by PMD.
Line: 48
final T[] array;
int index;
volatile boolean cancelled;
BaseArraySubscription(T[] array) {
this.array = array;
Reported by PMD.
Line: 50
int index;
volatile boolean cancelled;
BaseArraySubscription(T[] array) {
this.array = array;
}
Reported by PMD.
Line: 52
volatile boolean cancelled;
BaseArraySubscription(T[] array) {
this.array = array;
}
@Override
public final int requestFusion(int mode) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableToSortedListTest.java
47 issues
Line: 31
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableToSortedListTest extends RxJavaTest {
@Test
public void sortedListObservable() {
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
Observable<List<Integer>> observable = w.toSortedList().toObservable();
Reported by PMD.
Line: 36
@Test
public void sortedListObservable() {
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
Observable<List<Integer>> observable = w.toSortedList().toObservable();
Observer<List<Integer>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
verify(observer, Mockito.never()).onError(any(Throwable.class));
Reported by PMD.
Line: 36
@Test
public void sortedListObservable() {
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
Observable<List<Integer>> observable = w.toSortedList().toObservable();
Observer<List<Integer>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
verify(observer, Mockito.never()).onError(any(Throwable.class));
Reported by PMD.
Line: 39
Observable<List<Integer>> observable = w.toSortedList().toObservable();
Observer<List<Integer>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 40
Observer<List<Integer>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 41
Observer<List<Integer>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void sortedListWithCustomFunctionFlowable() {
Reported by PMD.
Line: 42
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(1, 2, 3, 4, 5));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void sortedListWithCustomFunctionFlowable() {
Observable<Integer> w = Observable.just(1, 3, 2, 5, 4);
Reported by PMD.
Line: 59
Observer<List<Integer>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 60
Observer<List<Integer>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void withFollowingFirstObservable() {
Reported by PMD.
Line: 61
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList(5, 4, 3, 2, 1));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void withFollowingFirstObservable() {
Observable<Integer> o = Observable.just(1, 3, 2, 5, 4);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeUntilPredicateTest.java
47 issues
Line: 31
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
;
public class ObservableTakeUntilPredicateTest extends RxJavaTest {
@Test
public void takeEmpty() {
Observer<Object> o = TestHelper.mockObserver();
Reported by PMD.
Line: 45
}
}).subscribe(o);
verify(o, never()).onNext(any());
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
@Test
Reported by PMD.
Line: 46
}).subscribe(o);
verify(o, never()).onNext(any());
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
@Test
public void takeAll() {
Reported by PMD.
Line: 47
verify(o, never()).onNext(any());
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
@Test
public void takeAll() {
Observer<Object> o = TestHelper.mockObserver();
Reported by PMD.
Line: 61
}
}).subscribe(o);
verify(o).onNext(1);
verify(o).onNext(2);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
Reported by PMD.
Line: 62
}).subscribe(o);
verify(o).onNext(1);
verify(o).onNext(2);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
@Test
Reported by PMD.
Line: 63
verify(o).onNext(1);
verify(o).onNext(2);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
@Test
public void takeFirst() {
Reported by PMD.
Line: 64
verify(o).onNext(1);
verify(o).onNext(2);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
@Test
public void takeFirst() {
Observer<Object> o = TestHelper.mockObserver();
Reported by PMD.
Line: 78
}
}).subscribe(o);
verify(o).onNext(1);
verify(o, never()).onNext(2);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
Reported by PMD.
Line: 79
}).subscribe(o);
verify(o).onNext(1);
verify(o, never()).onNext(2);
verify(o, never()).onError(any(Throwable.class));
verify(o).onComplete();
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/observers/MaybeConsumersTest.java
47 issues
Line: 49
import io.reactivex.rxjava3.subjects.MaybeSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class MaybeConsumersTest implements Consumer<Object>, Action {
final CompositeDisposable composite = new CompositeDisposable();
final MaybeSubject<Integer> processor = MaybeSubject.create();
Reported by PMD.
Line: 51
public class MaybeConsumersTest implements Consumer<Object>, Action {
final CompositeDisposable composite = new CompositeDisposable();
final MaybeSubject<Integer> processor = MaybeSubject.create();
final List<Object> events = new ArrayList<>();
Reported by PMD.
Line: 53
final CompositeDisposable composite = new CompositeDisposable();
final MaybeSubject<Integer> processor = MaybeSubject.create();
final List<Object> events = new ArrayList<>();
@Override
public void run() throws Exception {
Reported by PMD.
Line: 55
final MaybeSubject<Integer> processor = MaybeSubject.create();
final List<Object> events = new ArrayList<>();
@Override
public void run() throws Exception {
events.add("OnComplete");
}
Reported by PMD.
Line: 73
}
@Test
public void onSuccessNormal() {
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING, () -> { });
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
Reported by PMD.
Line: 77
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING, () -> { });
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 77
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING, () -> { });
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 79
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
processor.onSuccess(1);
Reported by PMD.
Line: 85
processor.onSuccess(1);
assertEquals(0, composite.size());
assertEquals(Arrays.<Object>asList(1), events);
}
Reported by PMD.
Line: 87
assertEquals(0, composite.size());
assertEquals(Arrays.<Object>asList(1), events);
}
@Test
public void onErrorNormal() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromCallableTest.java
47 issues
Line: 38
public class FlowableFromCallableTest extends RxJavaTest {
@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);
when(func.call()).thenReturn(new Object());
Reported by PMD.
Line: 40
@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);
when(func.call()).thenReturn(new Object());
Flowable<Object> fromCallableFlowable = Flowable.fromCallable(func);
Reported by PMD.
Line: 43
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);
when(func.call()).thenReturn(new Object());
Flowable<Object> fromCallableFlowable = Flowable.fromCallable(func);
verifyNoInteractions(func);
Reported by PMD.
Line: 49
verifyNoInteractions(func);
fromCallableFlowable.subscribe();
verify(func).call();
}
@SuppressWarnings("unchecked")
Reported by PMD.
Line: 51
fromCallableFlowable.subscribe();
verify(func).call();
}
@SuppressWarnings("unchecked")
@Test
public void shouldCallOnNextAndOnCompleted() throws Exception {
Reported by PMD.
Line: 56
@SuppressWarnings("unchecked")
@Test
public void shouldCallOnNextAndOnCompleted() throws Exception {
Callable<String> func = mock(Callable.class);
when(func.call()).thenReturn("test_value");
Flowable<String> fromCallableFlowable = Flowable.fromCallable(func);
Reported by PMD.
Line: 59
public void shouldCallOnNextAndOnCompleted() throws Exception {
Callable<String> func = mock(Callable.class);
when(func.call()).thenReturn("test_value");
Flowable<String> fromCallableFlowable = Flowable.fromCallable(func);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Reported by PMD.
Line: 59
public void shouldCallOnNextAndOnCompleted() throws Exception {
Callable<String> func = mock(Callable.class);
when(func.call()).thenReturn("test_value");
Flowable<String> fromCallableFlowable = Flowable.fromCallable(func);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Reported by PMD.
Line: 65
Subscriber<String> subscriber = TestHelper.mockSubscriber();
fromCallableFlowable.subscribe(subscriber);
verify(subscriber).onNext("test_value");
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
Reported by PMD.
Line: 67
fromCallableFlowable.subscribe(subscriber);
verify(subscriber).onNext("test_value");
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@SuppressWarnings("unchecked")
Reported by PMD.
src/test/java/io/reactivex/rxjava3/single/SingleTest.java
47 issues
Line: 587
new Single<Integer>() {
@Override
protected void subscribeActual(SingleObserver<? super Integer> observer) {
throw new NullPointerException();
}
}.test();
}
}
Reported by PMD.
Line: 167
.map(new Function<String, String>() {
@Override
public String apply(String v) {
System.out.println("SubscribeOn Thread: " + Thread.currentThread());
return v;
}
})
.observeOn(Schedulers.computation())
.map(new Function<String, String>() {
Reported by PMD.
Line: 175
.map(new Function<String, String>() {
@Override
public String apply(String v) {
System.out.println("ObserveOn Thread: " + Thread.currentThread());
return v;
}
})
.toFlowable().subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.single;
import static org.junit.Assert.*;
import java.util.*;
import java.util.concurrent.*;
Reported by PMD.
Line: 34
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class SingleTest extends RxJavaTest {
@Test
public void helloWorld() {
TestSubscriber<String> ts = new TestSubscriber<>();
Single.just("Hello World!").toFlowable().subscribe(ts);
Reported by PMD.
Line: 39
@Test
public void helloWorld() {
TestSubscriber<String> ts = new TestSubscriber<>();
Single.just("Hello World!").toFlowable().subscribe(ts);
ts.assertValueSequence(Arrays.asList("Hello World!"));
}
@Test
public void helloWorld2() {
Reported by PMD.
Line: 39
@Test
public void helloWorld() {
TestSubscriber<String> ts = new TestSubscriber<>();
Single.just("Hello World!").toFlowable().subscribe(ts);
ts.assertValueSequence(Arrays.asList("Hello World!"));
}
@Test
public void helloWorld2() {
Reported by PMD.
Line: 39
@Test
public void helloWorld() {
TestSubscriber<String> ts = new TestSubscriber<>();
Single.just("Hello World!").toFlowable().subscribe(ts);
ts.assertValueSequence(Arrays.asList("Hello World!"));
}
@Test
public void helloWorld2() {
Reported by PMD.
Line: 117
Single<String> a = Single.just("A");
Single<String> b = Single.just("B");
Single.merge(a, b).subscribe(ts);
ts.assertValueSequence(Arrays.asList("A", "B"));
}
@Test
public void mergeWith() {
Reported by PMD.
Line: 125
public void mergeWith() {
TestSubscriber<String> ts = new TestSubscriber<>();
Single.just("A").mergeWith(Single.just("B")).subscribe(ts);
ts.assertValueSequence(Arrays.asList("A", "B"));
}
@Test
public void createSuccess() {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoOnEach.java
46 issues
Line: 28
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class FlowableDoOnEach<T> extends AbstractFlowableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public FlowableDoOnEach(Flowable<T> source, Consumer<? super T> onNext,
Reported by PMD.
Line: 29
public final class FlowableDoOnEach<T> extends AbstractFlowableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public FlowableDoOnEach(Flowable<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Reported by PMD.
Line: 30
public final class FlowableDoOnEach<T> extends AbstractFlowableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public FlowableDoOnEach(Flowable<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Reported by PMD.
Line: 31
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public FlowableDoOnEach(Flowable<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
Reported by PMD.
Line: 56
}
static final class DoOnEachSubscriber<T> extends BasicFuseableSubscriber<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
DoOnEachSubscriber(
Reported by PMD.
Line: 56
}
static final class DoOnEachSubscriber<T> extends BasicFuseableSubscriber<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
DoOnEachSubscriber(
Reported by PMD.
Line: 57
static final class DoOnEachSubscriber<T> extends BasicFuseableSubscriber<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
DoOnEachSubscriber(
Subscriber<? super T> actual,
Reported by PMD.
Line: 57
static final class DoOnEachSubscriber<T> extends BasicFuseableSubscriber<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
DoOnEachSubscriber(
Subscriber<? super T> actual,
Reported by PMD.
Line: 58
static final class DoOnEachSubscriber<T> extends BasicFuseableSubscriber<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
DoOnEachSubscriber(
Subscriber<? super T> actual,
Consumer<? super T> onNext,
Reported by PMD.
Line: 58
static final class DoOnEachSubscriber<T> extends BasicFuseableSubscriber<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
DoOnEachSubscriber(
Subscriber<? super T> actual,
Consumer<? super T> onNext,
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableTimeoutTest.java
46 issues
Line: 39
public class CompletableTimeoutTest extends RxJavaTest {
@Test
public void timeoutException() throws Exception {
Completable.never()
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io())
.to(TestHelper.<Void>testConsumer())
.awaitDone(5, TimeUnit.SECONDS)
Reported by PMD.
Line: 39
public class CompletableTimeoutTest extends RxJavaTest {
@Test
public void timeoutException() throws Exception {
Completable.never()
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io())
.to(TestHelper.<Void>testConsumer())
.awaitDone(5, TimeUnit.SECONDS)
Reported by PMD.
Line: 41
@Test
public void timeoutException() throws Exception {
Completable.never()
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io())
.to(TestHelper.<Void>testConsumer())
.awaitDone(5, TimeUnit.SECONDS)
.assertFailureAndMessage(TimeoutException.class, timeoutMessage(100, TimeUnit.MILLISECONDS));
}
Reported by PMD.
Line: 41
@Test
public void timeoutException() throws Exception {
Completable.never()
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io())
.to(TestHelper.<Void>testConsumer())
.awaitDone(5, TimeUnit.SECONDS)
.assertFailureAndMessage(TimeoutException.class, timeoutMessage(100, TimeUnit.MILLISECONDS));
}
Reported by PMD.
Line: 41
@Test
public void timeoutException() throws Exception {
Completable.never()
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io())
.to(TestHelper.<Void>testConsumer())
.awaitDone(5, TimeUnit.SECONDS)
.assertFailureAndMessage(TimeoutException.class, timeoutMessage(100, TimeUnit.MILLISECONDS));
}
Reported by PMD.
Line: 41
@Test
public void timeoutException() throws Exception {
Completable.never()
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io())
.to(TestHelper.<Void>testConsumer())
.awaitDone(5, TimeUnit.SECONDS)
.assertFailureAndMessage(TimeoutException.class, timeoutMessage(100, TimeUnit.MILLISECONDS));
}
Reported by PMD.
Line: 49
}
@Test
public void timeoutContinueOther() throws Exception {
final int[] call = { 0 };
Completable other = Completable.fromAction(new Action() {
@Override
Reported by PMD.
Line: 60
}
});
Completable.never()
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io(), other)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
Reported by PMD.
Line: 60
}
});
Completable.never()
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io(), other)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
Reported by PMD.
Line: 60
}
});
Completable.never()
.timeout(100, TimeUnit.MILLISECONDS, Schedulers.io(), other)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableUnsubscribeOnTest.java
46 issues
Line: 188
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("failed to initialize and get inner thread");
}
}
@NonNull
@Override
Reported by PMD.
Line: 75
assertNotSame(Thread.currentThread(), subscribeThread.get());
// True for Schedulers.newThread()
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread.toString(), unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
observer.assertTerminated();
Reported by PMD.
Line: 76
// True for Schedulers.newThread()
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread.toString(), unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
observer.assertTerminated();
} finally {
Reported by PMD.
Line: 125
assertNotSame(Thread.currentThread(), subscribeThread.get());
// True for Schedulers.newThread()
System.out.println("UI Thread: " + uiEventLoop.getThread());
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
Reported by PMD.
Line: 126
// True for Schedulers.newThread()
System.out.println("UI Thread: " + uiEventLoop.getThread());
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
observer.assertTerminated();
Reported by PMD.
Line: 127
System.out.println("UI Thread: " + uiEventLoop.getThread());
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
observer.assertTerminated();
} finally {
Reported by PMD.
Line: 148
@Override
public void dispose() {
set(true);
System.out.println("unsubscribe invoked: " + Thread.currentThread());
thread = Thread.currentThread();
latch.countDown();
}
@Override public boolean isDisposed() {
Reported by PMD.
Line: 37
public class ObservableUnsubscribeOnTest extends RxJavaTest {
@Test
public void unsubscribeWhenSubscribeOnAndUnsubscribeOnAreOnSameThread() throws InterruptedException {
UIEventLoopScheduler uiEventLoop = new UIEventLoopScheduler();
try {
final ThreadSubscription subscription = new ThreadSubscription();
final AtomicReference<Thread> subscribeThread = new AtomicReference<>();
Observable<Integer> w = Observable.unsafeCreate(new ObservableSource<Integer>() {
Reported by PMD.
Line: 59
TestObserverEx<Integer> observer = new TestObserverEx<>();
w.subscribeOn(uiEventLoop).observeOn(Schedulers.computation())
.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(observer);
observer.awaitDone(5, TimeUnit.SECONDS);
Reported by PMD.
Line: 59
TestObserverEx<Integer> observer = new TestObserverEx<>();
w.subscribeOn(uiEventLoop).observeOn(Schedulers.computation())
.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(observer);
observer.awaitDone(5, TimeUnit.SECONDS);
Reported by PMD.