The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableCache.java
12 issues
Line: 32
static final InnerCompletableCache[] TERMINATED = new InnerCompletableCache[0];
final CompletableSource source;
final AtomicReference<InnerCompletableCache[]> observers;
final AtomicBoolean once;
Reported by PMD.
Line: 34
final CompletableSource source;
final AtomicReference<InnerCompletableCache[]> observers;
final AtomicBoolean once;
Throwable error;
Reported by PMD.
Line: 36
final AtomicReference<InnerCompletableCache[]> observers;
final AtomicBoolean once;
Throwable error;
public CompletableCache(CompletableSource source) {
this.source = source;
Reported by PMD.
Line: 38
final AtomicBoolean once;
Throwable error;
public CompletableCache(CompletableSource source) {
this.source = source;
this.observers = new AtomicReference<>(EMPTY);
this.once = new AtomicBoolean();
Reported by PMD.
Line: 79
error = e;
for (InnerCompletableCache inner : observers.getAndSet(TERMINATED)) {
if (!inner.get()) {
inner.downstream.onError(e);
}
}
}
@Override
Reported by PMD.
Line: 88
public void onComplete() {
for (InnerCompletableCache inner : observers.getAndSet(TERMINATED)) {
if (!inner.get()) {
inner.downstream.onComplete();
}
}
}
boolean add(InnerCompletableCache inner) {
Reported by PMD.
Line: 100
return false;
}
int n = a.length;
InnerCompletableCache[] b = new InnerCompletableCache[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (observers.compareAndSet(a, b)) {
return true;
}
Reported by PMD.
Line: 132
InnerCompletableCache[] b;
if (n == 1) {
b = EMPTY;
} else {
b = new InnerCompletableCache[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
Reported by PMD.
Line: 135
if (n == 1) {
b = EMPTY;
} else {
b = new InnerCompletableCache[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (observers.compareAndSet(a, b)) {
Reported by PMD.
Line: 152
private static final long serialVersionUID = 8943152917179642732L;
final CompletableObserver downstream;
InnerCompletableCache(CompletableObserver downstream) {
this.downstream = downstream;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDoOnLifecycle.java
12 issues
Line: 33
*/
public final class MaybeDoOnLifecycle<T> extends AbstractMaybeWithUpstream<T, T> {
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
public MaybeDoOnLifecycle(Maybe<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
Reported by PMD.
Line: 35
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
public MaybeDoOnLifecycle(Maybe<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
super(upstream);
this.onSubscribe = onSubscribe;
Reported by PMD.
Line: 51
static final class MaybeLifecycleObserver<T> implements MaybeObserver<T>, Disposable {
final MaybeObserver<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
Reported by PMD.
Line: 53
final MaybeObserver<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
Disposable upstream;
Reported by PMD.
Line: 53
final MaybeObserver<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
Disposable upstream;
Reported by PMD.
Line: 55
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
Disposable upstream;
MaybeLifecycleObserver(MaybeObserver<? super T> downstream, Consumer<? super Disposable> onSubscribe, Action onDispose) {
this.downstream = downstream;
Reported by PMD.
Line: 57
final Action onDispose;
Disposable upstream;
MaybeLifecycleObserver(MaybeObserver<? super T> downstream, Consumer<? super Disposable> onSubscribe, Action onDispose) {
this.downstream = downstream;
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
Reported by PMD.
Line: 70
// this way, multiple calls to onSubscribe can show up in tests that use doOnSubscribe to validate behavior
try {
onSubscribe.accept(d);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
d.dispose();
this.upstream = DisposableHelper.DISPOSED;
EmptyDisposable.error(e, downstream);
return;
Reported by PMD.
Line: 113
public void dispose() {
try {
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
Reported by PMD.
Line: 17
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReduceMaybe.java
12 issues
Line: 37
extends Maybe<T>
implements HasUpstreamPublisher<T>, FuseToFlowable<T> {
final Flowable<T> source;
final BiFunction<T, T, T> reducer;
public FlowableReduceMaybe(Flowable<T> source, BiFunction<T, T, T> reducer) {
this.source = source;
Reported by PMD.
Line: 37
extends Maybe<T>
implements HasUpstreamPublisher<T>, FuseToFlowable<T> {
final Flowable<T> source;
final BiFunction<T, T, T> reducer;
public FlowableReduceMaybe(Flowable<T> source, BiFunction<T, T, T> reducer) {
this.source = source;
Reported by PMD.
Line: 39
final Flowable<T> source;
final BiFunction<T, T, T> reducer;
public FlowableReduceMaybe(Flowable<T> source, BiFunction<T, T, T> reducer) {
this.source = source;
this.reducer = reducer;
}
Reported by PMD.
Line: 62
}
static final class ReduceSubscriber<T> implements FlowableSubscriber<T>, Disposable {
final MaybeObserver<? super T> downstream;
final BiFunction<T, T, T> reducer;
T value;
Reported by PMD.
Line: 64
static final class ReduceSubscriber<T> implements FlowableSubscriber<T>, Disposable {
final MaybeObserver<? super T> downstream;
final BiFunction<T, T, T> reducer;
T value;
Subscription upstream;
Reported by PMD.
Line: 66
final BiFunction<T, T, T> reducer;
T value;
Subscription upstream;
boolean done;
Reported by PMD.
Line: 68
T value;
Subscription upstream;
boolean done;
ReduceSubscriber(MaybeObserver<? super T> actual, BiFunction<T, T, T> reducer) {
this.downstream = actual;
Reported by PMD.
Line: 70
Subscription upstream;
boolean done;
ReduceSubscriber(MaybeObserver<? super T> actual, BiFunction<T, T, T> reducer) {
this.downstream = actual;
this.reducer = reducer;
}
Reported by PMD.
Line: 110
} else {
try {
value = Objects.requireNonNull(reducer.apply(v, t), "The reducer returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
onError(ex);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.flowable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.BiFunction;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReduceSeedSingle.java
12 issues
Line: 36
*/
public final class FlowableReduceSeedSingle<T, R> extends Single<R> {
final Publisher<T> source;
final R seed;
final BiFunction<R, ? super T, R> reducer;
Reported by PMD.
Line: 38
final Publisher<T> source;
final R seed;
final BiFunction<R, ? super T, R> reducer;
public FlowableReduceSeedSingle(Publisher<T> source, R seed, BiFunction<R, ? super T, R> reducer) {
this.source = source;
Reported by PMD.
Line: 40
final R seed;
final BiFunction<R, ? super T, R> reducer;
public FlowableReduceSeedSingle(Publisher<T> source, R seed, BiFunction<R, ? super T, R> reducer) {
this.source = source;
this.seed = seed;
this.reducer = reducer;
Reported by PMD.
Line: 55
static final class ReduceSeedObserver<T, R> implements FlowableSubscriber<T>, Disposable {
final SingleObserver<? super R> downstream;
final BiFunction<R, ? super T, R> reducer;
R value;
Reported by PMD.
Line: 57
final SingleObserver<? super R> downstream;
final BiFunction<R, ? super T, R> reducer;
R value;
Subscription upstream;
Reported by PMD.
Line: 59
final BiFunction<R, ? super T, R> reducer;
R value;
Subscription upstream;
ReduceSeedObserver(SingleObserver<? super R> actual, BiFunction<R, ? super T, R> reducer, R value) {
this.downstream = actual;
Reported by PMD.
Line: 61
R value;
Subscription upstream;
ReduceSeedObserver(SingleObserver<? super R> actual, BiFunction<R, ? super T, R> reducer, R value) {
this.downstream = actual;
this.value = value;
this.reducer = reducer;
Reported by PMD.
Line: 86
if (v != null) {
try {
this.value = Objects.requireNonNull(reducer.apply(v, value), "The reducer returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
onError(ex);
}
}
Reported by PMD.
Line: 97
@Override
public void onError(Throwable e) {
if (value != null) {
value = null;
upstream = SubscriptionHelper.CANCELLED;
downstream.onError(e);
} else {
RxJavaPlugins.onError(e);
}
Reported by PMD.
Line: 109
public void onComplete() {
R v = value;
if (v != null) {
value = null;
upstream = SubscriptionHelper.CANCELLED;
downstream.onSuccess(v);
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRepeatUntil.java
12 issues
Line: 26
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter;
public final class FlowableRepeatUntil<T> extends AbstractFlowableWithUpstream<T, T> {
final BooleanSupplier until;
public FlowableRepeatUntil(Flowable<T> source, BooleanSupplier until) {
super(source);
this.until = until;
}
Reported by PMD.
Line: 45
private static final long serialVersionUID = -7098360935104053232L;
final Subscriber<? super T> downstream;
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final BooleanSupplier stop;
long produced;
Reported by PMD.
Line: 46
private static final long serialVersionUID = -7098360935104053232L;
final Subscriber<? super T> downstream;
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final BooleanSupplier stop;
long produced;
Reported by PMD.
Line: 47
final Subscriber<? super T> downstream;
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final BooleanSupplier stop;
long produced;
RepeatSubscriber(Subscriber<? super T> actual, BooleanSupplier until, SubscriptionArbiter sa, Publisher<? extends T> source) {
Reported by PMD.
Line: 48
final Subscriber<? super T> downstream;
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final BooleanSupplier stop;
long produced;
RepeatSubscriber(Subscriber<? super T> actual, BooleanSupplier until, SubscriptionArbiter sa, Publisher<? extends T> source) {
this.downstream = actual;
Reported by PMD.
Line: 50
final Publisher<? extends T> source;
final BooleanSupplier stop;
long produced;
RepeatSubscriber(Subscriber<? super T> actual, BooleanSupplier until, SubscriptionArbiter sa, Publisher<? extends T> source) {
this.downstream = actual;
this.sa = sa;
this.source = source;
Reported by PMD.
Line: 80
boolean b;
try {
b = stop.getAsBoolean();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(e);
return;
}
if (b) {
Reported by PMD.
Line: 104
}
long p = produced;
if (p != 0L) {
produced = 0L;
sa.produced(p);
}
source.subscribe(this);
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicInteger;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.BooleanSupplier;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter;
Reported by PMD.
Line: 20
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.BooleanSupplier;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter;
public final class FlowableRepeatUntil<T> extends AbstractFlowableWithUpstream<T, T> {
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
11 issues
Line: 34
@State(Scope.Thread)
public class FlowableSwitchMapMaybeEmptyPerf {
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int count;
Flowable<Integer> flowableConvert;
Flowable<Integer> flowableDedicated;
Reported by PMD.
Line: 36
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int count;
Flowable<Integer> flowableConvert;
Flowable<Integer> flowableDedicated;
Flowable<Integer> flowablePlain;
Reported by PMD.
Line: 36
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int count;
Flowable<Integer> flowableConvert;
Flowable<Integer> flowableDedicated;
Flowable<Integer> flowablePlain;
Reported by PMD.
Line: 38
Flowable<Integer> flowableConvert;
Flowable<Integer> flowableDedicated;
Flowable<Integer> flowablePlain;
@Setup
public void setup() {
Reported by PMD.
Line: 38
Flowable<Integer> flowableConvert;
Flowable<Integer> flowableDedicated;
Flowable<Integer> flowablePlain;
@Setup
public void setup() {
Reported by PMD.
Line: 40
Flowable<Integer> flowableDedicated;
Flowable<Integer> flowablePlain;
@Setup
public void setup() {
Integer[] sourceArray = new Integer[count];
Arrays.fill(sourceArray, 777);
Reported by PMD.
Line: 40
Flowable<Integer> flowableDedicated;
Flowable<Integer> flowablePlain;
@Setup
public void setup() {
Integer[] sourceArray = new Integer[count];
Arrays.fill(sourceArray, 777);
Reported by PMD.
Line: 59
flowableConvert = source.switchMap(new Function<Integer, Publisher<? extends Integer>>() {
@Override
public Publisher<? extends Integer> apply(Integer v) {
return Maybe.<Integer>empty().toFlowable();
}
});
flowableDedicated = source.switchMapMaybe(new Function<Integer, Maybe<? extends Integer>>() {
@Override
Reported by PMD.
Line: 59
flowableConvert = source.switchMap(new Function<Integer, Publisher<? extends Integer>>() {
@Override
public Publisher<? extends Integer> apply(Integer v) {
return Maybe.<Integer>empty().toFlowable();
}
});
flowableDedicated = source.switchMapMaybe(new Function<Integer, Maybe<? extends Integer>>() {
@Override
Reported by PMD.
Line: 19
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableRetryBiPredicate.java
11 issues
Line: 25
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
public final class ObservableRetryBiPredicate<T> extends AbstractObservableWithUpstream<T, T> {
final BiPredicate<? super Integer, ? super Throwable> predicate;
public ObservableRetryBiPredicate(
Observable<T> source,
BiPredicate<? super Integer, ? super Throwable> predicate) {
super(source);
this.predicate = predicate;
Reported by PMD.
Line: 46
private static final long serialVersionUID = -7098360935104053232L;
final Observer<? super T> downstream;
final SequentialDisposable upstream;
final ObservableSource<? extends T> source;
final BiPredicate<? super Integer, ? super Throwable> predicate;
int retries;
RetryBiObserver(Observer<? super T> actual,
Reported by PMD.
Line: 47
private static final long serialVersionUID = -7098360935104053232L;
final Observer<? super T> downstream;
final SequentialDisposable upstream;
final ObservableSource<? extends T> source;
final BiPredicate<? super Integer, ? super Throwable> predicate;
int retries;
RetryBiObserver(Observer<? super T> actual,
BiPredicate<? super Integer, ? super Throwable> predicate, SequentialDisposable sa, ObservableSource<? extends T> source) {
Reported by PMD.
Line: 48
final Observer<? super T> downstream;
final SequentialDisposable upstream;
final ObservableSource<? extends T> source;
final BiPredicate<? super Integer, ? super Throwable> predicate;
int retries;
RetryBiObserver(Observer<? super T> actual,
BiPredicate<? super Integer, ? super Throwable> predicate, SequentialDisposable sa, ObservableSource<? extends T> source) {
this.downstream = actual;
Reported by PMD.
Line: 49
final Observer<? super T> downstream;
final SequentialDisposable upstream;
final ObservableSource<? extends T> source;
final BiPredicate<? super Integer, ? super Throwable> predicate;
int retries;
RetryBiObserver(Observer<? super T> actual,
BiPredicate<? super Integer, ? super Throwable> predicate, SequentialDisposable sa, ObservableSource<? extends T> source) {
this.downstream = actual;
this.upstream = sa;
Reported by PMD.
Line: 50
final SequentialDisposable upstream;
final ObservableSource<? extends T> source;
final BiPredicate<? super Integer, ? super Throwable> predicate;
int retries;
RetryBiObserver(Observer<? super T> actual,
BiPredicate<? super Integer, ? super Throwable> predicate, SequentialDisposable sa, ObservableSource<? extends T> source) {
this.downstream = actual;
this.upstream = sa;
this.source = source;
Reported by PMD.
Line: 74
boolean b;
try {
b = predicate.test(++retries, t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
if (!b) {
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.BiPredicate;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
Reported by PMD.
Line: 20
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.BiPredicate;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
public final class ObservableRetryBiPredicate<T> extends AbstractObservableWithUpstream<T, T> {
final BiPredicate<? super Integer, ? super Throwable> predicate;
Reported by PMD.
Line: 73
public void onError(Throwable t) {
boolean b;
try {
b = predicate.test(++retries, t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeoutPublisher.java
11 issues
Line: 36
*/
public final class MaybeTimeoutPublisher<T, U> extends AbstractMaybeWithUpstream<T, T> {
final Publisher<U> other;
final MaybeSource<? extends T> fallback;
public MaybeTimeoutPublisher(MaybeSource<T> source, Publisher<U> other, MaybeSource<? extends T> fallback) {
super(source);
Reported by PMD.
Line: 38
final Publisher<U> other;
final MaybeSource<? extends T> fallback;
public MaybeTimeoutPublisher(MaybeSource<T> source, Publisher<U> other, MaybeSource<? extends T> fallback) {
super(source);
this.other = other;
this.fallback = fallback;
Reported by PMD.
Line: 62
private static final long serialVersionUID = -5955289211445418871L;
final MaybeObserver<? super T> downstream;
final TimeoutOtherMaybeObserver<T, U> other;
final MaybeSource<? extends T> fallback;
Reported by PMD.
Line: 64
final MaybeObserver<? super T> downstream;
final TimeoutOtherMaybeObserver<T, U> other;
final MaybeSource<? extends T> fallback;
final TimeoutFallbackMaybeObserver<T> otherObserver;
Reported by PMD.
Line: 66
final TimeoutOtherMaybeObserver<T, U> other;
final MaybeSource<? extends T> fallback;
final TimeoutFallbackMaybeObserver<T> otherObserver;
TimeoutMainMaybeObserver(MaybeObserver<? super T> actual, MaybeSource<? extends T> fallback) {
this.downstream = actual;
Reported by PMD.
Line: 68
final MaybeSource<? extends T> fallback;
final TimeoutFallbackMaybeObserver<T> otherObserver;
TimeoutMainMaybeObserver(MaybeObserver<? super T> actual, MaybeSource<? extends T> fallback) {
this.downstream = actual;
this.other = new TimeoutOtherMaybeObserver<>(this);
this.fallback = fallback;
Reported by PMD.
Line: 148
private static final long serialVersionUID = 8663801314800248617L;
final TimeoutMainMaybeObserver<T, U> parent;
TimeoutOtherMaybeObserver(TimeoutMainMaybeObserver<T, U> parent) {
this.parent = parent;
}
Reported by PMD.
Line: 161
@Override
public void onNext(Object value) {
get().cancel();
parent.otherComplete();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 182
private static final long serialVersionUID = 8663801314800248617L;
final MaybeObserver<? super T> downstream;
TimeoutFallbackMaybeObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 19
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableScan.java
11 issues
Line: 27
import java.util.Objects;
public final class FlowableScan<T> extends AbstractFlowableWithUpstream<T, T> {
final BiFunction<T, T, T> accumulator;
public FlowableScan(Flowable<T> source, BiFunction<T, T, T> accumulator) {
super(source);
this.accumulator = accumulator;
}
Reported by PMD.
Line: 39
}
static final class ScanSubscriber<T> implements FlowableSubscriber<T>, Subscription {
final Subscriber<? super T> downstream;
final BiFunction<T, T, T> accumulator;
Subscription upstream;
T value;
Reported by PMD.
Line: 40
static final class ScanSubscriber<T> implements FlowableSubscriber<T>, Subscription {
final Subscriber<? super T> downstream;
final BiFunction<T, T, T> accumulator;
Subscription upstream;
T value;
Reported by PMD.
Line: 42
final Subscriber<? super T> downstream;
final BiFunction<T, T, T> accumulator;
Subscription upstream;
T value;
boolean done;
Reported by PMD.
Line: 44
Subscription upstream;
T value;
boolean done;
ScanSubscriber(Subscriber<? super T> actual, BiFunction<T, T, T> accumulator) {
this.downstream = actual;
Reported by PMD.
Line: 46
T value;
boolean done;
ScanSubscriber(Subscriber<? super T> actual, BiFunction<T, T, T> accumulator) {
this.downstream = actual;
this.accumulator = accumulator;
}
Reported by PMD.
Line: 76
try {
u = Objects.requireNonNull(accumulator.apply(v, t), "The value returned by the accumulator is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.cancel();
onError(e);
return;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.flowable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
Reported by PMD.
Line: 18
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 66
if (done) {
return;
}
final Subscriber<? super T> a = downstream;
T v = value;
if (v == null) {
value = t;
a.onNext(t);
} else {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInterval.java
11 issues
Line: 31
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
public final class FlowableInterval extends Flowable<Long> {
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public FlowableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
Reported by PMD.
Line: 32
public final class FlowableInterval extends Flowable<Long> {
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public FlowableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
Reported by PMD.
Line: 33
public final class FlowableInterval extends Flowable<Long> {
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public FlowableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
Reported by PMD.
Line: 34
final Scheduler scheduler;
final long initialDelay;
final long period;
final TimeUnit unit;
public FlowableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
this.initialDelay = initialDelay;
this.period = period;
this.unit = unit;
Reported by PMD.
Line: 53
if (sch instanceof TrampolineScheduler) {
Worker worker = sch.createWorker();
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
}
}
Reported by PMD.
Line: 65
private static final long serialVersionUID = -2809475196591179431L;
final Subscriber<? super Long> downstream;
long count;
final AtomicReference<Disposable> resource = new AtomicReference<>();
Reported by PMD.
Line: 67
final Subscriber<? super Long> downstream;
long count;
final AtomicReference<Disposable> resource = new AtomicReference<>();
IntervalSubscriber(Subscriber<? super Long> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 69
long count;
final AtomicReference<Disposable> resource = new AtomicReference<>();
IntervalSubscriber(Subscriber<? super Long> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 92
if (resource.get() != DisposableHelper.DISPOSED) {
long r = get();
if (r != 0L) {
downstream.onNext(count++);
BackpressureHelper.produced(this, 1);
} else {
downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
DisposableHelper.dispose(resource);
Reported by PMD.
Line: 19
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
Reported by PMD.