The following issues were found
src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableFromCompletionStageTest.java
15 issues
Line: 27
public class FlowableFromCompletionStageTest extends RxJavaTest {
@Test
public void syncSuccess() {
Flowable.fromCompletionStage(CompletableFuture.completedFuture(1))
.test()
.assertResult(1);
}
Reported by PMD.
Line: 28
@Test
public void syncSuccess() {
Flowable.fromCompletionStage(CompletableFuture.completedFuture(1))
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 28
@Test
public void syncSuccess() {
Flowable.fromCompletionStage(CompletableFuture.completedFuture(1))
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 34
}
@Test
public void syncFailure() {
CompletableFuture<Integer> cf = new CompletableFuture<>();
cf.completeExceptionally(new TestException());
Flowable.fromCompletionStage(cf)
.test()
Reported by PMD.
Line: 38
CompletableFuture<Integer> cf = new CompletableFuture<>();
cf.completeExceptionally(new TestException());
Flowable.fromCompletionStage(cf)
.test()
.assertFailure(TestException.class);
}
@Test
Reported by PMD.
Line: 38
CompletableFuture<Integer> cf = new CompletableFuture<>();
cf.completeExceptionally(new TestException());
Flowable.fromCompletionStage(cf)
.test()
.assertFailure(TestException.class);
}
@Test
Reported by PMD.
Line: 44
}
@Test
public void syncNull() {
Flowable.fromCompletionStage(CompletableFuture.<Integer>completedFuture(null))
.test()
.assertFailure(NullPointerException.class);
}
Reported by PMD.
Line: 45
@Test
public void syncNull() {
Flowable.fromCompletionStage(CompletableFuture.<Integer>completedFuture(null))
.test()
.assertFailure(NullPointerException.class);
}
@Test
Reported by PMD.
Line: 45
@Test
public void syncNull() {
Flowable.fromCompletionStage(CompletableFuture.<Integer>completedFuture(null))
.test()
.assertFailure(NullPointerException.class);
}
@Test
Reported by PMD.
Line: 51
}
@Test
public void cancel() {
CompletableFuture<Integer> cf = new CompletableFuture<>();
TestSubscriber<Integer> ts = Flowable.fromCompletionStage(cf)
.test();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java
15 issues
Line: 24
public class CancellableQueueFuseableTest {
@Test
public void offer() {
TestHelper.assertNoOffer(new CancellableQueueFuseable<>());
}
@Test
public void pollClear() throws Throwable {
Reported by PMD.
Line: 29
}
@Test
public void pollClear() throws Throwable {
CancellableQueueFuseable<Object> qs = new CancellableQueueFuseable<>();
assertNull(qs.poll());
qs.clear();
Reported by PMD.
Line: 32
public void pollClear() throws Throwable {
CancellableQueueFuseable<Object> qs = new CancellableQueueFuseable<>();
assertNull(qs.poll());
qs.clear();
assertNull(qs.poll());
}
Reported by PMD.
Line: 35
assertNull(qs.poll());
qs.clear();
assertNull(qs.poll());
}
@Test
public void cancel() {
CancellableQueueFuseable<Object> qs = new CancellableQueueFuseable<>();
Reported by PMD.
Line: 39
}
@Test
public void cancel() {
CancellableQueueFuseable<Object> qs = new CancellableQueueFuseable<>();
assertFalse(qs.isDisposed());
qs.cancel();
Reported by PMD.
Line: 42
public void cancel() {
CancellableQueueFuseable<Object> qs = new CancellableQueueFuseable<>();
assertFalse(qs.isDisposed());
qs.cancel();
assertTrue(qs.isDisposed());
Reported by PMD.
Line: 46
qs.cancel();
assertTrue(qs.isDisposed());
qs.cancel();
assertTrue(qs.isDisposed());
}
Reported by PMD.
Line: 50
qs.cancel();
assertTrue(qs.isDisposed());
}
@Test
public void dispose() {
CancellableQueueFuseable<Object> qs = new CancellableQueueFuseable<>();
Reported by PMD.
Line: 54
}
@Test
public void dispose() {
CancellableQueueFuseable<Object> qs = new CancellableQueueFuseable<>();
assertFalse(qs.isDisposed());
qs.dispose();
Reported by PMD.
Line: 57
public void dispose() {
CancellableQueueFuseable<Object> qs = new CancellableQueueFuseable<>();
assertFalse(qs.isDisposed());
qs.dispose();
assertTrue(qs.isDisposed());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoOnSubscribeTest.java
15 issues
Line: 30
public class FlowableDoOnSubscribeTest extends RxJavaTest {
@Test
public void doOnSubscribe() throws Exception {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> f = Flowable.just(1).doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
count.incrementAndGet();
Reported by PMD.
Line: 42
f.subscribe();
f.subscribe();
f.subscribe();
assertEquals(3, count.get());
}
@Test
public void doOnSubscribe2() throws Exception {
final AtomicInteger count = new AtomicInteger();
Reported by PMD.
Line: 46
}
@Test
public void doOnSubscribe2() throws Exception {
final AtomicInteger count = new AtomicInteger();
Flowable<Integer> f = Flowable.just(1).doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
count.incrementAndGet();
Reported by PMD.
Line: 61
});
f.subscribe();
assertEquals(2, count.get());
}
@Test
public void doOnUnSubscribeWorksWithRefCount() throws Exception {
final AtomicInteger onSubscribed = new AtomicInteger();
Reported by PMD.
Line: 65
}
@Test
public void doOnUnSubscribeWorksWithRefCount() throws Exception {
final AtomicInteger onSubscribed = new AtomicInteger();
final AtomicInteger countBefore = new AtomicInteger();
final AtomicInteger countAfter = new AtomicInteger();
final AtomicReference<Subscriber<? super Integer>> sref = new AtomicReference<>();
Flowable<Integer> f = Flowable.unsafeCreate(new Publisher<Integer>() {
Reported by PMD.
Line: 65
}
@Test
public void doOnUnSubscribeWorksWithRefCount() throws Exception {
final AtomicInteger onSubscribed = new AtomicInteger();
final AtomicInteger countBefore = new AtomicInteger();
final AtomicInteger countAfter = new AtomicInteger();
final AtomicReference<Subscriber<? super Integer>> sref = new AtomicReference<>();
Flowable<Integer> f = Flowable.unsafeCreate(new Publisher<Integer>() {
Reported by PMD.
Line: 95
f.subscribe();
f.subscribe();
f.subscribe();
assertEquals(1, countBefore.get());
assertEquals(1, onSubscribed.get());
assertEquals(3, countAfter.get());
sref.get().onComplete();
f.subscribe();
f.subscribe();
Reported by PMD.
Line: 96
f.subscribe();
f.subscribe();
assertEquals(1, countBefore.get());
assertEquals(1, onSubscribed.get());
assertEquals(3, countAfter.get());
sref.get().onComplete();
f.subscribe();
f.subscribe();
f.subscribe();
Reported by PMD.
Line: 97
f.subscribe();
assertEquals(1, countBefore.get());
assertEquals(1, onSubscribed.get());
assertEquals(3, countAfter.get());
sref.get().onComplete();
f.subscribe();
f.subscribe();
f.subscribe();
assertEquals(2, countBefore.get());
Reported by PMD.
Line: 98
assertEquals(1, countBefore.get());
assertEquals(1, onSubscribed.get());
assertEquals(3, countAfter.get());
sref.get().onComplete();
f.subscribe();
f.subscribe();
f.subscribe();
assertEquals(2, countBefore.get());
assertEquals(2, onSubscribed.get());
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMerge.java
15 issues
Line: 27
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
public final class CompletableMerge extends Completable {
final Publisher<? extends CompletableSource> source;
final int maxConcurrency;
final boolean delayErrors;
public CompletableMerge(Publisher<? extends CompletableSource> source, int maxConcurrency, boolean delayErrors) {
this.source = source;
Reported by PMD.
Line: 28
public final class CompletableMerge extends Completable {
final Publisher<? extends CompletableSource> source;
final int maxConcurrency;
final boolean delayErrors;
public CompletableMerge(Publisher<? extends CompletableSource> source, int maxConcurrency, boolean delayErrors) {
this.source = source;
this.maxConcurrency = maxConcurrency;
Reported by PMD.
Line: 29
public final class CompletableMerge extends Completable {
final Publisher<? extends CompletableSource> source;
final int maxConcurrency;
final boolean delayErrors;
public CompletableMerge(Publisher<? extends CompletableSource> source, int maxConcurrency, boolean delayErrors) {
this.source = source;
this.maxConcurrency = maxConcurrency;
this.delayErrors = delayErrors;
Reported by PMD.
Line: 49
private static final long serialVersionUID = -2108443387387077490L;
final CompletableObserver downstream;
final int maxConcurrency;
final boolean delayErrors;
final AtomicThrowable errors;
Reported by PMD.
Line: 50
private static final long serialVersionUID = -2108443387387077490L;
final CompletableObserver downstream;
final int maxConcurrency;
final boolean delayErrors;
final AtomicThrowable errors;
final CompositeDisposable set;
Reported by PMD.
Line: 51
final CompletableObserver downstream;
final int maxConcurrency;
final boolean delayErrors;
final AtomicThrowable errors;
final CompositeDisposable set;
Reported by PMD.
Line: 53
final int maxConcurrency;
final boolean delayErrors;
final AtomicThrowable errors;
final CompositeDisposable set;
Subscription upstream;
Reported by PMD.
Line: 55
final AtomicThrowable errors;
final CompositeDisposable set;
Subscription upstream;
CompletableMergeSubscriber(CompletableObserver actual, int maxConcurrency, boolean delayErrors) {
this.downstream = actual;
Reported by PMD.
Line: 57
final CompositeDisposable set;
Subscription upstream;
CompletableMergeSubscriber(CompletableObserver actual, int maxConcurrency, boolean delayErrors) {
this.downstream = actual;
this.maxConcurrency = maxConcurrency;
this.delayErrors = delayErrors;
Reported by PMD.
Line: 108
set.dispose();
if (errors.tryAddThrowableOrReport(t)) {
if (getAndSet(0) > 0) {
errors.tryTerminateConsumer(downstream);
}
}
} else {
if (errors.tryAddThrowableOrReport(t)) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleCache.java
15 issues
Line: 28
@SuppressWarnings("rawtypes")
static final CacheDisposable[] TERMINATED = new CacheDisposable[0];
final SingleSource<? extends T> source;
final AtomicInteger wip;
final AtomicReference<CacheDisposable<T>[]> observers;
Reported by PMD.
Line: 30
final SingleSource<? extends T> source;
final AtomicInteger wip;
final AtomicReference<CacheDisposable<T>[]> observers;
T value;
Reported by PMD.
Line: 32
final AtomicInteger wip;
final AtomicReference<CacheDisposable<T>[]> observers;
T value;
Throwable error;
Reported by PMD.
Line: 34
final AtomicReference<CacheDisposable<T>[]> observers;
T value;
Throwable error;
@SuppressWarnings("unchecked")
public SingleCache(SingleSource<? extends T> source) {
Reported by PMD.
Line: 36
T value;
Throwable error;
@SuppressWarnings("unchecked")
public SingleCache(SingleSource<? extends T> source) {
this.source = source;
this.wip = new AtomicInteger();
Reported by PMD.
Line: 38
Throwable error;
@SuppressWarnings("unchecked")
public SingleCache(SingleSource<? extends T> source) {
this.source = source;
this.wip = new AtomicInteger();
this.observers = new AtomicReference<>(EMPTY);
}
Reported by PMD.
Line: 77
}
int n = a.length;
@SuppressWarnings("unchecked")
CacheDisposable<T>[] b = new CacheDisposable[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = observer;
if (observers.compareAndSet(a, b)) {
return true;
}
Reported by PMD.
Line: 109
CacheDisposable<T>[] b;
if (n == 1) {
b = EMPTY;
} else {
b = new CacheDisposable[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
Reported by PMD.
Line: 112
if (n == 1) {
b = EMPTY;
} else {
b = new CacheDisposable[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (observers.compareAndSet(a, b)) {
return;
Reported by PMD.
Line: 134
for (CacheDisposable<T> d : observers.getAndSet(TERMINATED)) {
if (!d.isDisposed()) {
d.downstream.onSuccess(value);
}
}
}
@SuppressWarnings("unchecked")
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/jdk8/ObservableFlatMapStream.java
15 issues
Line: 38
*/
public final class ObservableFlatMapStream<T, R> extends Observable<R> {
final Observable<T> source;
final Function<? super T, ? extends Stream<? extends R>> mapper;
public ObservableFlatMapStream(Observable<T> source, Function<? super T, ? extends Stream<? extends R>> mapper) {
this.source = source;
Reported by PMD.
Line: 40
final Observable<T> source;
final Function<? super T, ? extends Stream<? extends R>> mapper;
public ObservableFlatMapStream(Observable<T> source, Function<? super T, ? extends Stream<? extends R>> mapper) {
this.source = source;
this.mapper = mapper;
}
Reported by PMD.
Line: 57
if (t != null) {
stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
Line: 78
private static final long serialVersionUID = -5127032662980523968L;
final Observer<? super R> downstream;
final Function<? super T, ? extends Stream<? extends R>> mapper;
Disposable upstream;
Reported by PMD.
Line: 80
final Observer<? super R> downstream;
final Function<? super T, ? extends Stream<? extends R>> mapper;
Disposable upstream;
volatile boolean disposed;
Reported by PMD.
Line: 82
final Function<? super T, ? extends Stream<? extends R>> mapper;
Disposable upstream;
volatile boolean disposed;
boolean done;
Reported by PMD.
Line: 84
Disposable upstream;
volatile boolean disposed;
boolean done;
FlatMapStreamObserver(Observer<? super R> downstream, Function<? super T, ? extends Stream<? extends R>> mapper) {
this.downstream = downstream;
Reported by PMD.
Line: 86
volatile boolean disposed;
boolean done;
FlatMapStreamObserver(Observer<? super R> downstream, Function<? super T, ? extends Stream<? extends R>> mapper) {
this.downstream = downstream;
this.mapper = mapper;
}
Reported by PMD.
Line: 127
}
}
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.dispose();
onError(ex);
}
}
Reported by PMD.
Line: 25
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
* Maps the upstream values onto {@link Stream}s and emits their items in order to the downstream.
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSamplePublisher.java
15 issues
Line: 27
import io.reactivex.rxjava3.subscribers.SerializedSubscriber;
public final class FlowableSamplePublisher<T> extends Flowable<T> {
final Publisher<T> source;
final Publisher<?> other;
final boolean emitLast;
public FlowableSamplePublisher(Publisher<T> source, Publisher<?> other, boolean emitLast) {
Reported by PMD.
Line: 28
public final class FlowableSamplePublisher<T> extends Flowable<T> {
final Publisher<T> source;
final Publisher<?> other;
final boolean emitLast;
public FlowableSamplePublisher(Publisher<T> source, Publisher<?> other, boolean emitLast) {
this.source = source;
Reported by PMD.
Line: 30
final Publisher<T> source;
final Publisher<?> other;
final boolean emitLast;
public FlowableSamplePublisher(Publisher<T> source, Publisher<?> other, boolean emitLast) {
this.source = source;
this.other = other;
this.emitLast = emitLast;
Reported by PMD.
Line: 48
}
}
abstract static class SamplePublisherSubscriber<T> extends AtomicReference<T> implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -3517602651313910099L;
final Subscriber<? super T> downstream;
final Publisher<?> sampler;
Reported by PMD.
Line: 52
private static final long serialVersionUID = -3517602651313910099L;
final Subscriber<? super T> downstream;
final Publisher<?> sampler;
final AtomicLong requested = new AtomicLong();
final AtomicReference<Subscription> other = new AtomicReference<>();
Reported by PMD.
Line: 53
private static final long serialVersionUID = -3517602651313910099L;
final Subscriber<? super T> downstream;
final Publisher<?> sampler;
final AtomicLong requested = new AtomicLong();
final AtomicReference<Subscription> other = new AtomicReference<>();
Reported by PMD.
Line: 55
final Subscriber<? super T> downstream;
final Publisher<?> sampler;
final AtomicLong requested = new AtomicLong();
final AtomicReference<Subscription> other = new AtomicReference<>();
Subscription upstream;
Reported by PMD.
Line: 57
final AtomicLong requested = new AtomicLong();
final AtomicReference<Subscription> other = new AtomicReference<>();
Subscription upstream;
SamplePublisherSubscriber(Subscriber<? super T> actual, Publisher<?> other) {
this.downstream = actual;
Reported by PMD.
Line: 59
final AtomicReference<Subscription> other = new AtomicReference<>();
Subscription upstream;
SamplePublisherSubscriber(Subscriber<? super T> actual, Publisher<?> other) {
this.downstream = actual;
this.sampler = other;
}
Reported by PMD.
Line: 127
T value = getAndSet(null);
if (value != null) {
long r = requested.get();
if (r != 0L) {
downstream.onNext(value);
BackpressureHelper.produced(requested, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/observers/LambdaObserver.java
15 issues
Line: 31
implements Observer<T>, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Reported by PMD.
Line: 31
implements Observer<T>, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Reported by PMD.
Line: 32
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Reported by PMD.
Line: 32
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Reported by PMD.
Line: 33
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
Reported by PMD.
Line: 33
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
Reported by PMD.
Line: 34
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
Reported by PMD.
Line: 34
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
Reported by PMD.
Line: 51
if (DisposableHelper.setOnce(this, d)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
Reported by PMD.
Line: 64
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java
15 issues
Line: 32
import io.reactivex.rxjava3.subscribers.SerializedSubscriber;
public final class FlowableThrottleFirstTimed<T> extends AbstractFlowableWithUpstream<T, T> {
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
public FlowableThrottleFirstTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
super(source);
Reported by PMD.
Line: 33
public final class FlowableThrottleFirstTimed<T> extends AbstractFlowableWithUpstream<T, T> {
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
public FlowableThrottleFirstTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
super(source);
this.timeout = timeout;
Reported by PMD.
Line: 34
public final class FlowableThrottleFirstTimed<T> extends AbstractFlowableWithUpstream<T, T> {
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
public FlowableThrottleFirstTimed(Flowable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
super(source);
this.timeout = timeout;
this.unit = unit;
Reported by PMD.
Line: 55
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = -9102637559663639004L;
final Subscriber<? super T> downstream;
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
Subscription upstream;
Reported by PMD.
Line: 56
private static final long serialVersionUID = -9102637559663639004L;
final Subscriber<? super T> downstream;
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
Subscription upstream;
Reported by PMD.
Line: 57
private static final long serialVersionUID = -9102637559663639004L;
final Subscriber<? super T> downstream;
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
Subscription upstream;
final SequentialDisposable timer = new SequentialDisposable();
Reported by PMD.
Line: 58
final Subscriber<? super T> downstream;
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
Subscription upstream;
final SequentialDisposable timer = new SequentialDisposable();
Reported by PMD.
Line: 60
final TimeUnit unit;
final Scheduler.Worker worker;
Subscription upstream;
final SequentialDisposable timer = new SequentialDisposable();
volatile boolean gate;
Reported by PMD.
Line: 62
Subscription upstream;
final SequentialDisposable timer = new SequentialDisposable();
volatile boolean gate;
boolean done;
Reported by PMD.
Line: 64
final SequentialDisposable timer = new SequentialDisposable();
volatile boolean gate;
boolean done;
DebounceTimedSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
this.downstream = actual;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRetryPredicate.java
15 issues
Line: 26
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter;
public final class FlowableRetryPredicate<T> extends AbstractFlowableWithUpstream<T, T> {
final Predicate<? super Throwable> predicate;
final long count;
public FlowableRetryPredicate(Flowable<T> source,
long count,
Predicate<? super Throwable> predicate) {
super(source);
Reported by PMD.
Line: 27
public final class FlowableRetryPredicate<T> extends AbstractFlowableWithUpstream<T, T> {
final Predicate<? super Throwable> predicate;
final long count;
public FlowableRetryPredicate(Flowable<T> source,
long count,
Predicate<? super Throwable> predicate) {
super(source);
this.predicate = predicate;
Reported by PMD.
Line: 49
private static final long serialVersionUID = -7098360935104053232L;
final Subscriber<? super T> downstream;
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final Predicate<? super Throwable> predicate;
long remaining;
Reported by PMD.
Line: 50
private static final long serialVersionUID = -7098360935104053232L;
final Subscriber<? super T> downstream;
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final Predicate<? super Throwable> predicate;
long remaining;
long produced;
Reported by PMD.
Line: 51
final Subscriber<? super T> downstream;
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final Predicate<? super Throwable> predicate;
long remaining;
long produced;
Reported by PMD.
Line: 52
final Subscriber<? super T> downstream;
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final Predicate<? super Throwable> predicate;
long remaining;
long produced;
RetrySubscriber(Subscriber<? super T> actual, long count,
Reported by PMD.
Line: 53
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final Predicate<? super Throwable> predicate;
long remaining;
long produced;
RetrySubscriber(Subscriber<? super T> actual, long count,
Predicate<? super Throwable> predicate, SubscriptionArbiter sa, Publisher<? extends T> source) {
Reported by PMD.
Line: 55
final Predicate<? super Throwable> predicate;
long remaining;
long produced;
RetrySubscriber(Subscriber<? super T> actual, long count,
Predicate<? super Throwable> predicate, SubscriptionArbiter sa, Publisher<? extends T> source) {
this.downstream = actual;
this.sa = sa;
Reported by PMD.
Line: 89
boolean b;
try {
b = predicate.test(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
if (!b) {
Reported by PMD.
Line: 119
}
long p = produced;
if (p != 0L) {
produced = 0L;
sa.produced(p);
}
source.subscribe(this);
Reported by PMD.