The following issues were found
src/test/java/io/reactivex/rxjava3/schedulers/CachedThreadSchedulerTest.java
32 issues
Line: 57
@Override
public void accept(String t) {
System.out.println("t: " + t);
}
});
}
@Test
Reported by PMD.
Line: 48
@Override
public String apply(Integer t) {
assertTrue(Thread.currentThread().getName().startsWith("RxCachedThreadScheduler"));
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
}
});
f.subscribeOn(Schedulers.io()).blockingForEach(new Consumer<String>() {
Reported by PMD.
Line: 48
@Override
public String apply(Integer t) {
assertTrue(Thread.currentThread().getName().startsWith("RxCachedThreadScheduler"));
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
}
});
f.subscribeOn(Schedulers.io()).blockingForEach(new Consumer<String>() {
Reported by PMD.
Line: 48
@Override
public String apply(Integer t) {
assertTrue(Thread.currentThread().getName().startsWith("RxCachedThreadScheduler"));
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
}
});
f.subscribeOn(Schedulers.io()).blockingForEach(new Consumer<String>() {
Reported by PMD.
Line: 49
@Override
public String apply(Integer t) {
assertTrue(Thread.currentThread().getName().startsWith("RxCachedThreadScheduler"));
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
}
});
f.subscribeOn(Schedulers.io()).blockingForEach(new Consumer<String>() {
Reported by PMD.
Line: 63
}
@Test
public final void handledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
SchedulerTestHelper.handledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
@Test
public void cancelledTaskRetention() throws InterruptedException {
Reported by PMD.
Line: 68
}
@Test
public void cancelledTaskRetention() throws InterruptedException {
Worker w = Schedulers.io().createWorker();
try {
ExecutorSchedulerTest.cancelledRetention(w, false);
} finally {
w.dispose();
Reported by PMD.
Line: 69
@Test
public void cancelledTaskRetention() throws InterruptedException {
Worker w = Schedulers.io().createWorker();
try {
ExecutorSchedulerTest.cancelledRetention(w, false);
} finally {
w.dispose();
}
Reported by PMD.
Line: 75
} finally {
w.dispose();
}
w = Schedulers.io().createWorker();
try {
ExecutorSchedulerTest.cancelledRetention(w, true);
} finally {
w.dispose();
}
Reported by PMD.
Line: 84
}
@Test
public void workerDisposed() {
Worker w = Schedulers.io().createWorker();
assertFalse(((Disposable)w).isDisposed());
w.dispose();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableDetachTest.java
32 issues
Line: 83
to.dispose();
System.gc();
Thread.sleep(200);
to.assertEmpty();
assertNull(wr.get());
Reported by PMD.
Line: 109
d = null;
System.gc();
Thread.sleep(200);
to.assertResult();
assertNull(wr.get());
Reported by PMD.
Line: 135
d = null;
System.gc();
Thread.sleep(200);
to.assertFailure(TestException.class);
assertNull(wr.get());
Reported by PMD.
Line: 34
public class CompletableDetachTest extends RxJavaTest {
@Test
public void doubleSubscribe() {
TestHelper.checkDoubleOnSubscribeCompletable(new Function<Completable, CompletableSource>() {
@Override
public CompletableSource apply(Completable m) throws Exception {
return m.onTerminateDetach();
Reported by PMD.
Line: 45
}
@Test
public void dispose() {
TestHelper.checkDisposed(PublishProcessor.create().ignoreElements().onTerminateDetach());
}
@Test
public void onError() {
Reported by PMD.
Line: 46
@Test
public void dispose() {
TestHelper.checkDisposed(PublishProcessor.create().ignoreElements().onTerminateDetach());
}
@Test
public void onError() {
Completable.error(new TestException())
Reported by PMD.
Line: 46
@Test
public void dispose() {
TestHelper.checkDisposed(PublishProcessor.create().ignoreElements().onTerminateDetach());
}
@Test
public void onError() {
Completable.error(new TestException())
Reported by PMD.
Line: 50
}
@Test
public void onError() {
Completable.error(new TestException())
.onTerminateDetach()
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 58
}
@Test
public void onComplete() {
Completable.complete()
.onTerminateDetach()
.test()
.assertResult();
}
Reported by PMD.
Line: 59
@Test
public void onComplete() {
Completable.complete()
.onTerminateDetach()
.test()
.assertResult();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/mixed/CompletableAndThenPublisherTest.java
32 issues
Line: 31
public class CompletableAndThenPublisherTest extends RxJavaTest {
@Test
public void cancelMain() {
CompletableSubject cs = CompletableSubject.create();
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = cs.andThen(pp)
.test();
Reported by PMD.
Line: 35
CompletableSubject cs = CompletableSubject.create();
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = cs.andThen(pp)
.test();
assertTrue(cs.hasObservers());
assertFalse(pp.hasSubscribers());
Reported by PMD.
Line: 35
CompletableSubject cs = CompletableSubject.create();
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = cs.andThen(pp)
.test();
assertTrue(cs.hasObservers());
assertFalse(pp.hasSubscribers());
Reported by PMD.
Line: 38
TestSubscriber<Integer> ts = cs.andThen(pp)
.test();
assertTrue(cs.hasObservers());
assertFalse(pp.hasSubscribers());
ts.cancel();
assertFalse(cs.hasObservers());
Reported by PMD.
Line: 38
TestSubscriber<Integer> ts = cs.andThen(pp)
.test();
assertTrue(cs.hasObservers());
assertFalse(pp.hasSubscribers());
ts.cancel();
assertFalse(cs.hasObservers());
Reported by PMD.
Line: 39
.test();
assertTrue(cs.hasObservers());
assertFalse(pp.hasSubscribers());
ts.cancel();
assertFalse(cs.hasObservers());
assertFalse(pp.hasSubscribers());
Reported by PMD.
Line: 39
.test();
assertTrue(cs.hasObservers());
assertFalse(pp.hasSubscribers());
ts.cancel();
assertFalse(cs.hasObservers());
assertFalse(pp.hasSubscribers());
Reported by PMD.
Line: 41
assertTrue(cs.hasObservers());
assertFalse(pp.hasSubscribers());
ts.cancel();
assertFalse(cs.hasObservers());
assertFalse(pp.hasSubscribers());
}
Reported by PMD.
Line: 43
ts.cancel();
assertFalse(cs.hasObservers());
assertFalse(pp.hasSubscribers());
}
@Test
public void cancelOther() {
Reported by PMD.
Line: 43
ts.cancel();
assertFalse(cs.hasObservers());
assertFalse(pp.hasSubscribers());
}
@Test
public void cancelOther() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnLifecycleTest.java
32 issues
Line: 33
@Test
public void success() throws Throwable {
@SuppressWarnings("unchecked")
Consumer<? super Disposable> onSubscribe = mock(Consumer.class);
Action onDispose = mock(Action.class);
Single.just(1)
.doOnLifecycle(onSubscribe, onDispose)
Reported by PMD.
Line: 37
Consumer<? super Disposable> onSubscribe = mock(Consumer.class);
Action onDispose = mock(Action.class);
Single.just(1)
.doOnLifecycle(onSubscribe, onDispose)
.test()
.assertResult(1);
verify(onSubscribe).accept(any());
Reported by PMD.
Line: 37
Consumer<? super Disposable> onSubscribe = mock(Consumer.class);
Action onDispose = mock(Action.class);
Single.just(1)
.doOnLifecycle(onSubscribe, onDispose)
.test()
.assertResult(1);
verify(onSubscribe).accept(any());
Reported by PMD.
Line: 37
Consumer<? super Disposable> onSubscribe = mock(Consumer.class);
Action onDispose = mock(Action.class);
Single.just(1)
.doOnLifecycle(onSubscribe, onDispose)
.test()
.assertResult(1);
verify(onSubscribe).accept(any());
Reported by PMD.
Line: 42
.test()
.assertResult(1);
verify(onSubscribe).accept(any());
verify(onDispose, never()).run();
}
@Test
public void error() throws Throwable {
Reported by PMD.
Line: 43
.assertResult(1);
verify(onSubscribe).accept(any());
verify(onDispose, never()).run();
}
@Test
public void error() throws Throwable {
@SuppressWarnings("unchecked")
Reported by PMD.
Line: 57
.test()
.assertFailure(TestException.class);
verify(onSubscribe).accept(any());
verify(onDispose, never()).run();
}
@Test
public void onSubscribeCrash() throws Throwable {
Reported by PMD.
Line: 58
.assertFailure(TestException.class);
verify(onSubscribe).accept(any());
verify(onDispose, never()).run();
}
@Test
public void onSubscribeCrash() throws Throwable {
TestHelper.withErrorTracking(errors -> {
Reported by PMD.
Line: 62
}
@Test
public void onSubscribeCrash() throws Throwable {
TestHelper.withErrorTracking(errors -> {
@SuppressWarnings("unchecked")
Consumer<? super Disposable> onSubscribe = mock(Consumer.class);
Action onDispose = mock(Action.class);
Reported by PMD.
Line: 68
Consumer<? super Disposable> onSubscribe = mock(Consumer.class);
Action onDispose = mock(Action.class);
doThrow(new TestException("First")).when(onSubscribe).accept(any());
Disposable bs = Disposable.empty();
new Single<Integer>() {
@Override
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/util/AtomicThrowableTest.java
32 issues
Line: 31
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class AtomicThrowableTest extends RxJavaTest {
@Test
public void isTerminated() {
AtomicThrowable ex = new AtomicThrowable();
Reported by PMD.
Line: 34
public class AtomicThrowableTest extends RxJavaTest {
@Test
public void isTerminated() {
AtomicThrowable ex = new AtomicThrowable();
assertFalse(ex.isTerminated());
assertNull(ex.terminate());
Reported by PMD.
Line: 37
public void isTerminated() {
AtomicThrowable ex = new AtomicThrowable();
assertFalse(ex.isTerminated());
assertNull(ex.terminate());
assertTrue(ex.isTerminated());
}
Reported by PMD.
Line: 39
assertFalse(ex.isTerminated());
assertNull(ex.terminate());
assertTrue(ex.isTerminated());
}
@Test
Reported by PMD.
Line: 41
assertNull(ex.terminate());
assertTrue(ex.isTerminated());
}
@Test
public void tryTerminateAndReportNull() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Reported by PMD.
Line: 52
AtomicThrowable ex = new AtomicThrowable();
ex.tryTerminateAndReport();
assertTrue("" + errors, errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
Reported by PMD.
Line: 68
ex.tryTerminateAndReport();
assertTrue("" + errors, errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
Reported by PMD.
Line: 86
TestHelper.assertUndeliverable(errors, 0, TestException.class);
assertEquals(1, errors.size());
} finally {
RxJavaPlugins.reset();
}
}
Reported by PMD.
Line: 93
}
@Test
public void tryTerminateConsumerSubscriberNoError() {
TestSubscriber<Object> ts = new TestSubscriber<>();
ts.onSubscribe(new BooleanSubscription());
AtomicThrowable ex = new AtomicThrowable();
ex.tryTerminateConsumer(ts);
Reported by PMD.
Line: 103
}
@Test
public void tryTerminateConsumerSubscriberError() {
TestSubscriber<Object> ts = new TestSubscriber<>();
ts.onSubscribe(new BooleanSubscription());
AtomicThrowable ex = new AtomicThrowable();
ex.set(new TestException());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observers/DisposableObserverTest.java
32 issues
Line: 33
static final class TestDisposableObserver<T> extends DisposableObserver<T> {
int start;
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
Reported by PMD.
Line: 35
int start;
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
int completions;
Reported by PMD.
Line: 37
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
int completions;
@Override
protected void onStart() {
Reported by PMD.
Line: 39
final List<Throwable> errors = new ArrayList<>();
int completions;
@Override
protected void onStart() {
super.onStart();
Reported by PMD.
Line: 65
}
@Test
public void normal() {
TestDisposableObserver<Integer> tc = new TestDisposableObserver<>();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
Reported by PMD.
Line: 68
public void normal() {
TestDisposableObserver<Integer> tc = new TestDisposableObserver<>();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Observable.just(1).subscribe(tc);
Reported by PMD.
Line: 69
TestDisposableObserver<Integer> tc = new TestDisposableObserver<>();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Observable.just(1).subscribe(tc);
Reported by PMD.
Line: 70
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Observable.just(1).subscribe(tc);
assertFalse(tc.isDisposed());
Reported by PMD.
Line: 70
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Observable.just(1).subscribe(tc);
assertFalse(tc.isDisposed());
Reported by PMD.
Line: 71
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Observable.just(1).subscribe(tc);
assertFalse(tc.isDisposed());
assertEquals(1, tc.start);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapIterableObservableTest.java
32 issues
Line: 33
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.*;
public class SingleFlatMapIterableObservableTest extends RxJavaTest {
@Test
public void normal() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
Reported by PMD.
Line: 36
public class SingleFlatMapIterableObservableTest extends RxJavaTest {
@Test
public void normal() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
Reported by PMD.
Line: 49
}
@Test
public void emptyIterable() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Collections.<Integer>emptyList();
Reported by PMD.
Line: 62
}
@Test
public void error() {
Single.<Integer>error(new TestException()).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
Reported by PMD.
Line: 75
}
@Test
public void take() {
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
return Arrays.asList(v, v + 1);
}
Reported by PMD.
Line: 88
}
@Test
public void fused() {
TestObserverEx<Integer> to = new TestObserverEx<>(QueueFuseable.ANY);
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
Reported by PMD.
Line: 99
})
.subscribe(to);
to.assertFuseable()
.assertFusionMode(QueueFuseable.ASYNC)
.assertResult(1, 2);
;
}
Reported by PMD.
Line: 99
})
.subscribe(to);
to.assertFuseable()
.assertFusionMode(QueueFuseable.ASYNC)
.assertResult(1, 2);
;
}
Reported by PMD.
Line: 102
to.assertFuseable()
.assertFusionMode(QueueFuseable.ASYNC)
.assertResult(1, 2);
;
}
@Test
public void fusedNoSync() {
TestObserverEx<Integer> to = new TestObserverEx<>(QueueFuseable.SYNC);
Reported by PMD.
Line: 106
}
@Test
public void fusedNoSync() {
TestObserverEx<Integer> to = new TestObserverEx<>(QueueFuseable.SYNC);
Single.just(1).flattenAsObservable(new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) throws Exception {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoOnLifecycleTest.java
31 issues
Line: 34
public class FlowableDoOnLifecycleTest extends RxJavaTest {
@Test
public void onSubscribeCrashed() {
Flowable.just(1)
.doOnLifecycle(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
throw new TestException();
Reported by PMD.
Line: 47
}
@Test
public void doubleOnSubscribe() {
final int[] calls = { 0, 0 };
TestHelper.checkDoubleOnSubscribeFlowable(new Function<Flowable<Object>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<Object> f) throws Exception {
Reported by PMD.
Line: 68
}
});
assertEquals(2, calls[0]);
assertEquals(0, calls[1]);
}
@Test
public void dispose() {
Reported by PMD.
Line: 69
});
assertEquals(2, calls[0]);
assertEquals(0, calls[1]);
}
@Test
public void dispose() {
final int[] calls = { 0, 0 };
Reported by PMD.
Line: 73
}
@Test
public void dispose() {
final int[] calls = { 0, 0 };
TestHelper.checkDisposed(Flowable.just(1)
.doOnLifecycle(new Consumer<Subscription>() {
@Override
Reported by PMD.
Line: 90
})
);
assertEquals(1, calls[0]);
assertEquals(1, calls[1]);
}
@Test
public void requestCrashed() {
Reported by PMD.
Line: 91
);
assertEquals(1, calls[0]);
assertEquals(1, calls[1]);
}
@Test
public void requestCrashed() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Reported by PMD.
Line: 95
}
@Test
public void requestCrashed() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.doOnLifecycle(Functions.emptyConsumer(),
new LongConsumer() {
Reported by PMD.
Line: 117
}
@Test
public void cancelCrashed() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.doOnLifecycle(Functions.emptyConsumer(),
Functions.EMPTY_LONG_CONSUMER,
Reported by PMD.
Line: 162
.to(TestHelper.<Integer>testConsumer())
.assertFailureAndMessage(TestException.class, "First");
assertTrue(bs.isCancelled());
TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
} finally {
RxJavaPlugins.reset();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorCompleteTest.java
31 issues
Line: 31
public class ObservableOnErrorCompleteTest {
@Test
public void normal() {
Observable.range(1, 10)
.onErrorComplete()
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
Reported by PMD.
Line: 32
@Test
public void normal() {
Observable.range(1, 10)
.onErrorComplete()
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
Reported by PMD.
Line: 32
@Test
public void normal() {
Observable.range(1, 10)
.onErrorComplete()
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
Reported by PMD.
Line: 32
@Test
public void normal() {
Observable.range(1, 10)
.onErrorComplete()
.test()
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}
Reported by PMD.
Line: 39
}
@Test
public void empty() {
Observable.empty()
.onErrorComplete()
.test()
.assertResult();
}
Reported by PMD.
Line: 40
@Test
public void empty() {
Observable.empty()
.onErrorComplete()
.test()
.assertResult();
}
Reported by PMD.
Line: 40
@Test
public void empty() {
Observable.empty()
.onErrorComplete()
.test()
.assertResult();
}
Reported by PMD.
Line: 40
@Test
public void empty() {
Observable.empty()
.onErrorComplete()
.test()
.assertResult();
}
Reported by PMD.
Line: 47
}
@Test
public void error() throws Throwable {
TestHelper.withErrorTracking(errors -> {
Observable.error(new TestException())
.onErrorComplete()
.test()
.assertResult();
Reported by PMD.
Line: 54
.test()
.assertResult();
assertTrue("" + errors, errors.isEmpty());
});
}
@Test
public void errorMatches() throws Throwable {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/FlowableFuseableTest.java
31 issues
Line: 27
public class FlowableFuseableTest extends RxJavaTest {
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 38
}
@Test
public void syncArray() {
Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 })
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Reported by PMD.
Line: 49
}
@Test
public void syncIterable() {
Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Reported by PMD.
Line: 51
@Test
public void syncIterable() {
Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 51
@Test
public void syncIterable() {
Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.