The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromPublisher.java
5 issues
Line: 24
public final class ObservableFromPublisher<T> extends Observable<T> {
final Publisher<? extends T> source;
public ObservableFromPublisher(Publisher<? extends T> publisher) {
this.source = publisher;
}
Reported by PMD.
Line: 38
static final class PublisherSubscriber<T>
implements FlowableSubscriber<T>, Disposable {
final Observer<? super T> downstream;
Subscription upstream;
PublisherSubscriber(Observer<? super T> o) {
this.downstream = o;
}
Reported by PMD.
Line: 39
implements FlowableSubscriber<T>, Disposable {
final Observer<? super T> downstream;
Subscription upstream;
PublisherSubscriber(Observer<? super T> o) {
this.downstream = o;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
Reported by PMD.
Line: 18
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
public final class ObservableFromPublisher<T> extends Observable<T> {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSubscribeOn.java
5 issues
Line: 23
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
Reported by PMD.
Line: 42
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 44
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<>();
}
Reported by PMD.
Line: 88
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDefer.java
5 issues
Line: 26
import java.util.Objects;
public final class FlowableDefer<T> extends Flowable<T> {
final Supplier<? extends Publisher<? extends T>> supplier;
public FlowableDefer(Supplier<? extends Publisher<? extends T>> supplier) {
this.supplier = supplier;
}
@Override
Reported by PMD.
Line: 36
Publisher<? extends T> pub;
try {
pub = Objects.requireNonNull(supplier.get(), "The publisher supplied is null");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptySubscription.error(t, s);
return;
}
Reported by PMD.
Line: 42
return;
}
pub.subscribe(s);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.flowable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.subscriptions.EmptySubscription;
Reported by PMD.
Line: 35
public void subscribeActual(Subscriber<? super T> s) {
Publisher<? extends T> pub;
try {
pub = Objects.requireNonNull(supplier.get(), "The publisher supplied is null");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptySubscription.error(t, s);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/subscriptions/SubscriptionHelper.java
5 issues
Line: 116
}
if (field.compareAndSet(current, s)) {
if (current != null) {
current.cancel();
}
return true;
}
}
}
Reported by PMD.
Line: 180
if (current != CANCELLED) {
current = field.getAndSet(CANCELLED);
if (current != CANCELLED) {
if (current != null) {
current.cancel();
}
return true;
}
}
Reported by PMD.
Line: 201
Subscription s) {
if (SubscriptionHelper.setOnce(field, s)) {
long r = requested.getAndSet(0L);
if (r != 0L) {
s.request(r);
}
return true;
}
return false;
Reported by PMD.
Line: 227
s = field.get();
if (s != null) {
long r = requested.getAndSet(0L);
if (r != 0L) {
s.request(r);
}
}
}
}
Reported by PMD.
Line: 227
s = field.get();
if (s != null) {
long r = requested.getAndSet(0L);
if (r != 0L) {
s.request(r);
}
}
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableLift.java
5 issues
Line: 46
Subscriber<? super T> st = operator.apply(s);
if (st == null) {
throw new NullPointerException("Operator " + operator + " returned a null Subscriber");
}
source.subscribe(st);
} catch (NullPointerException e) { // NOPMD
throw e;
Reported by PMD.
Line: 60
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
}
Reported by PMD.
Line: 33
*/
public final class FlowableLift<R, T> extends AbstractFlowableWithUpstream<T, R> {
/** The actual operator. */
final FlowableOperator<? extends R, ? super T> operator;
public FlowableLift(Flowable<T> source, FlowableOperator<? extends R, ? super T> operator) {
super(source);
this.operator = operator;
}
Reported by PMD.
Line: 52
source.subscribe(st);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Subscription has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
Reported by PMD.
Line: 18
import org.reactivestreams.Subscriber;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
* Allows lifting operators into a chain of Publishers.
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCountSingle.java
5 issues
Line: 23
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableCountSingle<T> extends Single<Long> implements FuseToObservable<Long> {
final ObservableSource<T> source;
public ObservableCountSingle(ObservableSource<T> source) {
this.source = source;
}
@Override
Reported by PMD.
Line: 39
}
static final class CountObserver implements Observer<Object>, Disposable {
final SingleObserver<? super Long> downstream;
Disposable upstream;
long count;
Reported by PMD.
Line: 41
static final class CountObserver implements Observer<Object>, Disposable {
final SingleObserver<? super Long> downstream;
Disposable upstream;
long count;
CountObserver(SingleObserver<? super Long> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 43
Disposable upstream;
long count;
CountObserver(SingleObserver<? super Long> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.fuseable.FuseToObservable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservable.java
5 issues
Line: 23
public final class FlowableFromObservable<T> extends Flowable<T> {
private final ObservableSource<T> upstream;
public FlowableFromObservable(ObservableSource<T> upstream) {
this.upstream = upstream;
}
Reported by PMD.
Line: 36
static final class SubscriberObserver<T> implements Observer<T>, Subscription {
final Subscriber<? super T> downstream;
Disposable upstream;
SubscriberObserver(Subscriber<? super T> s) {
this.downstream = s;
Reported by PMD.
Line: 38
final Subscriber<? super T> downstream;
Disposable upstream;
SubscriberObserver(Subscriber<? super T> s) {
this.downstream = s;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.flowable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
public final class FlowableFromObservable<T> extends Flowable<T> {
Reported by PMD.
Line: 18
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
public final class FlowableFromObservable<T> extends Flowable<T> {
private final ObservableSource<T> upstream;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromAction.java
5 issues
Line: 24
public final class CompletableFromAction extends Completable {
final Action run;
public CompletableFromAction(Action run) {
this.run = run;
}
Reported by PMD.
Line: 34
protected void subscribeActual(CompletableObserver observer) {
Disposable d = Disposable.empty();
observer.onSubscribe(d);
if (!d.isDisposed()) {
try {
run.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
if (!d.isDisposed()) {
Reported by PMD.
Line: 37
if (!d.isDisposed()) {
try {
run.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
if (!d.isDisposed()) {
observer.onError(e);
} else {
RxJavaPlugins.onError(e);
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.completable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 17
package io.reactivex.rxjava3.internal.operators.completable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class CompletableFromAction extends Completable {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDefer.java
5 issues
Line: 30
*/
public final class MaybeDefer<T> extends Maybe<T> {
final Supplier<? extends MaybeSource<? extends T>> maybeSupplier;
public MaybeDefer(Supplier<? extends MaybeSource<? extends T>> maybeSupplier) {
this.maybeSupplier = maybeSupplier;
}
Reported by PMD.
Line: 42
try {
source = Objects.requireNonNull(maybeSupplier.get(), "The maybeSupplier returned a null MaybeSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
Line: 48
return;
}
source.subscribe(observer);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import java.util.Objects;
Reported by PMD.
Line: 41
MaybeSource<? extends T> source;
try {
source = Objects.requireNonNull(maybeSupplier.get(), "The maybeSupplier returned a null MaybeSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeCount.java
5 issues
Line: 28
*/
public final class MaybeCount<T> extends Single<Long> implements HasUpstreamMaybeSource<T> {
final MaybeSource<T> source;
public MaybeCount(MaybeSource<T> source) {
this.source = source;
}
Reported by PMD.
Line: 28
*/
public final class MaybeCount<T> extends Single<Long> implements HasUpstreamMaybeSource<T> {
final MaybeSource<T> source;
public MaybeCount(MaybeSource<T> source) {
this.source = source;
}
Reported by PMD.
Line: 45
}
static final class CountMaybeObserver implements MaybeObserver<Object>, Disposable {
final SingleObserver<? super Long> downstream;
Disposable upstream;
CountMaybeObserver(SingleObserver<? super Long> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 47
static final class CountMaybeObserver implements MaybeObserver<Object>, Disposable {
final SingleObserver<? super Long> downstream;
Disposable upstream;
CountMaybeObserver(SingleObserver<? super Long> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.fuseable.HasUpstreamMaybeSource;
/**
Reported by PMD.