The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollect.java
13 issues
Line: 28
public final class FlowableCollect<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Supplier<? extends U> initialSupplier;
final BiConsumer<? super U, ? super T> collector;
public FlowableCollect(Flowable<T> source, Supplier<? extends U> initialSupplier, BiConsumer<? super U, ? super T> collector) {
super(source);
this.initialSupplier = initialSupplier;
Reported by PMD.
Line: 29
public final class FlowableCollect<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Supplier<? extends U> initialSupplier;
final BiConsumer<? super U, ? super T> collector;
public FlowableCollect(Flowable<T> source, Supplier<? extends U> initialSupplier, BiConsumer<? super U, ? super T> collector) {
super(source);
this.initialSupplier = initialSupplier;
this.collector = collector;
Reported by PMD.
Line: 42
U u;
try {
u = Objects.requireNonNull(initialSupplier.get(), "The initial value supplied is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptySubscription.error(e, s);
return;
}
Reported by PMD.
Line: 55
private static final long serialVersionUID = -3589550218733891694L;
final BiConsumer<? super U, ? super T> collector;
final U u;
Subscription upstream;
Reported by PMD.
Line: 57
final BiConsumer<? super U, ? super T> collector;
final U u;
Subscription upstream;
boolean done;
Reported by PMD.
Line: 59
final U u;
Subscription upstream;
boolean done;
CollectSubscriber(Subscriber<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
super(actual);
Reported by PMD.
Line: 61
Subscription upstream;
boolean done;
CollectSubscriber(Subscriber<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
super(actual);
this.collector = collector;
this.u = u;
Reported by PMD.
Line: 85
}
try {
collector.accept(u, t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.cancel();
onError(e);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.flowable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
Reported by PMD.
Line: 18
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnLifecycle.java
13 issues
Line: 33
*/
public final class SingleDoOnLifecycle<T> extends Single<T> {
final Single<T> source;
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
Reported by PMD.
Line: 35
final Single<T> source;
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
public SingleDoOnLifecycle(Single<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
Reported by PMD.
Line: 37
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
public SingleDoOnLifecycle(Single<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
this.source = upstream;
this.onSubscribe = onSubscribe;
Reported by PMD.
Line: 53
static final class SingleLifecycleObserver<T> implements SingleObserver<T>, Disposable {
final SingleObserver<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
Reported by PMD.
Line: 55
final SingleObserver<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
Disposable upstream;
Reported by PMD.
Line: 55
final SingleObserver<? super T> downstream;
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
Disposable upstream;
Reported by PMD.
Line: 57
final Consumer<? super Disposable> onSubscribe;
final Action onDispose;
Disposable upstream;
SingleLifecycleObserver(SingleObserver<? super T> downstream, Consumer<? super Disposable> onSubscribe, Action onDispose) {
this.downstream = downstream;
Reported by PMD.
Line: 59
final Action onDispose;
Disposable upstream;
SingleLifecycleObserver(SingleObserver<? super T> downstream, Consumer<? super Disposable> onSubscribe, Action onDispose) {
this.downstream = downstream;
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
Reported by PMD.
Line: 72
// this way, multiple calls to onSubscribe can show up in tests that use doOnSubscribe to validate behavior
try {
onSubscribe.accept(d);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
d.dispose();
this.upstream = DisposableHelper.DISPOSED;
EmptyDisposable.error(e, downstream);
return;
Reported by PMD.
Line: 107
public void dispose() {
try {
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/AbstractBackpressureThrottlingSubscriber.java
13 issues
Line: 37
private static final long serialVersionUID = -5050301752721603566L;
final Subscriber<? super R> downstream;
Subscription upstream;
volatile boolean done;
Throwable error;
Reported by PMD.
Line: 39
final Subscriber<? super R> downstream;
Subscription upstream;
volatile boolean done;
Throwable error;
volatile boolean cancelled;
Reported by PMD.
Line: 41
Subscription upstream;
volatile boolean done;
Throwable error;
volatile boolean cancelled;
final AtomicLong requested = new AtomicLong();
Reported by PMD.
Line: 42
Subscription upstream;
volatile boolean done;
Throwable error;
volatile boolean cancelled;
final AtomicLong requested = new AtomicLong();
Reported by PMD.
Line: 44
volatile boolean done;
Throwable error;
volatile boolean cancelled;
final AtomicLong requested = new AtomicLong();
final AtomicReference<R> current = new AtomicReference<>();
Reported by PMD.
Line: 46
volatile boolean cancelled;
final AtomicLong requested = new AtomicLong();
final AtomicReference<R> current = new AtomicReference<>();
AbstractBackpressureThrottlingSubscriber(Subscriber<? super R> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 48
final AtomicLong requested = new AtomicLong();
final AtomicReference<R> current = new AtomicReference<>();
AbstractBackpressureThrottlingSubscriber(Subscriber<? super R> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 99
}
}
void drain() {
if (getAndIncrement() != 0) {
return;
}
final Subscriber<? super R> a = downstream;
int missed = 1;
Reported by PMD.
Line: 133
return;
}
if (e != 0L) {
BackpressureHelper.produced(r, e);
}
missed = addAndGet(-missed);
if (missed == 0) {
Reported by PMD.
Line: 103
if (getAndIncrement() != 0) {
return;
}
final Subscriber<? super R> a = downstream;
int missed = 1;
final AtomicLong r = requested;
final AtomicReference<R> q = current;
for (;;) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ConcatMapXMainSubscriber.java
13 issues
Line: 38
private static final long serialVersionUID = -3214213361171757852L;
final AtomicThrowable errors;
final int prefetch;
final ErrorMode errorMode;
Reported by PMD.
Line: 40
final AtomicThrowable errors;
final int prefetch;
final ErrorMode errorMode;
SimpleQueue<T> queue;
Reported by PMD.
Line: 42
final int prefetch;
final ErrorMode errorMode;
SimpleQueue<T> queue;
Subscription upstream;
Reported by PMD.
Line: 44
final ErrorMode errorMode;
SimpleQueue<T> queue;
Subscription upstream;
volatile boolean done;
Reported by PMD.
Line: 46
SimpleQueue<T> queue;
Subscription upstream;
volatile boolean done;
volatile boolean cancelled;
Reported by PMD.
Line: 48
Subscription upstream;
volatile boolean done;
volatile boolean cancelled;
boolean syncFused;
Reported by PMD.
Line: 50
volatile boolean done;
volatile boolean cancelled;
boolean syncFused;
public ConcatMapXMainSubscriber(int prefetch, ErrorMode errorMode) {
this.errorMode = errorMode;
Reported by PMD.
Line: 52
volatile boolean cancelled;
boolean syncFused;
public ConcatMapXMainSubscriber(int prefetch, ErrorMode errorMode) {
this.errorMode = errorMode;
this.errors = new AtomicThrowable();
this.prefetch = prefetch;
Reported by PMD.
Line: 67
if (s instanceof QueueSubscription) {
@SuppressWarnings("unchecked")
QueueSubscription<T> qs = (QueueSubscription<T>)s;
int mode = qs.requestFusion(QueueFuseable.ANY | QueueFuseable.BOUNDARY);
if (mode == QueueFuseable.SYNC) {
queue = qs;
syncFused = true;
done = true;
Reported by PMD.
Line: 78
drain();
return;
}
else if (mode == QueueFuseable.ASYNC) {
queue = qs;
onSubscribeDownstream();
upstream.request(prefetch);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/observers/ForEachWhileObserver.java
13 issues
Line: 31
private static final long serialVersionUID = -4403180040475402120L;
final Predicate<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
Reported by PMD.
Line: 31
private static final long serialVersionUID = -4403180040475402120L;
final Predicate<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
Reported by PMD.
Line: 33
final Predicate<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
boolean done;
Reported by PMD.
Line: 33
final Predicate<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
boolean done;
Reported by PMD.
Line: 35
final Consumer<? super Throwable> onError;
final Action onComplete;
boolean done;
public ForEachWhileObserver(Predicate<? super T> onNext,
Consumer<? super Throwable> onError, Action onComplete) {
Reported by PMD.
Line: 35
final Consumer<? super Throwable> onError;
final Action onComplete;
boolean done;
public ForEachWhileObserver(Predicate<? super T> onNext,
Consumer<? super Throwable> onError, Action onComplete) {
Reported by PMD.
Line: 37
final Action onComplete;
boolean done;
public ForEachWhileObserver(Predicate<? super T> onNext,
Consumer<? super Throwable> onError, Action onComplete) {
this.onNext = onNext;
this.onError = onError;
Reported by PMD.
Line: 60
boolean b;
try {
b = onNext.test(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
dispose();
onError(ex);
return;
}
Reported by PMD.
Line: 82
done = true;
try {
onError.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(new CompositeException(t, ex));
}
}
Reported by PMD.
Line: 96
done = true;
try {
onComplete.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/FlowableFlatMapCompletableAsyncPerf.java
13 issues
Line: 35
public class FlowableFlatMapCompletableAsyncPerf implements Action {
@Param({"1", "10", "100", "1000", "10000", "100000", "1000000"})
int items;
@Param({"1", "8", "32", "128", "256"})
int maxConcurrency;
@Param({"1", "10", "100", "1000"})
Reported by PMD.
Line: 38
int items;
@Param({"1", "8", "32", "128", "256"})
int maxConcurrency;
@Param({"1", "10", "100", "1000"})
int work;
Completable flatMapCompletable;
Reported by PMD.
Line: 41
int maxConcurrency;
@Param({"1", "10", "100", "1000"})
int work;
Completable flatMapCompletable;
Flowable<Object> flatMap;
Reported by PMD.
Line: 43
@Param({"1", "10", "100", "1000"})
int work;
Completable flatMapCompletable;
Flowable<Object> flatMap;
@Override
public void run() {
Reported by PMD.
Line: 43
@Param({"1", "10", "100", "1000"})
int work;
Completable flatMapCompletable;
Flowable<Object> flatMap;
@Override
public void run() {
Reported by PMD.
Line: 45
Completable flatMapCompletable;
Flowable<Object> flatMap;
@Override
public void run() {
Blackhole.consumeCPU(work);
}
Reported by PMD.
Line: 45
Completable flatMapCompletable;
Flowable<Object> flatMap;
@Override
public void run() {
Blackhole.consumeCPU(work);
}
Reported by PMD.
Line: 57
Integer[] array = new Integer[items];
Arrays.fill(array, 777);
flatMapCompletable = Flowable.fromArray(array)
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency);
flatMap = Flowable.fromArray(array)
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), false, maxConcurrency);
}
Reported by PMD.
Line: 58
Arrays.fill(array, 777);
flatMapCompletable = Flowable.fromArray(array)
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency);
flatMap = Flowable.fromArray(array)
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), false, maxConcurrency);
}
Reported by PMD.
Line: 60
flatMapCompletable = Flowable.fromArray(array)
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency);
flatMap = Flowable.fromArray(array)
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), false, maxConcurrency);
}
// @Benchmark
public Object flatMap(Blackhole bh) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDematerialize.java
13 issues
Line: 27
public final class ObservableDematerialize<T, R> extends AbstractObservableWithUpstream<T, R> {
final Function<? super T, ? extends Notification<R>> selector;
public ObservableDematerialize(ObservableSource<T> source, Function<? super T, ? extends Notification<R>> selector) {
super(source);
this.selector = selector;
}
Reported by PMD.
Line: 40
}
static final class DematerializeObserver<T, R> implements Observer<T>, Disposable {
final Observer<? super R> downstream;
final Function<? super T, ? extends Notification<R>> selector;
boolean done;
Reported by PMD.
Line: 42
static final class DematerializeObserver<T, R> implements Observer<T>, Disposable {
final Observer<? super R> downstream;
final Function<? super T, ? extends Notification<R>> selector;
boolean done;
Disposable upstream;
Reported by PMD.
Line: 44
final Function<? super T, ? extends Notification<R>> selector;
boolean done;
Disposable upstream;
DematerializeObserver(Observer<? super R> downstream, Function<? super T, ? extends Notification<R>> selector) {
this.downstream = downstream;
Reported by PMD.
Line: 46
boolean done;
Disposable upstream;
DematerializeObserver(Observer<? super R> downstream, Function<? super T, ? extends Notification<R>> selector) {
this.downstream = downstream;
this.selector = selector;
}
Reported by PMD.
Line: 77
if (done) {
if (item instanceof Notification) {
Notification<?> notification = (Notification<?>)item;
if (notification.isOnError()) {
RxJavaPlugins.onError(notification.getError());
}
}
return;
}
Reported by PMD.
Line: 77
if (done) {
if (item instanceof Notification) {
Notification<?> notification = (Notification<?>)item;
if (notification.isOnError()) {
RxJavaPlugins.onError(notification.getError());
}
}
return;
}
Reported by PMD.
Line: 78
if (item instanceof Notification) {
Notification<?> notification = (Notification<?>)item;
if (notification.isOnError()) {
RxJavaPlugins.onError(notification.getError());
}
}
return;
}
Reported by PMD.
Line: 88
try {
notification = Objects.requireNonNull(selector.apply(item), "The selector returned a null Notification");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.dispose();
onError(ex);
return;
}
Reported by PMD.
Line: 94
onError(ex);
return;
}
if (notification.isOnError()) {
upstream.dispose();
onError(notification.getError());
}
else if (notification.isOnComplete()) {
upstream.dispose();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableScalarXMap.java
13 issues
Line: 54
try {
t = ((Supplier<T>)source).get();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, subscriber);
return true;
}
Reported by PMD.
Line: 69
try {
r = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, subscriber);
return true;
}
Reported by PMD.
Line: 80
try {
u = ((Supplier<R>)r).get();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, subscriber);
return true;
}
Reported by PMD.
Line: 92
}
subscriber.onSubscribe(new ScalarSubscription<>(subscriber, u));
} else {
r.subscribe(subscriber);
}
return true;
}
return false;
Reported by PMD.
Line: 122
*/
static final class ScalarXMapFlowable<T, R> extends Flowable<R> {
final T value;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
ScalarXMapFlowable(T value,
Function<? super T, ? extends Publisher<? extends R>> mapper) {
Reported by PMD.
Line: 124
final T value;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
ScalarXMapFlowable(T value,
Function<? super T, ? extends Publisher<? extends R>> mapper) {
this.value = value;
this.mapper = mapper;
Reported by PMD.
Line: 138
Publisher<? extends R> other;
try {
other = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null Publisher");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptySubscription.error(e, s);
return;
}
if (other instanceof Supplier) {
Reported by PMD.
Line: 148
try {
u = ((Supplier<R>)other).get();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.flowable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
Reported by PMD.
Line: 20
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import java.util.Objects;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSampleTimed.java
13 issues
Line: 25
import io.reactivex.rxjava3.observers.SerializedObserver;
public final class ObservableSampleTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final boolean emitLast;
Reported by PMD.
Line: 26
public final class ObservableSampleTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final boolean emitLast;
public ObservableSampleTimed(ObservableSource<T> source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
Reported by PMD.
Line: 27
public final class ObservableSampleTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final boolean emitLast;
public ObservableSampleTimed(ObservableSource<T> source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
super(source);
Reported by PMD.
Line: 29
final TimeUnit unit;
final Scheduler scheduler;
final boolean emitLast;
public ObservableSampleTimed(ObservableSource<T> source, long period, TimeUnit unit, Scheduler scheduler, boolean emitLast) {
super(source);
this.period = period;
this.unit = unit;
Reported by PMD.
Line: 53
private static final long serialVersionUID = -3517602651313910099L;
final Observer<? super T> downstream;
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final AtomicReference<Disposable> timer = new AtomicReference<>();
Reported by PMD.
Line: 54
private static final long serialVersionUID = -3517602651313910099L;
final Observer<? super T> downstream;
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final AtomicReference<Disposable> timer = new AtomicReference<>();
Reported by PMD.
Line: 55
final Observer<? super T> downstream;
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final AtomicReference<Disposable> timer = new AtomicReference<>();
Disposable upstream;
Reported by PMD.
Line: 56
final Observer<? super T> downstream;
final long period;
final TimeUnit unit;
final Scheduler scheduler;
final AtomicReference<Disposable> timer = new AtomicReference<>();
Disposable upstream;
Reported by PMD.
Line: 58
final TimeUnit unit;
final Scheduler scheduler;
final AtomicReference<Disposable> timer = new AtomicReference<>();
Disposable upstream;
SampleTimedObserver(Observer<? super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
this.downstream = actual;
Reported by PMD.
Line: 60
final AtomicReference<Disposable> timer = new AtomicReference<>();
Disposable upstream;
SampleTimedObserver(Observer<? super T> actual, long period, TimeUnit unit, Scheduler scheduler) {
this.downstream = actual;
this.period = period;
this.unit = unit;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFilter.java
13 issues
Line: 25
import io.reactivex.rxjava3.internal.subscribers.*;
public final class FlowableFilter<T> extends AbstractFlowableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public FlowableFilter(Flowable<T> source, Predicate<? super T> predicate) {
super(source);
this.predicate = predicate;
}
Reported by PMD.
Line: 43
static final class FilterSubscriber<T> extends BasicFuseableSubscriber<T, T>
implements ConditionalSubscriber<T> {
final Predicate<? super T> filter;
FilterSubscriber(Subscriber<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
}
Reported by PMD.
Line: 69
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return true;
}
if (b) {
downstream.onNext(t);
Reported by PMD.
Line: 108
}
static final class FilterConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
final Predicate<? super T> filter;
FilterConditionalSubscriber(ConditionalSubscriber<? super T> actual, Predicate<? super T> filter) {
super(actual);
this.filter = filter;
}
Reported by PMD.
Line: 135
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return true;
}
return b && downstream.tryOnNext(t);
}
Reported by PMD.
Line: 21
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.fuseable.*;
import io.reactivex.rxjava3.internal.subscribers.*;
public final class FlowableFilter<T> extends AbstractFlowableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public FlowableFilter(Flowable<T> source, Predicate<? super T> predicate) {
Reported by PMD.
Line: 22
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.fuseable.*;
import io.reactivex.rxjava3.internal.subscribers.*;
public final class FlowableFilter<T> extends AbstractFlowableWithUpstream<T, T> {
final Predicate<? super T> predicate;
public FlowableFilter(Flowable<T> source, Predicate<? super T> predicate) {
super(source);
Reported by PMD.
Line: 68
}
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return true;
}
if (b) {
Reported by PMD.
Line: 87
@Nullable
@Override
public T poll() throws Throwable {
QueueSubscription<T> qs = this.qs;
Predicate<? super T> f = filter;
for (;;) {
T t = qs.poll();
if (t == null) {
Reported by PMD.
Line: 88
@Override
public T poll() throws Throwable {
QueueSubscription<T> qs = this.qs;
Predicate<? super T> f = filter;
for (;;) {
T t = qs.poll();
if (t == null) {
return null;
Reported by PMD.