The following issues were found

src/test/java/io/reactivex/rxjava3/internal/observers/SingleConsumersTest.java
26 issues
Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 51

              
public class SingleConsumersTest implements Consumer<Object> {

    final CompositeDisposable composite = new CompositeDisposable();

    final SingleSubject<Integer> processor = SingleSubject.create();

    final List<Object> events = new ArrayList<>();


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 53

              
    final CompositeDisposable composite = new CompositeDisposable();

    final SingleSubject<Integer> processor = SingleSubject.create();

    final List<Object> events = new ArrayList<>();

    @Override
    public void accept(Object t) throws Exception {

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 55

              
    final SingleSubject<Integer> processor = SingleSubject.create();

    final List<Object> events = new ArrayList<>();

    @Override
    public void accept(Object t) throws Exception {
        events.add(t);
    }

            

Reported by PMD.

Unit tests should not contain more than 1 assert(s).
Design

Line: 68

                  }

    @Test
    public void onSuccessNormal() {

        Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING);

        assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());


            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 72

              
        Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING);

        assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());

        assertTrue(composite.size() > 0);

        assertTrue(events.toString(), events.isEmpty());


            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 72

              
        Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING);

        assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());

        assertTrue(composite.size() > 0);

        assertTrue(events.toString(), events.isEmpty());


            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 74

              
        assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());

        assertTrue(composite.size() > 0);

        assertTrue(events.toString(), events.isEmpty());

        processor.onSuccess(1);


            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 80

              
        processor.onSuccess(1);

        assertEquals(0, composite.size());

        assertEquals(Arrays.<Object>asList(1), events);

    }


            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 82

              
        assertEquals(0, composite.size());

        assertEquals(Arrays.<Object>asList(1), events);

    }

    @Test
    public void onErrorNormal() {

            

Reported by PMD.

Unit tests should not contain more than 1 assert(s).
Design

Line: 87

                  }

    @Test
    public void onErrorNormal() {

        subscribeAutoDispose(processor, composite, this, this);

        assertTrue(composite.size() > 0);


            

Reported by PMD.

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeLast.java
26 issues
Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 26

              import io.reactivex.rxjava3.internal.util.BackpressureHelper;

public final class FlowableTakeLast<T> extends AbstractFlowableWithUpstream<T, T> {
    final int count;

    public FlowableTakeLast(Flowable<T> source, int count) {
        super(source);
        this.count = count;
    }

            

Reported by PMD.

The class 'TakeLastSubscriber' has a Modified Cyclomatic Complexity of 3 (Highest = 10).
Design

Line: 38

                      source.subscribe(new TakeLastSubscriber<>(s, count));
    }

    static final class TakeLastSubscriber<T> extends ArrayDeque<T> implements FlowableSubscriber<T>, Subscription {

        private static final long serialVersionUID = 7240042530241604978L;
        final Subscriber<? super T> downstream;
        final int count;


            

Reported by PMD.

The class 'TakeLastSubscriber' has a Standard Cyclomatic Complexity of 3 (Highest = 10).
Design

Line: 38

                      source.subscribe(new TakeLastSubscriber<>(s, count));
    }

    static final class TakeLastSubscriber<T> extends ArrayDeque<T> implements FlowableSubscriber<T>, Subscription {

        private static final long serialVersionUID = 7240042530241604978L;
        final Subscriber<? super T> downstream;
        final int count;


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 41

                  static final class TakeLastSubscriber<T> extends ArrayDeque<T> implements FlowableSubscriber<T>, Subscription {

        private static final long serialVersionUID = 7240042530241604978L;
        final Subscriber<? super T> downstream;
        final int count;

        Subscription upstream;
        volatile boolean done;
        volatile boolean cancelled;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 42

              
        private static final long serialVersionUID = 7240042530241604978L;
        final Subscriber<? super T> downstream;
        final int count;

        Subscription upstream;
        volatile boolean done;
        volatile boolean cancelled;


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 44

                      final Subscriber<? super T> downstream;
        final int count;

        Subscription upstream;
        volatile boolean done;
        volatile boolean cancelled;

        final AtomicLong requested = new AtomicLong();


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 45

                      final int count;

        Subscription upstream;
        volatile boolean done;
        volatile boolean cancelled;

        final AtomicLong requested = new AtomicLong();

        final AtomicInteger wip = new AtomicInteger();

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 46

              
        Subscription upstream;
        volatile boolean done;
        volatile boolean cancelled;

        final AtomicLong requested = new AtomicLong();

        final AtomicInteger wip = new AtomicInteger();


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 48

                      volatile boolean done;
        volatile boolean cancelled;

        final AtomicLong requested = new AtomicLong();

        final AtomicInteger wip = new AtomicInteger();

        TakeLastSubscriber(Subscriber<? super T> actual, int count) {
            this.downstream = actual;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 50

              
        final AtomicLong requested = new AtomicLong();

        final AtomicInteger wip = new AtomicInteger();

        TakeLastSubscriber(Subscriber<? super T> actual, int count) {
            this.downstream = actual;
            this.count = count;
        }

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableIntervalRangeTest.java
26 issues
JUnit tests should include assert() or fail()
Design

Line: 29

              public class ObservableIntervalRangeTest extends RxJavaTest {

    @Test
    public void simple() throws Exception {
        Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(5L, 6L, 7L, 8L, 9L);
    }

            

Reported by PMD.

A method/constructor should not explicitly throw java.lang.Exception
Design

Line: 29

              public class ObservableIntervalRangeTest extends RxJavaTest {

    @Test
    public void simple() throws Exception {
        Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(5L, 6L, 7L, 8L, 9L);
    }

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 30

              
    @Test
    public void simple() throws Exception {
        Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(5L, 6L, 7L, 8L, 9L);
    }


            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 30

              
    @Test
    public void simple() throws Exception {
        Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(5L, 6L, 7L, 8L, 9L);
    }


            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 30

              
    @Test
    public void simple() throws Exception {
        Observable.intervalRange(5, 5, 50, 50, TimeUnit.MILLISECONDS)
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(5L, 6L, 7L, 8L, 9L);
    }


            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 37

                  }

    @Test
    public void customScheduler() {
        Observable.intervalRange(1, 5, 1, 1, TimeUnit.MILLISECONDS, Schedulers.single())
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(1L, 2L, 3L, 4L, 5L);
    }

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 38

              
    @Test
    public void customScheduler() {
        Observable.intervalRange(1, 5, 1, 1, TimeUnit.MILLISECONDS, Schedulers.single())
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(1L, 2L, 3L, 4L, 5L);
    }


            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 38

              
    @Test
    public void customScheduler() {
        Observable.intervalRange(1, 5, 1, 1, TimeUnit.MILLISECONDS, Schedulers.single())
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(1L, 2L, 3L, 4L, 5L);
    }


            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 38

              
    @Test
    public void customScheduler() {
        Observable.intervalRange(1, 5, 1, 1, TimeUnit.MILLISECONDS, Schedulers.single())
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult(1L, 2L, 3L, 4L, 5L);
    }


            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 45

                  }

    @Test
    public void countZero() {
        Observable.intervalRange(1, 0, 1, 1, TimeUnit.MILLISECONDS)
        .test()
        .awaitDone(5, TimeUnit.SECONDS)
        .assertResult();
    }

            

Reported by PMD.

src/main/java/io/reactivex/rxjava3/internal/util/QueueDrainHelper.java
26 issues
Avoid reassigning parameters such as 'n'
Design

Line: 302

                   * @param isCancelled a supplier that returns true if the drain has been cancelled
     * @return true if the queue was completely drained or the drain process was cancelled
     */
    static <T> boolean postCompleteDrain(long n,
                                         Subscriber<? super T> actual,
                                         Queue<T> queue,
                                         AtomicLong state,
                                         BooleanSupplier isCancelled) {


            

Reported by PMD.

Avoid reassigning parameters such as 'n'
Design

Line: 302

                   * @param isCancelled a supplier that returns true if the drain has been cancelled
     * @return true if the queue was completely drained or the drain process was cancelled
     */
    static <T> boolean postCompleteDrain(long n,
                                         Subscriber<? super T> actual,
                                         Queue<T> queue,
                                         AtomicLong state,
                                         BooleanSupplier isCancelled) {


            

Reported by PMD.

The class 'QueueDrainHelper' has a Modified Cyclomatic Complexity of 6 (Highest = 11).
Design

Line: 31

              /**
 * Utility class to help with the queue-drain serialization idiom.
 */
public final class QueueDrainHelper {
    /** Utility class. */
    private QueueDrainHelper() {
        throw new IllegalStateException("No instances!");
    }


            

Reported by PMD.

Possible God Class (WMC=65, ATFD=10, TCC=6.667%)
Design

Line: 31

              /**
 * Utility class to help with the queue-drain serialization idiom.
 */
public final class QueueDrainHelper {
    /** Utility class. */
    private QueueDrainHelper() {
        throw new IllegalStateException("No instances!");
    }


            

Reported by PMD.

The class 'QueueDrainHelper' has a Standard Cyclomatic Complexity of 6 (Highest = 11).
Design

Line: 31

              /**
 * Utility class to help with the queue-drain serialization idiom.
 */
public final class QueueDrainHelper {
    /** Utility class. */
    private QueueDrainHelper() {
        throw new IllegalStateException("No instances!");
    }


            

Reported by PMD.

The method 'drainMaxLoop(SimplePlainQueue, Subscriber, boolean, Disposable, QueueDrain)' has a cyclomatic complexity of 11.
Design

Line: 47

                   * @param dispose the disposable to call when termination happens and cleanup is necessary
     * @param qd the QueueDrain instance that gives status information to the drain logic
     */
    public static <T, U> void drainMaxLoop(SimplePlainQueue<T> q, Subscriber<? super U> a, boolean delayError,
            Disposable dispose, QueueDrain<T, U> qd) {
        int missed = 1;

        for (;;) {
            for (;;) {

            

Reported by PMD.

The method 'drainMaxLoop' has a Standard Cyclomatic Complexity of 11.
Design

Line: 47

                   * @param dispose the disposable to call when termination happens and cleanup is necessary
     * @param qd the QueueDrain instance that gives status information to the drain logic
     */
    public static <T, U> void drainMaxLoop(SimplePlainQueue<T> q, Subscriber<? super U> a, boolean delayError,
            Disposable dispose, QueueDrain<T, U> qd) {
        int missed = 1;

        for (;;) {
            for (;;) {

            

Reported by PMD.

The method 'drainMaxLoop' has a Modified Cyclomatic Complexity of 11.
Design

Line: 47

                   * @param dispose the disposable to call when termination happens and cleanup is necessary
     * @param qd the QueueDrain instance that gives status information to the drain logic
     */
    public static <T, U> void drainMaxLoop(SimplePlainQueue<T> q, Subscriber<? super U> a, boolean delayError,
            Disposable dispose, QueueDrain<T, U> qd) {
        int missed = 1;

        for (;;) {
            for (;;) {

            

Reported by PMD.

Avoid using Literals in Conditional Statements
Error

Line: 71

                              }

                long r = qd.requested();
                if (r != 0L) {
                    if (qd.accept(a, v)) {
                        if (r != Long.MAX_VALUE) {
                            qd.produced(1);
                        }
                    }

            

Reported by PMD.

These nested if statements could be combined
Design

Line: 73

                              long r = qd.requested();
                if (r != 0L) {
                    if (qd.accept(a, v)) {
                        if (r != Long.MAX_VALUE) {
                            qd.produced(1);
                        }
                    }
                } else {
                    q.clear();

            

Reported by PMD.

src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleAmb.java
26 issues
The class 'SingleAmb' has a Modified Cyclomatic Complexity of 6 (Highest = 10).
Design

Line: 24

              import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class SingleAmb<T> extends Single<T> {
    private final SingleSource<? extends T>[] sources;
    private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;

    public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
        this.sources = sources;

            

Reported by PMD.

The class 'SingleAmb' has a Standard Cyclomatic Complexity of 6 (Highest = 10).
Design

Line: 24

              import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class SingleAmb<T> extends Single<T> {
    private final SingleSource<? extends T>[] sources;
    private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;

    public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
        this.sources = sources;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 25

              import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class SingleAmb<T> extends Single<T> {
    private final SingleSource<? extends T>[] sources;
    private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;

    public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
        this.sources = sources;
        this.sourcesIterable = sourcesIterable;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 26

              
public final class SingleAmb<T> extends Single<T> {
    private final SingleSource<? extends T>[] sources;
    private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;

    public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
        this.sources = sources;
        this.sourcesIterable = sourcesIterable;
    }

            

Reported by PMD.

The user-supplied array 'sources' is stored directly.
Design

Line: 28

                  private final SingleSource<? extends T>[] sources;
    private final Iterable<? extends SingleSource<? extends T>> sourcesIterable;

    public SingleAmb(SingleSource<? extends T>[] sources, Iterable<? extends SingleSource<? extends T>> sourcesIterable) {
        this.sources = sources;
        this.sourcesIterable = sourcesIterable;
    }

    @Override

            

Reported by PMD.

The method 'subscribeActual' has a Modified Cyclomatic Complexity of 10.
Design

Line: 35

              
    @Override
    @SuppressWarnings("unchecked")
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        SingleSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new SingleSource[8];
            try {

            

Reported by PMD.

The method 'subscribeActual' has a Standard Cyclomatic Complexity of 10.
Design

Line: 35

              
    @Override
    @SuppressWarnings("unchecked")
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        SingleSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new SingleSource[8];
            try {

            

Reported by PMD.

The method 'subscribeActual(SingleObserver)' has a cyclomatic complexity of 10.
Design

Line: 35

              
    @Override
    @SuppressWarnings("unchecked")
    protected void subscribeActual(final SingleObserver<? super T> observer) {
        SingleSource<? extends T>[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new SingleSource[8];
            try {

            

Reported by PMD.

Avoid instantiating new objects inside loops
Performance

Line: 43

                          try {
                for (SingleSource<? extends T> element : sourcesIterable) {
                    if (element == null) {
                        EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
                        return;
                    }
                    if (count == sources.length) {
                        SingleSource<? extends T>[] b = new SingleSource[count + (count >> 2)];
                        System.arraycopy(sources, 0, b, 0, count);

            

Reported by PMD.

Avoid instantiating new objects inside loops
Performance

Line: 47

                                      return;
                    }
                    if (count == sources.length) {
                        SingleSource<? extends T>[] b = new SingleSource[count + (count >> 2)];
                        System.arraycopy(sources, 0, b, 0, count);
                        sources = b;
                    }
                    sources[count++] = element;
                }

            

Reported by PMD.

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAmb.java
26 issues
The class 'CompletableAmb' has a Standard Cyclomatic Complexity of 6 (Highest = 11).
Design

Line: 24

              import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class CompletableAmb extends Completable {
    private final CompletableSource[] sources;
    private final Iterable<? extends CompletableSource> sourcesIterable;

    public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
        this.sources = sources;

            

Reported by PMD.

The class 'CompletableAmb' has a Modified Cyclomatic Complexity of 6 (Highest = 11).
Design

Line: 24

              import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class CompletableAmb extends Completable {
    private final CompletableSource[] sources;
    private final Iterable<? extends CompletableSource> sourcesIterable;

    public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
        this.sources = sources;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 25

              import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class CompletableAmb extends Completable {
    private final CompletableSource[] sources;
    private final Iterable<? extends CompletableSource> sourcesIterable;

    public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
        this.sources = sources;
        this.sourcesIterable = sourcesIterable;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 26

              
public final class CompletableAmb extends Completable {
    private final CompletableSource[] sources;
    private final Iterable<? extends CompletableSource> sourcesIterable;

    public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
        this.sources = sources;
        this.sourcesIterable = sourcesIterable;
    }

            

Reported by PMD.

The user-supplied array 'sources' is stored directly.
Design

Line: 28

                  private final CompletableSource[] sources;
    private final Iterable<? extends CompletableSource> sourcesIterable;

    public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
        this.sources = sources;
        this.sourcesIterable = sourcesIterable;
    }

    @Override

            

Reported by PMD.

The method 'subscribeActual(CompletableObserver)' has a cyclomatic complexity of 11.
Design

Line: 34

                  }

    @Override
    public void subscribeActual(final CompletableObserver observer) {
        CompletableSource[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new CompletableSource[8];
            try {

            

Reported by PMD.

The method 'subscribeActual' has a Modified Cyclomatic Complexity of 11.
Design

Line: 34

                  }

    @Override
    public void subscribeActual(final CompletableObserver observer) {
        CompletableSource[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new CompletableSource[8];
            try {

            

Reported by PMD.

The method 'subscribeActual' has a Standard Cyclomatic Complexity of 11.
Design

Line: 34

                  }

    @Override
    public void subscribeActual(final CompletableObserver observer) {
        CompletableSource[] sources = this.sources;
        int count = 0;
        if (sources == null) {
            sources = new CompletableSource[8];
            try {

            

Reported by PMD.

Avoid instantiating new objects inside loops
Performance

Line: 42

                          try {
                for (CompletableSource element : sourcesIterable) {
                    if (element == null) {
                        EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
                        return;
                    }
                    if (count == sources.length) {
                        CompletableSource[] b = new CompletableSource[count + (count >> 2)];
                        System.arraycopy(sources, 0, b, 0, count);

            

Reported by PMD.

Avoid instantiating new objects inside loops
Performance

Line: 46

                                      return;
                    }
                    if (count == sources.length) {
                        CompletableSource[] b = new CompletableSource[count + (count >> 2)];
                        System.arraycopy(sources, 0, b, 0, count);
                        sources = b;
                    }
                    sources[count++] = element;
                }

            

Reported by PMD.

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimeout.java
26 issues
Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 29

              import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class ObservableTimeout<T, U, V> extends AbstractObservableWithUpstream<T, T> {
    final ObservableSource<U> firstTimeoutIndicator;
    final Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator;
    final ObservableSource<? extends T> other;

    public ObservableTimeout(
            Observable<T> source,

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 30

              
public final class ObservableTimeout<T, U, V> extends AbstractObservableWithUpstream<T, T> {
    final ObservableSource<U> firstTimeoutIndicator;
    final Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator;
    final ObservableSource<? extends T> other;

    public ObservableTimeout(
            Observable<T> source,
            ObservableSource<U> firstTimeoutIndicator,

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 31

              public final class ObservableTimeout<T, U, V> extends AbstractObservableWithUpstream<T, T> {
    final ObservableSource<U> firstTimeoutIndicator;
    final Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator;
    final ObservableSource<? extends T> other;

    public ObservableTimeout(
            Observable<T> source,
            ObservableSource<U> firstTimeoutIndicator,
            Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator,

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 68

              
        private static final long serialVersionUID = 3764492702657003550L;

        final Observer<? super T> downstream;

        final Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator;

        final SequentialDisposable task;


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 70

              
        final Observer<? super T> downstream;

        final Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator;

        final SequentialDisposable task;

        final AtomicReference<Disposable> upstream;


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 72

              
        final Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator;

        final SequentialDisposable task;

        final AtomicReference<Disposable> upstream;

        TimeoutObserver(Observer<? super T> actual, Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator) {
            this.downstream = actual;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 74

              
        final SequentialDisposable task;

        final AtomicReference<Disposable> upstream;

        TimeoutObserver(Observer<? super T> actual, Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator) {
            this.downstream = actual;
            this.itemTimeoutIndicator = itemTimeoutIndicator;
            this.task = new SequentialDisposable();

            

Reported by PMD.

A catch statement should never catch throwable since it includes errors.
Error

Line: 108

                              itemTimeoutObservableSource = Objects.requireNonNull(
                        itemTimeoutIndicator.apply(t),
                        "The itemTimeoutIndicator returned a null ObservableSource.");
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                upstream.get().dispose();
                getAndSet(Long.MAX_VALUE);
                downstream.onError(ex);
                return;

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 110

                                      "The itemTimeoutIndicator returned a null ObservableSource.");
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                upstream.get().dispose();
                getAndSet(Long.MAX_VALUE);
                downstream.onError(ex);
                return;
            }


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 189

              
        private static final long serialVersionUID = -7508389464265974549L;

        final Observer<? super T> downstream;

        final Function<? super T, ? extends ObservableSource<?>> itemTimeoutIndicator;

        final SequentialDisposable task;


            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeCallbackObserverTest.java
26 issues
Unit tests should not contain more than 1 assert(s).
Design

Line: 33

              public class MaybeCallbackObserverTest extends RxJavaTest {

    @Test
    public void dispose() {
        MaybeCallbackObserver<Object> mo = new MaybeCallbackObserver<>(Functions.emptyConsumer(), Functions.emptyConsumer(), Functions.EMPTY_ACTION);

        Disposable d = Disposable.empty();

        mo.onSubscribe(d);

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 40

              
        mo.onSubscribe(d);

        assertFalse(mo.isDisposed());

        mo.dispose();

        assertTrue(mo.isDisposed());


            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 44

              
        mo.dispose();

        assertTrue(mo.isDisposed());

        assertTrue(d.isDisposed());
    }

    @Test

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 46

              
        assertTrue(mo.isDisposed());

        assertTrue(d.isDisposed());
    }

    @Test
    public void onSuccessCrashes() {
        List<Throwable> errors = TestHelper.trackPluginErrors();

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 46

              
        assertTrue(mo.isDisposed());

        assertTrue(d.isDisposed());
    }

    @Test
    public void onSuccessCrashes() {
        List<Throwable> errors = TestHelper.trackPluginErrors();

            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 50

                  }

    @Test
    public void onSuccessCrashes() {
        List<Throwable> errors = TestHelper.trackPluginErrors();
        try {
            MaybeCallbackObserver<Object> mo = new MaybeCallbackObserver<>(
                    new Consumer<Object>() {
                        @Override

            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 74

                  }

    @Test
    public void onErrorCrashes() {
        List<Throwable> errors = TestHelper.trackPluginErrors();
        try {
            MaybeCallbackObserver<Object> mo = new MaybeCallbackObserver<>(
                    Functions.emptyConsumer(),
                    new Consumer<Object>() {

            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 103

                  }

    @Test
    public void onCompleteCrashes() {
        List<Throwable> errors = TestHelper.trackPluginErrors();
        try {
            MaybeCallbackObserver<Object> mo = new MaybeCallbackObserver<>(
                    Functions.emptyConsumer(),
                    Functions.emptyConsumer(),

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 132

                              Functions.ON_ERROR_MISSING,
                Functions.EMPTY_ACTION);

        assertFalse(o.hasCustomOnError());
    }

    @Test
    public void customOnErrorShouldReportCustomOnError() {
        MaybeCallbackObserver<Integer> o = new MaybeCallbackObserver<>(Functions.<Integer>emptyConsumer(),

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 141

                              Functions.<Throwable>emptyConsumer(),
                Functions.EMPTY_ACTION);

        assertTrue(o.hasCustomOnError());
    }
}

            

Reported by PMD.

src/main/java/io/reactivex/rxjava3/core/Scheduler.java
26 issues
This class has too many methods, consider refactoring it.
Design

Line: 91

               * <p>
 * All methods on the {@code Scheduler} and {@code Worker} classes should be thread safe.
 */
public abstract class Scheduler {
    /**
     * Value representing whether to use {@link System#nanoTime()}, or default as clock for {@link #now(TimeUnit)}
     * and {@link Scheduler.Worker#now(TimeUnit)}.
     * <p>
     * Associated system parameter:

            

Reported by PMD.

Potential violation of Law of Demeter (static property access)
Design

Line: 144

                   */
    static long computeClockDrift(long time, String timeUnit) {
        if ("seconds".equalsIgnoreCase(timeUnit)) {
            return TimeUnit.SECONDS.toNanos(time);
        } else if ("milliseconds".equalsIgnoreCase(timeUnit)) {
            return TimeUnit.MILLISECONDS.toNanos(time);
        }
        return TimeUnit.MINUTES.toNanos(time);
    }

            

Reported by PMD.

Potential violation of Law of Demeter (static property access)
Design

Line: 146

                      if ("seconds".equalsIgnoreCase(timeUnit)) {
            return TimeUnit.SECONDS.toNanos(time);
        } else if ("milliseconds".equalsIgnoreCase(timeUnit)) {
            return TimeUnit.MILLISECONDS.toNanos(time);
        }
        return TimeUnit.MINUTES.toNanos(time);
    }

    /**

            

Reported by PMD.

Potential violation of Law of Demeter (static property access)
Design

Line: 148

                      } else if ("milliseconds".equalsIgnoreCase(timeUnit)) {
            return TimeUnit.MILLISECONDS.toNanos(time);
        }
        return TimeUnit.MINUTES.toNanos(time);
    }

    /**
     * Returns the clock drift tolerance in nanoseconds.
     * <p>Related system properties:

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 259

              
        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

    /**

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 292

              
        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);

        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }

        return periodicTask;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 524

                       */
        final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
            @NonNull
            final Runnable decoratedRun;
            @NonNull
            final SequentialDisposable sd;
            final long periodInNanoseconds;
            long count;
            long lastNowNanoseconds;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 526

                          @NonNull
            final Runnable decoratedRun;
            @NonNull
            final SequentialDisposable sd;
            final long periodInNanoseconds;
            long count;
            long lastNowNanoseconds;
            long startInNanoseconds;


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 527

                          final Runnable decoratedRun;
            @NonNull
            final SequentialDisposable sd;
            final long periodInNanoseconds;
            long count;
            long lastNowNanoseconds;
            long startInNanoseconds;

            PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 528

                          @NonNull
            final SequentialDisposable sd;
            final long periodInNanoseconds;
            long count;
            long lastNowNanoseconds;
            long startInNanoseconds;

            PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
                    long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {

            

Reported by PMD.

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOn.java
26 issues
Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 28

              import io.reactivex.rxjava3.plugins.RxJavaPlugins;

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 29

              
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 30

              public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;

            

Reported by PMD.

This class has too many methods, consider refactoring it.
Design

Line: 50

                  }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 53

                  implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 54

              
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 55

                      private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable upstream;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 56

                      final Observer<? super T> downstream;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable upstream;


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 58

                      final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable upstream;

        Throwable error;
        volatile boolean done;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 60

              
        SimpleQueue<T> queue;

        Disposable upstream;

        Throwable error;
        volatile boolean done;

        volatile boolean disposed;

            

Reported by PMD.