The following issues were found
src/main/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserver.java
8 issues
Line: 32
private static final long serialVersionUID = -4361286194466301354L;
final Consumer<? super Throwable> onError;
final Action onComplete;
public CallbackCompletableObserver(Action onComplete) {
this.onError = this;
this.onComplete = onComplete;
Reported by PMD.
Line: 32
private static final long serialVersionUID = -4361286194466301354L;
final Consumer<? super Throwable> onError;
final Action onComplete;
public CallbackCompletableObserver(Action onComplete) {
this.onError = this;
this.onComplete = onComplete;
Reported by PMD.
Line: 33
private static final long serialVersionUID = -4361286194466301354L;
final Consumer<? super Throwable> onError;
final Action onComplete;
public CallbackCompletableObserver(Action onComplete) {
this.onError = this;
this.onComplete = onComplete;
}
Reported by PMD.
Line: 33
private static final long serialVersionUID = -4361286194466301354L;
final Consumer<? super Throwable> onError;
final Action onComplete;
public CallbackCompletableObserver(Action onComplete) {
this.onError = this;
this.onComplete = onComplete;
}
Reported by PMD.
Line: 54
public void onComplete() {
try {
onComplete.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
lazySet(DisposableHelper.DISPOSED);
}
Reported by PMD.
Line: 65
public void onError(Throwable e) {
try {
onError.accept(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
lazySet(DisposableHelper.DISPOSED);
}
Reported by PMD.
Line: 20
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 21
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class CallbackCompletableObserver
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableRepeat.java
8 issues
Line: 23
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
public final class ObservableRepeat<T> extends AbstractObservableWithUpstream<T, T> {
final long count;
public ObservableRepeat(Observable<T> source, long count) {
super(source);
this.count = count;
}
Reported by PMD.
Line: 42
private static final long serialVersionUID = -7098360935104053232L;
final Observer<? super T> downstream;
final SequentialDisposable sd;
final ObservableSource<? extends T> source;
long remaining;
RepeatObserver(Observer<? super T> actual, long count, SequentialDisposable sd, ObservableSource<? extends T> source) {
this.downstream = actual;
Reported by PMD.
Line: 43
private static final long serialVersionUID = -7098360935104053232L;
final Observer<? super T> downstream;
final SequentialDisposable sd;
final ObservableSource<? extends T> source;
long remaining;
RepeatObserver(Observer<? super T> actual, long count, SequentialDisposable sd, ObservableSource<? extends T> source) {
this.downstream = actual;
this.sd = sd;
Reported by PMD.
Line: 44
final Observer<? super T> downstream;
final SequentialDisposable sd;
final ObservableSource<? extends T> source;
long remaining;
RepeatObserver(Observer<? super T> actual, long count, SequentialDisposable sd, ObservableSource<? extends T> source) {
this.downstream = actual;
this.sd = sd;
this.source = source;
Reported by PMD.
Line: 45
final Observer<? super T> downstream;
final SequentialDisposable sd;
final ObservableSource<? extends T> source;
long remaining;
RepeatObserver(Observer<? super T> actual, long count, SequentialDisposable sd, ObservableSource<? extends T> source) {
this.downstream = actual;
this.sd = sd;
this.source = source;
this.remaining = count;
Reported by PMD.
Line: 74
if (r != Long.MAX_VALUE) {
remaining = r - 1;
}
if (r != 0L) {
subscribeNext();
} else {
downstream.onComplete();
}
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
public final class ObservableRepeat<T> extends AbstractObservableWithUpstream<T, T> {
final long count;
Reported by PMD.
Line: 86
*/
void subscribeNext() {
if (getAndIncrement() == 0) {
int missed = 1;
for (;;) {
if (sd.isDisposed()) {
return;
}
source.subscribe(this);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/observers/BlockingMultiObserver.java
8 issues
Line: 33
extends CountDownLatch
implements SingleObserver<T>, CompletableObserver, MaybeObserver<T> {
T value;
Throwable error;
Disposable upstream;
volatile boolean cancelled;
Reported by PMD.
Line: 34
implements SingleObserver<T>, CompletableObserver, MaybeObserver<T> {
T value;
Throwable error;
Disposable upstream;
volatile boolean cancelled;
Reported by PMD.
Line: 36
T value;
Throwable error;
Disposable upstream;
volatile boolean cancelled;
public BlockingMultiObserver() {
super(1);
Reported by PMD.
Line: 38
Disposable upstream;
volatile boolean cancelled;
public BlockingMultiObserver() {
super(1);
}
Reported by PMD.
Line: 179
} else {
onComplete.run();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
RxJavaPlugins.onError(t);
}
}
}
Reported by PMD.
Line: 18
import java.util.concurrent.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 21
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
* A combined Observer that awaits the success or error signal via a CountDownLatch.
Reported by PMD.
Line: 22
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
* A combined Observer that awaits the success or error signal via a CountDownLatch.
* @param <T> the value type
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelConcatMap.java
8 issues
Line: 58
}
@Override
public void subscribe(Subscriber<? super R>[] subscribers) {
subscribers = RxJavaPlugins.onSubscribe(this, subscribers);
if (!validate(subscribers)) {
return;
}
Reported by PMD.
Line: 34
*/
public final class ParallelConcatMap<T, R> extends ParallelFlowable<R> {
final ParallelFlowable<T> source;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
Reported by PMD.
Line: 36
final ParallelFlowable<T> source;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final ErrorMode errorMode;
Reported by PMD.
Line: 38
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final ErrorMode errorMode;
public ParallelConcatMap(
ParallelFlowable<T> source,
Reported by PMD.
Line: 40
final int prefetch;
final ErrorMode errorMode;
public ParallelConcatMap(
ParallelFlowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper,
int prefetch, ErrorMode errorMode) {
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.parallel;
import org.reactivestreams.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap;
import io.reactivex.rxjava3.internal.util.ErrorMode;
import io.reactivex.rxjava3.parallel.ParallelFlowable;
Reported by PMD.
Line: 68
int n = subscribers.length;
@SuppressWarnings("unchecked")
final Subscriber<T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
parents[i] = FlowableConcatMap.subscribe(subscribers[i], mapper, prefetch, errorMode);
}
Reported by PMD.
Line: 71
final Subscriber<T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
parents[i] = FlowableConcatMap.subscribe(subscribers[i], mapper, prefetch, errorMode);
}
source.subscribe(parents);
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeEqualSingle.java
8 issues
Line: 32
* @param <T> the common base type of the sources
*/
public final class MaybeEqualSingle<T> extends Single<Boolean> {
final MaybeSource<? extends T> source1;
final MaybeSource<? extends T> source2;
final BiPredicate<? super T, ? super T> isEqual;
Reported by PMD.
Line: 34
public final class MaybeEqualSingle<T> extends Single<Boolean> {
final MaybeSource<? extends T> source1;
final MaybeSource<? extends T> source2;
final BiPredicate<? super T, ? super T> isEqual;
public MaybeEqualSingle(MaybeSource<? extends T> source1, MaybeSource<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual) {
Reported by PMD.
Line: 36
final MaybeSource<? extends T> source2;
final BiPredicate<? super T, ? super T> isEqual;
public MaybeEqualSingle(MaybeSource<? extends T> source1, MaybeSource<? extends T> source2,
BiPredicate<? super T, ? super T> isEqual) {
this.source1 = source1;
this.source2 = source2;
Reported by PMD.
Line: 99
try {
b = isEqual.test((T)o1, (T)o2);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
Line: 132
private static final long serialVersionUID = -3031974433025990931L;
final EqualCoordinator<T> parent;
Object value;
EqualObserver(EqualCoordinator<T> parent) {
this.parent = parent;
Reported by PMD.
Line: 134
final EqualCoordinator<T> parent;
Object value;
EqualObserver(EqualCoordinator<T> parent) {
this.parent = parent;
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.BiPredicate;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 98
boolean b;
try {
b = isEqual.test((T)o1, (T)o2);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/InputWithIncrementingInteger.java
8 issues
Line: 49
static final class IncrementingIterable implements Iterable<Integer> {
final class IncrementingIterator implements Iterator<Integer> {
int i;
@Override
public boolean hasNext() {
return i < size;
}
Reported by PMD.
Line: 68
}
}
final int size;
IncrementingIterable(int size) {
this.size = size;
}
Reported by PMD.
Line: 82
static final class IncrementingPublisher implements Publisher<Integer> {
final int size;
IncrementingPublisher(int size) {
this.size = size;
}
Reported by PMD.
Line: 98
}
}
public Iterable<Integer> iterable;
public Flowable<Integer> flowable;
public Flowable<Integer> firehose;
public Blackhole bh;
public abstract int getSize();
Reported by PMD.
Line: 99
}
public Iterable<Integer> iterable;
public Flowable<Integer> flowable;
public Flowable<Integer> firehose;
public Blackhole bh;
public abstract int getSize();
Reported by PMD.
Line: 100
public Iterable<Integer> iterable;
public Flowable<Integer> flowable;
public Flowable<Integer> firehose;
public Blackhole bh;
public abstract int getSize();
@Setup
Reported by PMD.
Line: 101
public Iterable<Integer> iterable;
public Flowable<Integer> flowable;
public Flowable<Integer> firehose;
public Blackhole bh;
public abstract int getSize();
@Setup
public void setup(final Blackhole bh) {
Reported by PMD.
Line: 20
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.*;
import io.reactivex.rxjava3.internal.subscriptions.EmptySubscription;
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;
/**
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableInterval.java
8 issues
Line: 26
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;
public final class ObservableInterval extends Observable<Long> {
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
Reported by PMD.
Line: 27
public final class ObservableInterval extends Observable<Long> {
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
Reported by PMD.
Line: 28
public final class ObservableInterval extends Observable<Long> {
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
Reported by PMD.
Line: 29
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
Reported by PMD.
Line: 48
if (sch instanceof TrampolineScheduler) {
Worker worker = sch.createWorker();
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
}
}
Reported by PMD.
Line: 61
private static final long serialVersionUID = 346773832286157679L;
final Observer<? super Long> downstream;
long count;
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 63
final Observer<? super Long> downstream;
long count;
IntervalObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 19
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.schedulers.TrampolineScheduler;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLastMaybe.java
8 issues
Line: 29
*/
public final class FlowableLastMaybe<T> extends Maybe<T> {
final Publisher<T> source;
public FlowableLastMaybe(Publisher<T> source) {
this.source = source;
}
Reported by PMD.
Line: 44
static final class LastSubscriber<T> implements FlowableSubscriber<T>, Disposable {
final MaybeObserver<? super T> downstream;
Subscription upstream;
T item;
Reported by PMD.
Line: 46
final MaybeObserver<? super T> downstream;
Subscription upstream;
T item;
LastSubscriber(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 48
Subscription upstream;
T item;
LastSubscriber(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 84
@Override
public void onError(Throwable t) {
upstream = SubscriptionHelper.CANCELLED;
item = null;
downstream.onError(t);
}
@Override
public void onComplete() {
Reported by PMD.
Line: 93
upstream = SubscriptionHelper.CANCELLED;
T v = item;
if (v != null) {
item = null;
downstream.onSuccess(v);
} else {
downstream.onComplete();
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.flowable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
Reported by PMD.
Line: 18
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
/**
* Consumes the source Publisher and emits its last item or completes.
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorReturn.java
8 issues
Line: 23
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableOnErrorReturn<T> extends AbstractObservableWithUpstream<T, T> {
final Function<? super Throwable, ? extends T> valueSupplier;
public ObservableOnErrorReturn(ObservableSource<T> source, Function<? super Throwable, ? extends T> valueSupplier) {
super(source);
this.valueSupplier = valueSupplier;
}
Reported by PMD.
Line: 35
}
static final class OnErrorReturnObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Function<? super Throwable, ? extends T> valueSupplier;
Disposable upstream;
OnErrorReturnObserver(Observer<? super T> actual, Function<? super Throwable, ? extends T> valueSupplier) {
Reported by PMD.
Line: 36
static final class OnErrorReturnObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Function<? super Throwable, ? extends T> valueSupplier;
Disposable upstream;
OnErrorReturnObserver(Observer<? super T> actual, Function<? super Throwable, ? extends T> valueSupplier) {
this.downstream = actual;
Reported by PMD.
Line: 38
final Observer<? super T> downstream;
final Function<? super Throwable, ? extends T> valueSupplier;
Disposable upstream;
OnErrorReturnObserver(Observer<? super T> actual, Function<? super Throwable, ? extends T> valueSupplier) {
this.downstream = actual;
this.valueSupplier = valueSupplier;
}
Reported by PMD.
Line: 73
T v;
try {
v = valueSupplier.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
Reported by PMD.
Line: 18
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableOnErrorReturn<T> extends AbstractObservableWithUpstream<T, T> {
final Function<? super Throwable, ? extends T> valueSupplier;
Reported by PMD.
Line: 72
public void onError(Throwable t) {
T v;
try {
v = valueSupplier.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnEvent.java
8 issues
Line: 63
}
@Override
public void onError(Throwable e) {
try {
onEvent.accept(null, e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
Reported by PMD.
Line: 22
import io.reactivex.rxjava3.functions.BiConsumer;
public final class SingleDoOnEvent<T> extends Single<T> {
final SingleSource<T> source;
final BiConsumer<? super T, ? super Throwable> onEvent;
public SingleDoOnEvent(SingleSource<T> source, BiConsumer<? super T, ? super Throwable> onEvent) {
this.source = source;
Reported by PMD.
Line: 24
public final class SingleDoOnEvent<T> extends Single<T> {
final SingleSource<T> source;
final BiConsumer<? super T, ? super Throwable> onEvent;
public SingleDoOnEvent(SingleSource<T> source, BiConsumer<? super T, ? super Throwable> onEvent) {
this.source = source;
this.onEvent = onEvent;
}
Reported by PMD.
Line: 38
}
final class DoOnEvent implements SingleObserver<T> {
private final SingleObserver<? super T> downstream;
DoOnEvent(SingleObserver<? super T> observer) {
this.downstream = observer;
}
Reported by PMD.
Line: 53
public void onSuccess(T value) {
try {
onEvent.accept(value, null);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
Line: 66
public void onError(Throwable e) {
try {
onEvent.accept(null, e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
}
downstream.onError(e);
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.single;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.BiConsumer;
public final class SingleDoOnEvent<T> extends Single<T> {
Reported by PMD.
Line: 18
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.BiConsumer;
public final class SingleDoOnEvent<T> extends Single<T> {
final SingleSource<T> source;
Reported by PMD.