The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleZipArray.java
20 issues
Line: 28
public final class SingleZipArray<T, R> extends Single<R> {
final SingleSource<? extends T>[] sources;
final Function<? super Object[], ? extends R> zipper;
public SingleZipArray(SingleSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
Reported by PMD.
Line: 30
final SingleSource<? extends T>[] sources;
final Function<? super Object[], ? extends R> zipper;
public SingleZipArray(SingleSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
this.zipper = zipper;
}
Reported by PMD.
Line: 32
final Function<? super Object[], ? extends R> zipper;
public SingleZipArray(SingleSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
this.zipper = zipper;
}
@Override
Reported by PMD.
Line: 42
SingleSource<? extends T>[] sources = this.sources;
int n = sources.length;
if (n == 1) {
sources[0].subscribe(new SingleMap.MapSingleObserver<>(observer, new SingletonArrayFunc()));
return;
}
ZipCoordinator<T, R> parent = new ZipCoordinator<>(observer, n, zipper);
Reported by PMD.
Line: 59
SingleSource<? extends T> source = sources[i];
if (source == null) {
parent.innerError(new NullPointerException("One of the sources is null"), i);
return;
}
source.subscribe(parent.observers[i]);
}
Reported by PMD.
Line: 63
return;
}
source.subscribe(parent.observers[i]);
}
}
static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
Reported by PMD.
Line: 71
private static final long serialVersionUID = -5556924161382950569L;
final SingleObserver<? super R> downstream;
final Function<? super Object[], ? extends R> zipper;
final ZipSingleObserver<T>[] observers;
Reported by PMD.
Line: 73
final SingleObserver<? super R> downstream;
final Function<? super Object[], ? extends R> zipper;
final ZipSingleObserver<T>[] observers;
Object[] values;
Reported by PMD.
Line: 75
final Function<? super Object[], ? extends R> zipper;
final ZipSingleObserver<T>[] observers;
Object[] values;
@SuppressWarnings("unchecked")
ZipCoordinator(SingleObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
Reported by PMD.
Line: 77
final ZipSingleObserver<T>[] observers;
Object[] values;
@SuppressWarnings("unchecked")
ZipCoordinator(SingleObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
super(n);
this.downstream = observer;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableCollectWithCollector.java
20 issues
Line: 38
*/
public final class FlowableCollectWithCollector<T, A, R> extends Flowable<R> {
final Flowable<T> source;
final Collector<? super T, A, R> collector;
public FlowableCollectWithCollector(Flowable<T> source, Collector<? super T, A, R> collector) {
this.source = source;
Reported by PMD.
Line: 40
final Flowable<T> source;
final Collector<? super T, A, R> collector;
public FlowableCollectWithCollector(Flowable<T> source, Collector<? super T, A, R> collector) {
this.source = source;
this.collector = collector;
}
Reported by PMD.
Line: 54
Function<A, R> finisher;
try {
container = collector.supplier().get();
accumulator = collector.accumulator();
finisher = collector.finisher();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
Reported by PMD.
Line: 57
container = collector.supplier().get();
accumulator = collector.accumulator();
finisher = collector.finisher();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Reported by PMD.
Line: 72
private static final long serialVersionUID = -229544830565448758L;
final BiConsumer<A, T> accumulator;
final Function<A, R> finisher;
Subscription upstream;
Reported by PMD.
Line: 74
final BiConsumer<A, T> accumulator;
final Function<A, R> finisher;
Subscription upstream;
boolean done;
Reported by PMD.
Line: 76
final Function<A, R> finisher;
Subscription upstream;
boolean done;
A container;
Reported by PMD.
Line: 78
Subscription upstream;
boolean done;
A container;
CollectorSubscriber(Subscriber<? super R> downstream, A container, BiConsumer<A, T> accumulator, Function<A, R> finisher) {
super(downstream);
Reported by PMD.
Line: 80
boolean done;
A container;
CollectorSubscriber(Subscriber<? super R> downstream, A container, BiConsumer<A, T> accumulator, Function<A, R> finisher) {
super(downstream);
this.container = container;
this.accumulator = accumulator;
Reported by PMD.
Line: 107
}
try {
accumulator.accept(container, t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
onError(ex);
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/jdk8/ObservableCollectWithCollectorSingle.java
20 issues
Line: 38
*/
public final class ObservableCollectWithCollectorSingle<T, A, R> extends Single<R> implements FuseToObservable<R> {
final Observable<T> source;
final Collector<? super T, A, R> collector;
public ObservableCollectWithCollectorSingle(Observable<T> source, Collector<? super T, A, R> collector) {
this.source = source;
Reported by PMD.
Line: 40
final Observable<T> source;
final Collector<? super T, A, R> collector;
public ObservableCollectWithCollectorSingle(Observable<T> source, Collector<? super T, A, R> collector) {
this.source = source;
this.collector = collector;
}
Reported by PMD.
Line: 59
Function<A, R> finisher;
try {
container = collector.supplier().get();
accumulator = collector.accumulator();
finisher = collector.finisher();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
Reported by PMD.
Line: 62
container = collector.supplier().get();
accumulator = collector.accumulator();
finisher = collector.finisher();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
Line: 73
static final class CollectorSingleObserver<T, A, R> implements Observer<T>, Disposable {
final SingleObserver<? super R> downstream;
final BiConsumer<A, T> accumulator;
final Function<A, R> finisher;
Reported by PMD.
Line: 75
final SingleObserver<? super R> downstream;
final BiConsumer<A, T> accumulator;
final Function<A, R> finisher;
Disposable upstream;
Reported by PMD.
Line: 77
final BiConsumer<A, T> accumulator;
final Function<A, R> finisher;
Disposable upstream;
boolean done;
Reported by PMD.
Line: 79
final Function<A, R> finisher;
Disposable upstream;
boolean done;
A container;
Reported by PMD.
Line: 81
Disposable upstream;
boolean done;
A container;
CollectorSingleObserver(SingleObserver<? super R> downstream, A container, BiConsumer<A, T> accumulator, Function<A, R> finisher) {
this.downstream = downstream;
Reported by PMD.
Line: 83
boolean done;
A container;
CollectorSingleObserver(SingleObserver<? super R> downstream, A container, BiConsumer<A, T> accumulator, Function<A, R> finisher) {
this.downstream = downstream;
this.container = container;
this.accumulator = accumulator;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ScalarXMapZHelper.java
20 issues
Line: 57
Supplier<T> supplier = (Supplier<T>) source;
CompletableSource cs = null;
try {
T item = supplier.get();
if (item != null) {
cs = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null CompletableSource");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Reported by PMD.
Line: 61
if (item != null) {
cs = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null CompletableSource");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return true;
}
Reported by PMD.
Line: 70
if (cs == null) {
EmptyDisposable.complete(observer);
} else {
cs.subscribe(observer);
}
return true;
}
return false;
}
Reported by PMD.
Line: 97
Supplier<T> supplier = (Supplier<T>) source;
MaybeSource<? extends R> cs = null;
try {
T item = supplier.get();
if (item != null) {
cs = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null MaybeSource");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Reported by PMD.
Line: 101
if (item != null) {
cs = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null MaybeSource");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return true;
}
Reported by PMD.
Line: 110
if (cs == null) {
EmptyDisposable.complete(observer);
} else {
cs.subscribe(MaybeToObservable.create(observer));
}
return true;
}
return false;
}
Reported by PMD.
Line: 137
Supplier<T> supplier = (Supplier<T>) source;
SingleSource<? extends R> cs = null;
try {
T item = supplier.get();
if (item != null) {
cs = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null SingleSource");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Reported by PMD.
Line: 141
if (item != null) {
cs = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null SingleSource");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return true;
}
Reported by PMD.
Line: 150
if (cs == null) {
EmptyDisposable.complete(observer);
} else {
cs.subscribe(SingleToObservable.create(observer));
}
return true;
}
return false;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.mixed;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.internal.operators.maybe.MaybeToObservable;
import io.reactivex.rxjava3.internal.operators.single.SingleToObservable;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java
20 issues
Line: 30
import io.reactivex.rxjava3.internal.util.*;
public final class BlockingFlowableIterable<T> implements Iterable<T> {
final Flowable<T> source;
final int bufferSize;
public BlockingFlowableIterable(Flowable<T> source, int bufferSize) {
this.source = source;
Reported by PMD.
Line: 32
public final class BlockingFlowableIterable<T> implements Iterable<T> {
final Flowable<T> source;
final int bufferSize;
public BlockingFlowableIterable(Flowable<T> source, int bufferSize) {
this.source = source;
this.bufferSize = bufferSize;
}
Reported by PMD.
Line: 46
return it;
}
static final class BlockingFlowableIterator<T>
extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Iterator<T>, Runnable, Disposable {
private static final long serialVersionUID = 6695226475494099826L;
Reported by PMD.
Line: 46
return it;
}
static final class BlockingFlowableIterator<T>
extends AtomicReference<Subscription>
implements FlowableSubscriber<T>, Iterator<T>, Runnable, Disposable {
private static final long serialVersionUID = 6695226475494099826L;
Reported by PMD.
Line: 52
private static final long serialVersionUID = 6695226475494099826L;
final SpscArrayQueue<T> queue;
final long batchSize;
final long limit;
Reported by PMD.
Line: 54
final SpscArrayQueue<T> queue;
final long batchSize;
final long limit;
final Lock lock;
Reported by PMD.
Line: 56
final long batchSize;
final long limit;
final Lock lock;
final Condition condition;
Reported by PMD.
Line: 58
final long limit;
final Lock lock;
final Condition condition;
long produced;
Reported by PMD.
Line: 60
final Lock lock;
final Condition condition;
long produced;
volatile boolean done;
volatile Throwable error;
Reported by PMD.
Line: 62
final Condition condition;
long produced;
volatile boolean done;
volatile Throwable error;
BlockingFlowableIterator(int batchSize) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableDoOnTest.java
20 issues
Line: 29
public class ObservableDoOnTest extends RxJavaTest {
@Test
public void doOnEach() {
final AtomicReference<String> r = new AtomicReference<>();
String output = Observable.just("one").doOnNext(new Consumer<String>() {
@Override
public void accept(String v) {
r.set(v);
Reported by PMD.
Line: 31
@Test
public void doOnEach() {
final AtomicReference<String> r = new AtomicReference<>();
String output = Observable.just("one").doOnNext(new Consumer<String>() {
@Override
public void accept(String v) {
r.set(v);
}
}).blockingSingle();
Reported by PMD.
Line: 38
}
}).blockingSingle();
assertEquals("one", output);
assertEquals("one", r.get());
}
@Test
public void doOnError() {
Reported by PMD.
Line: 39
}).blockingSingle();
assertEquals("one", output);
assertEquals("one", r.get());
}
@Test
public void doOnError() {
final AtomicReference<Throwable> r = new AtomicReference<>();
Reported by PMD.
Line: 43
}
@Test
public void doOnError() {
final AtomicReference<Throwable> r = new AtomicReference<>();
Throwable t = null;
try {
Observable.<String> error(new RuntimeException("an error"))
.doOnError(new Consumer<Throwable>() {
Reported by PMD.
Line: 55
}
}).blockingSingle();
fail("expected exception, not a return value");
} catch (Throwable e) {
t = e;
}
assertNotNull(t);
assertEquals(t, r.get());
Reported by PMD.
Line: 59
t = e;
}
assertNotNull(t);
assertEquals(t, r.get());
}
@Test
public void doOnCompleted() {
Reported by PMD.
Line: 60
}
assertNotNull(t);
assertEquals(t, r.get());
}
@Test
public void doOnCompleted() {
final AtomicBoolean r = new AtomicBoolean();
Reported by PMD.
Line: 64
}
@Test
public void doOnCompleted() {
final AtomicBoolean r = new AtomicBoolean();
String output = Observable.just("one").doOnComplete(new Action() {
@Override
public void run() {
r.set(true);
Reported by PMD.
Line: 73
}
}).blockingSingle();
assertEquals("one", output);
assertTrue(r.get());
}
@Test
public void doOnTerminateComplete() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleZipTest.java
20 issues
Line: 29
public class SingleZipTest extends RxJavaTest {
@Test
public void zip2() {
Single.zip(Single.just(1), Single.just(2), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return a + "" + b;
}
Reported by PMD.
Line: 33
Single.zip(Single.just(1), Single.just(2), new BiFunction<Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b) throws Exception {
return a + "" + b;
}
})
.test()
.assertResult("12");
}
Reported by PMD.
Line: 41
}
@Test
public void zip3() {
Single.zip(Single.just(1), Single.just(2), Single.just(3), new Function3<Integer, Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b, Integer c) throws Exception {
return a + "" + b + c;
}
Reported by PMD.
Line: 45
Single.zip(Single.just(1), Single.just(2), Single.just(3), new Function3<Integer, Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b, Integer c) throws Exception {
return a + "" + b + c;
}
})
.test()
.assertResult("123");
}
Reported by PMD.
Line: 53
}
@Test
public void zip4() {
Single.zip(Single.just(1), Single.just(2), Single.just(3),
Single.just(4),
new Function4<Integer, Integer, Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b, Integer c, Integer d) throws Exception {
Reported by PMD.
Line: 59
new Function4<Integer, Integer, Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b, Integer c, Integer d) throws Exception {
return a + "" + b + c + d;
}
})
.test()
.assertResult("1234");
}
Reported by PMD.
Line: 67
}
@Test
public void zip5() {
Single.zip(Single.just(1), Single.just(2), Single.just(3),
Single.just(4), Single.just(5),
new Function5<Integer, Integer, Integer, Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b, Integer c, Integer d, Integer e) throws Exception {
Reported by PMD.
Line: 73
new Function5<Integer, Integer, Integer, Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b, Integer c, Integer d, Integer e) throws Exception {
return a + "" + b + c + d + e;
}
})
.test()
.assertResult("12345");
}
Reported by PMD.
Line: 81
}
@Test
public void zip6() {
Single.zip(Single.just(1), Single.just(2), Single.just(3),
Single.just(4), Single.just(5), Single.just(6),
new Function6<Integer, Integer, Integer, Integer, Integer, Integer, Object>() {
@Override
public Object apply(Integer a, Integer b, Integer c, Integer d, Integer e, Integer f)
Reported by PMD.
Line: 88
@Override
public Object apply(Integer a, Integer b, Integer c, Integer d, Integer e, Integer f)
throws Exception {
return a + "" + b + c + d + e + f;
}
})
.test()
.assertResult("123456");
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/util/MergerBiFunctionTest.java
20 issues
Line: 27
public class MergerBiFunctionTest extends RxJavaTest {
@Test
public void firstEmpty() throws Exception {
MergerBiFunction<Integer> merger = new MergerBiFunction<>(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
Reported by PMD.
Line: 36
});
List<Integer> list = merger.apply(Collections.<Integer>emptyList(), Arrays.asList(3, 5));
assertEquals(Arrays.asList(3, 5), list);
}
@Test
public void bothEmpty() throws Exception {
MergerBiFunction<Integer> merger = new MergerBiFunction<>(new Comparator<Integer>() {
Reported by PMD.
Line: 40
}
@Test
public void bothEmpty() throws Exception {
MergerBiFunction<Integer> merger = new MergerBiFunction<>(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
Reported by PMD.
Line: 49
});
List<Integer> list = merger.apply(Collections.<Integer>emptyList(), Collections.<Integer>emptyList());
assertEquals(Collections.<Integer>emptyList(), list);
}
@Test
public void secondEmpty() throws Exception {
MergerBiFunction<Integer> merger = new MergerBiFunction<>(new Comparator<Integer>() {
Reported by PMD.
Line: 53
}
@Test
public void secondEmpty() throws Exception {
MergerBiFunction<Integer> merger = new MergerBiFunction<>(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
Reported by PMD.
Line: 62
});
List<Integer> list = merger.apply(Arrays.asList(2, 4), Collections.<Integer>emptyList());
assertEquals(Arrays.asList(2, 4), list);
}
@Test
public void sameSize() throws Exception {
MergerBiFunction<Integer> merger = new MergerBiFunction<>(new Comparator<Integer>() {
Reported by PMD.
Line: 66
}
@Test
public void sameSize() throws Exception {
MergerBiFunction<Integer> merger = new MergerBiFunction<>(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
Reported by PMD.
Line: 75
});
List<Integer> list = merger.apply(Arrays.asList(2, 4), Arrays.asList(3, 5));
assertEquals(Arrays.asList(2, 3, 4, 5), list);
}
@Test
public void sameSizeReverse() throws Exception {
MergerBiFunction<Integer> merger = new MergerBiFunction<>(new Comparator<Integer>() {
Reported by PMD.
Line: 79
}
@Test
public void sameSizeReverse() throws Exception {
MergerBiFunction<Integer> merger = new MergerBiFunction<>(new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o1.compareTo(o2);
}
Reported by PMD.
Line: 88
});
List<Integer> list = merger.apply(Arrays.asList(3, 5), Arrays.asList(2, 4));
assertEquals(Arrays.asList(2, 3, 4, 5), list);
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeHideTest.java
20 issues
Line: 30
public class MaybeHideTest extends RxJavaTest {
@Test
public void normal() {
Maybe.just(1)
.hide()
.test()
.assertResult(1);
}
Reported by PMD.
Line: 31
@Test
public void normal() {
Maybe.just(1)
.hide()
.test()
.assertResult(1);
}
Reported by PMD.
Line: 31
@Test
public void normal() {
Maybe.just(1)
.hide()
.test()
.assertResult(1);
}
Reported by PMD.
Line: 31
@Test
public void normal() {
Maybe.just(1)
.hide()
.test()
.assertResult(1);
}
Reported by PMD.
Line: 38
}
@Test
public void empty() {
Maybe.empty()
.hide()
.test()
.assertResult();
}
Reported by PMD.
Line: 39
@Test
public void empty() {
Maybe.empty()
.hide()
.test()
.assertResult();
}
Reported by PMD.
Line: 39
@Test
public void empty() {
Maybe.empty()
.hide()
.test()
.assertResult();
}
Reported by PMD.
Line: 39
@Test
public void empty() {
Maybe.empty()
.hide()
.test()
.assertResult();
}
Reported by PMD.
Line: 46
}
@Test
public void error() {
Maybe.error(new TestException())
.hide()
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 54
}
@Test
public void hidden() {
assertTrue(Maybe.just(1) instanceof ScalarSupplier);
assertFalse(Maybe.just(1).hide() instanceof ScalarSupplier);
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/FlowableThrottleLastTests.java
20 issues
Line: 32
public class FlowableThrottleLastTests extends RxJavaTest {
@Test
public void throttle() {
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
TestScheduler s = new TestScheduler();
PublishProcessor<Integer> o = PublishProcessor.create();
o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(subscriber);
Reported by PMD.
Line: 37
TestScheduler s = new TestScheduler();
PublishProcessor<Integer> o = PublishProcessor.create();
o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(subscriber);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
Reported by PMD.
Line: 37
TestScheduler s = new TestScheduler();
PublishProcessor<Integer> o = PublishProcessor.create();
o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(subscriber);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
Reported by PMD.
Line: 41
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
Reported by PMD.
Line: 42
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 44
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver
Reported by PMD.
Line: 46
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
Reported by PMD.
Line: 48
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onComplete();
Reported by PMD.
Line: 49
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onComplete();
Reported by PMD.
Line: 51
o.onNext(5); // skip
o.onNext(6); // deliver
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onComplete();
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber).onNext(2);
Reported by PMD.