The following issues were found

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSerializeTest.java
101 issues
Avoid throwing raw exception types.
Design

Line: 268

                                      }
                        subscriber.onComplete();
                    } catch (Throwable e) {
                        throw new RuntimeException(e);
                    }
                }

            });
            System.out.println("starting TestSingleThreadedObservable thread");

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 282

                          try {
                t.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

    }


            

Reported by PMD.

Avoid throwing null pointer exceptions.
Design

Line: 325

                                                      if (s == null) {
                                            System.out.println("TestMultiThreadedObservable onNext: null");
                                            // force an error
                                            throw npe;
                                        } else {
                                            try {
                                                Thread.sleep(10);
                                            } catch (InterruptedException ex) {
                                                // ignored

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 352

                                      // we are done spawning threads
                        threadPool.shutdown();
                    } catch (Throwable e) {
                        throw new RuntimeException(e);
                    }

                    // wait until all threads are done, then mark it as COMPLETED
                    try {
                        // wait for all the threads to finish

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 360

                                      // wait for all the threads to finish
                        threadPool.awaitTermination(2, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    subscriber.onComplete();
                }
            });
            System.out.println("starting TestMultiThreadedObservable thread");

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 374

                          try {
                t.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class BusyObserver extends DefaultSubscriber<String> {

            

Reported by PMD.

System.out.println is used
Design

Line: 106

                      w.serialize().subscribe(busyobserver);
        onSubscribe.waitToFinish();

        System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());

        // we can't know how many onNext calls will occur since they each run on a separate thread
        // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
        // assertEquals(3, busyobserver.onNextCount.get());
        assertTrue(busyobserver.onNextCount.get() < 4);

            

Reported by PMD.

System.out.println is used
Design

Line: 152

                          w.serialize().subscribe(busyobserver);
            onSubscribe.waitToFinish();

            System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
            // this should not always be the full number of items since the error should (very often)
            // stop it before it completes all 9
            System.out.println("onNext count: " + busyobserver.onNextCount.get());
            if (busyobserver.onNextCount.get() < 9) {
                lessThan9 = true;

            

Reported by PMD.

System.out.println is used
Design

Line: 155

                          System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
            // this should not always be the full number of items since the error should (very often)
            // stop it before it completes all 9
            System.out.println("onNext count: " + busyobserver.onNextCount.get());
            if (busyobserver.onNextCount.get() < 9) {
                lessThan9 = true;
            }
            assertTrue(busyobserver.onError);
            // no onComplete because onError was invoked

            

Reported by PMD.

System.err.println is used
Design

Line: 218

                                  try {
                        f.get();
                    } catch (Throwable e) {
                        System.err.println("Error while waiting on future in CompletionThread");
                    }
                }
            }

            /* send the event */

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableElementAtTest.java
100 issues
This class has too many methods, consider refactoring it.
Design

Line: 32

              import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class ObservableElementAtTest extends RxJavaTest {

    @Test
    public void elementAtObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
                .intValue());

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 36

              
    @Test
    public void elementAtObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
                .intValue());
    }

    @Test
    public void elementAtWithIndexOutOfBoundsObservable() {

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 36

              
    @Test
    public void elementAtObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
                .intValue());
    }

    @Test
    public void elementAtWithIndexOutOfBoundsObservable() {

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 36

              
    @Test
    public void elementAtObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
                .intValue());
    }

    @Test
    public void elementAtWithIndexOutOfBoundsObservable() {

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 36

              
    @Test
    public void elementAtObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
                .intValue());
    }

    @Test
    public void elementAtWithIndexOutOfBoundsObservable() {

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 36

              
    @Test
    public void elementAtObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
                .intValue());
    }

    @Test
    public void elementAtWithIndexOutOfBoundsObservable() {

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 42

              
    @Test
    public void elementAtWithIndexOutOfBoundsObservable() {
        assertEquals(-99, Observable.fromArray(1, 2).elementAt(2).toObservable().blockingSingle(-99).intValue());
    }

    @Test
    public void elementAtOrDefaultObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1, 0).toObservable().blockingSingle().intValue());

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 42

              
    @Test
    public void elementAtWithIndexOutOfBoundsObservable() {
        assertEquals(-99, Observable.fromArray(1, 2).elementAt(2).toObservable().blockingSingle(-99).intValue());
    }

    @Test
    public void elementAtOrDefaultObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1, 0).toObservable().blockingSingle().intValue());

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 42

              
    @Test
    public void elementAtWithIndexOutOfBoundsObservable() {
        assertEquals(-99, Observable.fromArray(1, 2).elementAt(2).toObservable().blockingSingle(-99).intValue());
    }

    @Test
    public void elementAtOrDefaultObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1, 0).toObservable().blockingSingle().intValue());

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 42

              
    @Test
    public void elementAtWithIndexOutOfBoundsObservable() {
        assertEquals(-99, Observable.fromArray(1, 2).elementAt(2).toObservable().blockingSingle(-99).intValue());
    }

    @Test
    public void elementAtOrDefaultObservable() {
        assertEquals(2, Observable.fromArray(1, 2).elementAt(1, 0).toObservable().blockingSingle().intValue());

            

Reported by PMD.

src/main/java/io/reactivex/rxjava3/core/Observable.java
99 issues
Avoid throwing null pointer exceptions.
Design

Line: 13187

              
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

    /**
     * Operator implementations (both source and intermediate) should implement this method that

            

Reported by PMD.

Avoid reassigning parameters such as 'observer'
Design

Line: 13169

              
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            

Reported by PMD.

This class has a bunch of public methods and attributes
Design

Line: 14

               * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.core;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;


            

Reported by PMD.

The class 'Observable' has a NCSS line count of 1752 (Highest = 14).
Design

Line: 102

               * @see Flowable
 * @see io.reactivex.rxjava3.observers.DisposableObserver
 */
public abstract class Observable<@NonNull T> implements ObservableSource<T> {

    /**
     * Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
     * a termination notification.
     * <p>

            

Reported by PMD.

The class 'Observable' has a total cyclomatic complexity of 591 (highest 8).
Design

Line: 102

               * @see Flowable
 * @see io.reactivex.rxjava3.observers.DisposableObserver
 */
public abstract class Observable<@NonNull T> implements ObservableSource<T> {

    /**
     * Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
     * a termination notification.
     * <p>

            

Reported by PMD.

The type has an NCSS line count of 1752
Design

Line: 102

               * @see Flowable
 * @see io.reactivex.rxjava3.observers.DisposableObserver
 */
public abstract class Observable<@NonNull T> implements ObservableSource<T> {

    /**
     * Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
     * a termination notification.
     * <p>

            

Reported by PMD.

This class has too many methods, consider refactoring it.
Design

Line: 102

               * @see Flowable
 * @see io.reactivex.rxjava3.observers.DisposableObserver
 */
public abstract class Observable<@NonNull T> implements ObservableSource<T> {

    /**
     * Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
     * a termination notification.
     * <p>

            

Reported by PMD.

Avoid really long classes.
Design

Line: 102

               * @see Flowable
 * @see io.reactivex.rxjava3.observers.DisposableObserver
 */
public abstract class Observable<@NonNull T> implements ObservableSource<T> {

    /**
     * Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
     * a termination notification.
     * <p>

            

Reported by PMD.

Possible God Class (WMC=591, ATFD=79, TCC=0.000%)
Design

Line: 102

               * @see Flowable
 * @see io.reactivex.rxjava3.observers.DisposableObserver
 */
public abstract class Observable<@NonNull T> implements ObservableSource<T> {

    /**
     * Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
     * a termination notification.
     * <p>

            

Reported by PMD.

The String literal 'sources is null' appears 21 times in this file; the first occurrence is on line 134
Error

Line: 134

                  @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <@NonNull T> Observable<T> amb(@NonNull Iterable<@NonNull ? extends ObservableSource<? extends T>> sources) {
        Objects.requireNonNull(sources, "sources is null");
        return RxJavaPlugins.onAssembly(new ObservableAmb<>(null, sources));
    }

    /**
     * Mirrors the one {@link ObservableSource} in an array of several {@code ObservableSource}s that first either emits an item or sends

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeTest.java
98 issues
Avoid throwing raw exception types.
Design

Line: 232

                                      }
                        observer.onComplete();
                    } catch (Throwable e) {
                        throw new RuntimeException(e);
                    }
                }

            });
            System.out.println("starting TestObservable thread");

            

Reported by PMD.

System.out.println is used
Design

Line: 155

                          fail(e.getMessage());
        }

        System.out.println("TestObservable thread finished");
        verify(observer).onSubscribe((Disposable)notNull());
        verify(observer, times(1)).onNext("one");
        verify(observer, never()).onNext("two");
        verify(observer, never()).onNext("three");
        verify(observer, times(1)).onComplete();

            

Reported by PMD.

System.out.println is used
Design

Line: 188

                              Disposable bs = Disposable.empty();
                observer.onSubscribe(bs);
                for (int i = 0; !bs.isDisposed(); i++) {
                    System.out.println("Emit: " + i);
                    count.incrementAndGet();
                    observer.onNext(i);
                }
            }


            

Reported by PMD.

System.out.println is used
Design

Line: 198

              
            @Override
            public void accept(Integer t1) {
                System.out.println("Receive: " + t1);

            }

        });


            

Reported by PMD.

System.out.println is used
Design

Line: 219

                      @Override
        public void subscribe(final Observer<? super String> observer) {
            observer.onSubscribe(Disposable.empty());
            System.out.println("TestObservable subscribed to ...");
            t = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {

            

Reported by PMD.

System.out.println is used
Design

Line: 225

                              @Override
                public void run() {
                    try {
                        System.out.println("running TestObservable thread");
                        for (String s : values) {
                            System.out.println("TestObservable onNext: " + s);
                            observer.onNext(s);
                        }
                        observer.onComplete();

            

Reported by PMD.

System.out.println is used
Design

Line: 227

                                  try {
                        System.out.println("running TestObservable thread");
                        for (String s : values) {
                            System.out.println("TestObservable onNext: " + s);
                            observer.onNext(s);
                        }
                        observer.onComplete();
                    } catch (Throwable e) {
                        throw new RuntimeException(e);

            

Reported by PMD.

System.out.println is used
Design

Line: 237

                              }

            });
            System.out.println("starting TestObservable thread");
            t.start();
            System.out.println("done starting TestObservable thread");
        }
    }


            

Reported by PMD.

System.out.println is used
Design

Line: 239

                          });
            System.out.println("starting TestObservable thread");
            t.start();
            System.out.println("done starting TestObservable thread");
        }
    }

    private static Observable<Long> INFINITE_OBSERVABLE = Observable.unsafeCreate(new ObservableSource<Long>() {


            

Reported by PMD.

This class has too many methods, consider refactoring it.
Design

Line: 39

              import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;

public class ObservableTakeTest extends RxJavaTest {

    @Test
    public void take1() {
        Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
        Observable<String> take = w.take(2);

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/schedulers/TestSchedulerTest.java
98 issues
System.out.println is used
Design

Line: 51

                          inner.schedulePeriodically(new Runnable() {
                @Override
                public void run() {
                    System.out.println(scheduler.now(TimeUnit.MILLISECONDS));
                    try {
                        calledOp.apply(scheduler.now(TimeUnit.MILLISECONDS));
                    } catch (Throwable ex) {
                        ExceptionHelper.wrapOrThrow(ex);
                    }

            

Reported by PMD.

System.out.println is used
Design

Line: 101

                          final Disposable subscription = inner.schedulePeriodically(new Runnable() {
                @Override
                public void run() {
                    System.out.println(scheduler.now(TimeUnit.MILLISECONDS));
                    try {
                        calledOp.apply(scheduler.now(TimeUnit.MILLISECONDS));
                    } catch (Throwable ex) {
                        ExceptionHelper.wrapOrThrow(ex);
                    }

            

Reported by PMD.

System.out.println is used
Design

Line: 150

                              @Override
                public void run() {
                    counter.incrementAndGet();
                    System.out.println("counter: " + counter.get());
                    inner.schedule(this);
                }

            });
            inner.dispose();

            

Reported by PMD.

System.out.println is used
Design

Line: 174

                              @Override
                public void run() {
                    counter.incrementAndGet();
                    System.out.println("counter: " + counter.get());
                    inner.schedule(this);
                }

            });
            subscription.dispose();

            

Reported by PMD.

This class has too many methods, consider refactoring it.
Design

Line: 36

              import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.TestScheduler.*;

public class TestSchedulerTest extends RxJavaTest {

    @SuppressWarnings("unchecked")
    // mocking is unchecked, unfortunately
    @Test
    public final void periodicScheduling() throws Throwable {

            

Reported by PMD.

A catch statement should never catch throwable since it includes errors.
Error

Line: 54

                                  System.out.println(scheduler.now(TimeUnit.MILLISECONDS));
                    try {
                        calledOp.apply(scheduler.now(TimeUnit.MILLISECONDS));
                    } catch (Throwable ex) {
                        ExceptionHelper.wrapOrThrow(ex);
                    }
                }
            }, 1, 2, TimeUnit.SECONDS);


            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 60

                              }
            }, 1, 2, TimeUnit.SECONDS);

            verify(calledOp, never()).apply(anyLong());

            InOrder inOrder = Mockito.inOrder(calledOp);

            scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
            inOrder.verify(calledOp, never()).apply(anyLong());

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 65

                          InOrder inOrder = Mockito.inOrder(calledOp);

            scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
            inOrder.verify(calledOp, never()).apply(anyLong());

            scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
            inOrder.verify(calledOp, times(1)).apply(1000L);

            scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 65

                          InOrder inOrder = Mockito.inOrder(calledOp);

            scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
            inOrder.verify(calledOp, never()).apply(anyLong());

            scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
            inOrder.verify(calledOp, times(1)).apply(1000L);

            scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 68

                          inOrder.verify(calledOp, never()).apply(anyLong());

            scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
            inOrder.verify(calledOp, times(1)).apply(1000L);

            scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
            inOrder.verify(calledOp, never()).apply(3000L);

            scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSerializeTest.java
97 issues
Avoid throwing raw exception types.
Design

Line: 268

                                      }
                        observer.onComplete();
                    } catch (Throwable e) {
                        throw new RuntimeException(e);
                    }
                }

            });
            System.out.println("starting TestSingleThreadedObservable thread");

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 282

                          try {
                t.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

    }


            

Reported by PMD.

Avoid throwing null pointer exceptions.
Design

Line: 325

                                                      if (s == null) {
                                            System.out.println("TestMultiThreadedObservable onNext: null");
                                            // force an error
                                            throw npe;
                                        } else {
                                            System.out.println("TestMultiThreadedObservable onNext: " + s);
                                        }
                                        observer.onNext(s);
                                        // capture 'maxThreads'

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 347

                                      // we are done spawning threads
                        threadPool.shutdown();
                    } catch (Throwable e) {
                        throw new RuntimeException(e);
                    }

                    // wait until all threads are done, then mark it as COMPLETED
                    try {
                        // wait for all the threads to finish

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 355

                                      // wait for all the threads to finish
                        threadPool.awaitTermination(2, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    observer.onComplete();
                }
            });
            System.out.println("starting TestMultiThreadedObservable thread");

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 369

                          try {
                t.join();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class BusyObserver extends DefaultObserver<String> {

            

Reported by PMD.

System.out.println is used
Design

Line: 105

                      w.serialize().subscribe(busyobserver);
        onSubscribe.waitToFinish();

        System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());

        // we can't know how many onNext calls will occur since they each run on a separate thread
        // that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
        // assertEquals(3, busyobserver.onNextCount.get());
        assertTrue(busyobserver.onNextCount.get() < 4);

            

Reported by PMD.

System.out.println is used
Design

Line: 151

                          w.serialize().subscribe(busyobserver);
            onSubscribe.waitToFinish();

            System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
            // this should not always be the full number of items since the error should (very often)
            // stop it before it completes all 9
            System.out.println("onNext count: " + busyobserver.onNextCount.get());
            if (busyobserver.onNextCount.get() < 9) {
                lessThan9 = true;

            

Reported by PMD.

System.out.println is used
Design

Line: 154

                          System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
            // this should not always be the full number of items since the error should (very often)
            // stop it before it completes all 9
            System.out.println("onNext count: " + busyobserver.onNextCount.get());
            if (busyobserver.onNextCount.get() < 9) {
                lessThan9 = true;
            }
            assertTrue(busyobserver.onError);
            // no onComplete because onError was invoked

            

Reported by PMD.

System.err.println is used
Design

Line: 218

                                  try {
                        f.get();
                    } catch (Throwable e) {
                        System.err.println("Error while waiting on future in CompletionThread");
                    }
                }
            }

            /* send the event */

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapSingleTest.java
96 issues
This class has too many methods, consider refactoring it.
Design

Line: 34

              import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.*;

public class ObservableFlatMapSingleTest extends RxJavaTest {

    @Test
    public void normal() {
        Observable.range(1, 10)
        .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {

            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 37

              public class ObservableFlatMapSingleTest extends RxJavaTest {

    @Test
    public void normal() {
        Observable.range(1, 10)
        .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
            @Override
            public SingleSource<Integer> apply(Integer v) throws Exception {
                return Single.just(v);

            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 50

                  }

    @Test
    public void normalDelayError() {
        Observable.range(1, 10)
        .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
            @Override
            public SingleSource<Integer> apply(Integer v) throws Exception {
                return Single.just(v);

            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 63

                  }

    @Test
    public void normalAsync() {
        TestObserverEx<Integer> to = Observable.range(1, 10)
        .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
            @Override
            public SingleSource<Integer> apply(Integer v) throws Exception {
                return Single.just(v).subscribeOn(Schedulers.computation());

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 68

                      .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
            @Override
            public SingleSource<Integer> apply(Integer v) throws Exception {
                return Single.just(v).subscribeOn(Schedulers.computation());
            }
        })
        .to(TestHelper.<Integer>testConsumer())
        .awaitDone(5, TimeUnit.SECONDS)
        .assertSubscribed()

            

Reported by PMD.

Unit tests should not contain more than 1 assert(s).
Design

Line: 81

                  }

    @Test
    public void mapperThrowsObservable() {
        PublishSubject<Integer> ps = PublishSubject.create();

        TestObserver<Integer> to = ps
        .flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
            @Override

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 93

                      })
        .test();

        assertTrue(ps.hasObservers());

        ps.onNext(1);

        to.assertFailure(TestException.class);


            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 93

                      })
        .test();

        assertTrue(ps.hasObservers());

        ps.onNext(1);

        to.assertFailure(TestException.class);


            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 95

              
        assertTrue(ps.hasObservers());

        ps.onNext(1);

        to.assertFailure(TestException.class);

        assertFalse(ps.hasObservers());
    }

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 99

              
        to.assertFailure(TestException.class);

        assertFalse(ps.hasObservers());
    }

    @Test
    public void mapperReturnsNullObservable() {
        PublishSubject<Integer> ps = PublishSubject.create();

            

Reported by PMD.

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java
95 issues
Avoid reassigning parameters such as 'window'
Design

Line: 453

                          }
        }

        UnicastSubject<T> createNewWindow(UnicastSubject<T> window) {
            if (window != null) {
                window.onComplete();
                window = null;
            }


            

Reported by PMD.

Avoid reassigning parameters such as 'window'
Design

Line: 453

                          }
        }

        UnicastSubject<T> createNewWindow(UnicastSubject<T> window) {
            if (window != null) {
                window.onComplete();
                window = null;
            }


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 31

              import io.reactivex.rxjava3.subjects.UnicastSubject;

public final class ObservableWindowTimed<T> extends AbstractObservableWithUpstream<T, Observable<T>> {
    final long timespan;
    final long timeskip;
    final TimeUnit unit;
    final Scheduler scheduler;
    final long maxSize;
    final int bufferSize;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 32

              
public final class ObservableWindowTimed<T> extends AbstractObservableWithUpstream<T, Observable<T>> {
    final long timespan;
    final long timeskip;
    final TimeUnit unit;
    final Scheduler scheduler;
    final long maxSize;
    final int bufferSize;
    final boolean restartTimerOnMaxSize;

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 33

              public final class ObservableWindowTimed<T> extends AbstractObservableWithUpstream<T, Observable<T>> {
    final long timespan;
    final long timeskip;
    final TimeUnit unit;
    final Scheduler scheduler;
    final long maxSize;
    final int bufferSize;
    final boolean restartTimerOnMaxSize;


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 34

                  final long timespan;
    final long timeskip;
    final TimeUnit unit;
    final Scheduler scheduler;
    final long maxSize;
    final int bufferSize;
    final boolean restartTimerOnMaxSize;

    public ObservableWindowTimed(Observable<T> source,

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 35

                  final long timeskip;
    final TimeUnit unit;
    final Scheduler scheduler;
    final long maxSize;
    final int bufferSize;
    final boolean restartTimerOnMaxSize;

    public ObservableWindowTimed(Observable<T> source,
            long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 36

                  final TimeUnit unit;
    final Scheduler scheduler;
    final long maxSize;
    final int bufferSize;
    final boolean restartTimerOnMaxSize;

    public ObservableWindowTimed(Observable<T> source,
            long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,
            int bufferSize, boolean restartTimerOnMaxSize) {

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 37

                  final Scheduler scheduler;
    final long maxSize;
    final int bufferSize;
    final boolean restartTimerOnMaxSize;

    public ObservableWindowTimed(Observable<T> source,
            long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,
            int bufferSize, boolean restartTimerOnMaxSize) {
        super(source);

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 76

                  implements Observer<T>, Disposable {
        private static final long serialVersionUID = 5724293814035355511L;

        final Observer<? super Observable<T>> downstream;

        final SimplePlainQueue<Object> queue;

        final long timespan;
        final TimeUnit unit;

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferTest.java
95 issues
Avoid throwing raw exception types.
Design

Line: 175

              
        @Override
        public void run() {
            throw new RuntimeException();
        }
    };

    @Test
    public void nonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() {

            

Reported by PMD.

This class has too many methods, consider refactoring it.
Design

Line: 38

              import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;

public class FlowableOnBackpressureBufferTest extends RxJavaTest {

    @Test
    public void noBackpressureSupport() {
        TestSubscriber<Long> ts = new TestSubscriber<>(0L);
        // this will be ignored

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 46

                      // this will be ignored
        ts.request(100);
        // we take 500 so it unsubscribes
        infinite.take(500).subscribe(ts);
        // it completely ignores the `request(100)` and we get 500
        assertEquals(500, ts.values().size());
        ts.assertNoErrors();
    }


            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 48

                      // we take 500 so it unsubscribes
        infinite.take(500).subscribe(ts);
        // it completely ignores the `request(100)` and we get 500
        assertEquals(500, ts.values().size());
        ts.assertNoErrors();
    }

    @Test
    public void fixBackpressureWithBuffer() throws InterruptedException {

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 48

                      // we take 500 so it unsubscribes
        infinite.take(500).subscribe(ts);
        // it completely ignores the `request(100)` and we get 500
        assertEquals(500, ts.values().size());
        ts.assertNoErrors();
    }

    @Test
    public void fixBackpressureWithBuffer() throws InterruptedException {

            

Reported by PMD.

Unit tests should not contain more than 1 assert(s).
Design

Line: 53

                  }

    @Test
    public void fixBackpressureWithBuffer() throws InterruptedException {
        final CountDownLatch l1 = new CountDownLatch(100);
        final CountDownLatch l2 = new CountDownLatch(150);
        TestSubscriber<Long> ts = new TestSubscriber<>(new DefaultSubscriber<Long>() {

            @Override

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 80

                      // this will be ignored
        ts.request(100);
        // we take 500 so it unsubscribes
        infinite.subscribeOn(Schedulers.computation())
        .onBackpressureBuffer()
        .take(500)
        .subscribe(ts);

        // it completely ignores the `request(100)` and we get 500

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 80

                      // this will be ignored
        ts.request(100);
        // we take 500 so it unsubscribes
        infinite.subscribeOn(Schedulers.computation())
        .onBackpressureBuffer()
        .take(500)
        .subscribe(ts);

        // it completely ignores the `request(100)` and we get 500

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 80

                      // this will be ignored
        ts.request(100);
        // we take 500 so it unsubscribes
        infinite.subscribeOn(Schedulers.computation())
        .onBackpressureBuffer()
        .take(500)
        .subscribe(ts);

        // it completely ignores the `request(100)` and we get 500

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 87

              
        // it completely ignores the `request(100)` and we get 500
        l1.await();
        assertEquals(100, ts.values().size());
        ts.request(50);
        l2.await();
        assertEquals(150, ts.values().size());
        ts.request(350);
        ts.awaitDone(5, TimeUnit.SECONDS);

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromIterableTest.java
95 issues
This class has too many methods, consider refactoring it.
Design

Line: 37

              import io.reactivex.rxjava3.observers.*;
import io.reactivex.rxjava3.testsupport.*;

public class ObservableFromIterableTest extends RxJavaTest {

    @Test
    public void listIterable() {
        Observable<String> o = Observable.fromIterable(Arrays.<String> asList("one", "two", "three"));


            

Reported by PMD.

The String literal 'two' appears 4 times in this file; the first occurrence is on line 41
Error

Line: 41

              
    @Test
    public void listIterable() {
        Observable<String> o = Observable.fromIterable(Arrays.<String> asList("one", "two", "three"));

        Observer<String> observer = TestHelper.mockObserver();

        o.subscribe(observer);


            

Reported by PMD.

The String literal 'three' appears 4 times in this file; the first occurrence is on line 41
Error

Line: 41

              
    @Test
    public void listIterable() {
        Observable<String> o = Observable.fromIterable(Arrays.<String> asList("one", "two", "three"));

        Observer<String> observer = TestHelper.mockObserver();

        o.subscribe(observer);


            

Reported by PMD.

The String literal 'one' appears 4 times in this file; the first occurrence is on line 41
Error

Line: 41

              
    @Test
    public void listIterable() {
        Observable<String> o = Observable.fromIterable(Arrays.<String> asList("one", "two", "three"));

        Observer<String> observer = TestHelper.mockObserver();

        o.subscribe(observer);


            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 45

              
        Observer<String> observer = TestHelper.mockObserver();

        o.subscribe(observer);

        verify(observer, times(1)).onNext("one");
        verify(observer, times(1)).onNext("two");
        verify(observer, times(1)).onNext("three");
        verify(observer, Mockito.never()).onError(any(Throwable.class));

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 47

              
        o.subscribe(observer);

        verify(observer, times(1)).onNext("one");
        verify(observer, times(1)).onNext("two");
        verify(observer, times(1)).onNext("three");
        verify(observer, Mockito.never()).onError(any(Throwable.class));
        verify(observer, times(1)).onComplete();
    }

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 48

                      o.subscribe(observer);

        verify(observer, times(1)).onNext("one");
        verify(observer, times(1)).onNext("two");
        verify(observer, times(1)).onNext("three");
        verify(observer, Mockito.never()).onError(any(Throwable.class));
        verify(observer, times(1)).onComplete();
    }


            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 49

              
        verify(observer, times(1)).onNext("one");
        verify(observer, times(1)).onNext("two");
        verify(observer, times(1)).onNext("three");
        verify(observer, Mockito.never()).onError(any(Throwable.class));
        verify(observer, times(1)).onComplete();
    }

    /**

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 50

                      verify(observer, times(1)).onNext("one");
        verify(observer, times(1)).onNext("two");
        verify(observer, times(1)).onNext("three");
        verify(observer, Mockito.never()).onError(any(Throwable.class));
        verify(observer, times(1)).onComplete();
    }

    /**
     * This tests the path that can not optimize based on size so must use setProducer.

            

Reported by PMD.

Potential violation of Law of Demeter (method chain calls)
Design

Line: 51

                      verify(observer, times(1)).onNext("two");
        verify(observer, times(1)).onNext("three");
        verify(observer, Mockito.never()).onError(any(Throwable.class));
        verify(observer, times(1)).onComplete();
    }

    /**
     * This tests the path that can not optimize based on size so must use setProducer.
     */

            

Reported by PMD.