The following issues were found
src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelFlatMapStream.java
5 issues
Line: 32
*/
public final class ParallelFlatMapStream<T, R> extends ParallelFlowable<R> {
final ParallelFlowable<T> source;
final Function<? super T, ? extends Stream<? extends R>> mapper;
final int prefetch;
Reported by PMD.
Line: 34
final ParallelFlowable<T> source;
final Function<? super T, ? extends Stream<? extends R>> mapper;
final int prefetch;
public ParallelFlatMapStream(
ParallelFlowable<T> source,
Reported by PMD.
Line: 36
final Function<? super T, ? extends Stream<? extends R>> mapper;
final int prefetch;
public ParallelFlatMapStream(
ParallelFlowable<T> source,
Function<? super T, ? extends Stream<? extends R>> mapper,
int prefetch) {
Reported by PMD.
Line: 61
int n = subscribers.length;
@SuppressWarnings("unchecked")
final Subscriber<T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
parents[i] = FlowableFlatMapStream.subscribe(subscribers[i], mapper, prefetch);
}
Reported by PMD.
Line: 64
final Subscriber<T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
parents[i] = FlowableFlatMapStream.subscribe(subscribers[i], mapper, prefetch);
}
source.subscribe(parents);
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableMap.java
5 issues
Line: 24
import java.util.Objects;
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
Reported by PMD.
Line: 37
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
Reported by PMD.
Line: 59
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
Reported by PMD.
Line: 17
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.observers.BasicFuseableObserver;
import java.util.Objects;
Reported by PMD.
Line: 58
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableDefer.java
5 issues
Line: 25
public final class CompletableDefer extends Completable {
final Supplier<? extends CompletableSource> completableSupplier;
public CompletableDefer(Supplier<? extends CompletableSource> completableSupplier) {
this.completableSupplier = completableSupplier;
}
Reported by PMD.
Line: 37
try {
c = Objects.requireNonNull(completableSupplier.get(), "The completableSupplier returned a null CompletableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
Reported by PMD.
Line: 43
return;
}
c.subscribe(observer);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.completable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import java.util.Objects;
Reported by PMD.
Line: 36
CompletableSource c;
try {
c = Objects.requireNonNull(completableSupplier.get(), "The completableSupplier returned a null CompletableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableLift.java
5 issues
Line: 55
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
source.subscribe(liftedObserver);
}
}
Reported by PMD.
Line: 33
*/
public final class ObservableLift<R, T> extends AbstractObservableWithUpstream<T, R> {
/** The actual operator. */
final ObservableOperator<? extends R, ? super T> operator;
public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
super(source);
this.operator = operator;
}
Reported by PMD.
Line: 47
liftedObserver = Objects.requireNonNull(operator.apply(observer), "Operator " + operator + " returned a null Observer");
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Disposable already
RxJavaPlugins.onError(e);
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import java.util.Objects;
Reported by PMD.
Line: 44
public void subscribeActual(Observer<? super R> observer) {
Observer<? super T> liftedObserver;
try {
liftedObserver = Objects.requireNonNull(operator.apply(observer), "Operator " + operator + " returned a null Observer");
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/subscribers/FutureSubscriber.java
5 issues
Line: 39
public final class FutureSubscriber<T> extends CountDownLatch
implements FlowableSubscriber<T>, Future<T>, Subscription {
T value;
Throwable error;
final AtomicReference<Subscription> upstream;
public FutureSubscriber() {
Reported by PMD.
Line: 40
implements FlowableSubscriber<T>, Future<T>, Subscription {
T value;
Throwable error;
final AtomicReference<Subscription> upstream;
public FutureSubscriber() {
super(1);
Reported by PMD.
Line: 42
T value;
Throwable error;
final AtomicReference<Subscription> upstream;
public FutureSubscriber() {
super(1);
this.upstream = new AtomicReference<>();
}
Reported by PMD.
Line: 59
if (upstream.compareAndSet(a, SubscriptionHelper.CANCELLED)) {
if (a != null) {
a.cancel();
}
countDown();
return true;
}
}
Reported by PMD.
Line: 122
@Override
public void onNext(T t) {
if (value != null) {
upstream.get().cancel();
onError(new IndexOutOfBoundsException("More than one element received"));
return;
}
value = t;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromPublisher.java
5 issues
Line: 24
public final class CompletableFromPublisher<T> extends Completable {
final Publisher<T> flowable;
public CompletableFromPublisher(Publisher<T> flowable) {
this.flowable = flowable;
}
Reported by PMD.
Line: 37
static final class FromPublisherSubscriber<T> implements FlowableSubscriber<T>, Disposable {
final CompletableObserver downstream;
Subscription upstream;
FromPublisherSubscriber(CompletableObserver downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 39
final CompletableObserver downstream;
Subscription upstream;
FromPublisherSubscriber(CompletableObserver downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.completable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
Reported by PMD.
Line: 18
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
public final class CompletableFromPublisher<T> extends Completable {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCountSingle.java
5 issues
Line: 26
public final class FlowableCountSingle<T> extends Single<Long> implements FuseToFlowable<Long> {
final Flowable<T> source;
public FlowableCountSingle(Flowable<T> source) {
this.source = source;
}
Reported by PMD.
Line: 44
static final class CountSubscriber implements FlowableSubscriber<Object>, Disposable {
final SingleObserver<? super Long> downstream;
Subscription upstream;
long count;
Reported by PMD.
Line: 46
final SingleObserver<? super Long> downstream;
Subscription upstream;
long count;
CountSubscriber(SingleObserver<? super Long> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 48
Subscription upstream;
long count;
CountSubscriber(SingleObserver<? super Long> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 18
import org.reactivestreams.Subscription;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.fuseable.FuseToFlowable;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromSingle.java
5 issues
Line: 28
*/
public final class MaybeFromSingle<T> extends Maybe<T> implements HasUpstreamSingleSource<T> {
final SingleSource<T> source;
public MaybeFromSingle(SingleSource<T> source) {
this.source = source;
}
Reported by PMD.
Line: 28
*/
public final class MaybeFromSingle<T> extends Maybe<T> implements HasUpstreamSingleSource<T> {
final SingleSource<T> source;
public MaybeFromSingle(SingleSource<T> source) {
this.source = source;
}
Reported by PMD.
Line: 45
}
static final class FromSingleObserver<T> implements SingleObserver<T>, Disposable {
final MaybeObserver<? super T> downstream;
Disposable upstream;
FromSingleObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 47
static final class FromSingleObserver<T> implements SingleObserver<T>, Disposable {
final MaybeObserver<? super T> downstream;
Disposable upstream;
FromSingleObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.fuseable.HasUpstreamSingleSource;
/**
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleLift.java
5 issues
Line: 24
public final class SingleLift<T, R> extends Single<R> {
final SingleSource<T> source;
final SingleOperator<? extends R, ? super T> onLift;
public SingleLift(SingleSource<T> source, SingleOperator<? extends R, ? super T> onLift) {
this.source = source;
Reported by PMD.
Line: 26
final SingleSource<T> source;
final SingleOperator<? extends R, ? super T> onLift;
public SingleLift(SingleSource<T> source, SingleOperator<? extends R, ? super T> onLift) {
this.source = source;
this.onLift = onLift;
}
Reported by PMD.
Line: 39
try {
sr = Objects.requireNonNull(onLift.apply(observer), "The onLift returned a null SingleObserver");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.single;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import java.util.Objects;
Reported by PMD.
Line: 38
SingleObserver<? super T> sr;
try {
sr = Objects.requireNonNull(onLift.apply(observer), "The onLift returned a null SingleObserver");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableUnsubscribeOn.java
5 issues
Line: 24
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableUnsubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableUnsubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
Reported by PMD.
Line: 39
private static final long serialVersionUID = 1015244841293359600L;
final Observer<? super T> downstream;
final Scheduler scheduler;
Disposable upstream;
UnsubscribeObserver(Observer<? super T> actual, Scheduler scheduler) {
Reported by PMD.
Line: 40
private static final long serialVersionUID = 1015244841293359600L;
final Observer<? super T> downstream;
final Scheduler scheduler;
Disposable upstream;
UnsubscribeObserver(Observer<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
Reported by PMD.
Line: 42
final Observer<? super T> downstream;
final Scheduler scheduler;
Disposable upstream;
UnsubscribeObserver(Observer<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicBoolean;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableUnsubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
Reported by PMD.