The following issues were found
src/test/java/io/reactivex/rxjava3/internal/observers/SingleConsumersTest.java
26 issues
Line: 51
public class SingleConsumersTest implements Consumer<Object> {
final CompositeDisposable composite = new CompositeDisposable();
final SingleSubject<Integer> processor = SingleSubject.create();
final List<Object> events = new ArrayList<>();
Reported by PMD.
Line: 53
final CompositeDisposable composite = new CompositeDisposable();
final SingleSubject<Integer> processor = SingleSubject.create();
final List<Object> events = new ArrayList<>();
@Override
public void accept(Object t) throws Exception {
Reported by PMD.
Line: 55
final SingleSubject<Integer> processor = SingleSubject.create();
final List<Object> events = new ArrayList<>();
@Override
public void accept(Object t) throws Exception {
events.add(t);
}
Reported by PMD.
Line: 68
}
@Test
public void onSuccessNormal() {
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING);
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
Reported by PMD.
Line: 72
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING);
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 72
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING);
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 74
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
processor.onSuccess(1);
Reported by PMD.
Line: 80
processor.onSuccess(1);
assertEquals(0, composite.size());
assertEquals(Arrays.<Object>asList(1), events);
}
Reported by PMD.
Line: 82
assertEquals(0, composite.size());
assertEquals(Arrays.<Object>asList(1), events);
}
@Test
public void onErrorNormal() {
Reported by PMD.
Line: 87
}
@Test
public void onErrorNormal() {
subscribeAutoDispose(processor, composite, this, this);
assertTrue(composite.size() > 0);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeLast.java
26 issues
Line: 26
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
public final class FlowableTakeLast<T> extends AbstractFlowableWithUpstream<T, T> {
final int count;
public FlowableTakeLast(Flowable<T> source, int count) {
super(source);
this.count = count;
}
Reported by PMD.
Line: 38
source.subscribe(new TakeLastSubscriber<>(s, count));
}
static final class TakeLastSubscriber<T> extends ArrayDeque<T> implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = 7240042530241604978L;
final Subscriber<? super T> downstream;
final int count;
Reported by PMD.
Line: 38
source.subscribe(new TakeLastSubscriber<>(s, count));
}
static final class TakeLastSubscriber<T> extends ArrayDeque<T> implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = 7240042530241604978L;
final Subscriber<? super T> downstream;
final int count;
Reported by PMD.
Line: 41
static final class TakeLastSubscriber<T> extends ArrayDeque<T> implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = 7240042530241604978L;
final Subscriber<? super T> downstream;
final int count;
Subscription upstream;
volatile boolean done;
volatile boolean cancelled;
Reported by PMD.
Line: 42
private static final long serialVersionUID = 7240042530241604978L;
final Subscriber<? super T> downstream;
final int count;
Subscription upstream;
volatile boolean done;
volatile boolean cancelled;
Reported by PMD.
Line: 44
final Subscriber<? super T> downstream;
final int count;
Subscription upstream;
volatile boolean done;
volatile boolean cancelled;
final AtomicLong requested = new AtomicLong();
Reported by PMD.
Line: 45
final int count;
Subscription upstream;
volatile boolean done;
volatile boolean cancelled;
final AtomicLong requested = new AtomicLong();
final AtomicInteger wip = new AtomicInteger();
Reported by PMD.
Line: 46
Subscription upstream;
volatile boolean done;
volatile boolean cancelled;
final AtomicLong requested = new AtomicLong();
final AtomicInteger wip = new AtomicInteger();
Reported by PMD.
Line: 48
volatile boolean done;
volatile boolean cancelled;
final AtomicLong requested = new AtomicLong();
final AtomicInteger wip = new AtomicInteger();
TakeLastSubscriber(Subscriber<? super T> actual, int count) {
this.downstream = actual;
Reported by PMD.
Line: 50
final AtomicLong requested = new AtomicLong();
final AtomicInteger wip = new AtomicInteger();
TakeLastSubscriber(Subscriber<? super T> actual, int count) {
this.downstream = actual;
this.count = count;
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableIntervalRangeTest.java
26 issues
Line: 29
public class ObservableIntervalRangeTest extends RxJavaTest {
@Test
public void simple() throws Exception {
Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(5L, 6L, 7L, 8L, 9L);
}
Reported by PMD.
Line: 29
public class ObservableIntervalRangeTest extends RxJavaTest {
@Test
public void simple() throws Exception {
Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(5L, 6L, 7L, 8L, 9L);
}
Reported by PMD.
Line: 30
@Test
public void simple() throws Exception {
Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(5L, 6L, 7L, 8L, 9L);
}
Reported by PMD.
Line: 30
@Test
public void simple() throws Exception {
Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(5L, 6L, 7L, 8L, 9L);
}
Reported by PMD.
Line: 30
@Test
public void simple() throws Exception {
Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(5L, 6L, 7L, 8L, 9L);
}
Reported by PMD.
Line: 37
}
@Test
public void customScheduler() {
Observable.intervalRange(1, 5, 1, 1, TimeUnit.MILLISECONDS, Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1L, 2L, 3L, 4L, 5L);
}
Reported by PMD.
Line: 38
@Test
public void customScheduler() {
Observable.intervalRange(1, 5, 1, 1, TimeUnit.MILLISECONDS, Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1L, 2L, 3L, 4L, 5L);
}
Reported by PMD.
Line: 38
@Test
public void customScheduler() {
Observable.intervalRange(1, 5, 1, 1, TimeUnit.MILLISECONDS, Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1L, 2L, 3L, 4L, 5L);
}
Reported by PMD.
Line: 38
@Test
public void customScheduler() {
Observable.intervalRange(1, 5, 1, 1, TimeUnit.MILLISECONDS, Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(1L, 2L, 3L, 4L, 5L);
}
Reported by PMD.
Line: 45
}
@Test
public void countZero() {
Observable.intervalRange(1, 0, 1, 1, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java
26 issues
Line: 302
* @param isCancelled a supplier that returns true if the drain has been cancelled
* @return true if the queue was completely drained or the drain process was cancelled
*/
static <T> boolean postCompleteDrain(long n,
Subscriber<? super T> actual,
Queue<T> queue,
AtomicLong state,
BooleanSupplier isCancelled) {
Reported by PMD.
Line: 302
* @param isCancelled a supplier that returns true if the drain has been cancelled
* @return true if the queue was completely drained or the drain process was cancelled
*/
static <T> boolean postCompleteDrain(long n,
Subscriber<? super T> actual,
Queue<T> queue,
AtomicLong state,
BooleanSupplier isCancelled) {
Reported by PMD.
Line: 31
/**
* Utility class to help with the queue-drain serialization idiom.
*/
public final class QueueDrainHelper {
/** Utility class. */
private QueueDrainHelper() {
throw new IllegalStateException("No instances!");
}
Reported by PMD.
Line: 31
/**
* Utility class to help with the queue-drain serialization idiom.
*/
public final class QueueDrainHelper {
/** Utility class. */
private QueueDrainHelper() {
throw new IllegalStateException("No instances!");
}
Reported by PMD.
Line: 31
/**
* Utility class to help with the queue-drain serialization idiom.
*/
public final class QueueDrainHelper {
/** Utility class. */
private QueueDrainHelper() {
throw new IllegalStateException("No instances!");
}
Reported by PMD.
Line: 47
* @param dispose the disposable to call when termination happens and cleanup is necessary
* @param qd the QueueDrain instance that gives status information to the drain logic
*/
public static <T, U> void drainMaxLoop(SimplePlainQueue<T> q, Subscriber<? super U> a, boolean delayError,
Disposable dispose, QueueDrain<T, U> qd) {
int missed = 1;
for (;;) {
for (;;) {
Reported by PMD.
Line: 47
* @param dispose the disposable to call when termination happens and cleanup is necessary
* @param qd the QueueDrain instance that gives status information to the drain logic
*/
public static <T, U> void drainMaxLoop(SimplePlainQueue<T> q, Subscriber<? super U> a, boolean delayError,
Disposable dispose, QueueDrain<T, U> qd) {
int missed = 1;
for (;;) {
for (;;) {
Reported by PMD.
Line: 47
* @param dispose the disposable to call when termination happens and cleanup is necessary
* @param qd the QueueDrain instance that gives status information to the drain logic
*/
public static <T, U> void drainMaxLoop(SimplePlainQueue<T> q, Subscriber<? super U> a, boolean delayError,
Disposable dispose, QueueDrain<T, U> qd) {
int missed = 1;
for (;;) {
for (;;) {
Reported by PMD.
Line: 71
}
long r = qd.requested();
if (r != 0L) {
if (qd.accept(a, v)) {
if (r != Long.MAX_VALUE) {
qd.produced(1);
}
}
Reported by PMD.
Line: 73
long r = qd.requested();
if (r != 0L) {
if (qd.accept(a, v)) {
if (r != Long.MAX_VALUE) {
qd.produced(1);
}
}
} else {
q.clear();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleAmb.java
26 issues
Line: 24
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class SingleAmb<T> extends Single<T> {
private final SingleSource<? extends T>[] sources;
private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;
public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
this.sources = sources;
Reported by PMD.
Line: 24
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class SingleAmb<T> extends Single<T> {
private final SingleSource<? extends T>[] sources;
private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;
public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
this.sources = sources;
Reported by PMD.
Line: 25
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class SingleAmb<T> extends Single<T> {
private final SingleSource<? extends T>[] sources;
private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;
public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
Reported by PMD.
Line: 26
public final class SingleAmb<T> extends Single<T> {
private final SingleSource<? extends T>[] sources;
private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;
public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
Reported by PMD.
Line: 28
private final SingleSource<? extends T>[] sources;
private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;
public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
@Override
Reported by PMD.
Line: 35
@Override
@SuppressWarnings("unchecked")
protected void subscribeActual(final SingleObserver<? super T> observer) {
SingleSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new SingleSource[8];
try {
Reported by PMD.
Line: 35
@Override
@SuppressWarnings("unchecked")
protected void subscribeActual(final SingleObserver<? super T> observer) {
SingleSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new SingleSource[8];
try {
Reported by PMD.
Line: 35
@Override
@SuppressWarnings("unchecked")
protected void subscribeActual(final SingleObserver<? super T> observer) {
SingleSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new SingleSource[8];
try {
Reported by PMD.
Line: 43
try {
for (SingleSource<? extends T> element : sourcesIterable) {
if (element == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
return;
}
if (count == sources.length) {
SingleSource<? extends T>[] b = new SingleSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
Reported by PMD.
Line: 47
return;
}
if (count == sources.length) {
SingleSource<? extends T>[] b = new SingleSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = element;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAmb.java
26 issues
Line: 24
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class CompletableAmb extends Completable {
private final CompletableSource[] sources;
private final Iterable<? extends CompletableSource> sourcesIterable;
public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
this.sources = sources;
Reported by PMD.
Line: 24
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class CompletableAmb extends Completable {
private final CompletableSource[] sources;
private final Iterable<? extends CompletableSource> sourcesIterable;
public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
this.sources = sources;
Reported by PMD.
Line: 25
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class CompletableAmb extends Completable {
private final CompletableSource[] sources;
private final Iterable<? extends CompletableSource> sourcesIterable;
public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
Reported by PMD.
Line: 26
public final class CompletableAmb extends Completable {
private final CompletableSource[] sources;
private final Iterable<? extends CompletableSource> sourcesIterable;
public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
Reported by PMD.
Line: 28
private final CompletableSource[] sources;
private final Iterable<? extends CompletableSource> sourcesIterable;
public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
@Override
Reported by PMD.
Line: 34
}
@Override
public void subscribeActual(final CompletableObserver observer) {
CompletableSource[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new CompletableSource[8];
try {
Reported by PMD.
Line: 34
}
@Override
public void subscribeActual(final CompletableObserver observer) {
CompletableSource[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new CompletableSource[8];
try {
Reported by PMD.
Line: 34
}
@Override
public void subscribeActual(final CompletableObserver observer) {
CompletableSource[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new CompletableSource[8];
try {
Reported by PMD.
Line: 42
try {
for (CompletableSource element : sourcesIterable) {
if (element == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
return;
}
if (count == sources.length) {
CompletableSource[] b = new CompletableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
Reported by PMD.
Line: 46
return;
}
if (count == sources.length) {
CompletableSource[] b = new CompletableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = element;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimeout.java
26 issues
Line: 29
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableTimeout<T, U, V> extends AbstractObservableWithUpstream<T, T> {
final ObservableSource<U> firstTimeoutIndicator;
final Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator;
final ObservableSource<? extends T> other;
public ObservableTimeout(
Observable<T> source,
Reported by PMD.
Line: 30
public final class ObservableTimeout<T, U, V> extends AbstractObservableWithUpstream<T, T> {
final ObservableSource<U> firstTimeoutIndicator;
final Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator;
final ObservableSource<? extends T> other;
public ObservableTimeout(
Observable<T> source,
ObservableSource<U> firstTimeoutIndicator,
Reported by PMD.
Line: 31
public final class ObservableTimeout<T, U, V> extends AbstractObservableWithUpstream<T, T> {
final ObservableSource<U> firstTimeoutIndicator;
final Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator;
final ObservableSource<? extends T> other;
public ObservableTimeout(
Observable<T> source,
ObservableSource<U> firstTimeoutIndicator,
Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator,
Reported by PMD.
Line: 68
private static final long serialVersionUID = 3764492702657003550L;
final Observer<? super T> downstream;
final Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator;
final SequentialDisposable task;
Reported by PMD.
Line: 70
final Observer<? super T> downstream;
final Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator;
final SequentialDisposable task;
final AtomicReference<Disposable> upstream;
Reported by PMD.
Line: 72
final Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator;
final SequentialDisposable task;
final AtomicReference<Disposable> upstream;
TimeoutObserver(Observer<? super T> actual, Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator) {
this.downstream = actual;
Reported by PMD.
Line: 74
final SequentialDisposable task;
final AtomicReference<Disposable> upstream;
TimeoutObserver(Observer<? super T> actual, Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator) {
this.downstream = actual;
this.itemTimeoutIndicator = itemTimeoutIndicator;
this.task = new SequentialDisposable();
Reported by PMD.
Line: 108
itemTimeoutObservableSource = Objects.requireNonNull(
itemTimeoutIndicator.apply(t),
"The itemTimeoutIndicator returned a null ObservableSource.");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.get().dispose();
getAndSet(Long.MAX_VALUE);
downstream.onError(ex);
return;
Reported by PMD.
Line: 110
"The itemTimeoutIndicator returned a null ObservableSource.");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
upstream.get().dispose();
getAndSet(Long.MAX_VALUE);
downstream.onError(ex);
return;
}
Reported by PMD.
Line: 189
private static final long serialVersionUID = -7508389464265974549L;
final Observer<? super T> downstream;
final Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator;
final SequentialDisposable task;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeCallbackObserverTest.java
26 issues
Line: 33
public class MaybeCallbackObserverTest extends RxJavaTest {
@Test
public void dispose() {
MaybeCallbackObserver<Object> mo = new MaybeCallbackObserver<>(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION);
Disposable d = Disposable.empty();
mo.onSubscribe(d);
Reported by PMD.
Line: 40
mo.onSubscribe(d);
assertFalse(mo.isDisposed());
mo.dispose();
assertTrue(mo.isDisposed());
Reported by PMD.
Line: 44
mo.dispose();
assertTrue(mo.isDisposed());
assertTrue(d.isDisposed());
}
@Test
Reported by PMD.
Line: 46
assertTrue(mo.isDisposed());
assertTrue(d.isDisposed());
}
@Test
public void onSuccessCrashes() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Reported by PMD.
Line: 46
assertTrue(mo.isDisposed());
assertTrue(d.isDisposed());
}
@Test
public void onSuccessCrashes() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Reported by PMD.
Line: 50
}
@Test
public void onSuccessCrashes() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
MaybeCallbackObserver<Object> mo = new MaybeCallbackObserver<>(
new Consumer<Object>() {
@Override
Reported by PMD.
Line: 74
}
@Test
public void onErrorCrashes() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
MaybeCallbackObserver<Object> mo = new MaybeCallbackObserver<>(
Functions.emptyConsumer(),
new Consumer<Object>() {
Reported by PMD.
Line: 103
}
@Test
public void onCompleteCrashes() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
MaybeCallbackObserver<Object> mo = new MaybeCallbackObserver<>(
Functions.emptyConsumer(),
Functions.emptyConsumer(),
Reported by PMD.
Line: 132
Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION);
assertFalse(o.hasCustomOnError());
}
@Test
public void customOnErrorShouldReportCustomOnError() {
MaybeCallbackObserver<Integer> o = new MaybeCallbackObserver<>(Functions.<Integer>emptyConsumer(),
Reported by PMD.
Line: 141
Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION);
assertTrue(o.hasCustomOnError());
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/core/Scheduler.java
26 issues
Line: 91
* <p>
* All methods on the {@code Scheduler} and {@code Worker} classes should be thread safe.
*/
public abstract class Scheduler {
/**
* Value representing whether to use {@link System#nanoTime()}, or default as clock for {@link #now(TimeUnit)}
* and {@link Scheduler.Worker#now(TimeUnit)}.
* <p>
* Associated system parameter:
Reported by PMD.
Line: 144
*/
static long computeClockDrift(long time, String timeUnit) {
if ("seconds".equalsIgnoreCase(timeUnit)) {
return TimeUnit.SECONDS.toNanos(time);
} else if ("milliseconds".equalsIgnoreCase(timeUnit)) {
return TimeUnit.MILLISECONDS.toNanos(time);
}
return TimeUnit.MINUTES.toNanos(time);
}
Reported by PMD.
Line: 146
if ("seconds".equalsIgnoreCase(timeUnit)) {
return TimeUnit.SECONDS.toNanos(time);
} else if ("milliseconds".equalsIgnoreCase(timeUnit)) {
return TimeUnit.MILLISECONDS.toNanos(time);
}
return TimeUnit.MINUTES.toNanos(time);
}
/**
Reported by PMD.
Line: 148
} else if ("milliseconds".equalsIgnoreCase(timeUnit)) {
return TimeUnit.MILLISECONDS.toNanos(time);
}
return TimeUnit.MINUTES.toNanos(time);
}
/**
* Returns the clock drift tolerance in nanoseconds.
* <p>Related system properties:
Reported by PMD.
Line: 259
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
/**
Reported by PMD.
Line: 292
PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
Reported by PMD.
Line: 524
*/
final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
Reported by PMD.
Line: 526
@NonNull
final Runnable decoratedRun;
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;
Reported by PMD.
Line: 527
final Runnable decoratedRun;
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;
PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
Reported by PMD.
Line: 528
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;
PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOn.java
26 issues
Line: 28
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
Reported by PMD.
Line: 29
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
Reported by PMD.
Line: 30
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
Reported by PMD.
Line: 50
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
Reported by PMD.
Line: 53
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Reported by PMD.
Line: 54
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Reported by PMD.
Line: 55
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Reported by PMD.
Line: 56
final Observer<? super T> downstream;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Reported by PMD.
Line: 58
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Throwable error;
volatile boolean done;
Reported by PMD.
Line: 60
SimpleQueue<T> queue;
Disposable upstream;
Throwable error;
volatile boolean done;
volatile boolean disposed;
Reported by PMD.