The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleBlockingSubscribeTest.java
46 issues
Line: 31
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class SingleBlockingSubscribeTest {
@Test
public void noArgSuccess() {
Single.just(1)
.blockingSubscribe();
Reported by PMD.
Line: 34
public class SingleBlockingSubscribeTest {
@Test
public void noArgSuccess() {
Single.just(1)
.blockingSubscribe();
}
@Test
Reported by PMD.
Line: 35
@Test
public void noArgSuccess() {
Single.just(1)
.blockingSubscribe();
}
@Test
public void noArgSuccessAsync() {
Reported by PMD.
Line: 40
}
@Test
public void noArgSuccessAsync() {
Single.just(1)
.delay(100, TimeUnit.MILLISECONDS)
.blockingSubscribe();
}
Reported by PMD.
Line: 41
@Test
public void noArgSuccessAsync() {
Single.just(1)
.delay(100, TimeUnit.MILLISECONDS)
.blockingSubscribe();
}
@Test
Reported by PMD.
Line: 41
@Test
public void noArgSuccessAsync() {
Single.just(1)
.delay(100, TimeUnit.MILLISECONDS)
.blockingSubscribe();
}
@Test
Reported by PMD.
Line: 47
}
@Test
public void noArgError() throws Throwable {
TestHelper.withErrorTracking(errors -> {
Single.error(new TestException())
.blockingSubscribe();
TestHelper.assertUndeliverable(errors, 0, TestException.class);
Reported by PMD.
Line: 57
}
@Test
public void noArgErrorAsync() throws Throwable {
TestHelper.withErrorTracking(errors -> {
Single.error(new TestException())
.delay(100, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
.blockingSubscribe();
Reported by PMD.
Line: 69
@Test
public void oneArgSuccess() throws Throwable {
@SuppressWarnings("unchecked")
Consumer<Integer> success = mock(Consumer.class);
Single.just(1)
.blockingSubscribe(success);
Reported by PMD.
Line: 72
@SuppressWarnings("unchecked")
Consumer<Integer> success = mock(Consumer.class);
Single.just(1)
.blockingSubscribe(success);
verify(success).accept(1);
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/subscriptions/SubscriptionArbiterTest.java
46 issues
Line: 26
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class SubscriptionArbiterTest extends RxJavaTest {
@Test
public void setSubscriptionMissed() {
SubscriptionArbiter sa = new SubscriptionArbiter(true);
Reported by PMD.
Line: 29
public class SubscriptionArbiterTest extends RxJavaTest {
@Test
public void setSubscriptionMissed() {
SubscriptionArbiter sa = new SubscriptionArbiter(true);
sa.getAndIncrement();
BooleanSubscription bs1 = new BooleanSubscription();
Reported by PMD.
Line: 42
sa.setSubscription(bs2);
assertTrue(bs1.isCancelled());
assertFalse(bs2.isCancelled());
}
@Test
Reported by PMD.
Line: 44
assertTrue(bs1.isCancelled());
assertFalse(bs2.isCancelled());
}
@Test
public void invalidDeferredRequest() {
SubscriptionArbiter sa = new SubscriptionArbiter(true);
Reported by PMD.
Line: 48
}
@Test
public void invalidDeferredRequest() {
SubscriptionArbiter sa = new SubscriptionArbiter(true);
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
sa.request(-99);
Reported by PMD.
Line: 62
}
@Test
public void unbounded() {
SubscriptionArbiter sa = new SubscriptionArbiter(true);
sa.request(Long.MAX_VALUE);
assertEquals(Long.MAX_VALUE, sa.requested);
Reported by PMD.
Line: 67
sa.request(Long.MAX_VALUE);
assertEquals(Long.MAX_VALUE, sa.requested);
assertTrue(sa.isUnbounded());
sa.unbounded = false;
Reported by PMD.
Line: 69
assertEquals(Long.MAX_VALUE, sa.requested);
assertTrue(sa.isUnbounded());
sa.unbounded = false;
sa.request(Long.MAX_VALUE);
Reported by PMD.
Line: 75
sa.request(Long.MAX_VALUE);
assertEquals(Long.MAX_VALUE, sa.requested);
sa.produced(1);
assertEquals(Long.MAX_VALUE, sa.requested);
Reported by PMD.
Line: 79
sa.produced(1);
assertEquals(Long.MAX_VALUE, sa.requested);
sa.unbounded = false;
sa.produced(Long.MAX_VALUE);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableBlockingSubscribeTest.java
46 issues
Line: 31
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class CompletableBlockingSubscribeTest {
@Test
public void noArgComplete() {
Completable.complete()
.blockingSubscribe();
Reported by PMD.
Line: 34
public class CompletableBlockingSubscribeTest {
@Test
public void noArgComplete() {
Completable.complete()
.blockingSubscribe();
}
@Test
Reported by PMD.
Line: 35
@Test
public void noArgComplete() {
Completable.complete()
.blockingSubscribe();
}
@Test
public void noArgCompleteAsync() {
Reported by PMD.
Line: 40
}
@Test
public void noArgCompleteAsync() {
Completable.complete()
.delay(100, TimeUnit.MILLISECONDS)
.blockingSubscribe();
}
Reported by PMD.
Line: 41
@Test
public void noArgCompleteAsync() {
Completable.complete()
.delay(100, TimeUnit.MILLISECONDS)
.blockingSubscribe();
}
@Test
Reported by PMD.
Line: 41
@Test
public void noArgCompleteAsync() {
Completable.complete()
.delay(100, TimeUnit.MILLISECONDS)
.blockingSubscribe();
}
@Test
Reported by PMD.
Line: 47
}
@Test
public void noArgError() throws Throwable {
TestHelper.withErrorTracking(errors -> {
Completable.error(new TestException())
.blockingSubscribe();
TestHelper.assertUndeliverable(errors, 0, TestException.class);
Reported by PMD.
Line: 57
}
@Test
public void noArgErrorAsync() throws Throwable {
TestHelper.withErrorTracking(errors -> {
Completable.error(new TestException())
.delay(100, TimeUnit.MILLISECONDS, Schedulers.computation(), true)
.blockingSubscribe();
Reported by PMD.
Line: 71
public void oneArgComplete() throws Throwable {
Action action = mock(Action.class);
Completable.complete()
.blockingSubscribe(action);
verify(action).run();
}
Reported by PMD.
Line: 74
Completable.complete()
.blockingSubscribe(action);
verify(action).run();
}
@Test
public void oneArgCompleteAsync() throws Throwable {
Action action = mock(Action.class);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/subscribers/QueueDrainSubscriberTest.java
46 issues
Line: 32
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class QueueDrainSubscriberTest extends RxJavaTest {
static final QueueDrainSubscriber<Integer, Integer, Integer> createUnordered(TestSubscriber<Integer> ts, final Disposable d) {
return new QueueDrainSubscriber<Integer, Integer, Integer>(ts, new SpscArrayQueue<>(4)) {
@Override
public void onNext(Integer t) {
Reported by PMD.
Line: 153
QueueDrainSubscriber<Integer, Integer, Integer> qd = createUnordered(ts, d);
ts.onSubscribe(new BooleanSubscription());
qd.onNext(1);
ts.assertFailure(MissingBackpressureException.class);
assertTrue(d.isDisposed());
}
Reported by PMD.
Line: 157
ts.assertFailure(MissingBackpressureException.class);
assertTrue(d.isDisposed());
}
@Test
public void orderedFastPathNoRequest() {
TestSubscriber<Integer> ts = new TestSubscriber<>(0);
Reported by PMD.
Line: 157
ts.assertFailure(MissingBackpressureException.class);
assertTrue(d.isDisposed());
}
@Test
public void orderedFastPathNoRequest() {
TestSubscriber<Integer> ts = new TestSubscriber<>(0);
Reported by PMD.
Line: 167
QueueDrainSubscriber<Integer, Integer, Integer> qd = createOrdered(ts, d);
ts.onSubscribe(new BooleanSubscription());
qd.onNext(1);
ts.assertFailure(MissingBackpressureException.class);
assertTrue(d.isDisposed());
}
Reported by PMD.
Line: 171
ts.assertFailure(MissingBackpressureException.class);
assertTrue(d.isDisposed());
}
@Test
public void acceptBadRequest() {
TestSubscriber<Integer> ts = new TestSubscriber<>(0);
Reported by PMD.
Line: 171
ts.assertFailure(MissingBackpressureException.class);
assertTrue(d.isDisposed());
}
@Test
public void acceptBadRequest() {
TestSubscriber<Integer> ts = new TestSubscriber<>(0);
Reported by PMD.
Line: 181
QueueDrainSubscriber<Integer, Integer, Integer> qd = createUnordered(ts, d);
ts.onSubscribe(new BooleanSubscription());
assertTrue(qd.accept(ts, 0));
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
qd.requested(-1);
TestHelper.assertError(errors, 0, IllegalArgumentException.class);
Reported by PMD.
Line: 181
QueueDrainSubscriber<Integer, Integer, Integer> qd = createUnordered(ts, d);
ts.onSubscribe(new BooleanSubscription());
assertTrue(qd.accept(ts, 0));
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
qd.requested(-1);
TestHelper.assertError(errors, 0, IllegalArgumentException.class);
Reported by PMD.
Line: 193
}
@Test
public void unorderedFastPathRequest1() {
TestSubscriber<Integer> ts = new TestSubscriber<>(1);
Disposable d = Disposable.empty();
QueueDrainSubscriber<Integer, Integer, Integer> qd = createUnordered(ts, d);
ts.onSubscribe(new BooleanSubscription());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableUnsubscribeOnTest.java
46 issues
Line: 188
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("failed to initialize and get inner thread");
}
}
@NonNull
@Override
Reported by PMD.
Line: 75
assertNotSame(Thread.currentThread(), subscribeThread.get());
// True for Schedulers.newThread()
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread.toString(), unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
observer.assertTerminated();
Reported by PMD.
Line: 76
// True for Schedulers.newThread()
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread.toString(), unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
observer.assertTerminated();
} finally {
Reported by PMD.
Line: 125
assertNotSame(Thread.currentThread(), subscribeThread.get());
// True for Schedulers.newThread()
System.out.println("UI Thread: " + uiEventLoop.getThread());
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
Reported by PMD.
Line: 126
// True for Schedulers.newThread()
System.out.println("UI Thread: " + uiEventLoop.getThread());
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
observer.assertTerminated();
Reported by PMD.
Line: 127
System.out.println("UI Thread: " + uiEventLoop.getThread());
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
observer.assertValues(1, 2);
observer.assertTerminated();
} finally {
Reported by PMD.
Line: 148
@Override
public void dispose() {
set(true);
System.out.println("unsubscribe invoked: " + Thread.currentThread());
thread = Thread.currentThread();
latch.countDown();
}
@Override public boolean isDisposed() {
Reported by PMD.
Line: 37
public class ObservableUnsubscribeOnTest extends RxJavaTest {
@Test
public void unsubscribeWhenSubscribeOnAndUnsubscribeOnAreOnSameThread() throws InterruptedException {
UIEventLoopScheduler uiEventLoop = new UIEventLoopScheduler();
try {
final ThreadSubscription subscription = new ThreadSubscription();
final AtomicReference<Thread> subscribeThread = new AtomicReference<>();
Observable<Integer> w = Observable.unsafeCreate(new ObservableSource<Integer>() {
Reported by PMD.
Line: 59
TestObserverEx<Integer> observer = new TestObserverEx<>();
w.subscribeOn(uiEventLoop).observeOn(Schedulers.computation())
.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(observer);
observer.awaitDone(5, TimeUnit.SECONDS);
Reported by PMD.
Line: 59
TestObserverEx<Integer> observer = new TestObserverEx<>();
w.subscribeOn(uiEventLoop).observeOn(Schedulers.computation())
.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(observer);
observer.awaitDone(5, TimeUnit.SECONDS);
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/OperatorMergePerf.java
46 issues
Line: 40
}
});
PerfSubscriber o = input.newLatchedObserver();
Flowable.merge(os).subscribe(o);
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
Reported by PMD.
Line: 42
PerfSubscriber o = input.newLatchedObserver();
Flowable.merge(os).subscribe(o);
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
}
}
Reported by PMD.
Line: 43
Flowable.merge(os).subscribe(o);
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
}
}
Reported by PMD.
Line: 43
Flowable.merge(os).subscribe(o);
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
}
}
Reported by PMD.
Line: 45
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
}
}
// flatMap
@Benchmark
Reported by PMD.
Line: 59
}
});
PerfSubscriber o = input.newLatchedObserver();
Flowable.merge(os).subscribe(o);
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
Reported by PMD.
Line: 61
PerfSubscriber o = input.newLatchedObserver();
Flowable.merge(os).subscribe(o);
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
}
}
Reported by PMD.
Line: 62
Flowable.merge(os).subscribe(o);
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
}
}
Reported by PMD.
Line: 62
Flowable.merge(os).subscribe(o);
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
}
}
Reported by PMD.
Line: 64
if (input.size == 1) {
while (o.latch.getCount() != 0) { }
} else {
o.latch.await();
}
}
@Benchmark
public void mergeNSyncStreamsOfN(final InputThousand input) throws InterruptedException {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableJoin.java
45 issues
Line: 31
public final class ObservableJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends AbstractObservableWithUpstream<TLeft, R> {
final ObservableSource<? extends TRight> other;
final Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd;
Reported by PMD.
Line: 33
final ObservableSource<? extends TRight> other;
final Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector;
Reported by PMD.
Line: 35
final Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector;
public ObservableJoin(
ObservableSource<TLeft> source,
Reported by PMD.
Line: 37
final Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector;
public ObservableJoin(
ObservableSource<TLeft> source,
ObservableSource<? extends TRight> other,
Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd,
Reported by PMD.
Line: 62
observer.onSubscribe(parent);
LeftRightObserver left = new LeftRightObserver(parent, true);
parent.disposables.add(left);
LeftRightObserver right = new LeftRightObserver(parent, false);
parent.disposables.add(right);
source.subscribe(left);
other.subscribe(right);
Reported by PMD.
Line: 64
LeftRightObserver left = new LeftRightObserver(parent, true);
parent.disposables.add(left);
LeftRightObserver right = new LeftRightObserver(parent, false);
parent.disposables.add(right);
source.subscribe(left);
other.subscribe(right);
}
Reported by PMD.
Line: 70
other.subscribe(right);
}
static final class JoinDisposable<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Disposable, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Observer<? super R> downstream;
Reported by PMD.
Line: 70
other.subscribe(right);
}
static final class JoinDisposable<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Disposable, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Observer<? super R> downstream;
Reported by PMD.
Line: 75
private static final long serialVersionUID = -6071216598687999801L;
final Observer<? super R> downstream;
final SpscLinkedArrayQueue<Object> queue;
final CompositeDisposable disposables;
Reported by PMD.
Line: 77
final Observer<? super R> downstream;
final SpscLinkedArrayQueue<Object> queue;
final CompositeDisposable disposables;
final Map<Integer, TLeft> lefts;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZip.java
45 issues
Line: 29
public final class ObservableZip<T, R> extends Observable<R> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> zipper;
final int bufferSize;
final boolean delayError;
Reported by PMD.
Line: 30
public final class ObservableZip<T, R> extends Observable<R> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> zipper;
final int bufferSize;
final boolean delayError;
public ObservableZip(ObservableSource<? extends T>[] sources,
Reported by PMD.
Line: 31
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> zipper;
final int bufferSize;
final boolean delayError;
public ObservableZip(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Reported by PMD.
Line: 32
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> zipper;
final int bufferSize;
final boolean delayError;
public ObservableZip(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> zipper,
Reported by PMD.
Line: 33
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> zipper;
final int bufferSize;
final boolean delayError;
public ObservableZip(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> zipper,
int bufferSize,
Reported by PMD.
Line: 35
final int bufferSize;
final boolean delayError;
public ObservableZip(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> zipper,
int bufferSize,
boolean delayError) {
this.sources = sources;
Reported by PMD.
Line: 56
sources = new ObservableSource[8];
for (ObservableSource<? extends T> p : sourcesIterable) {
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
Reported by PMD.
Line: 75
zc.subscribe(sources, bufferSize);
}
static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
private static final long serialVersionUID = 2983708048395377667L;
final Observer<? super R> downstream;
final Function<? super Object[], ? extends R> zipper;
final ZipObserver<T, R>[] observers;
Reported by PMD.
Line: 75
zc.subscribe(sources, bufferSize);
}
static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
private static final long serialVersionUID = 2983708048395377667L;
final Observer<? super R> downstream;
final Function<? super Object[], ? extends R> zipper;
final ZipObserver<T, R>[] observers;
Reported by PMD.
Line: 78
static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
private static final long serialVersionUID = 2983708048395377667L;
final Observer<? super R> downstream;
final Function<? super Object[], ? extends R> zipper;
final ZipObserver<T, R>[] observers;
final T[] row;
final boolean delayError;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableRangeLongTest.java
45 issues
Line: 31
import io.reactivex.rxjava3.observers.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableRangeLongTest extends RxJavaTest {
@Test
public void rangeStartAt2Count3() {
Observer<Long> observer = TestHelper.mockObserver();
Observable.rangeLong(2, 3).subscribe(observer);
Reported by PMD.
Line: 36
public void rangeStartAt2Count3() {
Observer<Long> observer = TestHelper.mockObserver();
Observable.rangeLong(2, 3).subscribe(observer);
verify(observer, times(1)).onNext(2L);
verify(observer, times(1)).onNext(3L);
verify(observer, times(1)).onNext(4L);
verify(observer, never()).onNext(5L);
Reported by PMD.
Line: 38
Observable.rangeLong(2, 3).subscribe(observer);
verify(observer, times(1)).onNext(2L);
verify(observer, times(1)).onNext(3L);
verify(observer, times(1)).onNext(4L);
verify(observer, never()).onNext(5L);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
Reported by PMD.
Line: 39
Observable.rangeLong(2, 3).subscribe(observer);
verify(observer, times(1)).onNext(2L);
verify(observer, times(1)).onNext(3L);
verify(observer, times(1)).onNext(4L);
verify(observer, never()).onNext(5L);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 40
verify(observer, times(1)).onNext(2L);
verify(observer, times(1)).onNext(3L);
verify(observer, times(1)).onNext(4L);
verify(observer, never()).onNext(5L);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 41
verify(observer, times(1)).onNext(2L);
verify(observer, times(1)).onNext(3L);
verify(observer, times(1)).onNext(4L);
verify(observer, never()).onNext(5L);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 42
verify(observer, times(1)).onNext(3L);
verify(observer, times(1)).onNext(4L);
verify(observer, never()).onNext(5L);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void rangeUnsubscribe() {
Reported by PMD.
Line: 43
verify(observer, times(1)).onNext(4L);
verify(observer, never()).onNext(5L);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void rangeUnsubscribe() {
Observer<Long> observer = TestHelper.mockObserver();
Reported by PMD.
Line: 60
})
.take(3).subscribe(observer);
verify(observer, times(1)).onNext(1L);
verify(observer, times(1)).onNext(2L);
verify(observer, times(1)).onNext(3L);
verify(observer, never()).onNext(4L);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
Reported by PMD.
Line: 61
.take(3).subscribe(observer);
verify(observer, times(1)).onNext(1L);
verify(observer, times(1)).onNext(2L);
verify(observer, times(1)).onNext(3L);
verify(observer, never()).onNext(4L);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
assertEquals(3, count.get());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimerTest.java
45 issues
Line: 38
import io.reactivex.rxjava3.schedulers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableTimerTest extends RxJavaTest {
@Mock
Observer<Object> observer;
@Mock
Observer<Long> observer2;
Reported by PMD.
Line: 40
public class ObservableTimerTest extends RxJavaTest {
@Mock
Observer<Object> observer;
@Mock
Observer<Long> observer2;
TestScheduler scheduler;
Reported by PMD.
Line: 42
@Mock
Observer<Object> observer;
@Mock
Observer<Long> observer2;
TestScheduler scheduler;
@Before
public void before() {
Reported by PMD.
Line: 44
@Mock
Observer<Long> observer2;
TestScheduler scheduler;
@Before
public void before() {
observer = TestHelper.mockObserver();
Reported by PMD.
Line: 57
@Test
public void timerOnce() {
Observable.timer(100, TimeUnit.MILLISECONDS, scheduler).subscribe(observer);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(observer, times(1)).onNext(0L);
verify(observer, times(1)).onComplete();
verify(observer, never()).onError(any(Throwable.class));
Reported by PMD.
Line: 60
Observable.timer(100, TimeUnit.MILLISECONDS, scheduler).subscribe(observer);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(observer, times(1)).onNext(0L);
verify(observer, times(1)).onComplete();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
Reported by PMD.
Line: 61
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(observer, times(1)).onNext(0L);
verify(observer, times(1)).onComplete();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void timerPeriodically() {
Reported by PMD.
Line: 62
verify(observer, times(1)).onNext(0L);
verify(observer, times(1)).onComplete();
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void timerPeriodically() {
TestObserver<Long> to = new TestObserver<>();
Reported by PMD.
Line: 69
public void timerPeriodically() {
TestObserver<Long> to = new TestObserver<>();
Observable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler).subscribe(to);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
to.assertValue(0L);
Reported by PMD.
Line: 96
public void interval() {
Observable<Long> w = Observable.interval(1, TimeUnit.SECONDS, scheduler);
TestObserver<Long> to = new TestObserver<>();
w.subscribe(to);
to.assertNoValues();
to.assertNoErrors();
to.assertNotComplete();
Reported by PMD.