The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchIfEmpty.java
6 issues
Line: 21
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
public final class ObservableSwitchIfEmpty<T> extends AbstractObservableWithUpstream<T, T> {
final ObservableSource<? extends T> other;
public ObservableSwitchIfEmpty(ObservableSource<T> source, ObservableSource<? extends T> other) {
super(source);
this.other = other;
}
Reported by PMD.
Line: 35
}
static final class SwitchIfEmptyObserver<T> implements Observer<T> {
final Observer<? super T> downstream;
final ObservableSource<? extends T> other;
final SequentialDisposable arbiter;
boolean empty;
Reported by PMD.
Line: 36
static final class SwitchIfEmptyObserver<T> implements Observer<T> {
final Observer<? super T> downstream;
final ObservableSource<? extends T> other;
final SequentialDisposable arbiter;
boolean empty;
SwitchIfEmptyObserver(Observer<? super T> actual, ObservableSource<? extends T> other) {
Reported by PMD.
Line: 37
static final class SwitchIfEmptyObserver<T> implements Observer<T> {
final Observer<? super T> downstream;
final ObservableSource<? extends T> other;
final SequentialDisposable arbiter;
boolean empty;
SwitchIfEmptyObserver(Observer<? super T> actual, ObservableSource<? extends T> other) {
this.downstream = actual;
Reported by PMD.
Line: 39
final ObservableSource<? extends T> other;
final SequentialDisposable arbiter;
boolean empty;
SwitchIfEmptyObserver(Observer<? super T> actual, ObservableSource<? extends T> other) {
this.downstream = actual;
this.other = other;
this.empty = true;
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
public final class ObservableSwitchIfEmpty<T> extends AbstractObservableWithUpstream<T, T> {
final ObservableSource<? extends T> other;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimer.java
6 issues
Line: 24
import io.reactivex.rxjava3.internal.disposables.*;
public final class ObservableTimer extends Observable<Long> {
final Scheduler scheduler;
final long delay;
final TimeUnit unit;
public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
this.delay = delay;
this.unit = unit;
Reported by PMD.
Line: 25
public final class ObservableTimer extends Observable<Long> {
final Scheduler scheduler;
final long delay;
final TimeUnit unit;
public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
Reported by PMD.
Line: 26
public final class ObservableTimer extends Observable<Long> {
final Scheduler scheduler;
final long delay;
final TimeUnit unit;
public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
}
Reported by PMD.
Line: 48
private static final long serialVersionUID = -2809475196591179431L;
final Observer<? super Long> downstream;
TimerObserver(Observer<? super Long> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 19
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;
public final class ObservableTimer extends Observable<Long> {
final Scheduler scheduler;
Reported by PMD.
Line: 21
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.*;
public final class ObservableTimer extends Observable<Long> {
final Scheduler scheduler;
final long delay;
final TimeUnit unit;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFlatMapIterable.java
6 issues
Line: 53
}
@Override
public void subscribe(Subscriber<? super R>[] subscribers) {
subscribers = RxJavaPlugins.onSubscribe(this, subscribers);
if (!validate(subscribers)) {
return;
}
Reported by PMD.
Line: 32
*/
public final class ParallelFlatMapIterable<T, R> extends ParallelFlowable<R> {
final ParallelFlowable<T> source;
final Function<? super T, ? extends Iterable<? extends R>> mapper;
final int prefetch;
Reported by PMD.
Line: 34
final ParallelFlowable<T> source;
final Function<? super T, ? extends Iterable<? extends R>> mapper;
final int prefetch;
public ParallelFlatMapIterable(
ParallelFlowable<T> source,
Reported by PMD.
Line: 36
final Function<? super T, ? extends Iterable<? extends R>> mapper;
final int prefetch;
public ParallelFlatMapIterable(
ParallelFlowable<T> source,
Function<? super T, ? extends Iterable<? extends R>> mapper,
int prefetch) {
Reported by PMD.
Line: 63
int n = subscribers.length;
@SuppressWarnings("unchecked")
final Subscriber<T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
parents[i] = FlowableFlattenIterable.subscribe(subscribers[i], mapper, prefetch);
}
Reported by PMD.
Line: 66
final Subscriber<T>[] parents = new Subscriber[n];
for (int i = 0; i < n; i++) {
parents[i] = FlowableFlattenIterable.subscribe(subscribers[i], mapper, prefetch);
}
source.subscribe(parents);
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnDispose.java
6 issues
Line: 26
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class SingleDoOnDispose<T> extends Single<T> {
final SingleSource<T> source;
final Action onDispose;
public SingleDoOnDispose(SingleSource<T> source, Action onDispose) {
this.source = source;
Reported by PMD.
Line: 28
public final class SingleDoOnDispose<T> extends Single<T> {
final SingleSource<T> source;
final Action onDispose;
public SingleDoOnDispose(SingleSource<T> source, Action onDispose) {
this.source = source;
this.onDispose = onDispose;
}
Reported by PMD.
Line: 46
implements SingleObserver<T>, Disposable {
private static final long serialVersionUID = -8583764624474935784L;
final SingleObserver<? super T> downstream;
Disposable upstream;
DoOnDisposeObserver(SingleObserver<? super T> actual, Action onDispose) {
this.downstream = actual;
Reported by PMD.
Line: 48
final SingleObserver<? super T> downstream;
Disposable upstream;
DoOnDisposeObserver(SingleObserver<? super T> actual, Action onDispose) {
this.downstream = actual;
this.lazySet(onDispose);
}
Reported by PMD.
Line: 61
if (a != null) {
try {
a.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
upstream.dispose();
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/FlatMapJustPerf.java
6 issues
Line: 32
@State(Scope.Thread)
public class FlatMapJustPerf {
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
Reported by PMD.
Line: 34
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Reported by PMD.
Line: 34
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Reported by PMD.
Line: 36
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Integer[] array = new Integer[times];
Reported by PMD.
Line: 36
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Integer[] array = new Integer[times];
Reported by PMD.
Line: 18
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.functions.Function;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleUnsubscribeOn.java
6 issues
Line: 29
*/
public final class SingleUnsubscribeOn<T> extends Single<T> {
final SingleSource<T> source;
final Scheduler scheduler;
public SingleUnsubscribeOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
Reported by PMD.
Line: 31
final SingleSource<T> source;
final Scheduler scheduler;
public SingleUnsubscribeOn(SingleSource<T> source, Scheduler scheduler) {
this.source = source;
this.scheduler = scheduler;
}
Reported by PMD.
Line: 48
private static final long serialVersionUID = 3256698449646456986L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
Disposable ds;
Reported by PMD.
Line: 50
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
Disposable ds;
UnsubscribeOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
Reported by PMD.
Line: 52
final Scheduler scheduler;
Disposable ds;
UnsubscribeOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
/**
* Makes sure a dispose() call from downstream happens on the specified scheduler.
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java
6 issues
Line: 60
@Override
public boolean offer(final T e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
final LinkedQueueNode<T> nextNode = new LinkedQueueNode<>(e);
final LinkedQueueNode<T> prevProducerNode = xchgProducerNode(nextNode);
// Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed
// and completes the store in prev.next.
Reported by PMD.
Line: 31
* @param <T> the contained value type
*/
public final class MpscLinkedQueue<T> implements SimplePlainQueue<T> {
private final AtomicReference<LinkedQueueNode<T>> producerNode;
private final AtomicReference<LinkedQueueNode<T>> consumerNode;
public MpscLinkedQueue() {
producerNode = new AtomicReference<>();
consumerNode = new AtomicReference<>();
Reported by PMD.
Line: 32
*/
public final class MpscLinkedQueue<T> implements SimplePlainQueue<T> {
private final AtomicReference<LinkedQueueNode<T>> producerNode;
private final AtomicReference<LinkedQueueNode<T>> consumerNode;
public MpscLinkedQueue() {
producerNode = new AtomicReference<>();
consumerNode = new AtomicReference<>();
LinkedQueueNode<T> node = new LinkedQueueNode<>();
Reported by PMD.
Line: 66
final LinkedQueueNode<T> prevProducerNode = xchgProducerNode(nextNode);
// Should a producer thread get interrupted here the chain WILL be broken until that thread is resumed
// and completes the store in prev.next.
prevProducerNode.soNext(nextNode); // StoreStore
return true;
}
/**
* {@inheritDoc} <br>
Reported by PMD.
Line: 89
@Override
public T poll() {
LinkedQueueNode<T> currConsumerNode = lpConsumerNode(); // don't load twice, it's alright
LinkedQueueNode<T> nextNode = currConsumerNode.lvNext();
if (nextNode != null) {
// we have to null out the value because we are going to hang on to the node
final T nextValue = nextNode.getAndNullValue();
spConsumerNode(nextNode);
return nextValue;
Reported by PMD.
Line: 154
private static final long serialVersionUID = 2404266111789071508L;
private E value;
LinkedQueueNode() {
}
LinkedQueueNode(E val) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/util/BackpressureHelper.java
6 issues
Line: 38
*/
public static long addCap(long a, long b) {
long u = a + b;
if (u < 0L) {
return Long.MAX_VALUE;
}
return u;
}
Reported by PMD.
Line: 53
public static long multiplyCap(long a, long b) {
long u = a * b;
if (((a | b) >>> 31) != 0) {
if (u / a != b) {
return Long.MAX_VALUE;
}
}
return u;
}
Reported by PMD.
Line: 117
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
Reported by PMD.
Line: 118
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
Reported by PMD.
Line: 144
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
Reported by PMD.
Line: 145
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/FlattenRangePerf.java
6 issues
Line: 32
@State(Scope.Thread)
public class FlattenRangePerf {
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
Reported by PMD.
Line: 34
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Reported by PMD.
Line: 34
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Reported by PMD.
Line: 36
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Integer[] array = new Integer[times];
Arrays.fill(array, 777);
Reported by PMD.
Line: 36
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Integer[] array = new Integer[times];
Arrays.fill(array, 777);
Reported by PMD.
Line: 19
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import io.reactivex.rxjava3.functions.Function;
@BenchmarkMode(Mode.Throughput)
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/FlattenJustPerf.java
6 issues
Line: 32
@State(Scope.Thread)
public class FlattenJustPerf {
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
Reported by PMD.
Line: 34
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Reported by PMD.
Line: 34
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Reported by PMD.
Line: 36
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Integer[] array = new Integer[times];
Arrays.fill(array, 777);
Reported by PMD.
Line: 36
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Integer[] array = new Integer[times];
Arrays.fill(array, 777);
Reported by PMD.
Line: 19
import java.util.*;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import io.reactivex.rxjava3.functions.Function;
@BenchmarkMode(Mode.Throughput)
Reported by PMD.