The following issues were found
src/main/java/io/reactivex/rxjava3/internal/subscribers/BlockingBaseSubscriber.java
7 issues
Line: 27
public abstract class BlockingBaseSubscriber<T> extends CountDownLatch
implements FlowableSubscriber<T> {
T value;
Throwable error;
Subscription upstream;
volatile boolean cancelled;
Reported by PMD.
Line: 28
implements FlowableSubscriber<T> {
T value;
Throwable error;
Subscription upstream;
volatile boolean cancelled;
Reported by PMD.
Line: 30
T value;
Throwable error;
Subscription upstream;
volatile boolean cancelled;
public BlockingBaseSubscriber() {
super(1);
Reported by PMD.
Line: 32
Subscription upstream;
volatile boolean cancelled;
public BlockingBaseSubscriber() {
super(1);
}
Reported by PMD.
Line: 44
this.upstream = s;
if (!cancelled) {
s.request(Long.MAX_VALUE);
if (cancelled) {
this.upstream = SubscriptionHelper.CANCELLED;
s.cancel();
}
}
}
Reported by PMD.
Line: 71
Subscription s = this.upstream;
this.upstream = SubscriptionHelper.CANCELLED;
if (s != null) {
s.cancel();
}
throw ExceptionHelper.wrapOrThrow(ex);
}
}
Reported by PMD.
Line: 22
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.internal.util.*;
public abstract class BlockingBaseSubscriber<T> extends CountDownLatch
implements FlowableSubscriber<T> {
T value;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoAfterNext.java
7 issues
Line: 32
*/
public final class FlowableDoAfterNext<T> extends AbstractFlowableWithUpstream<T, T> {
final Consumer<? super T> onAfterNext;
public FlowableDoAfterNext(Flowable<T> source, Consumer<? super T> onAfterNext) {
super(source);
this.onAfterNext = onAfterNext;
}
Reported by PMD.
Line: 50
static final class DoAfterSubscriber<T> extends BasicFuseableSubscriber<T, T> {
final Consumer<? super T> onAfterNext;
DoAfterSubscriber(Subscriber<? super T> actual, Consumer<? super T> onAfterNext) {
super(actual);
this.onAfterNext = onAfterNext;
}
Reported by PMD.
Line: 67
if (sourceMode == NONE) {
try {
onAfterNext.accept(t);
} catch (Throwable ex) {
fail(ex);
}
}
}
Reported by PMD.
Line: 91
static final class DoAfterConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
final Consumer<? super T> onAfterNext;
DoAfterConditionalSubscriber(ConditionalSubscriber<? super T> actual, Consumer<? super T> onAfterNext) {
super(actual);
this.onAfterNext = onAfterNext;
}
Reported by PMD.
Line: 105
if (sourceMode == NONE) {
try {
onAfterNext.accept(t);
} catch (Throwable ex) {
fail(ex);
}
}
}
Reported by PMD.
Line: 116
boolean b = downstream.tryOnNext(t);
try {
onAfterNext.accept(t);
} catch (Throwable ex) {
fail(ex);
}
return b;
}
Reported by PMD.
Line: 22
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber;
import io.reactivex.rxjava3.internal.subscribers.*;
/**
* Calls a consumer after pushing the current item to the downstream.
* <p>History: 2.0.1 - experimental
* @param <T> the value type
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnError.java
7 issues
Line: 56
}
@Override
public void onError(Throwable e) {
try {
onError.accept(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
Reported by PMD.
Line: 23
public final class SingleDoOnError<T> extends Single<T> {
final SingleSource<T> source;
final Consumer<? super Throwable> onError;
public SingleDoOnError(SingleSource<T> source, Consumer<? super Throwable> onError) {
this.source = source;
Reported by PMD.
Line: 25
final SingleSource<T> source;
final Consumer<? super Throwable> onError;
public SingleDoOnError(SingleSource<T> source, Consumer<? super Throwable> onError) {
this.source = source;
this.onError = onError;
}
Reported by PMD.
Line: 39
}
final class DoOnError implements SingleObserver<T> {
private final SingleObserver<? super T> downstream;
DoOnError(SingleObserver<? super T> observer) {
this.downstream = observer;
}
Reported by PMD.
Line: 59
public void onError(Throwable e) {
try {
onError.accept(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
}
downstream.onError(e);
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.single;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Consumer;
public final class SingleDoOnError<T> extends Single<T> {
Reported by PMD.
Line: 18
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Consumer;
public final class SingleDoOnError<T> extends Single<T> {
final SingleSource<T> source;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoFinally.java
7 issues
Line: 33
*/
public final class SingleDoFinally<T> extends Single<T> {
final SingleSource<T> source;
final Action onFinally;
public SingleDoFinally(SingleSource<T> source, Action onFinally) {
this.source = source;
Reported by PMD.
Line: 35
final SingleSource<T> source;
final Action onFinally;
public SingleDoFinally(SingleSource<T> source, Action onFinally) {
this.source = source;
this.onFinally = onFinally;
}
Reported by PMD.
Line: 51
private static final long serialVersionUID = 4109457741734051389L;
final SingleObserver<? super T> downstream;
final Action onFinally;
Disposable upstream;
Reported by PMD.
Line: 53
final SingleObserver<? super T> downstream;
final Action onFinally;
Disposable upstream;
DoFinallyObserver(SingleObserver<? super T> actual, Action onFinally) {
this.downstream = actual;
Reported by PMD.
Line: 55
final Action onFinally;
Disposable upstream;
DoFinallyObserver(SingleObserver<? super T> actual, Action onFinally) {
this.downstream = actual;
this.onFinally = onFinally;
}
Reported by PMD.
Line: 98
if (compareAndSet(0, 1)) {
try {
onFinally.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoAfterSuccess.java
7 issues
Line: 31
*/
public final class SingleDoAfterSuccess<T> extends Single<T> {
final SingleSource<T> source;
final Consumer<? super T> onAfterSuccess;
public SingleDoAfterSuccess(SingleSource<T> source, Consumer<? super T> onAfterSuccess) {
this.source = source;
Reported by PMD.
Line: 33
final SingleSource<T> source;
final Consumer<? super T> onAfterSuccess;
public SingleDoAfterSuccess(SingleSource<T> source, Consumer<? super T> onAfterSuccess) {
this.source = source;
this.onAfterSuccess = onAfterSuccess;
}
Reported by PMD.
Line: 47
static final class DoAfterObserver<T> implements SingleObserver<T>, Disposable {
final SingleObserver<? super T> downstream;
final Consumer<? super T> onAfterSuccess;
Disposable upstream;
Reported by PMD.
Line: 49
final SingleObserver<? super T> downstream;
final Consumer<? super T> onAfterSuccess;
Disposable upstream;
DoAfterObserver(SingleObserver<? super T> actual, Consumer<? super T> onAfterSuccess) {
this.downstream = actual;
Reported by PMD.
Line: 51
final Consumer<? super T> onAfterSuccess;
Disposable upstream;
DoAfterObserver(SingleObserver<? super T> actual, Consumer<? super T> onAfterSuccess) {
this.downstream = actual;
this.onAfterSuccess = onAfterSuccess;
}
Reported by PMD.
Line: 73
try {
onAfterSuccess.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
// remember, onSuccess is a terminal event and we can't call onError
RxJavaPlugins.onError(ex);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.single;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDetach.java
7 issues
Line: 28
*/
public final class SingleDetach<T> extends Single<T> {
final SingleSource<T> source;
public SingleDetach(SingleSource<T> source) {
this.source = source;
}
Reported by PMD.
Line: 41
static final class DetachSingleObserver<T> implements SingleObserver<T>, Disposable {
SingleObserver<? super T> downstream;
Disposable upstream;
DetachSingleObserver(SingleObserver<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 43
SingleObserver<? super T> downstream;
Disposable upstream;
DetachSingleObserver(SingleObserver<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 51
@Override
public void dispose() {
downstream = null;
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
}
@Override
Reported by PMD.
Line: 75
upstream = DisposableHelper.DISPOSED;
SingleObserver<? super T> a = downstream;
if (a != null) {
downstream = null;
a.onSuccess(value);
}
}
@Override
Reported by PMD.
Line: 85
upstream = DisposableHelper.DISPOSED;
SingleObserver<? super T> a = downstream;
if (a != null) {
downstream = null;
a.onError(e);
}
}
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.single;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
/**
* Breaks the references between the upstream and downstream when the Maybe terminates.
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/observers/SafeMaybeObserver.java
7 issues
Line: 34
*/
public final class SafeMaybeObserver<T> implements MaybeObserver<T> {
final MaybeObserver<? super T> downstream;
boolean onSubscribeFailed;
public SafeMaybeObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 36
final MaybeObserver<? super T> downstream;
boolean onSubscribeFailed;
public SafeMaybeObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 46
public void onSubscribe(@NonNull Disposable d) {
try {
downstream.onSubscribe(d);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onSubscribeFailed = true;
d.dispose();
RxJavaPlugins.onError(ex);
}
Reported by PMD.
Line: 59
if (!onSubscribeFailed) {
try {
downstream.onSuccess(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
}
Reported by PMD.
Line: 73
} else {
try {
downstream.onError(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(new CompositeException(e, ex));
}
}
}
Reported by PMD.
Line: 85
if (!onSubscribeFailed) {
try {
downstream.onComplete();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
}
Reported by PMD.
Line: 19
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.MaybeObserver;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
* Wraps another {@link MaybeObserver} and catches exceptions thrown by its
* {@code onSubscribe}, {@code onSuccess}, {@code onError} or
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDelayWithObservable.java
7 issues
Line: 26
public final class SingleDelayWithObservable<T, U> extends Single<T> {
final SingleSource<T> source;
final ObservableSource<U> other;
public SingleDelayWithObservable(SingleSource<T> source, ObservableSource<U> other) {
this.source = source;
Reported by PMD.
Line: 28
final SingleSource<T> source;
final ObservableSource<U> other;
public SingleDelayWithObservable(SingleSource<T> source, ObservableSource<U> other) {
this.source = source;
this.other = other;
}
Reported by PMD.
Line: 46
private static final long serialVersionUID = -8565274649390031272L;
final SingleObserver<? super T> downstream;
final SingleSource<T> source;
boolean done;
Reported by PMD.
Line: 48
final SingleObserver<? super T> downstream;
final SingleSource<T> source;
boolean done;
OtherSubscriber(SingleObserver<? super T> actual, SingleSource<T> source) {
this.downstream = actual;
Reported by PMD.
Line: 50
final SingleSource<T> source;
boolean done;
OtherSubscriber(SingleObserver<? super T> actual, SingleSource<T> source) {
this.downstream = actual;
this.source = source;
}
Reported by PMD.
Line: 67
@Override
public void onNext(U value) {
get().dispose();
onComplete();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.observers.ResumeSingleObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSubscribeOn.java
7 issues
Line: 28
*/
public final class MaybeSubscribeOn<T> extends AbstractMaybeWithUpstream<T, T> {
final Scheduler scheduler;
public MaybeSubscribeOn(MaybeSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
Reported by PMD.
Line: 44
}
static final class SubscribeTask<T> implements Runnable {
final MaybeObserver<? super T> observer;
final MaybeSource<T> source;
SubscribeTask(MaybeObserver<? super T> observer, MaybeSource<T> source) {
this.observer = observer;
this.source = source;
Reported by PMD.
Line: 45
static final class SubscribeTask<T> implements Runnable {
final MaybeObserver<? super T> observer;
final MaybeSource<T> source;
SubscribeTask(MaybeObserver<? super T> observer, MaybeSource<T> source) {
this.observer = observer;
this.source = source;
}
Reported by PMD.
Line: 62
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {
final SequentialDisposable task;
private static final long serialVersionUID = 8571289934935992137L;
final MaybeObserver<? super T> downstream;
Reported by PMD.
Line: 66
private static final long serialVersionUID = 8571289934935992137L;
final MaybeObserver<? super T> downstream;
SubscribeOnMaybeObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
this.task = new SequentialDisposable();
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;
/**
* Subscribes to the upstream MaybeSource on the specified scheduler.
*
Reported by PMD.
Line: 20
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;
/**
* Subscribes to the upstream MaybeSource on the specified scheduler.
*
* @param <T> the value type delivered
*/
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSwitchIfEmpty.java
7 issues
Line: 29
*/
public final class MaybeSwitchIfEmpty<T> extends AbstractMaybeWithUpstream<T, T> {
final MaybeSource<? extends T> other;
public MaybeSwitchIfEmpty(MaybeSource<T> source, MaybeSource<? extends T> other) {
super(source);
this.other = other;
}
Reported by PMD.
Line: 47
private static final long serialVersionUID = -2223459372976438024L;
final MaybeObserver<? super T> downstream;
final MaybeSource<? extends T> other;
SwitchIfEmptyMaybeObserver(MaybeObserver<? super T> actual, MaybeSource<? extends T> other) {
this.downstream = actual;
Reported by PMD.
Line: 49
final MaybeObserver<? super T> downstream;
final MaybeSource<? extends T> other;
SwitchIfEmptyMaybeObserver(MaybeObserver<? super T> actual, MaybeSource<? extends T> other) {
this.downstream = actual;
this.other = other;
}
Reported by PMD.
Line: 87
public void onComplete() {
Disposable d = get();
if (d != DisposableHelper.DISPOSED) {
if (compareAndSet(d, null)) {
other.subscribe(new OtherMaybeObserver<T>(downstream, this));
}
}
}
Reported by PMD.
Line: 95
static final class OtherMaybeObserver<T> implements MaybeObserver<T> {
final MaybeObserver<? super T> downstream;
final AtomicReference<Disposable> parent;
OtherMaybeObserver(MaybeObserver<? super T> actual, AtomicReference<Disposable> parent) {
this.downstream = actual;
this.parent = parent;
Reported by PMD.
Line: 97
final MaybeObserver<? super T> downstream;
final AtomicReference<Disposable> parent;
OtherMaybeObserver(MaybeObserver<? super T> actual, AtomicReference<Disposable> parent) {
this.downstream = actual;
this.parent = parent;
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
/**
* Subscribes to the other source if the main source is empty.
Reported by PMD.