The following issues were found
src/main/java/io/reactivex/rxjava3/internal/subscribers/DeferredScalarSubscriber.java
5 issues
Line: 33
private static final long serialVersionUID = 2984505488220891551L;
/** The upstream subscription. */
protected Subscription upstream;
/** Can indicate if there was at least on onNext call. */
protected boolean hasValue;
/**
Reported by PMD.
Line: 36
protected Subscription upstream;
/** Can indicate if there was at least on onNext call. */
protected boolean hasValue;
/**
* Creates a DeferredScalarSubscriber instance and wraps a downstream Subscriber.
* @param downstream the downstream subscriber, not null (not verified)
*/
Reported by PMD.
Line: 59
@Override
public void onError(Throwable t) {
value = null;
downstream.onError(t);
}
@Override
public void onComplete() {
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.subscribers;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.internal.subscriptions.*;
/**
Reported by PMD.
Line: 19
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.internal.subscriptions.*;
/**
* A subscriber, extending a DeferredScalarSubscription,
* that is unbounded-in and can generate 0 or 1 resulting value.
* @param <T> the input value type
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduceWith.java
5 issues
Line: 27
public final class FlowableOnBackpressureReduceWith<T, R> extends AbstractFlowableWithUpstream<T, R> {
final BiFunction<R, ? super T, R> reducer;
final Supplier<R> supplier;
public FlowableOnBackpressureReduceWith(@NonNull Flowable<T> source,
@NonNull Supplier<R> supplier,
@NonNull BiFunction<R, ? super T, R> reducer) {
Reported by PMD.
Line: 28
public final class FlowableOnBackpressureReduceWith<T, R> extends AbstractFlowableWithUpstream<T, R> {
final BiFunction<R, ? super T, R> reducer;
final Supplier<R> supplier;
public FlowableOnBackpressureReduceWith(@NonNull Flowable<T> source,
@NonNull Supplier<R> supplier,
@NonNull BiFunction<R, ? super T, R> reducer) {
super(source);
Reported by PMD.
Line: 47
private static final long serialVersionUID = 8255923705960622424L;
final BiFunction<R, ? super T, R> reducer;
final Supplier<R> supplier;
BackpressureReduceWithSubscriber(@NonNull Subscriber<? super R> downstream,
@NonNull Supplier<R> supplier,
@NonNull BiFunction<R, ? super T, R> reducer) {
Reported by PMD.
Line: 48
private static final long serialVersionUID = 8255923705960622424L;
final BiFunction<R, ? super T, R> reducer;
final Supplier<R> supplier;
BackpressureReduceWithSubscriber(@NonNull Subscriber<? super R> downstream,
@NonNull Supplier<R> supplier,
@NonNull BiFunction<R, ? super T, R> reducer) {
super(downstream);
Reported by PMD.
Line: 73
} else {
current.lazySet(Objects.requireNonNull(reducer.apply(v, t), "The reducer returned a null value"));
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
onError(ex);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorReturn.java
5 issues
Line: 26
import java.util.Objects;
public final class FlowableOnErrorReturn<T> extends AbstractFlowableWithUpstream<T, T> {
final Function<? super Throwable, ? extends T> valueSupplier;
public FlowableOnErrorReturn(Flowable<T> source, Function<? super Throwable, ? extends T> valueSupplier) {
super(source);
this.valueSupplier = valueSupplier;
}
Reported by PMD.
Line: 41
extends SinglePostCompleteSubscriber<T, T> {
private static final long serialVersionUID = -3740826063558713822L;
final Function<? super Throwable, ? extends T> valueSupplier;
OnErrorReturnSubscriber(Subscriber<? super T> actual, Function<? super Throwable, ? extends T> valueSupplier) {
super(actual);
this.valueSupplier = valueSupplier;
}
Reported by PMD.
Line: 59
T v;
try {
v = Objects.requireNonNull(valueSupplier.apply(t), "The valueSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(t, ex));
return;
}
complete(v);
Reported by PMD.
Line: 19
import org.reactivestreams.Subscriber;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.subscribers.SinglePostCompleteSubscriber;
import java.util.Objects;
Reported by PMD.
Line: 58
public void onError(Throwable t) {
T v;
try {
v = Objects.requireNonNull(valueSupplier.apply(t), "The valueSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(t, ex));
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableTimer.java
5 issues
Line: 28
*/
public final class CompletableTimer extends Completable {
final long delay;
final TimeUnit unit;
final Scheduler scheduler;
public CompletableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
this.delay = delay;
Reported by PMD.
Line: 29
public final class CompletableTimer extends Completable {
final long delay;
final TimeUnit unit;
final Scheduler scheduler;
public CompletableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
this.delay = delay;
this.unit = unit;
Reported by PMD.
Line: 30
final long delay;
final TimeUnit unit;
final Scheduler scheduler;
public CompletableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
Reported by PMD.
Line: 48
static final class TimerDisposable extends AtomicReference<Disposable> implements Disposable, Runnable {
private static final long serialVersionUID = 3167244060586201109L;
final CompletableObserver downstream;
TimerDisposable(final CompletableObserver downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 19
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
/**
* Signals an {@code onComplete} event after the specified delay.
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromRunnable.java
5 issues
Line: 29
*/
public final class MaybeFromRunnable<T> extends Maybe<T> implements Supplier<T> {
final Runnable runnable;
public MaybeFromRunnable(Runnable runnable) {
this.runnable = runnable;
}
Reported by PMD.
Line: 40
Disposable d = Disposable.empty();
observer.onSubscribe(d);
if (!d.isDisposed()) {
try {
runnable.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Reported by PMD.
Line: 44
try {
runnable.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (!d.isDisposed()) {
observer.onError(ex);
} else {
RxJavaPlugins.onError(ex);
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 17
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/PerfAsyncConsumer.java
5 issues
Line: 80
try {
await();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
return this;
}
Reported by PMD.
Line: 29
public final class PerfAsyncConsumer extends CountDownLatch implements FlowableSubscriber<Object>, Observer<Object>,
SingleObserver<Object>, CompletableObserver, MaybeObserver<Object> {
final Blackhole bh;
public PerfAsyncConsumer(Blackhole bh) {
super(1);
this.bh = bh;
}
Reported by PMD.
Line: 58
@Override
public void onError(Throwable t) {
t.printStackTrace();
countDown();
}
@Override
public void onComplete() {
Reported by PMD.
Line: 74
* @return this
*/
public PerfAsyncConsumer await(int count) {
if (count <= 1000) {
while (getCount() != 0) { }
} else {
try {
await();
} catch (InterruptedException ex) {
Reported by PMD.
Line: 75
*/
public PerfAsyncConsumer await(int count) {
if (count <= 1000) {
while (getCount() != 0) { }
} else {
try {
await();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleToObservableTest.java
5 issues
Line: 26
public class SingleToObservableTest extends RxJavaTest {
@Test
public void dispose() {
TestHelper.checkDisposed(PublishSubject.create().singleOrError().toObservable());
}
@Test
public void doubleOnSubscribe() {
Reported by PMD.
Line: 27
@Test
public void dispose() {
TestHelper.checkDisposed(PublishSubject.create().singleOrError().toObservable());
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeSingleToObservable(new Function<Single<Object>, ObservableSource<Object>>() {
Reported by PMD.
Line: 27
@Test
public void dispose() {
TestHelper.checkDisposed(PublishSubject.create().singleOrError().toObservable());
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeSingleToObservable(new Function<Single<Object>, ObservableSource<Object>>() {
Reported by PMD.
Line: 31
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeSingleToObservable(new Function<Single<Object>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Single<Object> s) throws Exception {
return s.toObservable();
}
Reported by PMD.
Line: 18
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class SingleToObservableTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableRedoTest.java
5 issues
Line: 26
public class ObservableRedoTest extends RxJavaTest {
@Test
public void redoCancel() {
final TestObserver<Integer> to = new TestObserver<>();
Observable.just(1)
.repeatWhen(new Function<Observable<Object>, ObservableSource<Object>>() {
@Override
Reported by PMD.
Line: 37
int count;
@Override
public Object apply(Object v) throws Exception {
if (++count == 1) {
to.dispose();
}
return v;
}
});
Reported by PMD.
Line: 37
int count;
@Override
public Object apply(Object v) throws Exception {
if (++count == 1) {
to.dispose();
}
return v;
}
});
Reported by PMD.
Line: 49
}
@Test
public void managerThrows() {
Observable.just(1)
.retryWhen(new Function<Observable<Throwable>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Throwable> v) throws Exception {
throw new TestException();
Reported by PMD.
Line: 18
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.observers.TestObserver;
public class ObservableRedoTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableSequenceEqualTest.java
5 issues
Line: 24
public class CompletableSequenceEqualTest {
@Test
public void bothComplete() {
Completable.sequenceEqual(Completable.complete(), Completable.complete())
.test()
.assertResult(true);
}
Reported by PMD.
Line: 25
@Test
public void bothComplete() {
Completable.sequenceEqual(Completable.complete(), Completable.complete())
.test()
.assertResult(true);
}
@Test
Reported by PMD.
Line: 25
@Test
public void bothComplete() {
Completable.sequenceEqual(Completable.complete(), Completable.complete())
.test()
.assertResult(true);
}
@Test
Reported by PMD.
Line: 31
}
@Test
public void firstFails() {
Completable.sequenceEqual(Completable.error(new TestException()), Completable.complete())
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 38
}
@Test
public void secondFails() {
Completable.sequenceEqual(Completable.complete(), Completable.error(new TestException()))
.test()
.assertFailure(TestException.class);
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleObserveOnTest.java
5 issues
Line: 29
public class SingleObserveOnTest extends RxJavaTest {
@Test
public void dispose() {
TestHelper.checkDisposed(Single.just(1).observeOn(Schedulers.single()));
}
@Test
public void doubleOnSubscribe() {
Reported by PMD.
Line: 30
@Test
public void dispose() {
TestHelper.checkDisposed(Single.just(1).observeOn(Schedulers.single()));
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeSingle(new Function<Single<Object>, SingleSource<Object>>() {
Reported by PMD.
Line: 34
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeSingle(new Function<Single<Object>, SingleSource<Object>>() {
@Override
public SingleSource<Object> apply(Single<Object> s) throws Exception {
return s.observeOn(Schedulers.single());
}
Reported by PMD.
Line: 44
}
@Test
public void error() {
Single.error(new TestException())
.observeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
Reported by PMD.
Line: 20
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
Reported by PMD.