The following issues were found
src/test/java/io/reactivex/rxjava3/flowable/FlowableTests.java
322 issues
Line: 361
.safeSubscribe(new DefaultSubscriber<String>() {
@Override
public void onComplete() {
System.out.println("completed");
latch.countDown();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 368
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
latch.countDown();
}
@Override
Reported by PMD.
Line: 376
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
Reported by PMD.
Line: 409
@Override
public void onComplete() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
Reported by PMD.
Line: 415
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
Reported by PMD.
Line: 422
@Override
public void onNext(String v) {
int num = Integer.parseInt(v);
System.out.println(num);
// doSomething(num);
count.incrementAndGet();
}
});
Reported by PMD.
Line: 455
@Override
public void onComplete() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
Reported by PMD.
Line: 461
@Override
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
}
@Override
public void onNext(String v) {
Reported by PMD.
Line: 467
@Override
public void onNext(String v) {
System.out.println(v);
count.incrementAndGet();
}
});
assertEquals(2, count.get());
Reported by PMD.
Line: 669
@Override
public void onComplete() {
System.out.println("completed");
}
@Override
public void onError(Throwable e) {
error.set(e);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java
321 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.processors;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.List;
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class MulticastProcessorTest extends RxJavaTest {
@Test
public void complete() {
MulticastProcessor<Integer> mp = MulticastProcessor.create();
mp.start();
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class MulticastProcessorTest extends RxJavaTest {
@Test
public void complete() {
MulticastProcessor<Integer> mp = MulticastProcessor.create();
mp.start();
Reported by PMD.
Line: 36
public class MulticastProcessorTest extends RxJavaTest {
@Test
public void complete() {
MulticastProcessor<Integer> mp = MulticastProcessor.create();
mp.start();
assertFalse(mp.hasSubscribers());
assertFalse(mp.hasComplete());
Reported by PMD.
Line: 38
@Test
public void complete() {
MulticastProcessor<Integer> mp = MulticastProcessor.create();
mp.start();
assertFalse(mp.hasSubscribers());
assertFalse(mp.hasComplete());
assertFalse(mp.hasThrowable());
assertNull(mp.getThrowable());
Reported by PMD.
Line: 40
MulticastProcessor<Integer> mp = MulticastProcessor.create();
mp.start();
assertFalse(mp.hasSubscribers());
assertFalse(mp.hasComplete());
assertFalse(mp.hasThrowable());
assertNull(mp.getThrowable());
TestSubscriber<Integer> ts = mp.test();
Reported by PMD.
Line: 40
MulticastProcessor<Integer> mp = MulticastProcessor.create();
mp.start();
assertFalse(mp.hasSubscribers());
assertFalse(mp.hasComplete());
assertFalse(mp.hasThrowable());
assertNull(mp.getThrowable());
TestSubscriber<Integer> ts = mp.test();
Reported by PMD.
Line: 41
mp.start();
assertFalse(mp.hasSubscribers());
assertFalse(mp.hasComplete());
assertFalse(mp.hasThrowable());
assertNull(mp.getThrowable());
TestSubscriber<Integer> ts = mp.test();
Reported by PMD.
Line: 41
mp.start();
assertFalse(mp.hasSubscribers());
assertFalse(mp.hasComplete());
assertFalse(mp.hasThrowable());
assertNull(mp.getThrowable());
TestSubscriber<Integer> ts = mp.test();
Reported by PMD.
Line: 42
assertFalse(mp.hasSubscribers());
assertFalse(mp.hasComplete());
assertFalse(mp.hasThrowable());
assertNull(mp.getThrowable());
TestSubscriber<Integer> ts = mp.test();
assertTrue(mp.hasSubscribers());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapSchedulerTest.java
320 issues
Line: 219
return Flowable.fromCallable(new Callable<Integer>() {
@Override public Integer call() throws Exception {
if (integer >= 100) {
throw new NullPointerException("test null exp");
}
return integer;
}
});
}
Reported by PMD.
Line: 238
return Flowable.fromCallable(new Callable<Integer>() {
@Override public Integer call() throws Exception {
if (integer >= 100) {
throw new NullPointerException("test null exp");
}
return integer;
}
});
}
Reported by PMD.
Line: 387
// ignored
}
if (counter.getAndIncrement() % 100 == 0) {
System.out.print("testIssue2890NoStackoverflow -> ");
System.out.println(counter.get());
};
}
@Override
Reported by PMD.
Line: 388
}
if (counter.getAndIncrement() % 100 == 0) {
System.out.print("testIssue2890NoStackoverflow -> ");
System.out.println(counter.get());
};
}
@Override
public void onComplete() {
Reported by PMD.
Line: 422
return;
}
if (i % 1000 == 0) {
System.out.println("concatMapRangeAsyncLoop > " + i);
}
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
Flowable.range(0, 1000)
.concatMap(new Function<Integer, Flowable<Integer>>() {
@Override
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import java.lang.reflect.Method;
import java.util.*;
Reported by PMD.
Line: 39
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableConcatMapSchedulerTest extends RxJavaTest {
@Test
public void boundaryFusion() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
Reported by PMD.
Line: 39
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableConcatMapSchedulerTest extends RxJavaTest {
@Test
public void boundaryFusion() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
Reported by PMD.
Line: 39
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableConcatMapSchedulerTest extends RxJavaTest {
@Test
public void boundaryFusion() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
Reported by PMD.
Line: 39
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableConcatMapSchedulerTest extends RxJavaTest {
@Test
public void boundaryFusion() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimeoutWithSelectorTest.java
306 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableTimeoutWithSelectorTest extends RxJavaTest {
@Test
public void timeoutSelectorNormal1() {
PublishProcessor<Integer> source = PublishProcessor.create();
final PublishProcessor<Integer> timeout = PublishProcessor.create();
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableTimeoutWithSelectorTest extends RxJavaTest {
@Test
public void timeoutSelectorNormal1() {
PublishProcessor<Integer> source = PublishProcessor.create();
final PublishProcessor<Integer> timeout = PublishProcessor.create();
Reported by PMD.
Line: 43
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableTimeoutWithSelectorTest extends RxJavaTest {
@Test
public void timeoutSelectorNormal1() {
PublishProcessor<Integer> source = PublishProcessor.create();
final PublishProcessor<Integer> timeout = PublishProcessor.create();
Reported by PMD.
Line: 61
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
source.timeout(timeout, timeoutFunc, other).subscribe(subscriber);
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
Reported by PMD.
Line: 61
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
InOrder inOrder = inOrder(subscriber);
source.timeout(timeout, timeoutFunc, other).subscribe(subscriber);
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
Reported by PMD.
Line: 63
source.timeout(timeout, timeoutFunc, other).subscribe(subscriber);
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
inOrder.verify(subscriber).onNext(1);
Reported by PMD.
Line: 64
source.timeout(timeout, timeoutFunc, other).subscribe(subscriber);
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
inOrder.verify(subscriber).onNext(1);
inOrder.verify(subscriber).onNext(2);
Reported by PMD.
Line: 65
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
inOrder.verify(subscriber).onNext(1);
inOrder.verify(subscriber).onNext(2);
inOrder.verify(subscriber).onNext(3);
Reported by PMD.
Line: 66
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
inOrder.verify(subscriber).onNext(1);
inOrder.verify(subscriber).onNext(2);
inOrder.verify(subscriber).onNext(3);
inOrder.verify(subscriber).onNext(100);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithTimeTest.java
304 issues
Line: 199
.doOnComplete(new Action() {
@Override
public void run() {
System.out.println("Main done!");
}
})
.flatMap(new Function<Flowable<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Flowable<Integer> w) {
Reported by PMD.
Line: 209
.doOnComplete(new Action() {
@Override
public void run() {
System.out.println("inner done: " + wip.incrementAndGet());
}
})
;
}
})
Reported by PMD.
Line: 218
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer pv) {
System.out.println(pv);
}
})
.subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
Reported by PMD.
Line: 950
int count;
@Override
public void accept(Flowable<Integer> v) throws Exception {
System.out.println(Thread.currentThread());
if (count++ == 1) {
secondWindowProcessing.countDown();
try {
Thread.sleep(200);
isInterrupted.set(Thread.interrupted());
Reported by PMD.
Line: 991
int count;
@Override
public void accept(Flowable<Integer> v) throws Exception {
System.out.println(Thread.currentThread());
if (count++ == 1) {
secondWindowProcessing.countDown();
try {
Thread.sleep(200);
isInterrupted.set(Thread.interrupted());
Reported by PMD.
Line: 1031
int count;
@Override
public void accept(Flowable<Integer> v) throws Exception {
System.out.println(Thread.currentThread());
if (count++ == 1) {
secondWindowProcessing.countDown();
try {
Thread.sleep(200);
isInterrupted.set(Thread.interrupted());
Reported by PMD.
Line: 1072
int count;
@Override
public void accept(Flowable<Integer> v) throws Exception {
System.out.println(Thread.currentThread());
if (count++ == 1) {
secondWindowProcessing.countDown();
try {
Thread.sleep(200);
isInterrupted.set(Thread.interrupted());
Reported by PMD.
Line: 1112
int count;
@Override
public void accept(Flowable<Integer> v) throws Exception {
System.out.println(Thread.currentThread());
if (count++ == 1) {
secondWindowProcessing.countDown();
try {
Thread.sleep(200);
isInterrupted.set(Thread.interrupted());
Reported by PMD.
Line: 1153
int count;
@Override
public void accept(Flowable<Integer> v) throws Exception {
System.out.println(Thread.currentThread());
if (count++ == 1) {
secondWindowProcessing.countDown();
try {
Thread.sleep(200);
isInterrupted.set(Thread.interrupted());
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import java.util.*;
import java.util.concurrent.*;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/testsupport/TestObserverExTest.java
302 issues
Line: 272
try {
to.assertNotSubscribed();
throw new RuntimeException("Should have thrown");
} catch (AssertionError ex) {
// expected
}
assertTrue(to.hasSubscription());
Reported by PMD.
Line: 295
try {
to.assertNoValues();
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
try {
Reported by PMD.
Line: 302
try {
to.assertValueCount(0);
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
to.assertValueSequence(Collections.singletonList(1));
Reported by PMD.
Line: 311
try {
to.assertValueSequence(Collections.singletonList(2));
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
}
Reported by PMD.
Line: 323
try {
to.assertError(TestException.class);
throw new RuntimeException("Should have thrown");
} catch (AssertionError ex) {
// expected
}
try {
Reported by PMD.
Line: 330
try {
to.assertError(new TestException());
throw new RuntimeException("Should have thrown");
} catch (AssertionError ex) {
// expected
}
try {
Reported by PMD.
Line: 337
try {
to.assertError(Functions.<Throwable>alwaysTrue());
throw new RuntimeException("Should have thrown");
} catch (AssertionError ex) {
// expected
}
try {
Reported by PMD.
Line: 344
try {
to.assertErrorMessage("");
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
try {
Reported by PMD.
Line: 351
try {
to.assertSubscribed();
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
try {
Reported by PMD.
Line: 358
try {
to.assertTerminated();
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
to.onSubscribe(Disposable.empty());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchTest.java
298 issues
Line: 542
ts.awaitDone(10, TimeUnit.SECONDS);
System.out.println("> testIssue2654: " + ts.values().size());
ts.assertTerminated();
ts.assertNoErrors();
Assert.assertEquals(250, ts.values().size());
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 40
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
Reported by PMD.
Line: 40
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
Reported by PMD.
Line: 40
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
Reported by PMD.
Line: 42
public class FlowableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
@Before
public void before() {
Reported by PMD.
Line: 43
public class FlowableSwitchTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
@Before
public void before() {
scheduler = new TestScheduler();
Reported by PMD.
Line: 44
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
@Before
public void before() {
scheduler = new TestScheduler();
innerScheduler = scheduler.createWorker();
Reported by PMD.
Line: 63
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
publishNext(subscriber, 70, "one");
publishNext(subscriber, 100, "two");
publishCompleted(subscriber, 200);
}
}));
publishCompleted(subscriber, 60);
Reported by PMD.
Line: 64
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
publishNext(subscriber, 70, "one");
publishNext(subscriber, 100, "two");
publishCompleted(subscriber, 200);
}
}));
publishCompleted(subscriber, 60);
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimeoutWithSelectorTest.java
292 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableTimeoutWithSelectorTest extends RxJavaTest {
@Test
public void timeoutSelectorNormal1() {
PublishSubject<Integer> source = PublishSubject.create();
final PublishSubject<Integer> timeout = PublishSubject.create();
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableTimeoutWithSelectorTest extends RxJavaTest {
@Test
public void timeoutSelectorNormal1() {
PublishSubject<Integer> source = PublishSubject.create();
final PublishSubject<Integer> timeout = PublishSubject.create();
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableTimeoutWithSelectorTest extends RxJavaTest {
@Test
public void timeoutSelectorNormal1() {
PublishSubject<Integer> source = PublishSubject.create();
final PublishSubject<Integer> timeout = PublishSubject.create();
Reported by PMD.
Line: 61
Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
source.timeout(timeout, timeoutFunc, other).subscribe(o);
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
Reported by PMD.
Line: 61
Observer<Object> o = TestHelper.mockObserver();
InOrder inOrder = inOrder(o);
source.timeout(timeout, timeoutFunc, other).subscribe(o);
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
Reported by PMD.
Line: 63
source.timeout(timeout, timeoutFunc, other).subscribe(o);
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
inOrder.verify(o).onNext(1);
Reported by PMD.
Line: 64
source.timeout(timeout, timeoutFunc, other).subscribe(o);
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
inOrder.verify(o).onNext(1);
inOrder.verify(o).onNext(2);
Reported by PMD.
Line: 65
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
inOrder.verify(o).onNext(1);
inOrder.verify(o).onNext(2);
inOrder.verify(o).onNext(3);
Reported by PMD.
Line: 66
source.onNext(1);
source.onNext(2);
source.onNext(3);
timeout.onNext(1);
inOrder.verify(o).onNext(1);
inOrder.verify(o).onNext(2);
inOrder.verify(o).onNext(3);
inOrder.verify(o).onNext(100);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelFlowableTest.java
272 issues
Line: 395
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> v) throws Exception {
System.out.println(v.size());
}
})
.sequential()
.subscribe(ts);
Reported by PMD.
Line: 442
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> v) throws Exception {
System.out.println(v.size());
}
})
.sequential()
.subscribe(ts);
Reported by PMD.
Line: 490
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> v) throws Exception {
System.out.println(v.size());
}
})
.sequential()
.subscribe(ts);
Reported by PMD.
Line: 538
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> v) throws Exception {
System.out.println(v.size());
}
})
.sequential()
.subscribe(ts);
Reported by PMD.
Line: 586
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> v) throws Exception {
System.out.println(v.size());
}
})
.sequential()
.subscribe(ts);
Reported by PMD.
Line: 635
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> v) throws Exception {
System.out.println(v.size());
}
})
.sequential()
.subscribe(ts);
Reported by PMD.
Line: 690
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> v) throws Exception {
System.out.println(v.size());
}
})
.sequential()
.subscribe(ts);
Reported by PMD.
Line: 742
public void parallelismAndPrefetchAsync() {
for (int parallelism = 1; parallelism <= 8; parallelism *= 2) {
for (int prefetch = 1; prefetch <= 1024; prefetch *= 2) {
System.out.println("parallelismAndPrefetchAsync >> " + parallelism + ", " + prefetch);
Flowable.range(1, 1024 * 1024)
.parallel(parallelism, prefetch)
.runOn(Schedulers.computation())
.map(Functions.<Integer>identity())
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.parallel;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.*;
Reported by PMD.
Line: 36
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class ParallelFlowableTest extends RxJavaTest {
@Test
public void sequentialMode() {
Flowable<Integer> source = Flowable.range(1, 1000000).hide();
for (int i = 1; i < 33; i++) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableObserveOnTest.java
272 issues
Line: 338
// don't let this thing finish yet
try {
if (!nextLatch.await(1000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("it shouldn't have timed out");
}
} catch (InterruptedException e) {
throw new RuntimeException("it shouldn't have failed");
}
}
Reported by PMD.
Line: 341
throw new RuntimeException("it shouldn't have timed out");
}
} catch (InterruptedException e) {
throw new RuntimeException("it shouldn't have failed");
}
}
});
Reported by PMD.
Line: 92
@Test
public void threadName() throws InterruptedException {
System.out.println("Main Thread: " + Thread.currentThread().getName());
// FIXME null values not allowed
// Observable<String> obs = Observable.just("one", null, "two", "three", "four");
Observable<String> obs = Observable.just("one", "null", "two", "three", "four");
Observer<String> observer = TestHelper.mockObserver();
Reported by PMD.
Line: 108
@Override
public void accept(String s) {
String threadName = Thread.currentThread().getName();
System.out.println("Source ThreadName: " + threadName + " Expected => " + parentThreadName);
assertEquals(parentThreadName, threadName);
}
});
Reported by PMD.
Line: 121
public void accept(String t1) {
String threadName = Thread.currentThread().getName();
boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler");
System.out.println("ObserveOn ThreadName: " + threadName + " Correct => " + correctThreadName);
assertTrue(correctThreadName);
}
}).doAfterTerminate(new Action() {
Reported by PMD.
Line: 323
@Override
public void onComplete() {
System.out.println("onComplete");
completeTime.set(System.nanoTime());
completedLatch.countDown();
}
@Override
Reported by PMD.
Line: 348
});
long afterSubscribeTime = System.nanoTime();
System.out.println("After subscribe: " + completedLatch.getCount());
assertEquals(1, completedLatch.getCount());
nextLatch.countDown();
completedLatch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(completeTime.get() > afterSubscribeTime);
System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime));
Reported by PMD.
Line: 353
nextLatch.countDown();
completedLatch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(completeTime.get() > afterSubscribeTime);
System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime));
}
private static int randomIntFrom0to100() {
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
long x = System.nanoTime();
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 44
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableObserveOnTest extends RxJavaTest {
/**
* This is testing a no-op path since it uses Schedulers.immediate() which will not do scheduling.
*/
@Test
Reported by PMD.