The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithSingle.java
55 issues
Line: 37
*/
public final class FlowableMergeWithSingle<T> extends AbstractFlowableWithUpstream<T, T> {
final SingleSource<? extends T> other;
public FlowableMergeWithSingle(Flowable<T> source, SingleSource<? extends T> other) {
super(source);
this.other = other;
}
Reported by PMD.
Line: 52
other.subscribe(parent.otherObserver);
}
static final class MergeWithObserver<T> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -4592979584110982903L;
final Subscriber<? super T> downstream;
Reported by PMD.
Line: 52
other.subscribe(parent.otherObserver);
}
static final class MergeWithObserver<T> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -4592979584110982903L;
final Subscriber<? super T> downstream;
Reported by PMD.
Line: 57
private static final long serialVersionUID = -4592979584110982903L;
final Subscriber<? super T> downstream;
final AtomicReference<Subscription> mainSubscription;
final OtherObserver<T> otherObserver;
Reported by PMD.
Line: 59
final Subscriber<? super T> downstream;
final AtomicReference<Subscription> mainSubscription;
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
Reported by PMD.
Line: 61
final AtomicReference<Subscription> mainSubscription;
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
final AtomicLong requested;
Reported by PMD.
Line: 63
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
final AtomicLong requested;
final int prefetch;
Reported by PMD.
Line: 65
final AtomicThrowable errors;
final AtomicLong requested;
final int prefetch;
final int limit;
Reported by PMD.
Line: 67
final AtomicLong requested;
final int prefetch;
final int limit;
volatile SimplePlainQueue<T> queue;
Reported by PMD.
Line: 69
final int prefetch;
final int limit;
volatile SimplePlainQueue<T> queue;
T singleItem;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableJoin.java
55 issues
Line: 34
public final class FlowableJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends AbstractFlowableWithUpstream<TLeft, R> {
final Publisher<? extends TRight> other;
final Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd;
Reported by PMD.
Line: 36
final Publisher<? extends TRight> other;
final Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector;
Reported by PMD.
Line: 38
final Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector;
public FlowableJoin(
Flowable<TLeft> source,
Reported by PMD.
Line: 40
final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super TRight, ? extends R> resultSelector;
public FlowableJoin(
Flowable<TLeft> source,
Publisher<? extends TRight> other,
Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd,
Reported by PMD.
Line: 64
s.onSubscribe(parent);
LeftRightSubscriber left = new LeftRightSubscriber(parent, true);
parent.disposables.add(left);
LeftRightSubscriber right = new LeftRightSubscriber(parent, false);
parent.disposables.add(right);
source.subscribe(left);
other.subscribe(right);
Reported by PMD.
Line: 66
LeftRightSubscriber left = new LeftRightSubscriber(parent, true);
parent.disposables.add(left);
LeftRightSubscriber right = new LeftRightSubscriber(parent, false);
parent.disposables.add(right);
source.subscribe(left);
other.subscribe(right);
}
Reported by PMD.
Line: 72
other.subscribe(right);
}
static final class JoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Subscription, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 72
other.subscribe(right);
}
static final class JoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Subscription, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 73
}
static final class JoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Subscription, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 77
private static final long serialVersionUID = -6071216598687999801L;
final Subscriber<? super R> downstream;
final AtomicLong requested;
final SpscLinkedArrayQueue<Object> queue;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimerTest.java
54 issues
Line: 37
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableTimerTest extends RxJavaTest {
@Mock
Subscriber<Object> subscriber;
@Mock
Subscriber<Long> subscriber2;
Reported by PMD.
Line: 39
public class FlowableTimerTest extends RxJavaTest {
@Mock
Subscriber<Object> subscriber;
@Mock
Subscriber<Long> subscriber2;
TestScheduler scheduler;
Reported by PMD.
Line: 41
@Mock
Subscriber<Object> subscriber;
@Mock
Subscriber<Long> subscriber2;
TestScheduler scheduler;
@Before
public void before() {
Reported by PMD.
Line: 43
@Mock
Subscriber<Long> subscriber2;
TestScheduler scheduler;
@Before
public void before() {
subscriber = TestHelper.mockSubscriber();
Reported by PMD.
Line: 56
@Test
public void timerOnce() {
Flowable.timer(100, TimeUnit.MILLISECONDS, scheduler).subscribe(subscriber);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(subscriber, times(1)).onNext(0L);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
Reported by PMD.
Line: 59
Flowable.timer(100, TimeUnit.MILLISECONDS, scheduler).subscribe(subscriber);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(subscriber, times(1)).onNext(0L);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@Test
Reported by PMD.
Line: 60
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
verify(subscriber, times(1)).onNext(0L);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@Test
public void timerPeriodically() {
Reported by PMD.
Line: 61
verify(subscriber, times(1)).onNext(0L);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@Test
public void timerPeriodically() {
TestSubscriber<Long> ts = new TestSubscriber<>();
Reported by PMD.
Line: 68
public void timerPeriodically() {
TestSubscriber<Long> ts = new TestSubscriber<>();
Flowable.interval(100, 100, TimeUnit.MILLISECONDS, scheduler).subscribe(ts);
scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
ts.assertValue(0L);
Reported by PMD.
Line: 95
public void interval() {
Flowable<Long> w = Flowable.interval(1, TimeUnit.SECONDS, scheduler);
TestSubscriber<Long> ts = new TestSubscriber<>();
w.subscribe(ts);
ts.assertNoValues();
ts.assertNoErrors();
ts.assertNotComplete();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/queue/SimpleQueueTest.java
54 issues
Line: 29
import io.reactivex.rxjava3.core.RxJavaTest;
public class SimpleQueueTest extends RxJavaTest {
@Test(expected = NullPointerException.class)
public void spscArrayQueueNull() {
SpscArrayQueue<Object> q = new SpscArrayQueue<>(16);
q.offer(null);
Reported by PMD.
Line: 29
import io.reactivex.rxjava3.core.RxJavaTest;
public class SimpleQueueTest extends RxJavaTest {
@Test(expected = NullPointerException.class)
public void spscArrayQueueNull() {
SpscArrayQueue<Object> q = new SpscArrayQueue<>(16);
q.offer(null);
Reported by PMD.
Line: 50
}
@Test
public void spscArrayQueueBiOffer() {
SpscArrayQueue<Object> q = new SpscArrayQueue<>(16);
q.offer(1, 2);
assertEquals(1, q.poll());
assertEquals(2, q.poll());
Reported by PMD.
Line: 54
SpscArrayQueue<Object> q = new SpscArrayQueue<>(16);
q.offer(1, 2);
assertEquals(1, q.poll());
assertEquals(2, q.poll());
assertNull(q.poll());
}
@Test
Reported by PMD.
Line: 55
q.offer(1, 2);
assertEquals(1, q.poll());
assertEquals(2, q.poll());
assertNull(q.poll());
}
@Test
public void spscLinkedArrayQueueBiOffer() {
Reported by PMD.
Line: 56
assertEquals(1, q.poll());
assertEquals(2, q.poll());
assertNull(q.poll());
}
@Test
public void spscLinkedArrayQueueBiOffer() {
SpscLinkedArrayQueue<Object> q = new SpscLinkedArrayQueue<>(16);
Reported by PMD.
Line: 60
}
@Test
public void spscLinkedArrayQueueBiOffer() {
SpscLinkedArrayQueue<Object> q = new SpscLinkedArrayQueue<>(16);
q.offer(1, 2);
assertEquals(1, q.poll());
assertEquals(2, q.poll());
Reported by PMD.
Line: 64
SpscLinkedArrayQueue<Object> q = new SpscLinkedArrayQueue<>(16);
q.offer(1, 2);
assertEquals(1, q.poll());
assertEquals(2, q.poll());
assertNull(q.poll());
}
@Test
Reported by PMD.
Line: 65
q.offer(1, 2);
assertEquals(1, q.poll());
assertEquals(2, q.poll());
assertNull(q.poll());
}
@Test
public void mpscLinkedQueueBiOffer() {
Reported by PMD.
Line: 66
assertEquals(1, q.poll());
assertEquals(2, q.poll());
assertNull(q.poll());
}
@Test
public void mpscLinkedQueueBiOffer() {
MpscLinkedQueue<Object> q = new MpscLinkedQueue<>();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapEager.java
54 issues
Line: 32
public final class ObservableConcatMapEager<T, R> extends AbstractObservableWithUpstream<T, R> {
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final ErrorMode errorMode;
final int maxConcurrency;
Reported by PMD.
Line: 34
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final ErrorMode errorMode;
final int maxConcurrency;
final int prefetch;
Reported by PMD.
Line: 36
final ErrorMode errorMode;
final int maxConcurrency;
final int prefetch;
public ObservableConcatMapEager(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends R>> mapper,
Reported by PMD.
Line: 38
final int maxConcurrency;
final int prefetch;
public ObservableConcatMapEager(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends R>> mapper,
ErrorMode errorMode,
int maxConcurrency, int prefetch) {
Reported by PMD.
Line: 56
source.subscribe(new ConcatMapEagerMainObserver<>(observer, mapper, maxConcurrency, prefetch, errorMode));
}
static final class ConcatMapEagerMainObserver<T, R>
extends AtomicInteger
implements Observer<T>, Disposable, InnerQueuedObserverSupport<R> {
private static final long serialVersionUID = 8080567949447303262L;
Reported by PMD.
Line: 56
source.subscribe(new ConcatMapEagerMainObserver<>(observer, mapper, maxConcurrency, prefetch, errorMode));
}
static final class ConcatMapEagerMainObserver<T, R>
extends AtomicInteger
implements Observer<T>, Disposable, InnerQueuedObserverSupport<R> {
private static final long serialVersionUID = 8080567949447303262L;
Reported by PMD.
Line: 56
source.subscribe(new ConcatMapEagerMainObserver<>(observer, mapper, maxConcurrency, prefetch, errorMode));
}
static final class ConcatMapEagerMainObserver<T, R>
extends AtomicInteger
implements Observer<T>, Disposable, InnerQueuedObserverSupport<R> {
private static final long serialVersionUID = 8080567949447303262L;
Reported by PMD.
Line: 58
static final class ConcatMapEagerMainObserver<T, R>
extends AtomicInteger
implements Observer<T>, Disposable, InnerQueuedObserverSupport<R> {
private static final long serialVersionUID = 8080567949447303262L;
final Observer<? super R> downstream;
Reported by PMD.
Line: 62
private static final long serialVersionUID = 8080567949447303262L;
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int maxConcurrency;
Reported by PMD.
Line: 64
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int maxConcurrency;
final int prefetch;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java
54 issues
Line: 33
import io.reactivex.rxjava3.processors.UnicastProcessor;
public final class FlowableWindowBoundarySelector<T, B, V> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final Publisher<B> open;
final Function<? super B, ? extends Publisher<V>> closingIndicator;
final int bufferSize;
public FlowableWindowBoundarySelector(
Flowable<T> source,
Reported by PMD.
Line: 34
public final class FlowableWindowBoundarySelector<T, B, V> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final Publisher<B> open;
final Function<? super B, ? extends Publisher<V>> closingIndicator;
final int bufferSize;
public FlowableWindowBoundarySelector(
Flowable<T> source,
Publisher<B> open, Function<? super B, ? extends Publisher<V>> closingIndicator,
Reported by PMD.
Line: 35
public final class FlowableWindowBoundarySelector<T, B, V> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final Publisher<B> open;
final Function<? super B, ? extends Publisher<V>> closingIndicator;
final int bufferSize;
public FlowableWindowBoundarySelector(
Flowable<T> source,
Publisher<B> open, Function<? super B, ? extends Publisher<V>> closingIndicator,
int bufferSize) {
Reported by PMD.
Line: 53
s, open, closingIndicator, bufferSize));
}
static final class WindowBoundaryMainSubscriber<T, B, V>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Subscriber<? super Flowable<T>> downstream;
Reported by PMD.
Line: 53
s, open, closingIndicator, bufferSize));
}
static final class WindowBoundaryMainSubscriber<T, B, V>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Subscriber<? super Flowable<T>> downstream;
Reported by PMD.
Line: 53
s, open, closingIndicator, bufferSize));
}
static final class WindowBoundaryMainSubscriber<T, B, V>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Subscriber<? super Flowable<T>> downstream;
Reported by PMD.
Line: 55
static final class WindowBoundaryMainSubscriber<T, B, V>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Subscriber<? super Flowable<T>> downstream;
final Publisher<B> open;
final Function<? super B, ? extends Publisher<V>> closingIndicator;
Reported by PMD.
Line: 58
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Subscriber<? super Flowable<T>> downstream;
final Publisher<B> open;
final Function<? super B, ? extends Publisher<V>> closingIndicator;
final int bufferSize;
final CompositeDisposable resources;
Reported by PMD.
Line: 59
private static final long serialVersionUID = 8646217640096099753L;
final Subscriber<? super Flowable<T>> downstream;
final Publisher<B> open;
final Function<? super B, ? extends Publisher<V>> closingIndicator;
final int bufferSize;
final CompositeDisposable resources;
final WindowStartSubscriber<B> startSubscriber;
Reported by PMD.
Line: 59
private static final long serialVersionUID = 8646217640096099753L;
final Subscriber<? super Flowable<T>> downstream;
final Publisher<B> open;
final Function<? super B, ? extends Publisher<V>> closingIndicator;
final int bufferSize;
final CompositeDisposable resources;
final WindowStartSubscriber<B> startSubscriber;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/subscribers/BasicFuseableConditionalSubscriberTest.java
54 issues
Line: 29
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.testsupport.*;
public class BasicFuseableConditionalSubscriberTest extends RxJavaTest {
@Test
public void offerThrows() {
ConditionalSubscriber<Integer> cs = new ConditionalSubscriber<Integer>() {
Reported by PMD.
Line: 29
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.testsupport.*;
public class BasicFuseableConditionalSubscriberTest extends RxJavaTest {
@Test
public void offerThrows() {
ConditionalSubscriber<Integer> cs = new ConditionalSubscriber<Integer>() {
Reported by PMD.
Line: 32
public class BasicFuseableConditionalSubscriberTest extends RxJavaTest {
@Test
public void offerThrows() {
ConditionalSubscriber<Integer> cs = new ConditionalSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
}
Reported by PMD.
Line: 32
public class BasicFuseableConditionalSubscriberTest extends RxJavaTest {
@Test
public void offerThrows() {
ConditionalSubscriber<Integer> cs = new ConditionalSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
}
Reported by PMD.
Line: 32
public class BasicFuseableConditionalSubscriberTest extends RxJavaTest {
@Test
public void offerThrows() {
ConditionalSubscriber<Integer> cs = new ConditionalSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
}
Reported by PMD.
Line: 84
TestHelper.assertNoOffer(fcs);
assertFalse(fcs.isEmpty());
fcs.clear();
assertTrue(fcs.isEmpty());
}
@Test
Reported by PMD.
Line: 86
assertFalse(fcs.isEmpty());
fcs.clear();
assertTrue(fcs.isEmpty());
}
@Test
public void implementationStopsOnSubscribe() {
@SuppressWarnings("unchecked")
Reported by PMD.
Line: 91
@Test
public void implementationStopsOnSubscribe() {
@SuppressWarnings("unchecked")
ConditionalSubscriber<Integer> ts = mock(ConditionalSubscriber.class);
BasicFuseableConditionalSubscriber<Integer, Integer> bfs = new BasicFuseableConditionalSubscriber<Integer, Integer>(ts) {
@Override
Reported by PMD.
Line: 126
bfs.onSubscribe(new BooleanSubscription());
verify(ts, never()).onSubscribe(any());
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeFlowable(f -> f
Reported by PMD.
Line: 130
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeFlowable(f -> f
.map(v -> v)
.filter(v -> true)
);
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBlockingTest.java
54 issues
Line: 35
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableBlockingTest extends RxJavaTest {
@Test
public void blockingFirst() {
assertEquals(1, Observable.range(1, 10)
.subscribeOn(Schedulers.computation()).blockingFirst().intValue());
Reported by PMD.
Line: 39
@Test
public void blockingFirst() {
assertEquals(1, Observable.range(1, 10)
.subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}
@Test
public void blockingFirstDefault() {
Reported by PMD.
Line: 39
@Test
public void blockingFirst() {
assertEquals(1, Observable.range(1, 10)
.subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}
@Test
public void blockingFirstDefault() {
Reported by PMD.
Line: 39
@Test
public void blockingFirst() {
assertEquals(1, Observable.range(1, 10)
.subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}
@Test
public void blockingFirstDefault() {
Reported by PMD.
Line: 39
@Test
public void blockingFirst() {
assertEquals(1, Observable.range(1, 10)
.subscribeOn(Schedulers.computation()).blockingFirst().intValue());
}
@Test
public void blockingFirstDefault() {
Reported by PMD.
Line: 45
@Test
public void blockingFirstDefault() {
assertEquals(1, Observable.<Integer>empty()
.subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}
@Test
public void blockingSubscribeConsumer() {
Reported by PMD.
Line: 45
@Test
public void blockingFirstDefault() {
assertEquals(1, Observable.<Integer>empty()
.subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}
@Test
public void blockingSubscribeConsumer() {
Reported by PMD.
Line: 45
@Test
public void blockingFirstDefault() {
assertEquals(1, Observable.<Integer>empty()
.subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}
@Test
public void blockingSubscribeConsumer() {
Reported by PMD.
Line: 45
@Test
public void blockingFirstDefault() {
assertEquals(1, Observable.<Integer>empty()
.subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}
@Test
public void blockingSubscribeConsumer() {
Reported by PMD.
Line: 45
@Test
public void blockingFirstDefault() {
assertEquals(1, Observable.<Integer>empty()
.subscribeOn(Schedulers.computation()).blockingFirst(1).intValue());
}
@Test
public void blockingSubscribeConsumer() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/observers/FutureObserverTest.java
54 issues
Line: 32
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FutureObserverTest extends RxJavaTest {
FutureObserver<Integer> fo;
@Before
public void before() {
fo = new FutureObserver<>();
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FutureObserverTest extends RxJavaTest {
FutureObserver<Integer> fo;
@Before
public void before() {
fo = new FutureObserver<>();
}
Reported by PMD.
Line: 41
}
@Test
public void cancel2() {
fo.dispose();
assertFalse(fo.isCancelled());
assertFalse(fo.isDisposed());
Reported by PMD.
Line: 76
}
@Test
public void cancel() throws Exception {
assertFalse(fo.isDone());
assertFalse(fo.isCancelled());
fo.cancel(false);
Reported by PMD.
Line: 76
}
@Test
public void cancel() throws Exception {
assertFalse(fo.isDone());
assertFalse(fo.isCancelled());
fo.cancel(false);
Reported by PMD.
Line: 89
try {
fo.get();
fail("Should have thrown");
} catch (CancellationException ex) {
// expected
}
try {
Reported by PMD.
Line: 90
try {
fo.get();
fail("Should have thrown");
} catch (CancellationException ex) {
// expected
}
try {
fo.get(1, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 97
try {
fo.get(1, TimeUnit.MILLISECONDS);
fail("Should have thrown");
} catch (CancellationException ex) {
// expected
}
}
@Test
Reported by PMD.
Line: 103
}
@Test
public void onError() throws Exception {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
fo.onError(new TestException("One"));
Reported by PMD.
Line: 103
}
@Test
public void onError() throws Exception {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
fo.onError(new TestException("One"));
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleZipArrayTest.java
54 issues
Line: 34
import io.reactivex.rxjava3.subjects.SingleSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class SingleZipArrayTest extends RxJavaTest {
final BiFunction<Object, Object, Object> addString = new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return "" + a + b;
Reported by PMD.
Line: 36
public class SingleZipArrayTest extends RxJavaTest {
final BiFunction<Object, Object, Object> addString = new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return "" + a + b;
}
};
Reported by PMD.
Line: 39
final BiFunction<Object, Object, Object> addString = new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return "" + a + b;
}
};
final Function3<Object, Object, Object, Object> addString3 = new Function3<Object, Object, Object, Object>() {
@Override
Reported by PMD.
Line: 43
}
};
final Function3<Object, Object, Object, Object> addString3 = new Function3<Object, Object, Object, Object>() {
@Override
public Object apply(Object a, Object b, Object c) throws Exception {
return "" + a + b + c;
}
};
Reported by PMD.
Line: 46
final Function3<Object, Object, Object, Object> addString3 = new Function3<Object, Object, Object, Object>() {
@Override
public Object apply(Object a, Object b, Object c) throws Exception {
return "" + a + b + c;
}
};
@Test
public void firstError() {
Reported by PMD.
Line: 51
};
@Test
public void firstError() {
Single.zip(Single.error(new TestException()), Single.just(1), addString)
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 58
}
@Test
public void secondError() {
Single.zip(Single.just(1), Single.<Integer>error(new TestException()), addString)
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 65
}
@Test
public void dispose() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestObserver<Object> to = Single.zip(pp.single(0), pp.single(0), addString)
.test();
Reported by PMD.
Line: 68
public void dispose() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestObserver<Object> to = Single.zip(pp.single(0), pp.single(0), addString)
.test();
assertTrue(pp.hasSubscribers());
to.dispose();
Reported by PMD.
Line: 68
public void dispose() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestObserver<Object> to = Single.zip(pp.single(0), pp.single(0), addString)
.test();
assertTrue(pp.hasSubscribers());
to.dispose();
Reported by PMD.