The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatArrayTest.java
72 issues
Line: 31
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class MaybeConcatArrayTest extends RxJavaTest {
@Test
public void cancel() {
Maybe.concatArray(Maybe.just(1), Maybe.just(2))
.take(1)
Reported by PMD.
Line: 34
public class MaybeConcatArrayTest extends RxJavaTest {
@Test
public void cancel() {
Maybe.concatArray(Maybe.just(1), Maybe.just(2))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 35
@Test
public void cancel() {
Maybe.concatArray(Maybe.just(1), Maybe.just(2))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 35
@Test
public void cancel() {
Maybe.concatArray(Maybe.just(1), Maybe.just(2))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 35
@Test
public void cancel() {
Maybe.concatArray(Maybe.just(1), Maybe.just(2))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 42
}
@Test
public void cancelDelayError() {
Maybe.concatArrayDelayError(Maybe.just(1), Maybe.just(2))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 43
@Test
public void cancelDelayError() {
Maybe.concatArrayDelayError(Maybe.just(1), Maybe.just(2))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 43
@Test
public void cancelDelayError() {
Maybe.concatArrayDelayError(Maybe.just(1), Maybe.just(2))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 43
@Test
public void cancelDelayError() {
Maybe.concatArrayDelayError(Maybe.just(1), Maybe.just(2))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 50
}
@Test
public void backpressure() {
TestSubscriber<Integer> ts = Maybe.concatArray(Maybe.just(1), Maybe.just(2))
.test(0L);
ts.assertEmpty();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFromPublisher.java
72 issues
Line: 54
}
@Override
public void subscribe(Subscriber<? super T>[] subscribers) {
subscribers = RxJavaPlugins.onSubscribe(this, subscribers);
if (!validate(subscribers)) {
return;
}
Reported by PMD.
Line: 36
* @param <T> the value type
*/
public final class ParallelFromPublisher<T> extends ParallelFlowable<T> {
final Publisher<? extends T> source;
final int parallelism;
final int prefetch;
Reported by PMD.
Line: 38
public final class ParallelFromPublisher<T> extends ParallelFlowable<T> {
final Publisher<? extends T> source;
final int parallelism;
final int prefetch;
public ParallelFromPublisher(Publisher<? extends T> source, int parallelism, int prefetch) {
this.source = source;
Reported by PMD.
Line: 38
public final class ParallelFromPublisher<T> extends ParallelFlowable<T> {
final Publisher<? extends T> source;
final int parallelism;
final int prefetch;
public ParallelFromPublisher(Publisher<? extends T> source, int parallelism, int prefetch) {
this.source = source;
Reported by PMD.
Line: 40
final int parallelism;
final int prefetch;
public ParallelFromPublisher(Publisher<? extends T> source, int parallelism, int prefetch) {
this.source = source;
this.parallelism = parallelism;
this.prefetch = prefetch;
Reported by PMD.
Line: 64
source.subscribe(new ParallelDispatcher<>(subscribers, prefetch));
}
static final class ParallelDispatcher<T>
extends AtomicInteger
implements FlowableSubscriber<T> {
private static final long serialVersionUID = -4470634016609963609L;
Reported by PMD.
Line: 64
source.subscribe(new ParallelDispatcher<>(subscribers, prefetch));
}
static final class ParallelDispatcher<T>
extends AtomicInteger
implements FlowableSubscriber<T> {
private static final long serialVersionUID = -4470634016609963609L;
Reported by PMD.
Line: 70
private static final long serialVersionUID = -4470634016609963609L;
final Subscriber<? super T>[] subscribers;
final AtomicLongArray requests;
final long[] emissions;
Reported by PMD.
Line: 72
final Subscriber<? super T>[] subscribers;
final AtomicLongArray requests;
final long[] emissions;
final int prefetch;
Reported by PMD.
Line: 74
final AtomicLongArray requests;
final long[] emissions;
final int prefetch;
final int limit;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeAmbTest.java
72 issues
Line: 39
public class MaybeAmbTest extends RxJavaTest {
@Test
public void ambLots() {
List<Maybe<Integer>> ms = new ArrayList<>();
for (int i = 0; i < 32; i++) {
ms.add(Maybe.<Integer>never());
}
Reported by PMD.
Line: 48
ms.add(Maybe.just(1));
Maybe.amb(ms)
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 48
ms.add(Maybe.just(1));
Maybe.amb(ms)
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 54
}
@Test
public void ambFirstDone() {
Maybe.amb(Arrays.asList(Maybe.just(1), Maybe.just(2)))
.test()
.assertResult(1);
}
Reported by PMD.
Line: 55
@Test
public void ambFirstDone() {
Maybe.amb(Arrays.asList(Maybe.just(1), Maybe.just(2)))
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 55
@Test
public void ambFirstDone() {
Maybe.amb(Arrays.asList(Maybe.just(1), Maybe.just(2)))
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 61
}
@Test
public void dispose() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = Maybe.amb(Arrays.asList(pp1.singleElement(), pp2.singleElement()))
.test();
Reported by PMD.
Line: 65
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = Maybe.amb(Arrays.asList(pp1.singleElement(), pp2.singleElement()))
.test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
Reported by PMD.
Line: 65
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = Maybe.amb(Arrays.asList(pp1.singleElement(), pp2.singleElement()))
.test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
Reported by PMD.
Line: 65
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = Maybe.amb(Arrays.asList(pp1.singleElement(), pp2.singleElement()))
.test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java
71 issues
Line: 453
current = null;
}
return r;
}
}
@Override
public int requestFusion(int requestedMode) {
Reported by PMD.
Line: 33
public final class FlowableFlattenIterable<T, R> extends AbstractFlowableWithUpstream<T, R> {
final Function<? super T, ? extends Iterable<? extends R>> mapper;
final int prefetch;
public FlowableFlattenIterable(Flowable<T> source,
Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) {
Reported by PMD.
Line: 35
final Function<? super T, ? extends Iterable<? extends R>> mapper;
final int prefetch;
public FlowableFlattenIterable(Flowable<T> source,
Function<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) {
super(source);
this.mapper = mapper;
Reported by PMD.
Line: 52
try {
v = ((Supplier<T>)source).get();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Reported by PMD.
Line: 68
try {
Iterable<? extends R> iterable = mapper.apply(v);
it = iterable.iterator();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Reported by PMD.
Line: 69
Iterable<? extends R> iterable = mapper.apply(v);
it = iterable.iterator();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Reported by PMD.
Line: 95
return new FlattenIterableSubscriber<>(downstream, mapper, prefetch);
}
static final class FlattenIterableSubscriber<T, R>
extends BasicIntQueueSubscription<R>
implements FlowableSubscriber<T> {
private static final long serialVersionUID = -3096000382929934955L;
Reported by PMD.
Line: 95
return new FlattenIterableSubscriber<>(downstream, mapper, prefetch);
}
static final class FlattenIterableSubscriber<T, R>
extends BasicIntQueueSubscription<R>
implements FlowableSubscriber<T> {
private static final long serialVersionUID = -3096000382929934955L;
Reported by PMD.
Line: 97
static final class FlattenIterableSubscriber<T, R>
extends BasicIntQueueSubscription<R>
implements FlowableSubscriber<T> {
private static final long serialVersionUID = -3096000382929934955L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 101
private static final long serialVersionUID = -3096000382929934955L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Iterable<? extends R>> mapper;
final int prefetch;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoAfterNextTest.java
71 issues
Line: 31
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableDoAfterNextTest extends RxJavaTest {
final List<Integer> values = new ArrayList<>();
final Consumer<Integer> afterNext = new Consumer<Integer>() {
@Override
Reported by PMD.
Line: 33
public class FlowableDoAfterNextTest extends RxJavaTest {
final List<Integer> values = new ArrayList<>();
final Consumer<Integer> afterNext = new Consumer<Integer>() {
@Override
public void accept(Integer e) throws Exception {
values.add(-e);
Reported by PMD.
Line: 35
final List<Integer> values = new ArrayList<>();
final Consumer<Integer> afterNext = new Consumer<Integer>() {
@Override
public void accept(Integer e) throws Exception {
values.add(-e);
}
};
Reported by PMD.
Line: 42
}
};
final TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
FlowableDoAfterNextTest.this.values.add(t);
}
Reported by PMD.
Line: 46
@Override
public void onNext(Integer t) {
super.onNext(t);
FlowableDoAfterNextTest.this.values.add(t);
}
};
@Test
public void just() {
Reported by PMD.
Line: 46
@Override
public void onNext(Integer t) {
super.onNext(t);
FlowableDoAfterNextTest.this.values.add(t);
}
};
@Test
public void just() {
Reported by PMD.
Line: 46
@Override
public void onNext(Integer t) {
super.onNext(t);
FlowableDoAfterNextTest.this.values.add(t);
}
};
@Test
public void just() {
Reported by PMD.
Line: 52
@Test
public void just() {
Flowable.just(1)
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
Reported by PMD.
Line: 52
@Test
public void just() {
Flowable.just(1)
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
Reported by PMD.
Line: 52
@Test
public void just() {
Flowable.just(1)
.doAfterNext(afterNext)
.subscribeWith(ts)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMap.java
71 issues
Line: 31
public final class FlowableConcatMap<T, R> extends AbstractFlowableWithUpstream<T, R> {
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final ErrorMode errorMode;
Reported by PMD.
Line: 33
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final ErrorMode errorMode;
public FlowableConcatMap(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper,
Reported by PMD.
Line: 35
final int prefetch;
final ErrorMode errorMode;
public FlowableConcatMap(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper,
int prefetch, ErrorMode errorMode) {
super(source);
Reported by PMD.
Line: 74
private static final long serialVersionUID = -3511336836796789179L;
final ConcatMapInner<R> inner;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
Reported by PMD.
Line: 76
final ConcatMapInner<R> inner;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final int limit;
Reported by PMD.
Line: 78
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final int limit;
Subscription upstream;
Reported by PMD.
Line: 80
final int prefetch;
final int limit;
Subscription upstream;
int consumed;
Reported by PMD.
Line: 82
final int limit;
Subscription upstream;
int consumed;
SimpleQueue<T> queue;
Reported by PMD.
Line: 84
Subscription upstream;
int consumed;
SimpleQueue<T> queue;
volatile boolean done;
Reported by PMD.
Line: 86
int consumed;
SimpleQueue<T> queue;
volatile boolean done;
volatile boolean cancelled;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindow.java
71 issues
Line: 28
import io.reactivex.rxjava3.processors.UnicastProcessor;
public final class FlowableWindow<T> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final long size;
final long skip;
final int bufferSize;
Reported by PMD.
Line: 30
public final class FlowableWindow<T> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final long size;
final long skip;
final int bufferSize;
public FlowableWindow(Flowable<T> source, long size, long skip, int bufferSize) {
super(source);
Reported by PMD.
Line: 32
final long skip;
final int bufferSize;
public FlowableWindow(Flowable<T> source, long size, long skip, int bufferSize) {
super(source);
this.size = size;
this.skip = skip;
Reported by PMD.
Line: 59
private static final long serialVersionUID = -2365647875069161133L;
final Subscriber<? super Flowable<T>> downstream;
final long size;
final AtomicBoolean once;
Reported by PMD.
Line: 61
final Subscriber<? super Flowable<T>> downstream;
final long size;
final AtomicBoolean once;
final int bufferSize;
Reported by PMD.
Line: 63
final long size;
final AtomicBoolean once;
final int bufferSize;
long index;
Reported by PMD.
Line: 65
final AtomicBoolean once;
final int bufferSize;
long index;
Subscription upstream;
Reported by PMD.
Line: 67
final int bufferSize;
long index;
Subscription upstream;
UnicastProcessor<T> window;
Reported by PMD.
Line: 69
long index;
Subscription upstream;
UnicastProcessor<T> window;
WindowExactSubscriber(Subscriber<? super Flowable<T>> actual, long size, int bufferSize) {
super(1);
Reported by PMD.
Line: 71
Subscription upstream;
UnicastProcessor<T> window;
WindowExactSubscriber(Subscriber<? super Flowable<T>> actual, long size, int bufferSize) {
super(1);
this.downstream = actual;
this.size = size;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java
71 issues
Line: 31
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int prefetch;
Reported by PMD.
Line: 33
public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int prefetch;
public FlowableObserveOn(
Flowable<T> source,
Reported by PMD.
Line: 35
final boolean delayError;
final int prefetch;
public FlowableObserveOn(
Flowable<T> source,
Scheduler scheduler,
boolean delayError,
Reported by PMD.
Line: 62
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
private static final long serialVersionUID = -8241002408341274697L;
final Worker worker;
final boolean delayError;
Reported by PMD.
Line: 65
implements FlowableSubscriber<T>, Runnable {
private static final long serialVersionUID = -8241002408341274697L;
final Worker worker;
final boolean delayError;
final int prefetch;
Reported by PMD.
Line: 67
final Worker worker;
final boolean delayError;
final int prefetch;
final int limit;
Reported by PMD.
Line: 69
final boolean delayError;
final int prefetch;
final int limit;
final AtomicLong requested;
Reported by PMD.
Line: 71
final int prefetch;
final int limit;
final AtomicLong requested;
Subscription upstream;
Reported by PMD.
Line: 73
final int limit;
final AtomicLong requested;
Subscription upstream;
SimpleQueue<T> queue;
Reported by PMD.
Line: 75
final AtomicLong requested;
Subscription upstream;
SimpleQueue<T> queue;
volatile boolean cancelled;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observers/ResourceObserverTest.java
71 issues
Line: 33
public class ResourceObserverTest extends RxJavaTest {
static final class TestResourceObserver<T> extends ResourceObserver<T> {
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
int complete;
Reported by PMD.
Line: 35
static final class TestResourceObserver<T> extends ResourceObserver<T> {
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
int complete;
int start;
Reported by PMD.
Line: 37
final List<Throwable> errors = new ArrayList<>();
int complete;
int start;
@Override
protected void onStart() {
Reported by PMD.
Line: 39
int complete;
int start;
@Override
protected void onStart() {
super.onStart();
Reported by PMD.
Line: 75
}
@Test
public void addResources() {
TestResourceObserver<Integer> ro = new TestResourceObserver<>();
assertFalse(ro.isDisposed());
Disposable d = Disposable.empty();
Reported by PMD.
Line: 78
public void addResources() {
TestResourceObserver<Integer> ro = new TestResourceObserver<>();
assertFalse(ro.isDisposed());
Disposable d = Disposable.empty();
ro.add(d);
Reported by PMD.
Line: 84
ro.add(d);
assertFalse(d.isDisposed());
ro.dispose();
assertTrue(ro.isDisposed());
Reported by PMD.
Line: 84
ro.add(d);
assertFalse(d.isDisposed());
ro.dispose();
assertTrue(ro.isDisposed());
Reported by PMD.
Line: 88
ro.dispose();
assertTrue(ro.isDisposed());
assertTrue(d.isDisposed());
ro.dispose();
Reported by PMD.
Line: 90
assertTrue(ro.isDisposed());
assertTrue(d.isDisposed());
ro.dispose();
assertTrue(ro.isDisposed());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableToFutureTest.java
70 issues
Line: 33
public class FlowableToFutureTest extends RxJavaTest {
@Test
public void success() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Reported by PMD.
Line: 34
@Test
public void success() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
Reported by PMD.
Line: 37
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<>(subscriber);
Reported by PMD.
Line: 37
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
TestSubscriber<Object> ts = new TestSubscriber<>(subscriber);
Reported by PMD.
Line: 43
TestSubscriber<Object> ts = new TestSubscriber<>(subscriber);
Flowable.fromFuture(future).subscribe(ts);
ts.cancel();
verify(subscriber, times(1)).onNext(value);
verify(subscriber, times(1)).onComplete();
Reported by PMD.
Line: 47
ts.cancel();
verify(subscriber, times(1)).onNext(value);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
verify(future, never()).cancel(anyBoolean());
}
Reported by PMD.
Line: 48
ts.cancel();
verify(subscriber, times(1)).onNext(value);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
verify(future, never()).cancel(anyBoolean());
}
@Test
Reported by PMD.
Line: 49
verify(subscriber, times(1)).onNext(value);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
verify(future, never()).cancel(anyBoolean());
}
@Test
public void successOperatesOnSuppliedScheduler() throws Exception {
Reported by PMD.
Line: 50
verify(subscriber, times(1)).onNext(value);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
verify(future, never()).cancel(anyBoolean());
}
@Test
public void successOperatesOnSuppliedScheduler() throws Exception {
@SuppressWarnings("unchecked")
Reported by PMD.
Line: 54
}
@Test
public void successOperatesOnSuppliedScheduler() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Reported by PMD.