The following issues were found
src/main/java/io/reactivex/rxjava3/internal/observers/BasicFuseableObserver.java
8 issues
Line: 28
* @param <T> the upstream value type
* @param <R> the downstream value type
*/
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
/** The downstream subscriber. */
protected final Observer<? super R> downstream;
/** The upstream subscription. */
Reported by PMD.
Line: 31
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
/** The downstream subscriber. */
protected final Observer<? super R> downstream;
/** The upstream subscription. */
protected Disposable upstream;
/** The upstream's QueueDisposable if not null. */
Reported by PMD.
Line: 34
protected final Observer<? super R> downstream;
/** The upstream subscription. */
protected Disposable upstream;
/** The upstream's QueueDisposable if not null. */
protected QueueDisposable<T> qd;
/** Flag indicating no further onXXX event should be accepted. */
Reported by PMD.
Line: 37
protected Disposable upstream;
/** The upstream's QueueDisposable if not null. */
protected QueueDisposable<T> qd;
/** Flag indicating no further onXXX event should be accepted. */
protected boolean done;
/** Holds the established fusion mode of the upstream. */
Reported by PMD.
Line: 40
protected QueueDisposable<T> qd;
/** Flag indicating no further onXXX event should be accepted. */
protected boolean done;
/** Holds the established fusion mode of the upstream. */
protected int sourceMode;
/**
Reported by PMD.
Line: 43
protected boolean done;
/** Holds the established fusion mode of the upstream. */
protected int sourceMode;
/**
* Construct a BasicFuseableObserver by wrapping the given subscriber.
* @param downstream the subscriber, not null (not verified)
*/
Reported by PMD.
Line: 135
protected final int transitiveBoundaryFusion(int mode) {
QueueDisposable<T> qd = this.qd;
if (qd != null) {
if ((mode & BOUNDARY) == 0) {
int m = qd.requestFusion(mode);
if (m != NONE) {
sourceMode = m;
}
return m;
Reported by PMD.
Line: 137
if (qd != null) {
if ((mode & BOUNDARY) == 0) {
int m = qd.requestFusion(mode);
if (m != NONE) {
sourceMode = m;
}
return m;
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/jdk8/ObservableMapOptional.java
8 issues
Line: 31
*/
public final class ObservableMapOptional<T, R> extends Observable<R> {
final Observable<T> source;
final Function<? super T, Optional<? extends R>> mapper;
public ObservableMapOptional(Observable<T> source, Function<? super T, Optional<? extends R>> mapper) {
this.source = source;
Reported by PMD.
Line: 33
final Observable<T> source;
final Function<? super T, Optional<? extends R>> mapper;
public ObservableMapOptional(Observable<T> source, Function<? super T, Optional<? extends R>> mapper) {
this.source = source;
this.mapper = mapper;
}
Reported by PMD.
Line: 47
static final class MapOptionalObserver<T, R> extends BasicFuseableObserver<T, R> {
final Function<? super T, Optional<? extends R>> mapper;
MapOptionalObserver(Observer<? super R> downstream, Function<? super T, Optional<? extends R>> mapper) {
super(downstream);
this.mapper = mapper;
}
Reported by PMD.
Line: 68
Optional<? extends R> result;
try {
result = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Optional");
} catch (Throwable ex) {
fail(ex);
return;
}
if (result.isPresent()) {
Reported by PMD.
Line: 73
return;
}
if (result.isPresent()) {
downstream.onNext(result.get());
}
}
@Override
Reported by PMD.
Line: 91
return null;
}
Optional<? extends R> result = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null Optional");
if (result.isPresent()) {
return result.get();
}
}
}
}
Reported by PMD.
Line: 92
}
Optional<? extends R> result = Objects.requireNonNull(mapper.apply(item), "The mapper returned a null Optional");
if (result.isPresent()) {
return result.get();
}
}
}
}
}
Reported by PMD.
Line: 67
Optional<? extends R> result;
try {
result = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Optional");
} catch (Throwable ex) {
fail(ex);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTake.java
8 issues
Line: 22
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
final long limit;
public ObservableTake(ObservableSource<T> source, long limit) {
super(source);
this.limit = limit;
}
Reported by PMD.
Line: 34
}
static final class TakeObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
boolean done;
Disposable upstream;
Reported by PMD.
Line: 36
static final class TakeObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
boolean done;
Disposable upstream;
long remaining;
TakeObserver(Observer<? super T> actual, long limit) {
Reported by PMD.
Line: 38
boolean done;
Disposable upstream;
long remaining;
TakeObserver(Observer<? super T> actual, long limit) {
this.downstream = actual;
this.remaining = limit;
Reported by PMD.
Line: 40
Disposable upstream;
long remaining;
TakeObserver(Observer<? super T> actual, long limit) {
this.downstream = actual;
this.remaining = limit;
}
Reported by PMD.
Line: 62
@Override
public void onNext(T t) {
if (!done && remaining-- > 0) {
boolean stop = remaining == 0;
downstream.onNext(t);
if (stop) {
onComplete();
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
Reported by PMD.
Line: 18
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableTake<T> extends AbstractObservableWithUpstream<T, T> {
final long limit;
public ObservableTake(ObservableSource<T> source, long limit) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelaySubscriptionOther.java
8 issues
Line: 31
* @param <U> the other value type, ignored
*/
public final class FlowableDelaySubscriptionOther<T, U> extends Flowable<T> {
final Publisher<? extends T> main;
final Publisher<U> other;
public FlowableDelaySubscriptionOther(Publisher<? extends T> main, Publisher<U> other) {
this.main = main;
this.other = other;
Reported by PMD.
Line: 32
*/
public final class FlowableDelaySubscriptionOther<T, U> extends Flowable<T> {
final Publisher<? extends T> main;
final Publisher<U> other;
public FlowableDelaySubscriptionOther(Publisher<? extends T> main, Publisher<U> other) {
this.main = main;
this.other = other;
}
Reported by PMD.
Line: 50
private static final long serialVersionUID = 2259811067697317255L;
final Subscriber<? super T> downstream;
final Publisher<? extends T> main;
final OtherSubscriber other;
Reported by PMD.
Line: 52
final Subscriber<? super T> downstream;
final Publisher<? extends T> main;
final OtherSubscriber other;
final AtomicReference<Subscription> upstream;
Reported by PMD.
Line: 54
final Publisher<? extends T> main;
final OtherSubscriber other;
final AtomicReference<Subscription> upstream;
MainSubscriber(Subscriber<? super T> downstream, Publisher<? extends T> main) {
this.downstream = downstream;
Reported by PMD.
Line: 56
final OtherSubscriber other;
final AtomicReference<Subscription> upstream;
MainSubscriber(Subscriber<? super T> downstream, Publisher<? extends T> main) {
this.downstream = downstream;
this.main = main;
this.other = new OtherSubscriber();
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.*;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 20
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
* Delays the subscription to the main source until the other
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimeInterval.java
8 issues
Line: 24
import io.reactivex.rxjava3.schedulers.Timed;
public final class ObservableTimeInterval<T> extends AbstractObservableWithUpstream<T, Timed<T>> {
final Scheduler scheduler;
final TimeUnit unit;
public ObservableTimeInterval(ObservableSource<T> source, TimeUnit unit, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
Reported by PMD.
Line: 25
public final class ObservableTimeInterval<T> extends AbstractObservableWithUpstream<T, Timed<T>> {
final Scheduler scheduler;
final TimeUnit unit;
public ObservableTimeInterval(ObservableSource<T> source, TimeUnit unit, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
this.unit = unit;
Reported by PMD.
Line: 39
}
static final class TimeIntervalObserver<T> implements Observer<T>, Disposable {
final Observer<? super Timed<T>> downstream;
final TimeUnit unit;
final Scheduler scheduler;
long lastTime;
Reported by PMD.
Line: 40
static final class TimeIntervalObserver<T> implements Observer<T>, Disposable {
final Observer<? super Timed<T>> downstream;
final TimeUnit unit;
final Scheduler scheduler;
long lastTime;
Disposable upstream;
Reported by PMD.
Line: 41
static final class TimeIntervalObserver<T> implements Observer<T>, Disposable {
final Observer<? super Timed<T>> downstream;
final TimeUnit unit;
final Scheduler scheduler;
long lastTime;
Disposable upstream;
Reported by PMD.
Line: 43
final TimeUnit unit;
final Scheduler scheduler;
long lastTime;
Disposable upstream;
TimeIntervalObserver(Observer<? super Timed<T>> actual, TimeUnit unit, Scheduler scheduler) {
this.downstream = actual;
Reported by PMD.
Line: 45
long lastTime;
Disposable upstream;
TimeIntervalObserver(Observer<? super Timed<T>> actual, TimeUnit unit, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
this.unit = unit;
Reported by PMD.
Line: 18
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.schedulers.Timed;
public final class ObservableTimeInterval<T> extends AbstractObservableWithUpstream<T, Timed<T>> {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeMap.java
8 issues
Line: 32
*/
public final class MaybeMap<T, R> extends AbstractMaybeWithUpstream<T, R> {
final Function<? super T, ? extends R> mapper;
public MaybeMap(MaybeSource<T> source, Function<? super T, ? extends R> mapper) {
super(source);
this.mapper = mapper;
}
Reported by PMD.
Line: 46
static final class MapMaybeObserver<T, R> implements MaybeObserver<T>, Disposable {
final MaybeObserver<? super R> downstream;
final Function<? super T, ? extends R> mapper;
Disposable upstream;
Reported by PMD.
Line: 48
final MaybeObserver<? super R> downstream;
final Function<? super T, ? extends R> mapper;
Disposable upstream;
MapMaybeObserver(MaybeObserver<? super R> actual, Function<? super T, ? extends R> mapper) {
this.downstream = actual;
Reported by PMD.
Line: 50
final Function<? super T, ? extends R> mapper;
Disposable upstream;
MapMaybeObserver(MaybeObserver<? super R> actual, Function<? super T, ? extends R> mapper) {
this.downstream = actual;
this.mapper = mapper;
}
Reported by PMD.
Line: 61
public void dispose() {
Disposable d = this.upstream;
this.upstream = DisposableHelper.DISPOSED;
d.dispose();
}
@Override
public boolean isDisposed() {
return upstream.isDisposed();
Reported by PMD.
Line: 84
try {
v = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null item");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
Reported by PMD.
Line: 83
R v;
try {
v = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null item");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAll.java
8 issues
Line: 24
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableAll<T> extends AbstractObservableWithUpstream<T, Boolean> {
final Predicate<? super T> predicate;
public ObservableAll(ObservableSource<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
}
Reported by PMD.
Line: 36
}
static final class AllObserver<T> implements Observer<T>, Disposable {
final Observer<? super Boolean> downstream;
final Predicate<? super T> predicate;
Disposable upstream;
boolean done;
Reported by PMD.
Line: 37
static final class AllObserver<T> implements Observer<T>, Disposable {
final Observer<? super Boolean> downstream;
final Predicate<? super T> predicate;
Disposable upstream;
boolean done;
Reported by PMD.
Line: 39
final Observer<? super Boolean> downstream;
final Predicate<? super T> predicate;
Disposable upstream;
boolean done;
AllObserver(Observer<? super Boolean> actual, Predicate<? super T> predicate) {
this.downstream = actual;
Reported by PMD.
Line: 41
Disposable upstream;
boolean done;
AllObserver(Observer<? super Boolean> actual, Predicate<? super T> predicate) {
this.downstream = actual;
this.predicate = predicate;
}
Reported by PMD.
Line: 64
boolean b;
try {
b = predicate.test(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
return;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 63
}
boolean b;
try {
b = predicate.test(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
return;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnSubscribe.java
8 issues
Line: 30
*/
public final class SingleDoOnSubscribe<T> extends Single<T> {
final SingleSource<T> source;
final Consumer<? super Disposable> onSubscribe;
public SingleDoOnSubscribe(SingleSource<T> source, Consumer<? super Disposable> onSubscribe) {
this.source = source;
Reported by PMD.
Line: 32
final SingleSource<T> source;
final Consumer<? super Disposable> onSubscribe;
public SingleDoOnSubscribe(SingleSource<T> source, Consumer<? super Disposable> onSubscribe) {
this.source = source;
this.onSubscribe = onSubscribe;
}
Reported by PMD.
Line: 46
static final class DoOnSubscribeSingleObserver<T> implements SingleObserver<T> {
final SingleObserver<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
boolean done;
Reported by PMD.
Line: 48
final SingleObserver<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
boolean done;
DoOnSubscribeSingleObserver(SingleObserver<? super T> actual, Consumer<? super Disposable> onSubscribe) {
this.downstream = actual;
Reported by PMD.
Line: 48
final SingleObserver<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
boolean done;
DoOnSubscribeSingleObserver(SingleObserver<? super T> actual, Consumer<? super Disposable> onSubscribe) {
this.downstream = actual;
Reported by PMD.
Line: 50
final Consumer<? super Disposable> onSubscribe;
boolean done;
DoOnSubscribeSingleObserver(SingleObserver<? super T> actual, Consumer<? super Disposable> onSubscribe) {
this.downstream = actual;
this.onSubscribe = onSubscribe;
}
Reported by PMD.
Line: 61
public void onSubscribe(Disposable d) {
try {
onSubscribe.accept(d);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
done = true;
d.dispose();
EmptyDisposable.error(ex, downstream);
return;
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.single;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableRangeLong.java
8 issues
Line: 21
import io.reactivex.rxjava3.internal.observers.BasicIntQueueDisposable;
public final class ObservableRangeLong extends Observable<Long> {
private final long start;
private final long count;
public ObservableRangeLong(long start, long count) {
this.start = start;
this.count = count;
Reported by PMD.
Line: 22
public final class ObservableRangeLong extends Observable<Long> {
private final long start;
private final long count;
public ObservableRangeLong(long start, long count) {
this.start = start;
this.count = count;
}
Reported by PMD.
Line: 41
private static final long serialVersionUID = 396518478098735504L;
final Observer<? super Long> downstream;
final long end;
long index;
Reported by PMD.
Line: 43
final Observer<? super Long> downstream;
final long end;
long index;
boolean fused;
Reported by PMD.
Line: 45
final long end;
long index;
boolean fused;
RangeDisposable(Observer<? super Long> actual, long start, long end) {
this.downstream = actual;
Reported by PMD.
Line: 47
long index;
boolean fused;
RangeDisposable(Observer<? super Long> actual, long start, long end) {
this.downstream = actual;
this.index = start;
this.end = end;
Reported by PMD.
Line: 17
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.observers.BasicIntQueueDisposable;
public final class ObservableRangeLong extends Observable<Long> {
private final long start;
private final long count;
Reported by PMD.
Line: 59
if (fused) {
return;
}
Observer<? super Long> actual = this.downstream;
long e = end;
for (long i = index; i != e && get() == 0; i++) {
actual.onNext(i);
}
if (get() == 0) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableRange.java
8 issues
Line: 24
* Emits a range of integer values from start to end.
*/
public final class ObservableRange extends Observable<Integer> {
private final int start;
private final long end;
public ObservableRange(int start, int count) {
this.start = start;
this.end = (long)start + count;
Reported by PMD.
Line: 25
*/
public final class ObservableRange extends Observable<Integer> {
private final int start;
private final long end;
public ObservableRange(int start, int count) {
this.start = start;
this.end = (long)start + count;
}
Reported by PMD.
Line: 44
private static final long serialVersionUID = 396518478098735504L;
final Observer<? super Integer> downstream;
final long end;
long index;
Reported by PMD.
Line: 46
final Observer<? super Integer> downstream;
final long end;
long index;
boolean fused;
Reported by PMD.
Line: 48
final long end;
long index;
boolean fused;
RangeDisposable(Observer<? super Integer> actual, long start, long end) {
this.downstream = actual;
Reported by PMD.
Line: 50
long index;
boolean fused;
RangeDisposable(Observer<? super Integer> actual, long start, long end) {
this.downstream = actual;
this.index = start;
this.end = end;
Reported by PMD.
Line: 17
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.observers.BasicIntQueueDisposable;
/**
* Emits a range of integer values from start to end.
*/
Reported by PMD.
Line: 62
if (fused) {
return;
}
Observer<? super Integer> actual = this.downstream;
long e = end;
for (long i = index; i != e && get() == 0; i++) {
actual.onNext((int)i);
}
if (get() == 0) {
Reported by PMD.