The following issues were found
src/test/java/io/reactivex/rxjava3/testsupport/TestHelper.java
499 issues
Line: 488
throw new AssertionError("The wait timed out!");
}
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
if (errors[0] != null && errors[1] == null) {
throw ExceptionHelper.wrapOrThrow(errors[0]);
}
Reported by PMD.
Line: 2593
assertNotNull(state[2]);
assertTrue("Did not empty", state[3]);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
/**
* Checks if the source is fuseable and its isEmpty/clear works properly.
Reported by PMD.
Line: 2659
assertNotNull(state[2]);
assertTrue("Did not empty", state[3]);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
/**
* Returns an expanded error list of the given test consumer.
Reported by PMD.
Line: 2840
} catch (AssertionError ex) {
throw ex;
} catch (Throwable ex) {
throw new RuntimeException(ex);
} finally {
RxJavaPlugins.reset();
}
}
Reported by PMD.
Line: 2999
} catch (AssertionError ex) {
throw ex;
} catch (Throwable ex) {
throw new RuntimeException(ex);
} finally {
RxJavaPlugins.reset();
}
}
Reported by PMD.
Line: 509
* @param ex the target Throwable
* @return the list of Throwables
*/
public static List<Throwable> compositeList(Throwable ex) {
if (ex instanceof UndeliverableException) {
ex = ex.getCause();
}
return ((CompositeException)ex).getExceptions();
}
Reported by PMD.
Line: 3510
* @return the File pointing to the source
* @throws Exception on error
*/
public static File findSource(String baseClassName, String parentPackage) throws Exception {
URL u = TestHelper.class.getResource(TestHelper.class.getSimpleName() + ".class");
String path = new File(u.toURI()).toString().replace('\\', '/');
parentPackage = parentPackage.replace(".", "/");
Reported by PMD.
Line: 3535
}
if (p == null) {
System.err.println("Unable to locate the RxJava sources");
return null;
}
File f = new File(p);
Reported by PMD.
Line: 3545
return f;
}
System.out.println("Can't read " + p);
return null;
}
/**
* Cancels a flow before notifying a transformation and checks if an undeliverable exception
Reported by PMD.
Line: 3698
public static long awaitGC(long oneSleep, int maxLoop, long expectedMemoryUsage) throws InterruptedException {
MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
System.gc();
int i = maxLoop;
while (i-- != 0) {
long usage = bean.getHeapMemoryUsage().getUsed();
if (usage <= expectedMemoryUsage) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayTest.java
498 issues
Line: 472
@Override
public void accept(Integer v) {
effectCounter.incrementAndGet();
System.out.println("Sideeffect #" + v);
}
});
Observable<Integer> result = source.replay(
new Function<Observable<Integer>, Observable<Integer>>() {
Reported by PMD.
Line: 486
for (int i = 1; i < 3; i++) {
effectCounter.set(0);
System.out.printf("- %d -%n", i);
result.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
System.out.println(t1);
Reported by PMD.
Line: 491
@Override
public void accept(Integer t1) {
System.out.println(t1);
}
}, new Consumer<Throwable>() {
@Override
Reported by PMD.
Line: 504
new Action() {
@Override
public void run() {
System.out.println("Done");
}
});
assertEquals(2, effectCounter.get());
}
}
Reported by PMD.
Line: 901
@Override
public void run() {
counter.incrementAndGet();
System.out.println("published Observable being executed");
observer.onNext("one");
observer.onComplete();
}
}).start();
}
Reported by PMD.
Line: 918
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
// subscribe again
Reported by PMD.
Line: 929
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
Reported by PMD.
Line: 1664
.takeLast(1)
;
System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);
System.out.println("Bounded Replay Leak check: GC");
System.gc();
Reported by PMD.
Line: 1667
System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);
System.out.println("Bounded Replay Leak check: GC");
System.gc();
Thread.sleep(500);
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
Reported by PMD.
Line: 1668
Thread.sleep(1000);
System.out.println("Bounded Replay Leak check: GC");
System.gc();
Thread.sleep(500);
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatTest.java
491 issues
Line: 124
o1.t.join();
o2.t.join();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads");
}
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber, times(1)).onNext("one");
inOrder.verify(subscriber, times(1)).onNext("two");
Reported by PMD.
Line: 232
System.out.println("Thread2 is starting ... waiting for it to complete ...");
o2.waitForThreadDone();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
}
InOrder inOrder = inOrder(subscriber);
inOrder.verify(subscriber, times(1)).onNext("one");
inOrder.verify(subscriber, times(1)).onNext("two");
Reported by PMD.
Line: 257
// wait for 3rd to complete
o3.waitForThreadDone();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
}
try {
// wait for the parent to complete
if (!parentHasFinished.await(5, TimeUnit.SECONDS)) {
Reported by PMD.
Line: 266
fail("Parent didn't finish within the time limit");
}
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
}
inOrder.verify(subscriber, times(1)).onNext("seven");
inOrder.verify(subscriber, times(1)).onNext("eight");
inOrder.verify(subscriber, times(1)).onNext("nine");
Reported by PMD.
Line: 140
public void nestedAsyncConcatLoop() throws Throwable {
for (int i = 0; i < 500; i++) {
if (i % 10 == 0) {
System.out.println("testNestedAsyncConcat >> " + i);
}
nestedAsyncConcat();
}
}
Reported by PMD.
Line: 186
try {
// emit first
if (!d.isDisposed()) {
System.out.println("Emit o1");
subscriber.onNext(Flowable.unsafeCreate(o1));
}
// emit second
if (!d.isDisposed()) {
System.out.println("Emit o2");
Reported by PMD.
Line: 191
}
// emit second
if (!d.isDisposed()) {
System.out.println("Emit o2");
subscriber.onNext(Flowable.unsafeCreate(o2));
}
// wait until sometime later and emit third
try {
Reported by PMD.
Line: 202
subscriber.onError(e);
}
if (!d.isDisposed()) {
System.out.println("Emit o3");
subscriber.onNext(Flowable.unsafeCreate(o3));
}
} catch (Throwable e) {
subscriber.onError(e);
Reported by PMD.
Line: 209
} catch (Throwable e) {
subscriber.onError(e);
} finally {
System.out.println("Done parent Flowable");
subscriber.onComplete();
parentHasFinished.countDown();
}
}
}));
Reported by PMD.
Line: 227
try {
// wait for first 2 async observables to complete
System.out.println("Thread1 is starting ... waiting for it to complete ...");
o1.waitForThreadDone();
System.out.println("Thread2 is starting ... waiting for it to complete ...");
o2.waitForThreadDone();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java
482 issues
Line: 848
latch.await(1000, TimeUnit.MILLISECONDS);
if (!infiniteFlowable.await(2000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("didn't unsubscribe");
}
assertEquals(5, list.size());
assertEquals("1-1", list.get(0));
assertEquals("2-2", list.get(1));
Reported by PMD.
Line: 568
Function3<String, String, String, String> zipr = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 568
Function3<String, String, String, String> zipr = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 568
Function3<String, String, String, String> zipr = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 778
@Override
public void accept(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(5, list.size());
Reported by PMD.
Line: 841
@Override
public void onNext(String s) {
System.out.println(s);
list.add(s);
}
});
latch.await(1000, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 894
@Override
public void accept(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(4, list.size());
Reported by PMD.
Line: 923
@Override
public void accept(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(0, list.size());
Reported by PMD.
Line: 1040
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 2, ts.values().size());
System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get());
assertTrue(generatedA.get() < (Flowable.bufferSize() * 3));
assertTrue(generatedB.get() < (Flowable.bufferSize() * 3));
}
@Test
Reported by PMD.
Line: 1065
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
assertEquals(Flowable.bufferSize() * 2, ts.values().size());
System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get());
assertTrue(generatedA.get() < (Flowable.bufferSize() * 4));
assertTrue(generatedB.get() < (Flowable.bufferSize() * 4));
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferTest.java
469 issues
Line: 312
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> pv) {
System.out.println(pv);
}
})
.subscribe(to);
InOrder inOrder = Mockito.inOrder(o);
Reported by PMD.
Line: 564
.doOnNext(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> pv) {
System.out.println(pv);
}
})
.subscribe(o);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableBufferTest extends RxJavaTest {
private Observer<List<String>> observer;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableBufferTest extends RxJavaTest {
private Observer<List<String>> observer;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableBufferTest extends RxJavaTest {
private Observer<List<String>> observer;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableBufferTest extends RxJavaTest {
private Observer<List<String>> observer;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
Reported by PMD.
Line: 45
public class ObservableBufferTest extends RxJavaTest {
private Observer<List<String>> observer;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
Reported by PMD.
Line: 46
public class ObservableBufferTest extends RxJavaTest {
private Observer<List<String>> observer;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
observer = TestHelper.mockObserver();
Reported by PMD.
Line: 47
private Observer<List<String>> observer;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
observer = TestHelper.mockObserver();
scheduler = new TestScheduler();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subjects/SerializedSubjectTest.java
466 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.subjects;
import static org.junit.Assert.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
Reported by PMD.
Line: 32
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.*;
public class SerializedSubjectTest extends RxJavaTest {
@Test
public void basic() {
SerializedSubject<String> subject = new SerializedSubject<>(PublishSubject.<String>create());
TestObserver<String> to = new TestObserver<>();
Reported by PMD.
Line: 35
public class SerializedSubjectTest extends RxJavaTest {
@Test
public void basic() {
SerializedSubject<String> subject = new SerializedSubject<>(PublishSubject.<String>create());
TestObserver<String> to = new TestObserver<>();
subject.subscribe(to);
subject.onNext("hello");
subject.onComplete();
Reported by PMD.
Line: 46
}
@Test
public void asyncSubjectValueRelay() {
AsyncSubject<Integer> async = AsyncSubject.create();
async.onNext(1);
async.onComplete();
Subject<Integer> serial = async.toSerialized();
Reported by PMD.
Line: 48
@Test
public void asyncSubjectValueRelay() {
AsyncSubject<Integer> async = AsyncSubject.create();
async.onNext(1);
async.onComplete();
Subject<Integer> serial = async.toSerialized();
assertFalse(serial.hasObservers());
assertTrue(serial.hasComplete());
Reported by PMD.
Line: 49
public void asyncSubjectValueRelay() {
AsyncSubject<Integer> async = AsyncSubject.create();
async.onNext(1);
async.onComplete();
Subject<Integer> serial = async.toSerialized();
assertFalse(serial.hasObservers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
Reported by PMD.
Line: 50
AsyncSubject<Integer> async = AsyncSubject.create();
async.onNext(1);
async.onComplete();
Subject<Integer> serial = async.toSerialized();
assertFalse(serial.hasObservers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
assertNull(serial.getThrowable());
Reported by PMD.
Line: 52
async.onComplete();
Subject<Integer> serial = async.toSerialized();
assertFalse(serial.hasObservers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
assertNull(serial.getThrowable());
assertEquals((Integer)1, async.getValue());
assertTrue(async.hasValue());
Reported by PMD.
Line: 52
async.onComplete();
Subject<Integer> serial = async.toSerialized();
assertFalse(serial.hasObservers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
assertNull(serial.getThrowable());
assertEquals((Integer)1, async.getValue());
assertTrue(async.hasValue());
Reported by PMD.
Line: 53
Subject<Integer> serial = async.toSerialized();
assertFalse(serial.hasObservers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
assertNull(serial.getThrowable());
assertEquals((Integer)1, async.getValue());
assertTrue(async.hasValue());
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/processors/SerializedProcessorTest.java
464 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.processors;
import static org.junit.Assert.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
Reported by PMD.
Line: 31
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class SerializedProcessorTest extends RxJavaTest {
@Test
public void basic() {
SerializedProcessor<String> processor = new SerializedProcessor<>(PublishProcessor.<String>create());
TestSubscriber<String> ts = new TestSubscriber<>();
Reported by PMD.
Line: 34
public class SerializedProcessorTest extends RxJavaTest {
@Test
public void basic() {
SerializedProcessor<String> processor = new SerializedProcessor<>(PublishProcessor.<String>create());
TestSubscriber<String> ts = new TestSubscriber<>();
processor.subscribe(ts);
processor.onNext("hello");
processor.onComplete();
Reported by PMD.
Line: 45
}
@Test
public void asyncSubjectValueRelay() {
AsyncProcessor<Integer> async = AsyncProcessor.create();
async.onNext(1);
async.onComplete();
FlowableProcessor<Integer> serial = async.toSerialized();
Reported by PMD.
Line: 47
@Test
public void asyncSubjectValueRelay() {
AsyncProcessor<Integer> async = AsyncProcessor.create();
async.onNext(1);
async.onComplete();
FlowableProcessor<Integer> serial = async.toSerialized();
assertFalse(serial.hasSubscribers());
assertTrue(serial.hasComplete());
Reported by PMD.
Line: 48
public void asyncSubjectValueRelay() {
AsyncProcessor<Integer> async = AsyncProcessor.create();
async.onNext(1);
async.onComplete();
FlowableProcessor<Integer> serial = async.toSerialized();
assertFalse(serial.hasSubscribers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
Reported by PMD.
Line: 49
AsyncProcessor<Integer> async = AsyncProcessor.create();
async.onNext(1);
async.onComplete();
FlowableProcessor<Integer> serial = async.toSerialized();
assertFalse(serial.hasSubscribers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
assertNull(serial.getThrowable());
Reported by PMD.
Line: 51
async.onComplete();
FlowableProcessor<Integer> serial = async.toSerialized();
assertFalse(serial.hasSubscribers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
assertNull(serial.getThrowable());
assertEquals((Integer)1, async.getValue());
assertTrue(async.hasValue());
Reported by PMD.
Line: 51
async.onComplete();
FlowableProcessor<Integer> serial = async.toSerialized();
assertFalse(serial.hasSubscribers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
assertNull(serial.getThrowable());
assertEquals((Integer)1, async.getValue());
assertTrue(async.hasValue());
Reported by PMD.
Line: 52
FlowableProcessor<Integer> serial = async.toSerialized();
assertFalse(serial.hasSubscribers());
assertTrue(serial.hasComplete());
assertFalse(serial.hasThrowable());
assertNull(serial.getThrowable());
assertEquals((Integer)1, async.getValue());
assertTrue(async.hasValue());
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectTest.java
452 issues
Line: 278
@Override
public void onNext(String v) {
System.out.println("observer1: " + v);
lastValueForSubscriber1.set(v);
}
};
Reported by PMD.
Line: 302
@Override
public void onNext(String v) {
System.out.println("observer2: " + v);
if (v.equals("one")) {
oneReceived.countDown();
} else {
try {
makeSlow.await();
Reported by PMD.
Line: 327
// use subscribeOn to make this async otherwise we deadlock as we are using CountDownLatches
subject.subscribeOn(Schedulers.newThread()).subscribe(observer2);
System.out.println("before waiting for one");
// wait until observer2 starts having replay occur
oneReceived.await();
System.out.println("after waiting for one");
Reported by PMD.
Line: 332
// wait until observer2 starts having replay occur
oneReceived.await();
System.out.println("after waiting for one");
subject.onNext("three");
System.out.println("sent three");
Reported by PMD.
Line: 336
subject.onNext("three");
System.out.println("sent three");
// if subscription blocked existing subscribers then 'makeSlow' would cause this to not be there yet
assertEquals("three", lastValueForSubscriber1.get());
System.out.println("about to send onComplete");
Reported by PMD.
Line: 341
// if subscription blocked existing subscribers then 'makeSlow' would cause this to not be there yet
assertEquals("three", lastValueForSubscriber1.get());
System.out.println("about to send onComplete");
subject.onComplete();
System.out.println("completed subject");
Reported by PMD.
Line: 345
subject.onComplete();
System.out.println("completed subject");
// release
makeSlow.countDown();
System.out.println("makeSlow released");
Reported by PMD.
Line: 350
// release
makeSlow.countDown();
System.out.println("makeSlow released");
completed.await();
// all of them should be emitted with the last being "three"
assertEquals("three", lastValueForSubscriber2.get());
Reported by PMD.
Line: 380
InOrder inOrder = inOrder(o);
String v = "" + i;
src.onNext(v);
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
Reported by PMD.
Line: 393
.subscribe(new DefaultObserver<String>() {
@Override
public void onNext(String t) {
System.out.println(t);
o.onNext(t);
}
@Override
public void onError(Throwable e) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishTest.java
451 issues
Line: 103
.doOnComplete(new Action() {
@Override
public void run() {
System.out.println("^^^^^^^^^^^^^ completed FAST");
}
});
Flowable<Integer> slow = is.observeOn(Schedulers.computation()).map(new Function<Integer, Integer>() {
int c;
Reported by PMD.
Line: 126
@Override
public void run() {
System.out.println("^^^^^^^^^^^^^ completed SLOW");
}
});
TestSubscriber<Integer> ts = new TestSubscriber<>();
Reported by PMD.
Line: 171
ts.assertNoErrors();
ts.assertValues(0, 1, 2, 3);
assertEquals(5, emitted.get());
System.out.println(ts.values());
}
// use case from https://github.com/ReactiveX/RxJava/issues/1732
@Test
public void takeUntilWithPublishedStream() {
Reported by PMD.
Line: 189
})).subscribe(ts);
xsp.connect();
System.out.println(ts.values());
}
@Test
public void backpressureTwoConsumers() {
final AtomicInteger sourceEmission = new AtomicInteger();
Reported by PMD.
Line: 296
ts2.assertNoErrors();
ts2.assertTerminated();
System.out.println(connection);
System.out.println(connection2);
}
@Test
public void noSubscriberRetentionOnCompleted() {
Reported by PMD.
Line: 297
ts2.assertTerminated();
System.out.println(connection);
System.out.println(connection2);
}
@Test
public void noSubscriberRetentionOnCompleted() {
FlowablePublish<Integer> source = (FlowablePublish<Integer>)Flowable.just(1).publish();
Reported by PMD.
Line: 1376
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription v) throws Exception {
System.out.println("Subscribed");
}
})
.publish(10)
.refCount()
;
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import java.util.*;
import java.util.concurrent.*;
Reported by PMD.
Line: 41
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowablePublishTest extends RxJavaTest {
@Test
public void publish() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
ConnectableFlowable<String> f = Flowable.unsafeCreate(new Publisher<String>() {
Reported by PMD.
Line: 41
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowablePublishTest extends RxJavaTest {
@Test
public void publish() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
ConnectableFlowable<String> f = Flowable.unsafeCreate(new Publisher<String>() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subscribers/TestSubscriberTest.java
437 issues
Line: 563
try {
ts.assertNotComplete();
throw new RuntimeException("Failed to report there were terminal event(s)!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 577
try {
ts.assertNoErrors();
throw new RuntimeException("Failed to report there were terminal event(s)!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 592
try {
ts.assertNotComplete();
throw new RuntimeException("Failed to report there were terminal event(s)!");
} catch (AssertionError ex) {
// expected
}
try {
Reported by PMD.
Line: 599
try {
ts.assertNoErrors();
throw new RuntimeException("Failed to report there were terminal event(s)!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 615
try {
ts.assertNoErrors();
throw new RuntimeException("Failed to report there were terminal event(s)!");
} catch (AssertionError ex) {
// expected
Throwable e = ex.getCause();
if (!(e instanceof CompositeException)) {
fail("Multiple Error present but the reported error doesn't have a composite cause!");
Reported by PMD.
Line: 634
try {
ts.assertNoValues();
throw new RuntimeException("Failed to report there were values!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 648
try {
ts.assertValueCount(3);
throw new RuntimeException("Failed to report there were values!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 718
try {
ts.assertNoValues();
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
try {
Reported by PMD.
Line: 725
try {
ts.assertValueCount(0);
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
ts.assertValueSequence(Collections.singletonList(1));
Reported by PMD.
Line: 734
try {
ts.assertValueSequence(Collections.singletonList(2));
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
}
Reported by PMD.