The following issues were found

src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java
499 issues
Avoid throwing raw exception types.
Design

Line: 488

                              throw new AssertionError("The wait timed out!");
            }
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
        if (errors[0] != null && errors[1] == null) {
            throw ExceptionHelper.wrapOrThrow(errors[0]);
        }


            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 2593

                          assertNotNull(state[2]);
            assertTrue("Did not empty", state[3]);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    /**
     * Checks if the source is fuseable and its isEmpty/clear works properly.

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 2659

                          assertNotNull(state[2]);
            assertTrue("Did not empty", state[3]);
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }

    /**
     * Returns an expanded error list of the given test consumer.

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 2840

                      } catch (AssertionError ex) {
            throw ex;
        } catch (Throwable ex) {
            throw new RuntimeException(ex);
        } finally {
            RxJavaPlugins.reset();
        }
    }


            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 2999

                      } catch (AssertionError ex) {
            throw ex;
        } catch (Throwable ex) {
            throw new RuntimeException(ex);
        } finally {
            RxJavaPlugins.reset();
        }
    }


            

Reported by PMD.

Avoid reassigning parameters such as 'ex'
Design

Line: 509

                   * @param ex the target Throwable
     * @return the list of Throwables
     */
    public static List<Throwable> compositeList(Throwable ex) {
        if (ex instanceof UndeliverableException) {
            ex = ex.getCause();
        }
        return ((CompositeException)ex).getExceptions();
    }

            

Reported by PMD.

Avoid reassigning parameters such as 'parentPackage'
Design

Line: 3510

                   * @return the File pointing to the source
     * @throws Exception on error
     */
    public static File findSource(String baseClassName, String parentPackage) throws Exception {
        URL u = TestHelper.class.getResource(TestHelper.class.getSimpleName() + ".class");

        String path = new File(u.toURI()).toString().replace('\\', '/');

        parentPackage = parentPackage.replace(".", "/");

            

Reported by PMD.

System.err.println is used
Design

Line: 3535

                      }

        if (p == null) {
            System.err.println("Unable to locate the RxJava sources");
            return null;
        }

        File f = new File(p);


            

Reported by PMD.

System.out.println is used
Design

Line: 3545

                          return f;
        }

        System.out.println("Can't read " + p);
        return null;
    }

    /**
     * Cancels a flow before notifying a transformation and checks if an undeliverable exception

            

Reported by PMD.

Do not explicitly trigger a garbage collection.
Error

Line: 3698

                  public static long awaitGC(long oneSleep, int maxLoop, long expectedMemoryUsage) throws InterruptedException {
        MemoryMXBean bean = ManagementFactory.getMemoryMXBean();

        System.gc();

        int i = maxLoop;
        while (i-- != 0) {
            long usage = bean.getHeapMemoryUsage().getUsed();
            if (usage <= expectedMemoryUsage) {

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayTest.java
498 issues
System.out.println is used
Design

Line: 472

                          @Override
            public void accept(Integer v) {
                effectCounter.incrementAndGet();
                System.out.println("Sideeffect #" + v);
            }
        });

        Observable<Integer> result = source.replay(
        new Function<Observable<Integer>, Observable<Integer>>() {

            

Reported by PMD.

System.out.printf is used
Design

Line: 486

              
        for (int i = 1; i < 3; i++) {
            effectCounter.set(0);
            System.out.printf("- %d -%n", i);
            result.subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t1) {
                    System.out.println(t1);

            

Reported by PMD.

System.out.println is used
Design

Line: 491

              
                @Override
                public void accept(Integer t1) {
                    System.out.println(t1);
                }

            }, new Consumer<Throwable>() {

                @Override

            

Reported by PMD.

System.out.println is used
Design

Line: 504

                          new Action() {
                @Override
                public void run() {
                    System.out.println("Done");
                }
            });
            assertEquals(2, effectCounter.get());
        }
    }

            

Reported by PMD.

System.out.println is used
Design

Line: 901

                                  @Override
                    public void run() {
                        counter.incrementAndGet();
                        System.out.println("published Observable being executed");
                        observer.onNext("one");
                        observer.onComplete();
                    }
                }).start();
            }

            

Reported by PMD.

System.out.println is used
Design

Line: 918

                          @Override
            public void accept(String v) {
                assertEquals("one", v);
                System.out.println("v: " + v);
                latch.countDown();
            }
        });

        // subscribe again

            

Reported by PMD.

System.out.println is used
Design

Line: 929

                          @Override
            public void accept(String v) {
                assertEquals("one", v);
                System.out.println("v: " + v);
                latch.countDown();
            }
        });

        if (!latch.await(1000, TimeUnit.MILLISECONDS)) {

            

Reported by PMD.

System.out.println is used
Design

Line: 1664

                      .takeLast(1)
        ;

        System.out.println("Bounded Replay Leak check: Wait before GC");
        Thread.sleep(1000);

        System.out.println("Bounded Replay Leak check: GC");
        System.gc();


            

Reported by PMD.

System.out.println is used
Design

Line: 1667

                      System.out.println("Bounded Replay Leak check: Wait before GC");
        Thread.sleep(1000);

        System.out.println("Bounded Replay Leak check: GC");
        System.gc();

        Thread.sleep(500);

        final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();

            

Reported by PMD.

Do not explicitly trigger a garbage collection.
Error

Line: 1668

                      Thread.sleep(1000);

        System.out.println("Bounded Replay Leak check: GC");
        System.gc();

        Thread.sleep(500);

        final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
        MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java
491 issues
Avoid throwing raw exception types.
Design

Line: 124

                          o1.t.join();
            o2.t.join();
        } catch (Throwable e) {
            throw new RuntimeException("failed waiting on threads");
        }

        InOrder inOrder = inOrder(subscriber);
        inOrder.verify(subscriber, times(1)).onNext("one");
        inOrder.verify(subscriber, times(1)).onNext("two");

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 232

                          System.out.println("Thread2 is starting ... waiting for it to complete ...");
            o2.waitForThreadDone();
        } catch (Throwable e) {
            throw new RuntimeException("failed waiting on threads", e);
        }

        InOrder inOrder = inOrder(subscriber);
        inOrder.verify(subscriber, times(1)).onNext("one");
        inOrder.verify(subscriber, times(1)).onNext("two");

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 257

                          // wait for 3rd to complete
            o3.waitForThreadDone();
        } catch (Throwable e) {
            throw new RuntimeException("failed waiting on threads", e);
        }

        try {
            // wait for the parent to complete
            if (!parentHasFinished.await(5, TimeUnit.SECONDS)) {

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 266

                              fail("Parent didn't finish within the time limit");
            }
        } catch (Throwable e) {
            throw new RuntimeException("failed waiting on threads", e);
        }

        inOrder.verify(subscriber, times(1)).onNext("seven");
        inOrder.verify(subscriber, times(1)).onNext("eight");
        inOrder.verify(subscriber, times(1)).onNext("nine");

            

Reported by PMD.

System.out.println is used
Design

Line: 140

                  public void nestedAsyncConcatLoop() throws Throwable {
        for (int i = 0; i < 500; i++) {
            if (i % 10 == 0) {
                System.out.println("testNestedAsyncConcat >> " + i);
            }
            nestedAsyncConcat();
        }
    }


            

Reported by PMD.

System.out.println is used
Design

Line: 186

                                      try {
                            // emit first
                            if (!d.isDisposed()) {
                                System.out.println("Emit o1");
                                subscriber.onNext(Flowable.unsafeCreate(o1));
                            }
                            // emit second
                            if (!d.isDisposed()) {
                                System.out.println("Emit o2");

            

Reported by PMD.

System.out.println is used
Design

Line: 191

                                          }
                            // emit second
                            if (!d.isDisposed()) {
                                System.out.println("Emit o2");
                                subscriber.onNext(Flowable.unsafeCreate(o2));
                            }

                            // wait until sometime later and emit third
                            try {

            

Reported by PMD.

System.out.println is used
Design

Line: 202

                                              subscriber.onError(e);
                            }
                            if (!d.isDisposed()) {
                                System.out.println("Emit o3");
                                subscriber.onNext(Flowable.unsafeCreate(o3));
                            }

                        } catch (Throwable e) {
                            subscriber.onError(e);

            

Reported by PMD.

System.out.println is used
Design

Line: 209

                                      } catch (Throwable e) {
                            subscriber.onError(e);
                        } finally {
                            System.out.println("Done parent Flowable");
                            subscriber.onComplete();
                            parentHasFinished.countDown();
                        }
                    }
                }));

            

Reported by PMD.

System.out.println is used
Design

Line: 227

              
        try {
            // wait for first 2 async observables to complete
            System.out.println("Thread1 is starting ... waiting for it to complete ...");
            o1.waitForThreadDone();
            System.out.println("Thread2 is starting ... waiting for it to complete ...");
            o2.waitForThreadDone();
        } catch (Throwable e) {
            throw new RuntimeException("failed waiting on threads", e);

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java
482 issues
Avoid throwing raw exception types.
Design

Line: 848

              
        latch.await(1000, TimeUnit.MILLISECONDS);
        if (!infiniteFlowable.await(2000, TimeUnit.MILLISECONDS)) {
            throw new RuntimeException("didn't unsubscribe");
        }

        assertEquals(5, list.size());
        assertEquals("1-1", list.get(0));
        assertEquals("2-2", list.get(1));

            

Reported by PMD.

Avoid reassigning parameters such as 'a1'
Design

Line: 568

                      Function3<String, String, String, String> zipr = new Function3<String, String, String, String>() {

            @Override
            public String apply(String a1, String a2, String a3) {
                if (a1 == null) {
                    a1 = "";
                }
                if (a2 == null) {
                    a2 = "";

            

Reported by PMD.

Avoid reassigning parameters such as 'a2'
Design

Line: 568

                      Function3<String, String, String, String> zipr = new Function3<String, String, String, String>() {

            @Override
            public String apply(String a1, String a2, String a3) {
                if (a1 == null) {
                    a1 = "";
                }
                if (a2 == null) {
                    a2 = "";

            

Reported by PMD.

Avoid reassigning parameters such as 'a3'
Design

Line: 568

                      Function3<String, String, String, String> zipr = new Function3<String, String, String, String>() {

            @Override
            public String apply(String a1, String a2, String a3) {
                if (a1 == null) {
                    a1 = "";
                }
                if (a2 == null) {
                    a2 = "";

            

Reported by PMD.

System.out.println is used
Design

Line: 778

              
            @Override
            public void accept(String s) {
                System.out.println(s);
                list.add(s);
            }
        });

        assertEquals(5, list.size());

            

Reported by PMD.

System.out.println is used
Design

Line: 841

              
            @Override
            public void onNext(String s) {
                System.out.println(s);
                list.add(s);
            }
        });

        latch.await(1000, TimeUnit.MILLISECONDS);

            

Reported by PMD.

System.out.println is used
Design

Line: 894

              
            @Override
            public void accept(String s) {
                System.out.println(s);
                list.add(s);
            }
        });

        assertEquals(4, list.size());

            

Reported by PMD.

System.out.println is used
Design

Line: 923

              
            @Override
            public void accept(String s) {
                System.out.println(s);
                list.add(s);
            }
        });

        assertEquals(0, list.size());

            

Reported by PMD.

System.out.println is used
Design

Line: 1040

                      ts.awaitDone(5, TimeUnit.SECONDS);
        ts.assertNoErrors();
        assertEquals(Flowable.bufferSize() * 2, ts.values().size());
        System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get());
        assertTrue(generatedA.get() < (Flowable.bufferSize() * 3));
        assertTrue(generatedB.get() < (Flowable.bufferSize() * 3));
    }

    @Test

            

Reported by PMD.

System.out.println is used
Design

Line: 1065

                      ts.awaitDone(5, TimeUnit.SECONDS);
        ts.assertNoErrors();
        assertEquals(Flowable.bufferSize() * 2, ts.values().size());
        System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get());
        assertTrue(generatedA.get() < (Flowable.bufferSize() * 4));
        assertTrue(generatedB.get() < (Flowable.bufferSize() * 4));
    }

    @Test

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java
469 issues
System.out.println is used
Design

Line: 312

                      .doOnNext(new Consumer<List<Integer>>() {
            @Override
            public void accept(List<Integer> pv) {
                System.out.println(pv);
            }
        })
        .subscribe(to);

        InOrder inOrder = Mockito.inOrder(o);

            

Reported by PMD.

System.out.println is used
Design

Line: 564

                      .doOnNext(new Consumer<List<Long>>() {
            @Override
            public void accept(List<Long> pv) {
                System.out.println(pv);
            }
        })
        .subscribe(o);

        scheduler.advanceTimeBy(5, TimeUnit.SECONDS);

            

Reported by PMD.

This class has a bunch of public methods and attributes
Design

Line: 14

               * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.observable;

import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;


            

Reported by PMD.

Possible God Class (WMC=95, ATFD=186, TCC=1.363%)
Design

Line: 43

              import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class ObservableBufferTest extends RxJavaTest {

    private Observer<List<String>> observer;
    private TestScheduler scheduler;
    private Scheduler.Worker innerScheduler;


            

Reported by PMD.

The class 'ObservableBufferTest' has a total cyclomatic complexity of 95 (highest 3).
Design

Line: 43

              import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class ObservableBufferTest extends RxJavaTest {

    private Observer<List<String>> observer;
    private TestScheduler scheduler;
    private Scheduler.Worker innerScheduler;


            

Reported by PMD.

Avoid really long classes.
Design

Line: 43

              import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class ObservableBufferTest extends RxJavaTest {

    private Observer<List<String>> observer;
    private TestScheduler scheduler;
    private Scheduler.Worker innerScheduler;


            

Reported by PMD.

This class has too many methods, consider refactoring it.
Design

Line: 43

              import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class ObservableBufferTest extends RxJavaTest {

    private Observer<List<String>> observer;
    private TestScheduler scheduler;
    private Scheduler.Worker innerScheduler;


            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 45

              
public class ObservableBufferTest extends RxJavaTest {

    private Observer<List<String>> observer;
    private TestScheduler scheduler;
    private Scheduler.Worker innerScheduler;

    @Before
    public void before() {

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 46

              public class ObservableBufferTest extends RxJavaTest {

    private Observer<List<String>> observer;
    private TestScheduler scheduler;
    private Scheduler.Worker innerScheduler;

    @Before
    public void before() {
        observer = TestHelper.mockObserver();

            

Reported by PMD.

Found non-transient, non-static member. Please mark as transient or provide accessors.
Error

Line: 47

              
    private Observer<List<String>> observer;
    private TestScheduler scheduler;
    private Scheduler.Worker innerScheduler;

    @Before
    public void before() {
        observer = TestHelper.mockObserver();
        scheduler = new TestScheduler();

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/subjects/SerializedSubjectTest.java
466 issues
This class has a bunch of public methods and attributes
Design

Line: 14

               * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.subjects;

import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.TimeUnit;

            

Reported by PMD.

This class has too many methods, consider refactoring it.
Design

Line: 32

              import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.*;

public class SerializedSubjectTest extends RxJavaTest {

    @Test
    public void basic() {
        SerializedSubject<String> subject = new SerializedSubject<>(PublishSubject.<String>create());
        TestObserver<String> to = new TestObserver<>();

            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 35

              public class SerializedSubjectTest extends RxJavaTest {

    @Test
    public void basic() {
        SerializedSubject<String> subject = new SerializedSubject<>(PublishSubject.<String>create());
        TestObserver<String> to = new TestObserver<>();
        subject.subscribe(to);
        subject.onNext("hello");
        subject.onComplete();

            

Reported by PMD.

Unit tests should not contain more than 1 assert(s).
Design

Line: 46

                  }

    @Test
    public void asyncSubjectValueRelay() {
        AsyncSubject<Integer> async = AsyncSubject.create();
        async.onNext(1);
        async.onComplete();
        Subject<Integer> serial = async.toSerialized();


            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 48

                  @Test
    public void asyncSubjectValueRelay() {
        AsyncSubject<Integer> async = AsyncSubject.create();
        async.onNext(1);
        async.onComplete();
        Subject<Integer> serial = async.toSerialized();

        assertFalse(serial.hasObservers());
        assertTrue(serial.hasComplete());

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 49

                  public void asyncSubjectValueRelay() {
        AsyncSubject<Integer> async = AsyncSubject.create();
        async.onNext(1);
        async.onComplete();
        Subject<Integer> serial = async.toSerialized();

        assertFalse(serial.hasObservers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 50

                      AsyncSubject<Integer> async = AsyncSubject.create();
        async.onNext(1);
        async.onComplete();
        Subject<Integer> serial = async.toSerialized();

        assertFalse(serial.hasObservers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());
        assertNull(serial.getThrowable());

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 52

                      async.onComplete();
        Subject<Integer> serial = async.toSerialized();

        assertFalse(serial.hasObservers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());
        assertNull(serial.getThrowable());
        assertEquals((Integer)1, async.getValue());
        assertTrue(async.hasValue());

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 52

                      async.onComplete();
        Subject<Integer> serial = async.toSerialized();

        assertFalse(serial.hasObservers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());
        assertNull(serial.getThrowable());
        assertEquals((Integer)1, async.getValue());
        assertTrue(async.hasValue());

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 53

                      Subject<Integer> serial = async.toSerialized();

        assertFalse(serial.hasObservers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());
        assertNull(serial.getThrowable());
        assertEquals((Integer)1, async.getValue());
        assertTrue(async.hasValue());
    }

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/processors/SerializedProcessorTest.java
464 issues
This class has a bunch of public methods and attributes
Design

Line: 14

               * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.processors;

import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.TimeUnit;

            

Reported by PMD.

This class has too many methods, consider refactoring it.
Design

Line: 31

              import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;

public class SerializedProcessorTest extends RxJavaTest {

    @Test
    public void basic() {
        SerializedProcessor<String> processor = new SerializedProcessor<>(PublishProcessor.<String>create());
        TestSubscriber<String> ts = new TestSubscriber<>();

            

Reported by PMD.

JUnit tests should include assert() or fail()
Design

Line: 34

              public class SerializedProcessorTest extends RxJavaTest {

    @Test
    public void basic() {
        SerializedProcessor<String> processor = new SerializedProcessor<>(PublishProcessor.<String>create());
        TestSubscriber<String> ts = new TestSubscriber<>();
        processor.subscribe(ts);
        processor.onNext("hello");
        processor.onComplete();

            

Reported by PMD.

Unit tests should not contain more than 1 assert(s).
Design

Line: 45

                  }

    @Test
    public void asyncSubjectValueRelay() {
        AsyncProcessor<Integer> async = AsyncProcessor.create();
        async.onNext(1);
        async.onComplete();
        FlowableProcessor<Integer> serial = async.toSerialized();


            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 47

                  @Test
    public void asyncSubjectValueRelay() {
        AsyncProcessor<Integer> async = AsyncProcessor.create();
        async.onNext(1);
        async.onComplete();
        FlowableProcessor<Integer> serial = async.toSerialized();

        assertFalse(serial.hasSubscribers());
        assertTrue(serial.hasComplete());

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 48

                  public void asyncSubjectValueRelay() {
        AsyncProcessor<Integer> async = AsyncProcessor.create();
        async.onNext(1);
        async.onComplete();
        FlowableProcessor<Integer> serial = async.toSerialized();

        assertFalse(serial.hasSubscribers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 49

                      AsyncProcessor<Integer> async = AsyncProcessor.create();
        async.onNext(1);
        async.onComplete();
        FlowableProcessor<Integer> serial = async.toSerialized();

        assertFalse(serial.hasSubscribers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());
        assertNull(serial.getThrowable());

            

Reported by PMD.

Potential violation of Law of Demeter (object not created locally)
Design

Line: 51

                      async.onComplete();
        FlowableProcessor<Integer> serial = async.toSerialized();

        assertFalse(serial.hasSubscribers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());
        assertNull(serial.getThrowable());
        assertEquals((Integer)1, async.getValue());
        assertTrue(async.hasValue());

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 51

                      async.onComplete();
        FlowableProcessor<Integer> serial = async.toSerialized();

        assertFalse(serial.hasSubscribers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());
        assertNull(serial.getThrowable());
        assertEquals((Integer)1, async.getValue());
        assertTrue(async.hasValue());

            

Reported by PMD.

JUnit assertions should include a message
Design

Line: 52

                      FlowableProcessor<Integer> serial = async.toSerialized();

        assertFalse(serial.hasSubscribers());
        assertTrue(serial.hasComplete());
        assertFalse(serial.hasThrowable());
        assertNull(serial.getThrowable());
        assertEquals((Integer)1, async.getValue());
        assertTrue(async.hasValue());
    }

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java
452 issues
System.out.println is used
Design

Line: 278

              
            @Override
            public void onNext(String v) {
                System.out.println("observer1: " + v);
                lastValueForSubscriber1.set(v);
            }

        };


            

Reported by PMD.

System.out.println is used
Design

Line: 302

              
            @Override
            public void onNext(String v) {
                System.out.println("observer2: " + v);
                if (v.equals("one")) {
                    oneReceived.countDown();
                } else {
                    try {
                        makeSlow.await();

            

Reported by PMD.

System.out.println is used
Design

Line: 327

                      // use subscribeOn to make this async otherwise we deadlock as we are using CountDownLatches
        subject.subscribeOn(Schedulers.newThread()).subscribe(observer2);

        System.out.println("before waiting for one");

        // wait until observer2 starts having replay occur
        oneReceived.await();

        System.out.println("after waiting for one");

            

Reported by PMD.

System.out.println is used
Design

Line: 332

                      // wait until observer2 starts having replay occur
        oneReceived.await();

        System.out.println("after waiting for one");

        subject.onNext("three");

        System.out.println("sent three");


            

Reported by PMD.

System.out.println is used
Design

Line: 336

              
        subject.onNext("three");

        System.out.println("sent three");

        // if subscription blocked existing subscribers then 'makeSlow' would cause this to not be there yet
        assertEquals("three", lastValueForSubscriber1.get());

        System.out.println("about to send onComplete");

            

Reported by PMD.

System.out.println is used
Design

Line: 341

                      // if subscription blocked existing subscribers then 'makeSlow' would cause this to not be there yet
        assertEquals("three", lastValueForSubscriber1.get());

        System.out.println("about to send onComplete");

        subject.onComplete();

        System.out.println("completed subject");


            

Reported by PMD.

System.out.println is used
Design

Line: 345

              
        subject.onComplete();

        System.out.println("completed subject");

        // release
        makeSlow.countDown();

        System.out.println("makeSlow released");

            

Reported by PMD.

System.out.println is used
Design

Line: 350

                      // release
        makeSlow.countDown();

        System.out.println("makeSlow released");

        completed.await();
        // all of them should be emitted with the last being "three"
        assertEquals("three", lastValueForSubscriber2.get());


            

Reported by PMD.

System.out.printf is used
Design

Line: 380

                          InOrder inOrder = inOrder(o);
            String v = "" + i;
            src.onNext(v);
            System.out.printf("Turn: %d%n", i);
            src.firstElement()
                .toObservable()
                .flatMap(new Function<String, Observable<String>>() {

                    @Override

            

Reported by PMD.

System.out.println is used
Design

Line: 393

                              .subscribe(new DefaultObserver<String>() {
                    @Override
                    public void onNext(String t) {
                        System.out.println(t);
                        o.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java
451 issues
System.out.println is used
Design

Line: 103

                      .doOnComplete(new Action() {
            @Override
            public void run() {
                System.out.println("^^^^^^^^^^^^^ completed FAST");
            }
        });

        Flowable<Integer> slow = is.observeOn(Schedulers.computation()).map(new Function<Integer, Integer>() {
            int c;

            

Reported by PMD.

System.out.println is used
Design

Line: 126

              
            @Override
            public void run() {
                System.out.println("^^^^^^^^^^^^^ completed SLOW");
            }

        });

        TestSubscriber<Integer> ts = new TestSubscriber<>();

            

Reported by PMD.

System.out.println is used
Design

Line: 171

                      ts.assertNoErrors();
        ts.assertValues(0, 1, 2, 3);
        assertEquals(5, emitted.get());
        System.out.println(ts.values());
    }

    // use case from https://github.com/ReactiveX/RxJava/issues/1732
    @Test
    public void takeUntilWithPublishedStream() {

            

Reported by PMD.

System.out.println is used
Design

Line: 189

              
        })).subscribe(ts);
        xsp.connect();
        System.out.println(ts.values());
    }

    @Test
    public void backpressureTwoConsumers() {
        final AtomicInteger sourceEmission = new AtomicInteger();

            

Reported by PMD.

System.out.println is used
Design

Line: 296

                      ts2.assertNoErrors();
        ts2.assertTerminated();

        System.out.println(connection);
        System.out.println(connection2);
    }

    @Test
    public void noSubscriberRetentionOnCompleted() {

            

Reported by PMD.

System.out.println is used
Design

Line: 297

                      ts2.assertTerminated();

        System.out.println(connection);
        System.out.println(connection2);
    }

    @Test
    public void noSubscriberRetentionOnCompleted() {
        FlowablePublish<Integer> source = (FlowablePublish<Integer>)Flowable.just(1).publish();

            

Reported by PMD.

System.out.println is used
Design

Line: 1376

                      .doOnSubscribe(new Consumer<Subscription>() {
            @Override
            public void accept(Subscription v) throws Exception {
                System.out.println("Subscribed");
            }
        })
        .publish(10)
        .refCount()
        ;

            

Reported by PMD.

This class has a bunch of public methods and attributes
Design

Line: 14

               * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.rxjava3.internal.operators.flowable;

import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.*;

            

Reported by PMD.

Possible God Class (WMC=104, ATFD=87, TCC=0.000%)
Design

Line: 41

              import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;

public class FlowablePublishTest extends RxJavaTest {

    @Test
    public void publish() throws InterruptedException {
        final AtomicInteger counter = new AtomicInteger();
        ConnectableFlowable<String> f = Flowable.unsafeCreate(new Publisher<String>() {

            

Reported by PMD.

Avoid really long classes.
Design

Line: 41

              import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;

public class FlowablePublishTest extends RxJavaTest {

    @Test
    public void publish() throws InterruptedException {
        final AtomicInteger counter = new AtomicInteger();
        ConnectableFlowable<String> f = Flowable.unsafeCreate(new Publisher<String>() {

            

Reported by PMD.

src/test/java/io/reactivex/rxjava3/subscribers/TestSubscriberTest.java
437 issues
Avoid throwing raw exception types.
Design

Line: 563

              
        try {
            ts.assertNotComplete();
            throw new RuntimeException("Failed to report there were terminal event(s)!");
        } catch (AssertionError ex) {
            // expected
        }
    }


            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 577

              
        try {
            ts.assertNoErrors();
            throw new RuntimeException("Failed to report there were terminal event(s)!");
        } catch (AssertionError ex) {
            // expected
        }
    }


            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 592

              
        try {
            ts.assertNotComplete();
            throw new RuntimeException("Failed to report there were terminal event(s)!");
        } catch (AssertionError ex) {
            // expected
        }

        try {

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 599

              
        try {
            ts.assertNoErrors();
            throw new RuntimeException("Failed to report there were terminal event(s)!");
        } catch (AssertionError ex) {
            // expected
        }
    }


            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 615

              
        try {
            ts.assertNoErrors();
            throw new RuntimeException("Failed to report there were terminal event(s)!");
        } catch (AssertionError ex) {
            // expected
            Throwable e = ex.getCause();
            if (!(e instanceof CompositeException)) {
                fail("Multiple Error present but the reported error doesn't have a composite cause!");

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 634

              
        try {
            ts.assertNoValues();
            throw new RuntimeException("Failed to report there were values!");
        } catch (AssertionError ex) {
            // expected
        }
    }


            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 648

              
        try {
            ts.assertValueCount(3);
            throw new RuntimeException("Failed to report there were values!");
        } catch (AssertionError ex) {
            // expected
        }
    }


            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 718

              
        try {
            ts.assertNoValues();
            throw new RuntimeException("Should have thrown");
        } catch (AssertionError exc) {
            // expected
        }

        try {

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 725

              
        try {
            ts.assertValueCount(0);
            throw new RuntimeException("Should have thrown");
        } catch (AssertionError exc) {
            // expected
        }

        ts.assertValueSequence(Collections.singletonList(1));

            

Reported by PMD.

Avoid throwing raw exception types.
Design

Line: 734

              
        try {
            ts.assertValueSequence(Collections.singletonList(2));
            throw new RuntimeException("Should have thrown");
        } catch (AssertionError exc) {
            // expected
        }
    }


            

Reported by PMD.