The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSingleMaybe.java
7 issues
Line: 23
public final class ObservableSingleMaybe<T> extends Maybe<T> {
final ObservableSource<T> source;
public ObservableSingleMaybe(ObservableSource<T> source) {
this.source = source;
}
Reported by PMD.
Line: 35
}
static final class SingleElementObserver<T> implements Observer<T>, Disposable {
final MaybeObserver<? super T> downstream;
Disposable upstream;
T value;
Reported by PMD.
Line: 37
static final class SingleElementObserver<T> implements Observer<T>, Disposable {
final MaybeObserver<? super T> downstream;
Disposable upstream;
T value;
boolean done;
Reported by PMD.
Line: 39
Disposable upstream;
T value;
boolean done;
SingleElementObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 41
T value;
boolean done;
SingleElementObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 96
}
done = true;
T v = value;
value = null;
if (v == null) {
downstream.onComplete();
} else {
downstream.onSuccess(v);
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableSingleMaybe<T> extends Maybe<T> {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReduceWithSingle.java
7 issues
Line: 35
*/
public final class FlowableReduceWithSingle<T, R> extends Single<R> {
final Publisher<T> source;
final Supplier<R> seedSupplier;
final BiFunction<R, ? super T, R> reducer;
Reported by PMD.
Line: 37
final Publisher<T> source;
final Supplier<R> seedSupplier;
final BiFunction<R, ? super T, R> reducer;
public FlowableReduceWithSingle(Publisher<T> source, Supplier<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
this.source = source;
Reported by PMD.
Line: 39
final Supplier<R> seedSupplier;
final BiFunction<R, ? super T, R> reducer;
public FlowableReduceWithSingle(Publisher<T> source, Supplier<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
this.source = source;
this.seedSupplier = seedSupplier;
this.reducer = reducer;
Reported by PMD.
Line: 53
try {
seed = Objects.requireNonNull(seedSupplier.get(), "The seedSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
source.subscribe(new ReduceSeedObserver<>(observer, reducer, seed));
Reported by PMD.
Line: 18
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.internal.operators.flowable.FlowableReduceSeedSingle.ReduceSeedObserver;
Reported by PMD.
Line: 20
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.internal.operators.flowable.FlowableReduceSeedSingle.ReduceSeedObserver;
import java.util.Objects;
Reported by PMD.
Line: 52
R seed;
try {
seed = Objects.requireNonNull(seedSupplier.get(), "The seedSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDelayWithCompletable.java
7 issues
Line: 24
public final class MaybeDelayWithCompletable<T> extends Maybe<T> {
final MaybeSource<T> source;
final CompletableSource other;
public MaybeDelayWithCompletable(MaybeSource<T> source, CompletableSource other) {
this.source = source;
Reported by PMD.
Line: 26
final MaybeSource<T> source;
final CompletableSource other;
public MaybeDelayWithCompletable(MaybeSource<T> source, CompletableSource other) {
this.source = source;
this.other = other;
}
Reported by PMD.
Line: 43
implements CompletableObserver, Disposable {
private static final long serialVersionUID = 703409937383992161L;
final MaybeObserver<? super T> downstream;
final MaybeSource<T> source;
OtherObserver(MaybeObserver<? super T> actual, MaybeSource<T> source) {
this.downstream = actual;
Reported by PMD.
Line: 45
final MaybeObserver<? super T> downstream;
final MaybeSource<T> source;
OtherObserver(MaybeObserver<? super T> actual, MaybeSource<T> source) {
this.downstream = actual;
this.source = source;
}
Reported by PMD.
Line: 83
static final class DelayWithMainObserver<T> implements MaybeObserver<T> {
final AtomicReference<Disposable> parent;
final MaybeObserver<? super T> downstream;
DelayWithMainObserver(AtomicReference<Disposable> parent, MaybeObserver<? super T> downstream) {
this.parent = parent;
Reported by PMD.
Line: 85
final AtomicReference<Disposable> parent;
final MaybeObserver<? super T> downstream;
DelayWithMainObserver(AtomicReference<Disposable> parent, MaybeObserver<? super T> downstream) {
this.parent = parent;
this.downstream = downstream;
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class MaybeDelayWithCompletable<T> extends Maybe<T> {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeDetach.java
7 issues
Line: 38
static final class DetachMaybeObserver<T> implements MaybeObserver<T>, Disposable {
MaybeObserver<? super T> downstream;
Disposable upstream;
DetachMaybeObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 40
MaybeObserver<? super T> downstream;
Disposable upstream;
DetachMaybeObserver(MaybeObserver<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 48
@Override
public void dispose() {
downstream = null;
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
}
@Override
Reported by PMD.
Line: 72
upstream = DisposableHelper.DISPOSED;
MaybeObserver<? super T> a = downstream;
if (a != null) {
downstream = null;
a.onSuccess(value);
}
}
@Override
Reported by PMD.
Line: 82
upstream = DisposableHelper.DISPOSED;
MaybeObserver<? super T> a = downstream;
if (a != null) {
downstream = null;
a.onError(e);
}
}
@Override
Reported by PMD.
Line: 92
upstream = DisposableHelper.DISPOSED;
MaybeObserver<? super T> a = downstream;
if (a != null) {
downstream = null;
a.onComplete();
}
}
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
/**
* Breaks the references between the upstream and downstream when the Maybe terminates.
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReduce.java
7 issues
Line: 34
*/
public final class FlowableReduce<T> extends AbstractFlowableWithUpstream<T, T> {
final BiFunction<T, T, T> reducer;
public FlowableReduce(Flowable<T> source, BiFunction<T, T, T> reducer) {
super(source);
this.reducer = reducer;
}
Reported by PMD.
Line: 50
private static final long serialVersionUID = -4663883003264602070L;
final BiFunction<T, T, T> reducer;
Subscription upstream;
ReduceSubscriber(Subscriber<? super T> actual, BiFunction<T, T, T> reducer) {
super(actual);
Reported by PMD.
Line: 52
final BiFunction<T, T, T> reducer;
Subscription upstream;
ReduceSubscriber(Subscriber<? super T> actual, BiFunction<T, T, T> reducer) {
super(actual);
this.reducer = reducer;
}
Reported by PMD.
Line: 82
} else {
try {
value = Objects.requireNonNull(reducer.apply(v, t), "The reducer returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.cancel();
onError(ex);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.flowable;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.internal.subscriptions.*;
Reported by PMD.
Line: 18
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 21
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.internal.subscriptions.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import java.util.Objects;
/**
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableTakeUntilCompletable.java
7 issues
Line: 30
*/
public final class CompletableTakeUntilCompletable extends Completable {
final Completable source;
final CompletableSource other;
public CompletableTakeUntilCompletable(Completable source,
CompletableSource other) {
Reported by PMD.
Line: 32
final Completable source;
final CompletableSource other;
public CompletableTakeUntilCompletable(Completable source,
CompletableSource other) {
this.source = source;
this.other = other;
Reported by PMD.
Line: 54
private static final long serialVersionUID = 3533011714830024923L;
final CompletableObserver downstream;
final OtherObserver other;
final AtomicBoolean once;
Reported by PMD.
Line: 56
final CompletableObserver downstream;
final OtherObserver other;
final AtomicBoolean once;
TakeUntilMainObserver(CompletableObserver downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 58
final OtherObserver other;
final AtomicBoolean once;
TakeUntilMainObserver(CompletableObserver downstream) {
this.downstream = downstream;
this.other = new OtherObserver(this);
this.once = new AtomicBoolean();
Reported by PMD.
Line: 122
implements CompletableObserver {
private static final long serialVersionUID = 5176264485428790318L;
final TakeUntilMainObserver parent;
OtherObserver(TakeUntilMainObserver parent) {
this.parent = parent;
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/OperatorFlatMapPerf.java
7 issues
Line: 33
public static class Input extends InputWithIncrementingInteger {
@Param({ "1", "1000", "1000000" })
public int size;
@Override
public int getSize() {
return size;
}
Reported by PMD.
Line: 58
input.flowable.flatMap(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer i) {
return Flowable.just(i).subscribeOn(Schedulers.computation());
}
}).subscribe(latchedObserver);
if (input.size == 1) {
while (latchedObserver.latch.getCount() != 0) { }
} else {
Reported by PMD.
Line: 61
return Flowable.just(i).subscribeOn(Schedulers.computation());
}
}).subscribe(latchedObserver);
if (input.size == 1) {
while (latchedObserver.latch.getCount() != 0) { }
} else {
latchedObserver.latch.await();
}
}
Reported by PMD.
Line: 62
}
}).subscribe(latchedObserver);
if (input.size == 1) {
while (latchedObserver.latch.getCount() != 0) { }
} else {
latchedObserver.latch.await();
}
}
Reported by PMD.
Line: 62
}
}).subscribe(latchedObserver);
if (input.size == 1) {
while (latchedObserver.latch.getCount() != 0) { }
} else {
latchedObserver.latch.await();
}
}
Reported by PMD.
Line: 64
if (input.size == 1) {
while (latchedObserver.latch.getCount() != 0) { }
} else {
latchedObserver.latch.await();
}
}
@Benchmark
public void flatMapTwoNestedSync(final Input input) {
Reported by PMD.
Line: 18
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatMapCompletable.java
7 issues
Line: 31
*/
public final class MaybeFlatMapCompletable<T> extends Completable {
final MaybeSource<T> source;
final Function<? super T, ? extends CompletableSource> mapper;
public MaybeFlatMapCompletable(MaybeSource<T> source, Function<? super T, ? extends CompletableSource> mapper) {
this.source = source;
Reported by PMD.
Line: 33
final MaybeSource<T> source;
final Function<? super T, ? extends CompletableSource> mapper;
public MaybeFlatMapCompletable(MaybeSource<T> source, Function<? super T, ? extends CompletableSource> mapper) {
this.source = source;
this.mapper = mapper;
}
Reported by PMD.
Line: 53
private static final long serialVersionUID = -2177128922851101253L;
final CompletableObserver downstream;
final Function<? super T, ? extends CompletableSource> mapper;
FlatMapCompletableObserver(CompletableObserver actual,
Function<? super T, ? extends CompletableSource> mapper) {
Reported by PMD.
Line: 55
final CompletableObserver downstream;
final Function<? super T, ? extends CompletableSource> mapper;
FlatMapCompletableObserver(CompletableObserver actual,
Function<? super T, ? extends CompletableSource> mapper) {
this.downstream = actual;
this.mapper = mapper;
Reported by PMD.
Line: 84
try {
cs = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onError(ex);
return;
}
Reported by PMD.
Line: 19
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
Reported by PMD.
Line: 83
CompletableSource cs;
try {
cs = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onError(ex);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAndThenCompletable.java
7 issues
Line: 24
public final class CompletableAndThenCompletable extends Completable {
final CompletableSource source;
final CompletableSource next;
public CompletableAndThenCompletable(CompletableSource source, CompletableSource next) {
this.source = source;
Reported by PMD.
Line: 26
final CompletableSource source;
final CompletableSource next;
public CompletableAndThenCompletable(CompletableSource source, CompletableSource next) {
this.source = source;
this.next = next;
}
Reported by PMD.
Line: 44
private static final long serialVersionUID = -4101678820158072998L;
final CompletableObserver actualObserver;
final CompletableSource next;
SourceObserver(CompletableObserver actualObserver, CompletableSource next) {
this.actualObserver = actualObserver;
Reported by PMD.
Line: 46
final CompletableObserver actualObserver;
final CompletableSource next;
SourceObserver(CompletableObserver actualObserver, CompletableSource next) {
this.actualObserver = actualObserver;
this.next = next;
}
Reported by PMD.
Line: 83
static final class NextObserver implements CompletableObserver {
final AtomicReference<Disposable> parent;
final CompletableObserver downstream;
NextObserver(AtomicReference<Disposable> parent, CompletableObserver downstream) {
this.parent = parent;
Reported by PMD.
Line: 85
final AtomicReference<Disposable> parent;
final CompletableObserver downstream;
NextObserver(AtomicReference<Disposable> parent, CompletableObserver downstream) {
this.parent = parent;
this.downstream = downstream;
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class CompletableAndThenCompletable extends Completable {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFlatten.java
7 issues
Line: 33
*/
public final class MaybeFlatten<T, R> extends AbstractMaybeWithUpstream<T, R> {
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
public MaybeFlatten(MaybeSource<T> source, Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
super(source);
this.mapper = mapper;
}
Reported by PMD.
Line: 51
private static final long serialVersionUID = 4375739915521278546L;
final MaybeObserver<? super R> downstream;
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
Disposable upstream;
Reported by PMD.
Line: 53
final MaybeObserver<? super R> downstream;
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
Disposable upstream;
FlatMapMaybeObserver(MaybeObserver<? super R> actual,
Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
Reported by PMD.
Line: 55
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
Disposable upstream;
FlatMapMaybeObserver(MaybeObserver<? super R> actual,
Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
this.downstream = actual;
this.mapper = mapper;
Reported by PMD.
Line: 89
try {
source = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null MaybeSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.
Line: 19
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
Reported by PMD.
Line: 88
MaybeSource<? extends R> source;
try {
source = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null MaybeSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(ex);
return;
}
Reported by PMD.