The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchTest.java
364 issues
Line: 483
to.awaitDone(10, TimeUnit.SECONDS);
System.out.println("> testIssue2654: " + to.values().size());
to.assertTerminated();
to.assertNoErrors();
Assert.assertEquals(250, to.values().size());
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Observer<String> observer;
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Observer<String> observer;
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Observer<String> observer;
Reported by PMD.
Line: 45
public class ObservableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Observer<String> observer;
@Before
public void before() {
Reported by PMD.
Line: 46
public class ObservableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Observer<String> observer;
@Before
public void before() {
scheduler = new TestScheduler();
Reported by PMD.
Line: 47
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Observer<String> observer;
@Before
public void before() {
scheduler = new TestScheduler();
innerScheduler = scheduler.createWorker();
Reported by PMD.
Line: 66
@Override
public void subscribe(Observer<? super String> innerObserver) {
innerObserver.onSubscribe(Disposable.empty());
publishNext(innerObserver, 70, "one");
publishNext(innerObserver, 100, "two");
publishCompleted(innerObserver, 200);
}
}));
publishCompleted(outerObserver, 60);
Reported by PMD.
Line: 67
public void subscribe(Observer<? super String> innerObserver) {
innerObserver.onSubscribe(Disposable.empty());
publishNext(innerObserver, 70, "one");
publishNext(innerObserver, 100, "two");
publishCompleted(innerObserver, 200);
}
}));
publishCompleted(outerObserver, 60);
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelayTest.java
359 issues
Line: 127
@Override
public Long apply(Long value) {
if (value == 1L) {
throw new RuntimeException("error!");
}
return value;
}
});
Flowable<Long> delayed = source.delay(1L, TimeUnit.SECONDS, scheduler);
Reported by PMD.
Line: 624
@Override
public void accept(Notification<Integer> t1) {
System.out.println(t1);
}
});
TestSubscriber<Integer> ts = new TestSubscriber<>();
delayed.subscribe(ts);
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 37
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableDelayTest extends RxJavaTest {
private Subscriber<Long> subscriber;
private Subscriber<Long> subscriber2;
private TestScheduler scheduler;
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableDelayTest extends RxJavaTest {
private Subscriber<Long> subscriber;
private Subscriber<Long> subscriber2;
private TestScheduler scheduler;
@Before
Reported by PMD.
Line: 39
public class FlowableDelayTest extends RxJavaTest {
private Subscriber<Long> subscriber;
private Subscriber<Long> subscriber2;
private TestScheduler scheduler;
@Before
public void before() {
Reported by PMD.
Line: 41
private Subscriber<Long> subscriber;
private Subscriber<Long> subscriber2;
private TestScheduler scheduler;
@Before
public void before() {
subscriber = TestHelper.mockSubscriber();
subscriber2 = TestHelper.mockSubscriber();
Reported by PMD.
Line: 53
@Test
public void delay() {
Flowable<Long> source = Flowable.interval(1L, TimeUnit.SECONDS, scheduler).take(3);
Flowable<Long> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
delayed.subscribe(subscriber);
InOrder inOrder = inOrder(subscriber);
scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 54
@Test
public void delay() {
Flowable<Long> source = Flowable.interval(1L, TimeUnit.SECONDS, scheduler).take(3);
Flowable<Long> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
delayed.subscribe(subscriber);
InOrder inOrder = inOrder(subscriber);
scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS);
verify(subscriber, never()).onNext(anyLong());
Reported by PMD.
Line: 55
public void delay() {
Flowable<Long> source = Flowable.interval(1L, TimeUnit.SECONDS, scheduler).take(3);
Flowable<Long> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
delayed.subscribe(subscriber);
InOrder inOrder = inOrder(subscriber);
scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS);
verify(subscriber, never()).onNext(anyLong());
verify(subscriber, never()).onComplete();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelayTest.java
359 issues
Line: 128
@Override
public Long apply(Long value) {
if (value == 1L) {
throw new RuntimeException("error!");
}
return value;
}
});
Observable<Long> delayed = source.delay(1L, TimeUnit.SECONDS, scheduler);
Reported by PMD.
Line: 631
@Override
public void accept(Notification<Integer> t1) {
System.out.println(t1);
}
});
TestObserver<Integer> observer = new TestObserver<>();
delayed.subscribe(observer);
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableDelayTest extends RxJavaTest {
private Observer<Long> observer;
private Observer<Long> observer2;
private TestScheduler scheduler;
Reported by PMD.
Line: 39
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableDelayTest extends RxJavaTest {
private Observer<Long> observer;
private Observer<Long> observer2;
private TestScheduler scheduler;
@Before
Reported by PMD.
Line: 40
public class ObservableDelayTest extends RxJavaTest {
private Observer<Long> observer;
private Observer<Long> observer2;
private TestScheduler scheduler;
@Before
public void before() {
Reported by PMD.
Line: 42
private Observer<Long> observer;
private Observer<Long> observer2;
private TestScheduler scheduler;
@Before
public void before() {
observer = TestHelper.mockObserver();
observer2 = TestHelper.mockObserver();
Reported by PMD.
Line: 54
@Test
public void delay() {
Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3);
Observable<Long> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
delayed.subscribe(observer);
InOrder inOrder = inOrder(observer);
scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 55
@Test
public void delay() {
Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3);
Observable<Long> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
delayed.subscribe(observer);
InOrder inOrder = inOrder(observer);
scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS);
verify(observer, never()).onNext(anyLong());
Reported by PMD.
Line: 56
public void delay() {
Observable<Long> source = Observable.interval(1L, TimeUnit.SECONDS, scheduler).take(3);
Observable<Long> delayed = source.delay(500L, TimeUnit.MILLISECONDS, scheduler);
delayed.subscribe(observer);
InOrder inOrder = inOrder(observer);
scheduler.advanceTimeTo(1499L, TimeUnit.MILLISECONDS);
verify(observer, never()).onNext(anyLong());
verify(observer, never()).onComplete();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatTest.java
356 issues
Line: 121
o1.t.join();
o2.t.join();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads");
}
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(1)).onNext("two");
Reported by PMD.
Line: 219
System.out.println("Thread2 is starting ... waiting for it to complete ...");
o2.waitForThreadDone();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
}
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(1)).onNext("two");
Reported by PMD.
Line: 244
// wait for 3rd to complete
o3.waitForThreadDone();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
}
try {
// wait for the parent to complete
if (!parentHasFinished.await(5, TimeUnit.SECONDS)) {
Reported by PMD.
Line: 253
fail("Parent didn't finish within the time limit");
}
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
}
inOrder.verify(observer, times(1)).onNext("seven");
inOrder.verify(observer, times(1)).onNext("eight");
inOrder.verify(observer, times(1)).onNext("nine");
Reported by PMD.
Line: 137
public void nestedAsyncConcatLoop() throws Throwable {
for (int i = 0; i < 500; i++) {
if (i % 10 == 0) {
System.out.println("testNestedAsyncConcat >> " + i);
}
nestedAsyncConcat();
}
}
Reported by PMD.
Line: 173
try {
// emit first
if (!d.isDisposed()) {
System.out.println("Emit o1");
observer.onNext(Observable.unsafeCreate(o1));
}
// emit second
if (!d.isDisposed()) {
System.out.println("Emit o2");
Reported by PMD.
Line: 178
}
// emit second
if (!d.isDisposed()) {
System.out.println("Emit o2");
observer.onNext(Observable.unsafeCreate(o2));
}
// wait until sometime later and emit third
try {
Reported by PMD.
Line: 189
observer.onError(e);
}
if (!d.isDisposed()) {
System.out.println("Emit o3");
observer.onNext(Observable.unsafeCreate(o3));
}
} catch (Throwable e) {
observer.onError(e);
Reported by PMD.
Line: 196
} catch (Throwable e) {
observer.onError(e);
} finally {
System.out.println("Done parent Observable");
observer.onComplete();
parentHasFinished.countDown();
}
}
}));
Reported by PMD.
Line: 214
try {
// wait for first 2 async observables to complete
System.out.println("Thread1 is starting ... waiting for it to complete ...");
o1.waitForThreadDone();
System.out.println("Thread2 is starting ... waiting for it to complete ...");
o2.waitForThreadDone();
} catch (Throwable e) {
throw new RuntimeException("failed waiting on threads", e);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java
352 issues
Line: 256
InOrder inOrder = inOrder(subscriber);
String v = "" + i;
src.onNext(v);
System.out.printf("Turn: %d%n", i);
src.firstElement().toFlowable()
.flatMap(new Function<String, Flowable<String>>() {
@Override
public Flowable<String> apply(String t1) {
Reported by PMD.
Line: 373
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final BehaviorProcessor<Object> rs = BehaviorProcessor.create();
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
Reported by PMD.
Line: 419
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasSubscribers());
rs.onComplete();
Assert.fail("Timeout @ " + i);
break;
} else {
Reported by PMD.
Line: 420
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasSubscribers());
rs.onComplete();
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.processors;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class BehaviorProcessorTest extends FlowableProcessorTest<Object> {
private final Throwable testException = new Throwable();
@Override
protected FlowableProcessor<Object> create() {
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class BehaviorProcessorTest extends FlowableProcessorTest<Object> {
private final Throwable testException = new Throwable();
@Override
protected FlowableProcessor<Object> create() {
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class BehaviorProcessorTest extends FlowableProcessorTest<Object> {
private final Throwable testException = new Throwable();
@Override
protected FlowableProcessor<Object> create() {
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class BehaviorProcessorTest extends FlowableProcessorTest<Object> {
private final Throwable testException = new Throwable();
@Override
protected FlowableProcessor<Object> create() {
Reported by PMD.
Line: 40
public class BehaviorProcessorTest extends FlowableProcessorTest<Object> {
private final Throwable testException = new Throwable();
@Override
protected FlowableProcessor<Object> create() {
return BehaviorProcessor.create();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeTest.java
348 issues
Line: 261
}
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException("failed", e);
} finally {
concurrentCounter.decrementAndGet();
}
}
Reported by PMD.
Line: 301
f1.t.join();
f2.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals(2, totalCounter.get());
assertEquals(0, concurrentCounter.get());
}
Reported by PMD.
Line: 1230
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Integer t) {
latch.countDown();
Reported by PMD.
Line: 1261
try {
Thread.sleep(time);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
}
Reported by PMD.
Line: 150
@Override
public void cancel() {
System.out.println("*** unsubscribed");
unsubscribed.set(true);
}
};
subscriber.onSubscribe(s);
Reported by PMD.
Line: 165
while (!unsubscribed.get()) {
subscriber.onNext(Flowable.just(1L, 2L));
}
System.out.println("Done looping after unsubscribe: " + unsubscribed.get());
subscriber.onComplete();
// mark that the thread is finished
latch.countDown();
}
Reported by PMD.
Line: 181
@Override
public void accept(Long v) {
System.out.println("Value: " + v);
int c = count.incrementAndGet();
if (c > 6) {
fail("Should be only 6");
}
Reported by PMD.
Line: 192
latch.await(1000, TimeUnit.MILLISECONDS);
System.out.println("unsubscribed: " + unsubscribed.get());
assertTrue(unsubscribed.get());
}
Reported by PMD.
Line: 218
@Test
public void synchronizationOfMultipleSequencesLoop() throws Throwable {
for (int i = 0; i < 100; i++) {
System.out.println("testSynchronizationOfMultipleSequencesLoop > " + i);
synchronizationOfMultipleSequences();
}
}
@Test
Reported by PMD.
Line: 407
subscriber.onSubscribe(new BooleanSubscription());
for (String s : valuesToReturn) {
if (s == null) {
System.out.println("throwing exception");
subscriber.onError(new NullPointerException());
} else {
subscriber.onNext(s);
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observers/TestObserverTest.java
341 issues
Line: 187
try {
to.assertNoValues();
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
try {
Reported by PMD.
Line: 194
try {
to.assertValueCount(0);
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
to.assertValueSequence(Collections.singletonList(1));
Reported by PMD.
Line: 203
try {
to.assertValueSequence(Collections.singletonList(2));
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
}
Reported by PMD.
Line: 215
try {
to.assertError(TestException.class);
throw new RuntimeException("Should have thrown");
} catch (AssertionError ex) {
// expected
}
try {
Reported by PMD.
Line: 222
try {
to.assertError(new TestException());
throw new RuntimeException("Should have thrown");
} catch (AssertionError ex) {
// expected
}
try {
Reported by PMD.
Line: 229
try {
to.assertError(Functions.<Throwable>alwaysTrue());
throw new RuntimeException("Should have thrown");
} catch (AssertionError ex) {
// expected
}
try {
Reported by PMD.
Line: 236
try {
to.assertSubscribed();
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
to.onSubscribe(Disposable.empty());
Reported by PMD.
Line: 266
try {
to.assertError(new RuntimeException());
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
try {
Reported by PMD.
Line: 273
try {
to.assertError(IOException.class);
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
try {
Reported by PMD.
Line: 280
try {
to.assertError(Functions.<Throwable>alwaysFalse());
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
try {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableTest.java
328 issues
Line: 378
.safeSubscribe(new DefaultObserver<String>() {
@Override
public void onComplete() {
System.out.println("completed");
latch.countDown();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 385
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
latch.countDown();
}
@Override
Reported by PMD.
Line: 393
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
Reported by PMD.
Line: 426
@Override
public void onComplete() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
Reported by PMD.
Line: 432
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
Reported by PMD.
Line: 439
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
Reported by PMD.
Line: 473
@Override
public void onComplete() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
Reported by PMD.
Line: 479
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
Reported by PMD.
Line: 485
@Override
public void onNext(String v) {
System.out.println(v);
count.incrementAndGet();
}
});
assertEquals(2, count.get());
Reported by PMD.
Line: 687
@Override
public void onComplete() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java
325 issues
Line: 391
6060, 6061, 7070, 7071, 8080, 8081, 9090, 9091, 10100, 10101
));
Assert.assertEquals(expected.size(), ts.values().size());
System.out.println("--> testFlatMapSelectorMaxConcurrent: " + ts.values());
Assert.assertTrue(expected.containsAll(ts.values()));
}
@Test
public void flatMapTransformsMaxConcurrentNormalLoop() {
Reported by PMD.
Line: 399
public void flatMapTransformsMaxConcurrentNormalLoop() {
for (int i = 0; i < 1000; i++) {
if (i % 100 == 0) {
System.out.println("testFlatMapTransformsMaxConcurrentNormalLoop => " + i);
}
flatMapTransformsMaxConcurrentNormal();
}
}
Reported by PMD.
Line: 451
public void flatMapRangeMixedAsyncLoop() {
for (int i = 0; i < 2000; i++) {
if (i % 10 == 0) {
System.out.println("flatMapRangeAsyncLoop > " + i);
}
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
Flowable.range(0, 1000)
.flatMap(new Function<Integer, Flowable<Integer>>() {
final Random rnd = new Random();
Reported by PMD.
Line: 471
ts.awaitDone(2500, TimeUnit.MILLISECONDS);
if (ts.completions() == 0) {
System.out.println(ts.values().size());
}
ts.assertTerminated();
ts.assertNoErrors();
List<Integer> list = ts.values();
if (list.size() < 1000) {
Reported by PMD.
Line: 480
Set<Integer> set = new HashSet<>(list);
for (int j = 0; j < 1000; j++) {
if (!set.contains(j)) {
System.out.println(j + " missing");
}
}
}
assertEquals(1000, list.size());
}
Reported by PMD.
Line: 519
}
}).subscribe(ts);
System.out.println("flatMapTwoNestedSync >> @ " + n);
ts.assertNoErrors();
ts.assertComplete();
ts.assertValueCount(n * 2);
}
}
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 40
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableFlatMapTest extends RxJavaTest {
@Test
public void normal() {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
final List<Integer> list = Arrays.asList(1, 2, 3);
Reported by PMD.
Line: 40
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableFlatMapTest extends RxJavaTest {
@Test
public void normal() {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
final List<Integer> list = Arrays.asList(1, 2, 3);
Reported by PMD.
Line: 40
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableFlatMapTest extends RxJavaTest {
@Test
public void normal() {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
final List<Integer> list = Arrays.asList(1, 2, 3);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subjects/BehaviorSubjectTest.java
323 issues
Line: 255
InOrder inOrder = inOrder(o);
String v = "" + i;
src.onNext(v);
System.out.printf("Turn: %d%n", i);
src.firstElement()
.toObservable()
.flatMap(new Function<String, Observable<String>>() {
@Override
Reported by PMD.
Line: 373
try {
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final BehaviorSubject<Object> rs = BehaviorSubject.create();
final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);
Reported by PMD.
Line: 419
start.countDown();
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
rs.onComplete();
Assert.fail("Timeout @ " + i);
break;
} else {
Reported by PMD.
Line: 420
if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
rs.onComplete();
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.subjects;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 37
import io.reactivex.rxjava3.subjects.BehaviorSubject.BehaviorDisposable;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class BehaviorSubjectTest extends SubjectTest<Integer> {
private final Throwable testException = new Throwable();
@Override
protected Subject<Integer> create() {
Reported by PMD.
Line: 37
import io.reactivex.rxjava3.subjects.BehaviorSubject.BehaviorDisposable;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class BehaviorSubjectTest extends SubjectTest<Integer> {
private final Throwable testException = new Throwable();
@Override
protected Subject<Integer> create() {
Reported by PMD.
Line: 37
import io.reactivex.rxjava3.subjects.BehaviorSubject.BehaviorDisposable;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class BehaviorSubjectTest extends SubjectTest<Integer> {
private final Throwable testException = new Throwable();
@Override
protected Subject<Integer> create() {
Reported by PMD.
Line: 39
public class BehaviorSubjectTest extends SubjectTest<Integer> {
private final Throwable testException = new Throwable();
@Override
protected Subject<Integer> create() {
return BehaviorSubject.create();
}
Reported by PMD.
Line: 48
@Test
public void thatSubscriberReceivesDefaultValueAndSubsequentEvents() {
BehaviorSubject<String> subject = BehaviorSubject.createDefault("default");
Observer<String> observer = TestHelper.mockObserver();
subject.subscribe(observer);
subject.onNext("one");
Reported by PMD.