The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastOne.java
7 issues
Line: 32
}
static final class TakeLastOneObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
Disposable upstream;
T value;
Reported by PMD.
Line: 34
static final class TakeLastOneObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
Disposable upstream;
T value;
TakeLastOneObserver(Observer<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 36
Disposable upstream;
T value;
TakeLastOneObserver(Observer<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 57
@Override
public void onError(Throwable t) {
value = null;
downstream.onError(t);
}
@Override
public void onComplete() {
Reported by PMD.
Line: 69
void emit() {
T v = value;
if (v != null) {
value = null;
downstream.onNext(v);
}
downstream.onComplete();
}
Reported by PMD.
Line: 77
@Override
public void dispose() {
value = null;
upstream.dispose();
}
@Override
public boolean isDisposed() {
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableTakeLastOne<T> extends AbstractObservableWithUpstream<T, T> {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLast.java
7 issues
Line: 23
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableTakeLast<T> extends AbstractObservableWithUpstream<T, T> {
final int count;
public ObservableTakeLast(ObservableSource<T> source, int count) {
super(source);
this.count = count;
}
Reported by PMD.
Line: 38
static final class TakeLastObserver<T> extends ArrayDeque<T> implements Observer<T>, Disposable {
private static final long serialVersionUID = 7240042530241604978L;
final Observer<? super T> downstream;
final int count;
Disposable upstream;
volatile boolean cancelled;
Reported by PMD.
Line: 39
private static final long serialVersionUID = 7240042530241604978L;
final Observer<? super T> downstream;
final int count;
Disposable upstream;
volatile boolean cancelled;
Reported by PMD.
Line: 41
final Observer<? super T> downstream;
final int count;
Disposable upstream;
volatile boolean cancelled;
TakeLastObserver(Observer<? super T> actual, int count) {
this.downstream = actual;
Reported by PMD.
Line: 43
Disposable upstream;
volatile boolean cancelled;
TakeLastObserver(Observer<? super T> actual, int count) {
this.downstream = actual;
this.count = count;
}
Reported by PMD.
Line: 18
import java.util.ArrayDeque;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableTakeLast<T> extends AbstractObservableWithUpstream<T, T> {
final int count;
Reported by PMD.
Line: 73
@Override
public void onComplete() {
Observer<? super T> a = downstream;
for (;;) {
if (cancelled) {
return;
}
T v = poll();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReduceWithSingle.java
7 issues
Line: 33
*/
public final class ObservableReduceWithSingle<T, R> extends Single<R> {
final ObservableSource<T> source;
final Supplier<R> seedSupplier;
final BiFunction<R, ? super T, R> reducer;
Reported by PMD.
Line: 35
final ObservableSource<T> source;
final Supplier<R> seedSupplier;
final BiFunction<R, ? super T, R> reducer;
public ObservableReduceWithSingle(ObservableSource<T> source, Supplier<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
this.source = source;
Reported by PMD.
Line: 37
final Supplier<R> seedSupplier;
final BiFunction<R, ? super T, R> reducer;
public ObservableReduceWithSingle(ObservableSource<T> source, Supplier<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
this.source = source;
this.seedSupplier = seedSupplier;
this.reducer = reducer;
Reported by PMD.
Line: 51
try {
seed = Objects.requireNonNull(seedSupplier.get(), "The seedSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
source.subscribe(new ReduceSeedObserver<>(observer, reducer, seed));
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.internal.operators.observable.ObservableReduceSeedSingle.ReduceSeedObserver;
Reported by PMD.
Line: 18
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.internal.operators.observable.ObservableReduceSeedSingle.ReduceSeedObserver;
import java.util.Objects;
Reported by PMD.
Line: 50
R seed;
try {
seed = Objects.requireNonNull(seedSupplier.get(), "The seedSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/mixed/MaybeFlatMapObservable.java
7 issues
Line: 35
*/
public final class MaybeFlatMapObservable<T, R> extends Observable<R> {
final MaybeSource<T> source;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
public MaybeFlatMapObservable(MaybeSource<T> source,
Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
Reported by PMD.
Line: 37
final MaybeSource<T> source;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
public MaybeFlatMapObservable(MaybeSource<T> source,
Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
this.source = source;
this.mapper = mapper;
Reported by PMD.
Line: 58
private static final long serialVersionUID = -8948264376121066672L;
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
FlatMapObserver(Observer<? super R> downstream, Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
this.downstream = downstream;
Reported by PMD.
Line: 60
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
FlatMapObserver(Observer<? super R> downstream, Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
this.downstream = downstream;
this.mapper = mapper;
}
Reported by PMD.
Line: 103
try {
o = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
Line: 19
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
Reported by PMD.
Line: 102
ObservableSource<? extends R> o;
try {
o = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableLastMaybe.java
7 issues
Line: 28
*/
public final class ObservableLastMaybe<T> extends Maybe<T> {
final ObservableSource<T> source;
public ObservableLastMaybe(ObservableSource<T> source) {
this.source = source;
}
Reported by PMD.
Line: 43
static final class LastObserver<T> implements Observer<T>, Disposable {
final MaybeObserver<? super T> downstream;
Disposable upstream;
T item;
Reported by PMD.
Line: 45
final MaybeObserver<? super T> downstream;
Disposable upstream;
T item;
LastObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 47
Disposable upstream;
T item;
LastObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 81
@Override
public void onError(Throwable t) {
upstream = DisposableHelper.DISPOSED;
item = null;
downstream.onError(t);
}
@Override
public void onComplete() {
Reported by PMD.
Line: 90
upstream = DisposableHelper.DISPOSED;
T v = item;
if (v != null) {
item = null;
downstream.onSuccess(v);
} else {
downstream.onComplete();
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
/**
* Consumes the source ObservableSource and emits its last item, the defaultItem
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/observers/ConsumerSingleObserver.java
7 issues
Line: 33
private static final long serialVersionUID = -7012088219455310787L;
final Consumer<? super T> onSuccess;
final Consumer<? super Throwable> onError;
public ConsumerSingleObserver(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError) {
this.onSuccess = onSuccess;
Reported by PMD.
Line: 33
private static final long serialVersionUID = -7012088219455310787L;
final Consumer<? super T> onSuccess;
final Consumer<? super Throwable> onError;
public ConsumerSingleObserver(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError) {
this.onSuccess = onSuccess;
Reported by PMD.
Line: 35
final Consumer<? super T> onSuccess;
final Consumer<? super Throwable> onError;
public ConsumerSingleObserver(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError) {
this.onSuccess = onSuccess;
this.onError = onError;
}
Reported by PMD.
Line: 35
final Consumer<? super T> onSuccess;
final Consumer<? super Throwable> onError;
public ConsumerSingleObserver(Consumer<? super T> onSuccess, Consumer<? super Throwable> onError) {
this.onSuccess = onSuccess;
this.onError = onError;
}
Reported by PMD.
Line: 47
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(new CompositeException(e, ex));
}
}
Reported by PMD.
Line: 63
lazySet(DisposableHelper.DISPOSED);
try {
onSuccess.accept(value);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
Reported by PMD.
Line: 20
import io.reactivex.rxjava3.core.SingleObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservable.java
7 issues
Line: 35
*/
public final class SingleFlatMapObservable<T, R> extends Observable<R> {
final SingleSource<T> source;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
public SingleFlatMapObservable(SingleSource<T> source,
Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
Reported by PMD.
Line: 37
final SingleSource<T> source;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
public SingleFlatMapObservable(SingleSource<T> source,
Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
this.source = source;
this.mapper = mapper;
Reported by PMD.
Line: 58
private static final long serialVersionUID = -8948264376121066672L;
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
FlatMapObserver(Observer<? super R> downstream, Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
this.downstream = downstream;
Reported by PMD.
Line: 60
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
FlatMapObserver(Observer<? super R> downstream, Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
this.downstream = downstream;
this.mapper = mapper;
}
Reported by PMD.
Line: 103
try {
o = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
Line: 19
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
Reported by PMD.
Line: 102
ObservableSource<? extends R> o;
try {
o = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSkip.java
7 issues
Line: 22
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
public final class FlowableSkip<T> extends AbstractFlowableWithUpstream<T, T> {
final long n;
public FlowableSkip(Flowable<T> source, long n) {
super(source);
this.n = n;
}
Reported by PMD.
Line: 34
}
static final class SkipSubscriber<T> implements FlowableSubscriber<T>, Subscription {
final Subscriber<? super T> downstream;
long remaining;
Subscription upstream;
SkipSubscriber(Subscriber<? super T> actual, long n) {
Reported by PMD.
Line: 35
static final class SkipSubscriber<T> implements FlowableSubscriber<T>, Subscription {
final Subscriber<? super T> downstream;
long remaining;
Subscription upstream;
SkipSubscriber(Subscriber<? super T> actual, long n) {
this.downstream = actual;
Reported by PMD.
Line: 37
final Subscriber<? super T> downstream;
long remaining;
Subscription upstream;
SkipSubscriber(Subscriber<? super T> actual, long n) {
this.downstream = actual;
this.remaining = n;
}
Reported by PMD.
Line: 56
@Override
public void onNext(T t) {
if (remaining != 0L) {
remaining--;
} else {
downstream.onNext(t);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.flowable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
public final class FlowableSkip<T> extends AbstractFlowableWithUpstream<T, T> {
Reported by PMD.
Line: 18
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
public final class FlowableSkip<T> extends AbstractFlowableWithUpstream<T, T> {
final long n;
public FlowableSkip(Flowable<T> source, long n) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSingleMaybe.java
7 issues
Line: 26
public final class FlowableSingleMaybe<T> extends Maybe<T> implements FuseToFlowable<T> {
final Flowable<T> source;
public FlowableSingleMaybe(Flowable<T> source) {
this.source = source;
}
Reported by PMD.
Line: 45
static final class SingleElementSubscriber<T>
implements FlowableSubscriber<T>, Disposable {
final MaybeObserver<? super T> downstream;
Subscription upstream;
boolean done;
Reported by PMD.
Line: 47
final MaybeObserver<? super T> downstream;
Subscription upstream;
boolean done;
T value;
Reported by PMD.
Line: 49
Subscription upstream;
boolean done;
T value;
SingleElementSubscriber(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 51
boolean done;
T value;
SingleElementSubscriber(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 100
done = true;
upstream = SubscriptionHelper.CANCELLED;
T v = value;
value = null;
if (v == null) {
downstream.onComplete();
} else {
downstream.onSuccess(v);
}
Reported by PMD.
Line: 18
import org.reactivestreams.Subscription;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.fuseable.FuseToFlowable;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/queue/SpscArrayQueue.java
7 issues
Line: 61
@Override
public boolean offer(E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// local load of field to avoid repeated loads after volatile reads
final int mask = this.mask;
final long index = producerIndex.get();
final int offset = calcElementOffset(index, mask);
Reported by PMD.
Line: 44
public final class SpscArrayQueue<E> extends AtomicReferenceArray<E> implements SimplePlainQueue<E> {
private static final long serialVersionUID = -1296597691183856449L;
private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
final int mask;
final AtomicLong producerIndex;
long producerLookAhead;
final AtomicLong consumerIndex;
final int lookAheadStep;
Reported by PMD.
Line: 45
private static final long serialVersionUID = -1296597691183856449L;
private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
final int mask;
final AtomicLong producerIndex;
long producerLookAhead;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscArrayQueue(int capacity) {
Reported by PMD.
Line: 46
private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
final int mask;
final AtomicLong producerIndex;
long producerLookAhead;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscArrayQueue(int capacity) {
super(Pow2.roundToPowerOfTwo(capacity));
Reported by PMD.
Line: 47
final int mask;
final AtomicLong producerIndex;
long producerLookAhead;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscArrayQueue(int capacity) {
super(Pow2.roundToPowerOfTwo(capacity));
this.mask = length() - 1;
Reported by PMD.
Line: 48
final AtomicLong producerIndex;
long producerLookAhead;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscArrayQueue(int capacity) {
super(Pow2.roundToPowerOfTwo(capacity));
this.mask = length() - 1;
this.producerIndex = new AtomicLong();
Reported by PMD.
Line: 103
@Override
public boolean isEmpty() {
return producerIndex.get() == consumerIndex.get();
}
void soProducerIndex(long newIndex) {
producerIndex.lazySet(newIndex);
}
Reported by PMD.