The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowWithObservableTest.java
206 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableWindowWithObservableTest extends RxJavaTest {
@Test
public void windowViaObservableNormal1() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> boundary = PublishSubject.create();
Reported by PMD.
Line: 69
}
};
source.window(boundary).subscribe(wo);
int n = 30;
for (int i = 0; i < n; i++) {
source.onNext(i);
if (i % 3 == 2 && i < n - 1) {
Reported by PMD.
Line: 69
}
};
source.window(boundary).subscribe(wo);
int n = 30;
for (int i = 0; i < n; i++) {
source.onNext(i);
if (i % 3 == 2 && i < n - 1) {
Reported by PMD.
Line: 78
boundary.onNext(i / 3);
}
}
source.onComplete();
verify(o, never()).onError(any(Throwable.class));
assertEquals(n / 3, values.size());
Reported by PMD.
Line: 80
}
source.onComplete();
verify(o, never()).onError(any(Throwable.class));
assertEquals(n / 3, values.size());
int j = 0;
for (Observer<Object> mo : values) {
Reported by PMD.
Line: 82
verify(o, never()).onError(any(Throwable.class));
assertEquals(n / 3, values.size());
int j = 0;
for (Observer<Object> mo : values) {
verify(mo, never()).onError(any(Throwable.class));
for (int i = 0; i < 3; i++) {
Reported by PMD.
Line: 86
int j = 0;
for (Observer<Object> mo : values) {
verify(mo, never()).onError(any(Throwable.class));
for (int i = 0; i < 3; i++) {
verify(mo).onNext(j + i);
}
verify(mo).onComplete();
j += 3;
Reported by PMD.
Line: 88
for (Observer<Object> mo : values) {
verify(mo, never()).onError(any(Throwable.class));
for (int i = 0; i < 3; i++) {
verify(mo).onNext(j + i);
}
verify(mo).onComplete();
j += 3;
}
Reported by PMD.
Line: 90
for (int i = 0; i < 3; i++) {
verify(mo).onNext(j + i);
}
verify(mo).onComplete();
j += 3;
}
verify(o).onComplete();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observers/SerializedObserverTest.java
203 issues
Line: 668
}
observer.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
System.out.println("starting TestSingleThreadedObservable thread");
Reported by PMD.
Line: 682
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Reported by PMD.
Line: 728
System.out.println("TestMultiThreadedObservable onNext: " + s + " on thread " + Thread.currentThread().getName());
if (s == null) {
// force an error
throw npe;
} else {
// allow the exception to queue up
int sleep = (fj % 3) * 10;
if (sleep != 0) {
Thread.sleep(sleep);
Reported by PMD.
Line: 754
// we are done spawning threads
threadPool.shutdown();
} catch (Throwable e) {
throw new RuntimeException(e);
}
// wait until all threads are done, then mark it as COMPLETED
try {
// wait for all the threads to finish
Reported by PMD.
Line: 764
System.out.println("Threadpool did not terminate in time.");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
observer.onComplete();
}
});
System.out.println("starting TestMultiThreadedObservable thread");
Reported by PMD.
Line: 778
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
static class BusyObserver extends DefaultObserver<String> {
Reported by PMD.
Line: 103
onSubscribe.waitToFinish();
busySubscriber.terminalEvent.await();
System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Observer maxConcurrentThreads: " + busySubscriber.maxConcurrentThreads.get());
// we can't know how many onNext calls will occur since they each run on a separate thread
// that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
// assertEquals(3, busySubscriber.onNextCount.get());
assertTrue(busySubscriber.onNextCount.get() < 4);
Reported by PMD.
Line: 136
w.subscribe(aw);
onSubscribe.waitToFinish();
System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Observer maxConcurrentThreads: " + busySubscriber.maxConcurrentThreads.get());
// we can have concurrency ...
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
// ... but the onNext execution should be single threaded
assertEquals(1, busySubscriber.maxConcurrentThreads.get());
Reported by PMD.
Line: 144
assertEquals(1, busySubscriber.maxConcurrentThreads.get());
// this should not be the full number of items since the error should stop it before it completes all 9
System.out.println("onNext count: " + busySubscriber.onNextCount.get());
assertFalse(busySubscriber.onComplete);
assertTrue(busySubscriber.onError);
assertTrue(busySubscriber.onNextCount.get() < 9);
// no onComplete because onError was invoked
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
Reported by PMD.
Line: 311
firstOnNext.await();
Thread t1 = to.lastThread();
System.out.println("first onNext on thread: " + t1);
latch.countDown();
waitOnThreads(f1, f2);
// not completed yet
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/FlowableBackpressureTests.java
201 issues
Line: 62
a.onNext(i++);
c.incrementAndGet();
}
System.out.println("unsubscribed after: " + i);
}
}
@Override
public void cancel() {
Reported by PMD.
Line: 89
incrementingIntegers(c).observeOn(Schedulers.computation()).take(num).subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
System.out.println("testObserveOn => Received: " + ts.values().size() + " Emitted: " + c.get());
assertEquals(num, ts.values().size());
assertTrue(c.get() < Flowable.bufferSize() * 4);
}
@Test
Reported by PMD.
Line: 114
).take(num).subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
System.out.println("testObserveOnWithSlowConsumer => Received: " + ts.values().size() + " Emitted: " + c.get());
assertEquals(num, ts.values().size());
assertTrue(c.get() < Flowable.bufferSize() * 2);
}
@Test
Reported by PMD.
Line: 130
merged.take(num).subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
System.out.println("Expected: " + num + " got: " + ts.values().size());
System.out.println("testMergeSync => Received: " + ts.values().size() + " Emitted: " + c1.get() + " / " + c2.get());
assertEquals(num, ts.values().size());
// either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1)
// TODO is it possible to make this deterministic rather than one possibly starving the other?
// benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit
Reported by PMD.
Line: 131
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
System.out.println("Expected: " + num + " got: " + ts.values().size());
System.out.println("testMergeSync => Received: " + ts.values().size() + " Emitted: " + c1.get() + " / " + c2.get());
assertEquals(num, ts.values().size());
// either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1)
// TODO is it possible to make this deterministic rather than one possibly starving the other?
// benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit
assertTrue(c1.get() < Flowable.bufferSize() * 5);
Reported by PMD.
Line: 153
merged.take(num).subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
System.out.println("testMergeAsync => Received: " + ts.values().size() + " Emitted: " + c1.get() + " / " + c2.get());
assertEquals(num, ts.values().size());
// either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1)
// TODO is it possible to make this deterministic rather than one possibly starving the other?
// benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit
int max = Flowable.bufferSize() * 7;
Reported by PMD.
Line: 167
public void mergeAsyncThenObserveOnLoop() {
for (int i = 0; i < 500; i++) {
if (i % 10 == 0) {
System.out.println("testMergeAsyncThenObserveOnLoop >> " + i);
}
// Verify there is no MissingBackpressureException
int num = (int) (Flowable.bufferSize() * 4.1);
AtomicInteger c1 = new AtomicInteger();
AtomicInteger c2 = new AtomicInteger();
Reported by PMD.
Line: 187
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertComplete();
ts.assertNoErrors();
System.out.println("testMergeAsyncThenObserveOn => Received: " + ts.values().size() + " Emitted: " + c1.get() + " / " + c2.get());
assertEquals(num, ts.values().size());
}
}
@Test
Reported by PMD.
Line: 205
merged.observeOn(Schedulers.newThread()).take(num).subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
System.out.println("testMergeAsyncThenObserveOn => Received: " + ts.values().size() + " Emitted: " + c1.get() + " / " + c2.get());
assertEquals(num, ts.values().size());
// either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1)
// TODO is it possible to make this deterministic rather than one possibly starving the other?
// benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit
// akarnokd => run this in a loop over 10k times and never saw values get as high as 7*SIZE, but since observeOn delays the unsubscription non-deterministically, the test will remain unreliable
Reported by PMD.
Line: 232
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
System.out.println("testFlatMapSync => Received: " + ts.values().size() + " Emitted: " + c.get());
assertEquals(num, ts.values().size());
// expect less than 1 buffer since the flatMap is emitting 10 each time, so it is num/10 that will be taken.
assertTrue(c.get() < Flowable.bufferSize());
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapMaybeTest.java
201 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.mixed;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
Reported by PMD.
Line: 36
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableSwitchMapMaybeTest extends RxJavaTest {
@Test
public void simple() {
Flowable.range(1, 5)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
Reported by PMD.
Line: 39
public class FlowableSwitchMapMaybeTest extends RxJavaTest {
@Test
public void simple() {
Flowable.range(1, 5)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 53
}
@Test
public void simpleEmpty() {
Flowable.range(1, 5)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 67
}
@Test
public void simpleMixed() {
Flowable.range(1, 10)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 84
}
@Test
public void backpressured() {
TestSubscriber<Integer> ts = Flowable.range(1, 1024)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 99
.test(0L);
// backpressure results items skipped
ts
.requestMore(1)
.assertResult(1024);
}
@Test
Reported by PMD.
Line: 105
}
@Test
public void mainError() {
Flowable.error(new TestException())
.switchMapMaybe(Functions.justFunction(Maybe.never()))
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 113
}
@Test
public void innerError() {
Flowable.just(1)
.switchMapMaybe(Functions.justFunction(Maybe.error(new TestException())))
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 121
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeFlowable(new Function<Flowable<Object>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<Object> f)
throws Exception {
return f
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapSingleTest.java
200 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.mixed;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
Reported by PMD.
Line: 36
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableSwitchMapSingleTest extends RxJavaTest {
@Test
public void simple() {
Flowable.range(1, 5)
.switchMapSingle(new Function<Integer, SingleSource<Integer>>() {
Reported by PMD.
Line: 39
public class FlowableSwitchMapSingleTest extends RxJavaTest {
@Test
public void simple() {
Flowable.range(1, 5)
.switchMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 53
}
@Test
public void mainError() {
Flowable.error(new TestException())
.switchMapSingle(Functions.justFunction(Single.never()))
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 61
}
@Test
public void innerError() {
Flowable.just(1)
.switchMapSingle(Functions.justFunction(Single.error(new TestException())))
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 69
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeFlowable(new Function<Flowable<Object>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<Object> f)
throws Exception {
return f
Reported by PMD.
Line: 82
}
@Test
public void limit() {
Flowable.range(1, 5)
.switchMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 97
}
@Test
public void switchOver() {
PublishProcessor<Integer> pp = PublishProcessor.create();
final SingleSubject<Integer> ms1 = SingleSubject.create();
final SingleSubject<Integer> ms2 = SingleSubject.create();
Reported by PMD.
Line: 107
@Override
public SingleSource<Integer> apply(Integer v)
throws Exception {
if (v == 1) {
return ms1;
}
return ms2;
}
}).test();
Reported by PMD.
Line: 116
ts.assertEmpty();
pp.onNext(1);
ts.assertEmpty();
assertTrue(ms1.hasObservers());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupJoinTest.java
197 issues
Line: 302
Function<Integer, Observable<Integer>> fail = new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer t1) {
throw new RuntimeException("Forced failure");
}
};
Observable<Observable<Integer>> m = source1.groupJoin(source2,
fail,
Reported by PMD.
Line: 326
Function<Integer, Observable<Integer>> fail = new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer t1) {
throw new RuntimeException("Forced failure");
}
};
Observable<Observable<Integer>> m = source1.groupJoin(source2,
just(Observable.never()),
Reported by PMD.
Line: 350
BiFunction<Integer, Observable<Integer>, Integer> fail = new BiFunction<Integer, Observable<Integer>, Integer>() {
@Override
public Integer apply(Integer t1, Observable<Integer> t2) {
throw new RuntimeException("Forced failure");
}
};
Observable<Integer> m = source1.groupJoin(source2,
just(Observable.never()),
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableGroupJoinTest extends RxJavaTest {
Observer<Object> observer = TestHelper.mockObserver();
BiFunction<Integer, Integer, Integer> add = new BiFunction<Integer, Integer, Integer>() {
@Override
Reported by PMD.
Line: 40
public class ObservableGroupJoinTest extends RxJavaTest {
Observer<Object> observer = TestHelper.mockObserver();
BiFunction<Integer, Integer, Integer> add = new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
Reported by PMD.
Line: 42
Observer<Object> observer = TestHelper.mockObserver();
BiFunction<Integer, Integer, Integer> add = new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
};
Reported by PMD.
Line: 67
};
}
BiFunction<Integer, Observable<Integer>, Observable<Integer>> add2 = new BiFunction<Integer, Observable<Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> apply(final Integer leftValue, Observable<Integer> rightValues) {
return rightValues.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer rightValue) throws Throwable {
Reported by PMD.
Line: 90
PublishSubject<Integer> source1 = PublishSubject.create();
PublishSubject<Integer> source2 = PublishSubject.create();
Observable<Integer> m = Observable.merge(source1.groupJoin(source2,
just(Observable.never()),
just(Observable.never()), add2));
m.subscribe(observer);
Reported by PMD.
Line: 94
just(Observable.never()),
just(Observable.never()), add2));
m.subscribe(observer);
source1.onNext(1);
source1.onNext(2);
source1.onNext(4);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTest.java
196 issues
Line: 35
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableSampleTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<Long> subscriber;
private Subscriber<Object> subscriber2;
Reported by PMD.
Line: 36
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableSampleTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<Long> subscriber;
private Subscriber<Object> subscriber2;
@Before
Reported by PMD.
Line: 37
public class FlowableSampleTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<Long> subscriber;
private Subscriber<Object> subscriber2;
@Before
// due to mocking
Reported by PMD.
Line: 38
public class FlowableSampleTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<Long> subscriber;
private Subscriber<Object> subscriber2;
@Before
// due to mocking
public void before() {
Reported by PMD.
Line: 39
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<Long> subscriber;
private Subscriber<Object> subscriber2;
@Before
// due to mocking
public void before() {
scheduler = new TestScheduler();
Reported by PMD.
Line: 78
});
Flowable<Long> sampled = source.sample(400L, TimeUnit.MILLISECONDS, scheduler);
sampled.subscribe(subscriber);
InOrder inOrder = inOrder(subscriber);
scheduler.advanceTimeTo(800L, TimeUnit.MILLISECONDS);
verify(subscriber, never()).onNext(any(Long.class));
Reported by PMD.
Line: 83
InOrder inOrder = inOrder(subscriber);
scheduler.advanceTimeTo(800L, TimeUnit.MILLISECONDS);
verify(subscriber, never()).onNext(any(Long.class));
verify(subscriber, never()).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
scheduler.advanceTimeTo(1200L, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext(1L);
Reported by PMD.
Line: 84
scheduler.advanceTimeTo(800L, TimeUnit.MILLISECONDS);
verify(subscriber, never()).onNext(any(Long.class));
verify(subscriber, never()).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
scheduler.advanceTimeTo(1200L, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext(1L);
verify(subscriber, never()).onNext(2L);
Reported by PMD.
Line: 85
scheduler.advanceTimeTo(800L, TimeUnit.MILLISECONDS);
verify(subscriber, never()).onNext(any(Long.class));
verify(subscriber, never()).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
scheduler.advanceTimeTo(1200L, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext(1L);
verify(subscriber, never()).onNext(2L);
verify(subscriber, never()).onComplete();
Reported by PMD.
Line: 88
verify(subscriber, never()).onError(any(Throwable.class));
scheduler.advanceTimeTo(1200L, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext(1L);
verify(subscriber, never()).onNext(2L);
verify(subscriber, never()).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
scheduler.advanceTimeTo(1600L, TimeUnit.MILLISECONDS);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subjects/MaybeSubjectTest.java
195 issues
Line: 32
public class MaybeSubjectTest extends RxJavaTest {
@Test
public void success() {
MaybeSubject<Integer> ms = MaybeSubject.create();
assertFalse(ms.hasValue());
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
Reported by PMD.
Line: 35
public void success() {
MaybeSubject<Integer> ms = MaybeSubject.create();
assertFalse(ms.hasValue());
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
assertFalse(ms.hasThrowable());
assertNull(ms.getThrowable());
assertFalse(ms.hasObservers());
Reported by PMD.
Line: 35
public void success() {
MaybeSubject<Integer> ms = MaybeSubject.create();
assertFalse(ms.hasValue());
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
assertFalse(ms.hasThrowable());
assertNull(ms.getThrowable());
assertFalse(ms.hasObservers());
Reported by PMD.
Line: 36
MaybeSubject<Integer> ms = MaybeSubject.create();
assertFalse(ms.hasValue());
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
assertFalse(ms.hasThrowable());
assertNull(ms.getThrowable());
assertFalse(ms.hasObservers());
assertEquals(0, ms.observerCount());
Reported by PMD.
Line: 36
MaybeSubject<Integer> ms = MaybeSubject.create();
assertFalse(ms.hasValue());
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
assertFalse(ms.hasThrowable());
assertNull(ms.getThrowable());
assertFalse(ms.hasObservers());
assertEquals(0, ms.observerCount());
Reported by PMD.
Line: 37
assertFalse(ms.hasValue());
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
assertFalse(ms.hasThrowable());
assertNull(ms.getThrowable());
assertFalse(ms.hasObservers());
assertEquals(0, ms.observerCount());
Reported by PMD.
Line: 37
assertFalse(ms.hasValue());
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
assertFalse(ms.hasThrowable());
assertNull(ms.getThrowable());
assertFalse(ms.hasObservers());
assertEquals(0, ms.observerCount());
Reported by PMD.
Line: 38
assertFalse(ms.hasValue());
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
assertFalse(ms.hasThrowable());
assertNull(ms.getThrowable());
assertFalse(ms.hasObservers());
assertEquals(0, ms.observerCount());
TestObserver<Integer> to = ms.test();
Reported by PMD.
Line: 38
assertFalse(ms.hasValue());
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
assertFalse(ms.hasThrowable());
assertNull(ms.getThrowable());
assertFalse(ms.hasObservers());
assertEquals(0, ms.observerCount());
TestObserver<Integer> to = ms.test();
Reported by PMD.
Line: 39
assertNull(ms.getValue());
assertFalse(ms.hasComplete());
assertFalse(ms.hasThrowable());
assertNull(ms.getThrowable());
assertFalse(ms.hasObservers());
assertEquals(0, ms.observerCount());
TestObserver<Integer> to = ms.test();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterableTest.java
193 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import java.util.*;
import java.util.concurrent.Callable;
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableFlattenIterableTest extends RxJavaTest {
@Test
public void normal0() {
TestSubscriber<Integer> ts = new TestSubscriber<>();
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableFlattenIterableTest extends RxJavaTest {
@Test
public void normal0() {
TestSubscriber<Integer> ts = new TestSubscriber<>();
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableFlattenIterableTest extends RxJavaTest {
@Test
public void normal0() {
TestSubscriber<Integer> ts = new TestSubscriber<>();
Reported by PMD.
Line: 61
})
.subscribe(ts);
ts.assertValues(2, 3)
.assertNoErrors()
.assertComplete();
}
final Function<Integer, Iterable<Integer>> mapper = new Function<Integer, Iterable<Integer>>() {
Reported by PMD.
Line: 61
})
.subscribe(ts);
ts.assertValues(2, 3)
.assertNoErrors()
.assertComplete();
}
final Function<Integer, Iterable<Integer>> mapper = new Function<Integer, Iterable<Integer>>() {
Reported by PMD.
Line: 66
.assertComplete();
}
final Function<Integer, Iterable<Integer>> mapper = new Function<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> apply(Integer v) {
return Arrays.asList(v, v + 1);
}
};
Reported by PMD.
Line: 77
public void normal() {
TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5).concatMapIterable(mapper)
.subscribe(ts);
ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
ts.assertNoErrors();
ts.assertComplete();
Reported by PMD.
Line: 77
public void normal() {
TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5).concatMapIterable(mapper)
.subscribe(ts);
ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
ts.assertNoErrors();
ts.assertComplete();
Reported by PMD.
Line: 89
public void normalViaFlatMap() {
TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5).flatMapIterable(mapper)
.subscribe(ts);
ts.assertValues(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
ts.assertNoErrors();
ts.assertComplete();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/validators/SourceAnnotationCheck.java
193 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.validators;
import java.io.File;
import java.nio.file.Files;
import java.util.*;
Reported by PMD.
Line: 39
* @NonNull or @Nullable annotations specified on their return type and object-type parameters
* as well as @SafeVarargs for varargs.
*/
public class SourceAnnotationCheck {
@Test
public void checkCompletable() throws Exception {
processFile(Completable.class);
}
Reported by PMD.
Line: 39
* @NonNull or @Nullable annotations specified on their return type and object-type parameters
* as well as @SafeVarargs for varargs.
*/
public class SourceAnnotationCheck {
@Test
public void checkCompletable() throws Exception {
processFile(Completable.class);
}
Reported by PMD.
Line: 39
* @NonNull or @Nullable annotations specified on their return type and object-type parameters
* as well as @SafeVarargs for varargs.
*/
public class SourceAnnotationCheck {
@Test
public void checkCompletable() throws Exception {
processFile(Completable.class);
}
Reported by PMD.
Line: 39
* @NonNull or @Nullable annotations specified on their return type and object-type parameters
* as well as @SafeVarargs for varargs.
*/
public class SourceAnnotationCheck {
@Test
public void checkCompletable() throws Exception {
processFile(Completable.class);
}
Reported by PMD.
Line: 39
* @NonNull or @Nullable annotations specified on their return type and object-type parameters
* as well as @SafeVarargs for varargs.
*/
public class SourceAnnotationCheck {
@Test
public void checkCompletable() throws Exception {
processFile(Completable.class);
}
Reported by PMD.
Line: 42
public class SourceAnnotationCheck {
@Test
public void checkCompletable() throws Exception {
processFile(Completable.class);
}
@Test
public void checkSingle() throws Exception {
Reported by PMD.
Line: 42
public class SourceAnnotationCheck {
@Test
public void checkCompletable() throws Exception {
processFile(Completable.class);
}
@Test
public void checkSingle() throws Exception {
Reported by PMD.
Line: 47
}
@Test
public void checkSingle() throws Exception {
processFile(Single.class);
}
@Test
public void checkMaybe() throws Exception {
Reported by PMD.
Line: 47
}
@Test
public void checkSingle() throws Exception {
processFile(Single.class);
}
@Test
public void checkMaybe() throws Exception {
Reported by PMD.