The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDetachTest.java
30 issues
Line: 48
o = null;
System.gc();
Thread.sleep(200);
Assert.assertNull("Object retained!", wr.get());
}
Reported by PMD.
Line: 108
o = null;
System.gc();
Thread.sleep(200);
Assert.assertNull("Object retained!", wr.get());
}
Reported by PMD.
Line: 127
ts.cancel();
o = null;
System.gc();
Thread.sleep(200);
Assert.assertNull("Object retained!", wr.get());
}
Reported by PMD.
Line: 30
public class FlowableDetachTest extends RxJavaTest {
Object o;
@Test
public void just() throws Exception {
o = new Object();
Reported by PMD.
Line: 33
Object o;
@Test
public void just() throws Exception {
o = new Object();
WeakReference<Object> wr = new WeakReference<>(o);
TestSubscriber<Object> ts = new TestSubscriber<>();
Reported by PMD.
Line: 40
TestSubscriber<Object> ts = new TestSubscriber<>();
Flowable.just(o).count().toFlowable().onTerminateDetach().subscribe(ts);
ts.assertValue(1L);
ts.assertComplete();
ts.assertNoErrors();
Reported by PMD.
Line: 40
TestSubscriber<Object> ts = new TestSubscriber<>();
Flowable.just(o).count().toFlowable().onTerminateDetach().subscribe(ts);
ts.assertValue(1L);
ts.assertComplete();
ts.assertNoErrors();
Reported by PMD.
Line: 40
TestSubscriber<Object> ts = new TestSubscriber<>();
Flowable.just(o).count().toFlowable().onTerminateDetach().subscribe(ts);
ts.assertValue(1L);
ts.assertComplete();
ts.assertNoErrors();
Reported by PMD.
Line: 40
TestSubscriber<Object> ts = new TestSubscriber<>();
Flowable.just(o).count().toFlowable().onTerminateDetach().subscribe(ts);
ts.assertValue(1L);
ts.assertComplete();
ts.assertNoErrors();
Reported by PMD.
Line: 46
ts.assertComplete();
ts.assertNoErrors();
o = null;
System.gc();
Thread.sleep(200);
Assert.assertNull("Object retained!", wr.get());
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservablePublish.java
30 issues
Line: 69
}
doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true);
break;
}
try {
connection.accept(conn);
} catch (Throwable ex) {
Reported by PMD.
Line: 98
}
conn = fresh;
}
break;
}
InnerDisposable<T> inner = new InnerDisposable<>(observer, conn);
observer.onSubscribe(inner);
if (conn.add(inner)) {
Reported by PMD.
Line: 43
public final class ObservablePublish<T> extends ConnectableObservable<T>
implements HasUpstreamObservableSource<T> {
final ObservableSource<T> source;
final AtomicReference<PublishConnection<T>> current;
public ObservablePublish(ObservableSource<T> source) {
this.source = source;
Reported by PMD.
Line: 43
public final class ObservablePublish<T> extends ConnectableObservable<T>
implements HasUpstreamObservableSource<T> {
final ObservableSource<T> source;
final AtomicReference<PublishConnection<T>> current;
public ObservablePublish(ObservableSource<T> source) {
this.source = source;
Reported by PMD.
Line: 45
final ObservableSource<T> source;
final AtomicReference<PublishConnection<T>> current;
public ObservablePublish(ObservableSource<T> source) {
this.source = source;
this.current = new AtomicReference<>();
}
Reported by PMD.
Line: 61
conn = current.get();
if (conn == null || conn.isDisposed()) {
PublishConnection<T> fresh = new PublishConnection<>(current);
if (!current.compareAndSet(conn, fresh)) {
continue;
}
conn = fresh;
}
Reported by PMD.
Line: 68
conn = fresh;
}
doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true);
break;
}
try {
connection.accept(conn);
Reported by PMD.
Line: 68
conn = fresh;
}
doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true);
break;
}
try {
connection.accept(conn);
Reported by PMD.
Line: 74
try {
connection.accept(conn);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
if (doConnect) {
Reported by PMD.
Line: 92
conn = current.get();
// we don't create a fresh connection if the current is terminated
if (conn == null) {
PublishConnection<T> fresh = new PublishConnection<>(current);
if (!current.compareAndSet(conn, fresh)) {
continue;
}
conn = fresh;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableMergeWithSingle.java
30 issues
Line: 34
*/
public final class ObservableMergeWithSingle<T> extends AbstractObservableWithUpstream<T, T> {
final SingleSource<? extends T> other;
public ObservableMergeWithSingle(Observable<T> source, SingleSource<? extends T> other) {
super(source);
this.other = other;
}
Reported by PMD.
Line: 49
other.subscribe(parent.otherObserver);
}
static final class MergeWithObserver<T> extends AtomicInteger
implements Observer<T>, Disposable {
private static final long serialVersionUID = -4592979584110982903L;
final Observer<? super T> downstream;
Reported by PMD.
Line: 49
other.subscribe(parent.otherObserver);
}
static final class MergeWithObserver<T> extends AtomicInteger
implements Observer<T>, Disposable {
private static final long serialVersionUID = -4592979584110982903L;
final Observer<? super T> downstream;
Reported by PMD.
Line: 54
private static final long serialVersionUID = -4592979584110982903L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> mainDisposable;
final OtherObserver<T> otherObserver;
Reported by PMD.
Line: 56
final Observer<? super T> downstream;
final AtomicReference<Disposable> mainDisposable;
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
Reported by PMD.
Line: 58
final AtomicReference<Disposable> mainDisposable;
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
volatile SimplePlainQueue<T> queue;
Reported by PMD.
Line: 60
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
volatile SimplePlainQueue<T> queue;
T singleItem;
Reported by PMD.
Line: 62
final AtomicThrowable errors;
volatile SimplePlainQueue<T> queue;
T singleItem;
volatile boolean disposed;
Reported by PMD.
Line: 64
volatile SimplePlainQueue<T> queue;
T singleItem;
volatile boolean disposed;
volatile boolean mainDone;
Reported by PMD.
Line: 66
T singleItem;
volatile boolean disposed;
volatile boolean mainDone;
volatile int otherState;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableMergeWithMaybe.java
30 issues
Line: 34
*/
public final class ObservableMergeWithMaybe<T> extends AbstractObservableWithUpstream<T, T> {
final MaybeSource<? extends T> other;
public ObservableMergeWithMaybe(Observable<T> source, MaybeSource<? extends T> other) {
super(source);
this.other = other;
}
Reported by PMD.
Line: 49
other.subscribe(parent.otherObserver);
}
static final class MergeWithObserver<T> extends AtomicInteger
implements Observer<T>, Disposable {
private static final long serialVersionUID = -4592979584110982903L;
final Observer<? super T> downstream;
Reported by PMD.
Line: 49
other.subscribe(parent.otherObserver);
}
static final class MergeWithObserver<T> extends AtomicInteger
implements Observer<T>, Disposable {
private static final long serialVersionUID = -4592979584110982903L;
final Observer<? super T> downstream;
Reported by PMD.
Line: 54
private static final long serialVersionUID = -4592979584110982903L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> mainDisposable;
final OtherObserver<T> otherObserver;
Reported by PMD.
Line: 56
final Observer<? super T> downstream;
final AtomicReference<Disposable> mainDisposable;
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
Reported by PMD.
Line: 58
final AtomicReference<Disposable> mainDisposable;
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
volatile SimplePlainQueue<T> queue;
Reported by PMD.
Line: 60
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
volatile SimplePlainQueue<T> queue;
T singleItem;
Reported by PMD.
Line: 62
final AtomicThrowable errors;
volatile SimplePlainQueue<T> queue;
T singleItem;
volatile boolean disposed;
Reported by PMD.
Line: 64
volatile SimplePlainQueue<T> queue;
T singleItem;
volatile boolean disposed;
volatile boolean mainDone;
Reported by PMD.
Line: 66
T singleItem;
volatile boolean disposed;
volatile boolean mainDone;
volatile int otherState;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/schedulers/TrampolineSchedulerTest.java
30 issues
Line: 59
@Override
public void accept(String t) {
System.out.println("t: " + t);
}
});
}
@Test
Reported by PMD.
Line: 141
public void run() {
String msg = key + ".1";
workDone.add(msg);
System.out.println(msg);
Worker worker3 = Schedulers.trampoline().createWorker();
worker3.schedule(createPrintAction(key + ".B.1", workDone));
worker3.schedule(createPrintAction(key + ".B.2", workDone));
}
Reported by PMD.
Line: 156
@Override
public void run() {
System.out.println(message);
workDone.add(message);
}
};
}
Reported by PMD.
Line: 42
@Test
public final void mergeWithCurrentThreadScheduler1() {
final String currentThreadName = Thread.currentThread().getName();
Flowable<Integer> f1 = Flowable.<Integer> just(1, 2, 3, 4, 5);
Flowable<Integer> f2 = Flowable.<Integer> just(6, 7, 8, 9, 10);
Flowable<String> f = Flowable.<Integer> merge(f1, f2).subscribeOn(Schedulers.trampoline()).map(new Function<Integer, String>() {
Reported by PMD.
Line: 50
@Override
public String apply(Integer t) {
assertEquals(Thread.currentThread().getName(), currentThreadName);
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
}
});
f.blockingForEach(new Consumer<String>() {
Reported by PMD.
Line: 50
@Override
public String apply(Integer t) {
assertEquals(Thread.currentThread().getName(), currentThreadName);
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
}
});
f.blockingForEach(new Consumer<String>() {
Reported by PMD.
Line: 51
@Override
public String apply(Integer t) {
assertEquals(Thread.currentThread().getName(), currentThreadName);
return "Value_" + t + "_Thread_" + Thread.currentThread().getName();
}
});
f.blockingForEach(new Consumer<String>() {
Reported by PMD.
Line: 65
}
@Test
public void nestedTrampolineWithUnsubscribe() {
final ArrayList<String> workDone = new ArrayList<>();
final CompositeDisposable workers = new CompositeDisposable();
Worker worker = Schedulers.trampoline().createWorker();
try {
workers.add(worker);
Reported by PMD.
Line: 68
public void nestedTrampolineWithUnsubscribe() {
final ArrayList<String> workDone = new ArrayList<>();
final CompositeDisposable workers = new CompositeDisposable();
Worker worker = Schedulers.trampoline().createWorker();
try {
workers.add(worker);
worker.schedule(new Runnable() {
@Override
Reported by PMD.
Line: 80
});
final Worker worker2 = Schedulers.trampoline().createWorker();
workers.add(worker2);
worker2.schedule(new Runnable() {
@Override
public void run() {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableInternalHelper.java
30 issues
Line: 27
/**
* Helper utility class to support Observable with inner classes.
*/
public final class ObservableInternalHelper {
private ObservableInternalHelper() {
throw new IllegalStateException("No instances!");
}
Reported by PMD.
Line: 34
}
static final class SimpleGenerator<T, S> implements BiFunction<S, Emitter<T>, S> {
final Consumer<Emitter<T>> consumer;
SimpleGenerator(Consumer<Emitter<T>> consumer) {
this.consumer = consumer;
}
Reported by PMD.
Line: 52
}
static final class SimpleBiGenerator<T, S> implements BiFunction<S, Emitter<T>, S> {
final BiConsumer<S, Emitter<T>> consumer;
SimpleBiGenerator(BiConsumer<S, Emitter<T>> consumer) {
this.consumer = consumer;
}
Reported by PMD.
Line: 70
}
static final class ItemDelayFunction<T, U> implements Function<T, ObservableSource<T>> {
final Function<? super T, ? extends ObservableSource<U>> itemDelay;
ItemDelayFunction(Function<? super T, ? extends ObservableSource<U>> itemDelay) {
this.itemDelay = itemDelay;
}
Reported by PMD.
Line: 88
}
static final class ObserverOnNext<T> implements Consumer<T> {
final Observer<T> observer;
ObserverOnNext(Observer<T> observer) {
this.observer = observer;
}
Reported by PMD.
Line: 101
}
static final class ObserverOnError<T> implements Consumer<Throwable> {
final Observer<T> observer;
ObserverOnError(Observer<T> observer) {
this.observer = observer;
}
Reported by PMD.
Line: 114
}
static final class ObserverOnComplete<T> implements Action {
final Observer<T> observer;
ObserverOnComplete(Observer<T> observer) {
this.observer = observer;
}
Reported by PMD.
Line: 139
}
static final class FlatMapWithCombinerInner<U, R, T> implements Function<U, R> {
private final BiFunction<? super T, ? super U, ? extends R> combiner;
private final T t;
FlatMapWithCombinerInner(BiFunction<? super T, ? super U, ? extends R> combiner, T t) {
this.combiner = combiner;
this.t = t;
Reported by PMD.
Line: 140
static final class FlatMapWithCombinerInner<U, R, T> implements Function<U, R> {
private final BiFunction<? super T, ? super U, ? extends R> combiner;
private final T t;
FlatMapWithCombinerInner(BiFunction<? super T, ? super U, ? extends R> combiner, T t) {
this.combiner = combiner;
this.t = t;
}
Reported by PMD.
Line: 154
}
static final class FlatMapWithCombinerOuter<T, R, U> implements Function<T, ObservableSource<R>> {
private final BiFunction<? super T, ? super U, ? extends R> combiner;
private final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
FlatMapWithCombinerOuter(BiFunction<? super T, ? super U, ? extends R> combiner,
Function<? super T, ? extends ObservableSource<? extends U>> mapper) {
this.combiner = combiner;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFutureTest.java
30 issues
Line: 31
public class MaybeFromFutureTest extends RxJavaTest {
@Test
public void cancelImmediately() {
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
Maybe.fromFuture(ft).test(true)
.assertEmpty();
}
Reported by PMD.
Line: 34
public void cancelImmediately() {
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
Maybe.fromFuture(ft).test(true)
.assertEmpty();
}
@Test
public void timeout() {
Reported by PMD.
Line: 34
public void cancelImmediately() {
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
Maybe.fromFuture(ft).test(true)
.assertEmpty();
}
@Test
public void timeout() {
Reported by PMD.
Line: 39
}
@Test
public void timeout() {
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
Maybe.fromFuture(ft, 1, TimeUnit.MILLISECONDS).test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
Reported by PMD.
Line: 42
public void timeout() {
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
Maybe.fromFuture(ft, 1, TimeUnit.MILLISECONDS).test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
}
@Test
Reported by PMD.
Line: 42
public void timeout() {
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
Maybe.fromFuture(ft, 1, TimeUnit.MILLISECONDS).test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
}
@Test
Reported by PMD.
Line: 42
public void timeout() {
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
Maybe.fromFuture(ft, 1, TimeUnit.MILLISECONDS).test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
}
@Test
Reported by PMD.
Line: 48
}
@Test
public void timedWait() {
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
ft.run();
Maybe.fromFuture(ft, 1, TimeUnit.MILLISECONDS).test()
.awaitDone(5, TimeUnit.SECONDS)
Reported by PMD.
Line: 52
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
ft.run();
Maybe.fromFuture(ft, 1, TimeUnit.MILLISECONDS).test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
@Test
Reported by PMD.
Line: 52
FutureTask<Integer> ft = new FutureTask<>(Functions.justCallable(1));
ft.run();
Maybe.fromFuture(ft, 1, TimeUnit.MILLISECONDS).test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1);
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java
30 issues
Line: 35
public class BlockingObservableToIteratorTest extends RxJavaTest {
@Test
public void toIterator() {
Observable<String> obs = Observable.just("one", "two", "three");
Iterator<String> it = obs.blockingIterable().iterator();
assertTrue(it.hasNext());
Reported by PMD.
Line: 36
@Test
public void toIterator() {
Observable<String> obs = Observable.just("one", "two", "three");
Iterator<String> it = obs.blockingIterable().iterator();
assertTrue(it.hasNext());
assertEquals("one", it.next());
Reported by PMD.
Line: 38
public void toIterator() {
Observable<String> obs = Observable.just("one", "two", "three");
Iterator<String> it = obs.blockingIterable().iterator();
assertTrue(it.hasNext());
assertEquals("one", it.next());
assertTrue(it.hasNext());
Reported by PMD.
Line: 38
public void toIterator() {
Observable<String> obs = Observable.just("one", "two", "three");
Iterator<String> it = obs.blockingIterable().iterator();
assertTrue(it.hasNext());
assertEquals("one", it.next());
assertTrue(it.hasNext());
Reported by PMD.
Line: 40
Iterator<String> it = obs.blockingIterable().iterator();
assertTrue(it.hasNext());
assertEquals("one", it.next());
assertTrue(it.hasNext());
assertEquals("two", it.next());
Reported by PMD.
Line: 41
Iterator<String> it = obs.blockingIterable().iterator();
assertTrue(it.hasNext());
assertEquals("one", it.next());
assertTrue(it.hasNext());
assertEquals("two", it.next());
assertTrue(it.hasNext());
Reported by PMD.
Line: 43
assertTrue(it.hasNext());
assertEquals("one", it.next());
assertTrue(it.hasNext());
assertEquals("two", it.next());
assertTrue(it.hasNext());
assertEquals("three", it.next());
Reported by PMD.
Line: 44
assertEquals("one", it.next());
assertTrue(it.hasNext());
assertEquals("two", it.next());
assertTrue(it.hasNext());
assertEquals("three", it.next());
assertFalse(it.hasNext());
Reported by PMD.
Line: 46
assertTrue(it.hasNext());
assertEquals("two", it.next());
assertTrue(it.hasNext());
assertEquals("three", it.next());
assertFalse(it.hasNext());
}
Reported by PMD.
Line: 47
assertEquals("two", it.next());
assertTrue(it.hasNext());
assertEquals("three", it.next());
assertFalse(it.hasNext());
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapMaybe.java
29 issues
Line: 34
*/
public final class ObservableFlatMapMaybe<T, R> extends AbstractObservableWithUpstream<T, R> {
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
final boolean delayErrors;
public ObservableFlatMapMaybe(ObservableSource<T> source, Function<? super T, ? extends MaybeSource<? extends R>> mapper,
boolean delayError) {
Reported by PMD.
Line: 36
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
final boolean delayErrors;
public ObservableFlatMapMaybe(ObservableSource<T> source, Function<? super T, ? extends MaybeSource<? extends R>> mapper,
boolean delayError) {
super(source);
this.mapper = mapper;
Reported by PMD.
Line: 50
source.subscribe(new FlatMapMaybeObserver<>(observer, mapper, delayErrors));
}
static final class FlatMapMaybeObserver<T, R>
extends AtomicInteger
implements Observer<T>, Disposable {
private static final long serialVersionUID = 8600231336733376951L;
Reported by PMD.
Line: 50
source.subscribe(new FlatMapMaybeObserver<>(observer, mapper, delayErrors));
}
static final class FlatMapMaybeObserver<T, R>
extends AtomicInteger
implements Observer<T>, Disposable {
private static final long serialVersionUID = 8600231336733376951L;
Reported by PMD.
Line: 52
static final class FlatMapMaybeObserver<T, R>
extends AtomicInteger
implements Observer<T>, Disposable {
private static final long serialVersionUID = 8600231336733376951L;
final Observer<? super R> downstream;
Reported by PMD.
Line: 56
private static final long serialVersionUID = 8600231336733376951L;
final Observer<? super R> downstream;
final boolean delayErrors;
final CompositeDisposable set;
Reported by PMD.
Line: 58
final Observer<? super R> downstream;
final boolean delayErrors;
final CompositeDisposable set;
final AtomicInteger active;
Reported by PMD.
Line: 60
final boolean delayErrors;
final CompositeDisposable set;
final AtomicInteger active;
final AtomicThrowable errors;
Reported by PMD.
Line: 62
final CompositeDisposable set;
final AtomicInteger active;
final AtomicThrowable errors;
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
Reported by PMD.
Line: 64
final AtomicInteger active;
final AtomicThrowable errors;
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
final AtomicReference<SpscLinkedArrayQueue<R>> queue;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatWithSingleTest.java
29 issues
Line: 28
public class FlowableConcatWithSingleTest extends RxJavaTest {
@Test
public void normal() {
final TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5)
.concatWith(Single.just(100))
.subscribe(ts);
Reported by PMD.
Line: 31
public void normal() {
final TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5)
.concatWith(Single.just(100))
.subscribe(ts);
ts.assertResult(1, 2, 3, 4, 5, 100);
}
Reported by PMD.
Line: 31
public void normal() {
final TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5)
.concatWith(Single.just(100))
.subscribe(ts);
ts.assertResult(1, 2, 3, 4, 5, 100);
}
Reported by PMD.
Line: 39
}
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Single.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
Reported by PMD.
Line: 40
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Single.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
Line: 40
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Single.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
Line: 40
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Single.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
Line: 40
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Single.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
Line: 40
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Single.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
Line: 40
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Single.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.