The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSinglePublisher.java
6 issues
Line: 34
*/
public final class FlowableConcatMapSinglePublisher<T, R> extends Flowable<R> {
final Publisher<T> source;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final ErrorMode errorMode;
Reported by PMD.
Line: 36
final Publisher<T> source;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final ErrorMode errorMode;
final int prefetch;
Reported by PMD.
Line: 38
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final ErrorMode errorMode;
final int prefetch;
public FlowableConcatMapSinglePublisher(Publisher<T> source,
Function<? super T, ? extends SingleSource<? extends R>> mapper,
Reported by PMD.
Line: 40
final ErrorMode errorMode;
final int prefetch;
public FlowableConcatMapSinglePublisher(Publisher<T> source,
Function<? super T, ? extends SingleSource<? extends R>> mapper,
ErrorMode errorMode, int prefetch) {
this.source = source;
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.mixed;
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapSingle.ConcatMapSingleSubscriber;
import io.reactivex.rxjava3.internal.util.ErrorMode;
Reported by PMD.
Line: 18
import org.reactivestreams.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.operators.mixed.FlowableConcatMapSingle.ConcatMapSingleSubscriber;
import io.reactivex.rxjava3.internal.util.ErrorMode;
/**
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableMostRecent.java
6 issues
Line: 32
*/
public final class BlockingObservableMostRecent<T> implements Iterable<T> {
final ObservableSource<T> source;
final T initialValue;
public BlockingObservableMostRecent(ObservableSource<T> source, T initialValue) {
this.source = source;
Reported by PMD.
Line: 34
final ObservableSource<T> source;
final T initialValue;
public BlockingObservableMostRecent(ObservableSource<T> source, T initialValue) {
this.source = source;
this.initialValue = initialValue;
}
Reported by PMD.
Line: 51
}
static final class MostRecentObserver<T> extends DefaultObserver<T> {
volatile Object value;
MostRecentObserver(T value) {
this.value = NotificationLite.next(value);
}
Reported by PMD.
Line: 85
/**
* buffer to make sure that the state of the iterator doesn't change between calling hasNext() and next().
*/
private Object buf;
@Override
public boolean hasNext() {
buf = value;
return !NotificationLite.isComplete(buf);
Reported by PMD.
Line: 109
return NotificationLite.getValue(buf);
}
finally {
buf = null;
}
}
@Override
public void remove() {
Reported by PMD.
Line: 19
import java.util.*;
import io.reactivex.rxjava3.core.ObservableSource;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.observers.DefaultObserver;
/**
* Returns an Iterable that always returns the item most recently emitted by an Observable, or a
* seed value if no item has yet been emitted.
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBlockingSubscribe.java
6 issues
Line: 19
import java.util.Objects;
import java.util.concurrent.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.util.*;
Reported by PMD.
Line: 20
import java.util.concurrent.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.util.*;
/**
Reported by PMD.
Line: 21
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.util.*;
/**
* Utility methods to consume an Observable in a blocking manner with callbacks or Observer.
Reported by PMD.
Line: 22
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.util.*;
/**
* Utility methods to consume an Observable in a blocking manner with callbacks or Observer.
*/
Reported by PMD.
Line: 23
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.observers.*;
import io.reactivex.rxjava3.internal.util.*;
/**
* Utility methods to consume an Observable in a blocking manner with callbacks or Observer.
*/
public final class ObservableBlockingSubscribe {
Reported by PMD.
Line: 58
Object v = queue.poll();
if (v == null) {
try {
v = queue.take();
} catch (InterruptedException ex) {
bs.dispose();
observer.onError(ex);
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelaySubscriptionOther.java
6 issues
Line: 28
* @param <U> the other value type, ignored
*/
public final class ObservableDelaySubscriptionOther<T, U> extends Observable<T> {
final ObservableSource<? extends T> main;
final ObservableSource<U> other;
public ObservableDelaySubscriptionOther(ObservableSource<? extends T> main, ObservableSource<U> other) {
this.main = main;
this.other = other;
Reported by PMD.
Line: 29
*/
public final class ObservableDelaySubscriptionOther<T, U> extends Observable<T> {
final ObservableSource<? extends T> main;
final ObservableSource<U> other;
public ObservableDelaySubscriptionOther(ObservableSource<? extends T> main, ObservableSource<U> other) {
this.main = main;
this.other = other;
}
Reported by PMD.
Line: 47
}
final class DelayObserver implements Observer<U> {
final SequentialDisposable serial;
final Observer<? super T> child;
boolean done;
DelayObserver(SequentialDisposable serial, Observer<? super T> child) {
this.serial = serial;
Reported by PMD.
Line: 48
final class DelayObserver implements Observer<U> {
final SequentialDisposable serial;
final Observer<? super T> child;
boolean done;
DelayObserver(SequentialDisposable serial, Observer<? super T> child) {
this.serial = serial;
this.child = child;
Reported by PMD.
Line: 49
final class DelayObserver implements Observer<U> {
final SequentialDisposable serial;
final Observer<? super T> child;
boolean done;
DelayObserver(SequentialDisposable serial, Observer<? super T> child) {
this.serial = serial;
this.child = child;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDetach.java
6 issues
Line: 40
static final class DetachObserver<T> implements Observer<T>, Disposable {
Observer<? super T> downstream;
Disposable upstream;
DetachObserver(Observer<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 42
Observer<? super T> downstream;
Disposable upstream;
DetachObserver(Observer<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 53
Disposable d = this.upstream;
this.upstream = EmptyComponent.INSTANCE;
this.downstream = EmptyComponent.asObserver();
d.dispose();
}
@Override
public boolean isDisposed() {
return upstream.isDisposed();
Reported by PMD.
Line: 80
Observer<? super T> a = downstream;
this.upstream = EmptyComponent.INSTANCE;
this.downstream = EmptyComponent.asObserver();
a.onError(t);
}
@Override
public void onComplete() {
Observer<? super T> a = downstream;
Reported by PMD.
Line: 88
Observer<? super T> a = downstream;
this.upstream = EmptyComponent.INSTANCE;
this.downstream = EmptyComponent.asObserver();
a.onComplete();
}
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.util.EmptyComponent;
/**
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCompletable.java
6 issues
Line: 29
*/
public final class ObservableFromCompletable<T> extends Observable<T> implements HasUpstreamCompletableSource {
final CompletableSource source;
public ObservableFromCompletable(CompletableSource source) {
this.source = source;
}
Reported by PMD.
Line: 29
*/
public final class ObservableFromCompletable<T> extends Observable<T> implements HasUpstreamCompletableSource {
final CompletableSource source;
public ObservableFromCompletable(CompletableSource source) {
this.source = source;
}
Reported by PMD.
Line: 49
extends AbstractEmptyQueueFuseable<T>
implements CompletableObserver {
final Observer<? super T> downstream;
Disposable upstream;
public FromCompletableObserver(Observer<? super T> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 51
final Observer<? super T> downstream;
Disposable upstream;
public FromCompletableObserver(Observer<? super T> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.fuseable.*;
/**
Reported by PMD.
Line: 19
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.fuseable.*;
/**
* Wrap a Completable into an Observable.
*
* @param <T> the value type
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromFuture.java
6 issues
Line: 24
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
public final class ObservableFromFuture<T> extends Observable<T> {
final Future<? extends T> future;
final long timeout;
final TimeUnit unit;
public ObservableFromFuture(Future<? extends T> future, long timeout, TimeUnit unit) {
this.future = future;
Reported by PMD.
Line: 25
public final class ObservableFromFuture<T> extends Observable<T> {
final Future<? extends T> future;
final long timeout;
final TimeUnit unit;
public ObservableFromFuture(Future<? extends T> future, long timeout, TimeUnit unit) {
this.future = future;
this.timeout = timeout;
Reported by PMD.
Line: 26
public final class ObservableFromFuture<T> extends Observable<T> {
final Future<? extends T> future;
final long timeout;
final TimeUnit unit;
public ObservableFromFuture(Future<? extends T> future, long timeout, TimeUnit unit) {
this.future = future;
this.timeout = timeout;
this.unit = unit;
Reported by PMD.
Line: 42
T v;
try {
v = ExceptionHelper.nullCheck(unit != null ? future.get(timeout, unit) : future.get(), "Future returned a null value.");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (!d.isDisposed()) {
observer.onError(ex);
}
return;
Reported by PMD.
Line: 18
import java.util.concurrent.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.internal.observers.DeferredScalarDisposable;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
public final class ObservableFromFuture<T> extends Observable<T> {
Reported by PMD.
Line: 41
if (!d.isDisposed()) {
T v;
try {
v = ExceptionHelper.nullCheck(unit != null ? future.get(timeout, unit) : future.get(), "Future returned a null value.");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (!d.isDisposed()) {
observer.onError(ex);
}
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/FlattenCrossMapPerf.java
6 issues
Line: 32
@State(Scope.Thread)
public class FlattenCrossMapPerf {
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
Reported by PMD.
Line: 34
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Reported by PMD.
Line: 34
@Param({ "1", "10", "100", "1000", "10000", "100000", "1000000" })
public int times;
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Reported by PMD.
Line: 36
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Integer[] array = new Integer[times];
Arrays.fill(array, 777);
Reported by PMD.
Line: 36
Flowable<Integer> flowable;
Observable<Integer> observable;
@Setup
public void setup() {
Integer[] array = new Integer[times];
Arrays.fill(array, 777);
Reported by PMD.
Line: 19
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import io.reactivex.rxjava3.functions.Function;
@BenchmarkMode(Mode.Throughput)
Reported by PMD.
src/main/java/io/reactivex/rxjava3/flowables/ConnectableFlowable.java
6 issues
Line: 56
* the type of items emitted by the {@code ConnectableFlowable}
* @since 2.0.0
*/
public abstract class ConnectableFlowable<T> extends Flowable<T> {
/**
* Instructs the {@code ConnectableFlowable} to begin emitting the items from its underlying
* {@link Flowable} to its {@link Subscriber}s.
* <dl>
Reported by PMD.
Line: 19
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
Reported by PMD.
Line: 21
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
Reported by PMD.
Line: 22
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.util.ConnectConsumer;
Reported by PMD.
Line: 25
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.util.ConnectConsumer;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.Schedulers;
Reported by PMD.
Line: 26
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.internal.functions.*;
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.util.ConnectConsumer;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.Schedulers;
/**
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSkip.java
6 issues
Line: 21
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableSkip<T> extends AbstractObservableWithUpstream<T, T> {
final long n;
public ObservableSkip(ObservableSource<T> source, long n) {
super(source);
this.n = n;
}
Reported by PMD.
Line: 33
}
static final class SkipObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
long remaining;
Disposable upstream;
SkipObserver(Observer<? super T> actual, long n) {
Reported by PMD.
Line: 34
static final class SkipObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
long remaining;
Disposable upstream;
SkipObserver(Observer<? super T> actual, long n) {
this.downstream = actual;
Reported by PMD.
Line: 36
final Observer<? super T> downstream;
long remaining;
Disposable upstream;
SkipObserver(Observer<? super T> actual, long n) {
this.downstream = actual;
this.remaining = n;
}
Reported by PMD.
Line: 53
@Override
public void onNext(T t) {
if (remaining != 0L) {
remaining--;
} else {
downstream.onNext(t);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableSkip<T> extends AbstractObservableWithUpstream<T, T> {
final long n;
Reported by PMD.