The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableSwitchMapMaybeTest.java
215 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.mixed;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableSwitchMapMaybeTest extends RxJavaTest {
@Test
public void simple() {
Observable.range(1, 5)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
Reported by PMD.
Line: 36
public class ObservableSwitchMapMaybeTest extends RxJavaTest {
@Test
public void simple() {
Observable.range(1, 5)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 50
}
@Test
public void simpleEmpty() {
Observable.range(1, 5)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 64
}
@Test
public void simpleMixed() {
Observable.range(1, 10)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 81
}
@Test
public void mainError() {
Observable.error(new TestException())
.switchMapMaybe(Functions.justFunction(Maybe.never()))
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 89
}
@Test
public void innerError() {
Observable.just(1)
.switchMapMaybe(Functions.justFunction(Maybe.error(new TestException())))
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 97
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeObservable(new Function<Observable<Object>, Observable<Object>>() {
@Override
public Observable<Object> apply(Observable<Object> f)
throws Exception {
return f
Reported by PMD.
Line: 110
}
@Test
public void take() {
Observable.range(1, 5)
.switchMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 125
}
@Test
public void switchOver() {
PublishSubject<Integer> ps = PublishSubject.create();
final MaybeSubject<Integer> ms1 = MaybeSubject.create();
final MaybeSubject<Integer> ms2 = MaybeSubject.create();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSingleTest.java
214 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 31
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableSingleTest extends RxJavaTest {
@Test
public void singleObservable() {
Observable<Integer> o = Observable.just(1).singleElement().toObservable();
Reported by PMD.
Line: 34
public class ObservableSingleTest extends RxJavaTest {
@Test
public void singleObservable() {
Observable<Integer> o = Observable.just(1).singleElement().toObservable();
Observer<Integer> observer = TestHelper.mockObserver();
o.subscribe(observer);
Reported by PMD.
Line: 35
@Test
public void singleObservable() {
Observable<Integer> o = Observable.just(1).singleElement().toObservable();
Observer<Integer> observer = TestHelper.mockObserver();
o.subscribe(observer);
InOrder inOrder = inOrder(observer);
Reported by PMD.
Line: 35
@Test
public void singleObservable() {
Observable<Integer> o = Observable.just(1).singleElement().toObservable();
Observer<Integer> observer = TestHelper.mockObserver();
o.subscribe(observer);
InOrder inOrder = inOrder(observer);
Reported by PMD.
Line: 38
Observable<Integer> o = Observable.just(1).singleElement().toObservable();
Observer<Integer> observer = TestHelper.mockObserver();
o.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(1);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
Reported by PMD.
Line: 41
o.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(1);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
@Test
Reported by PMD.
Line: 41
o.subscribe(observer);
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(1);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
@Test
Reported by PMD.
Line: 42
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(1);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
@Test
public void singleWithTooManyElementsObservable() {
Reported by PMD.
Line: 42
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(1);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
@Test
public void singleWithTooManyElementsObservable() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableSwitchMapSingleTest.java
212 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.mixed;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableSwitchMapSingleTest extends RxJavaTest {
@Test
public void simple() {
Observable.range(1, 5)
.switchMapSingle(new Function<Integer, SingleSource<Integer>>() {
Reported by PMD.
Line: 36
public class ObservableSwitchMapSingleTest extends RxJavaTest {
@Test
public void simple() {
Observable.range(1, 5)
.switchMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 50
}
@Test
public void mainError() {
Observable.error(new TestException())
.switchMapSingle(Functions.justFunction(Single.never()))
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 58
}
@Test
public void innerError() {
Observable.just(1)
.switchMapSingle(Functions.justFunction(Single.error(new TestException())))
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 66
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeObservable(new Function<Observable<Object>, Observable<Object>>() {
@Override
public Observable<Object> apply(Observable<Object> f)
throws Exception {
return f
Reported by PMD.
Line: 79
}
@Test
public void take() {
Observable.range(1, 5)
.switchMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 94
}
@Test
public void switchOver() {
PublishSubject<Integer> ps = PublishSubject.create();
final SingleSubject<Integer> ms1 = SingleSubject.create();
final SingleSubject<Integer> ms2 = SingleSubject.create();
Reported by PMD.
Line: 104
@Override
public SingleSource<Integer> apply(Integer v)
throws Exception {
if (v == 1) {
return ms1;
}
return ms2;
}
}).test();
Reported by PMD.
Line: 113
to.assertEmpty();
ps.onNext(1);
to.assertEmpty();
assertTrue(ms1.hasObservers());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/jdk8/ObservableStageSubscriberOrDefaultTest.java
211 issues
Line: 28
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableStageSubscriberOrDefaultTest extends RxJavaTest {
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstStage(null)
Reported by PMD.
Line: 31
public class ObservableStageSubscriberOrDefaultTest extends RxJavaTest {
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstStage(null)
.toCompletableFuture()
.get();
Reported by PMD.
Line: 32
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstStage(null)
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 32
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstStage(null)
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 32
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstStage(null)
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 37
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
}
@Test
public void firstEmpty() throws Exception {
Integer v = Observable.<Integer>empty()
Reported by PMD.
Line: 41
}
@Test
public void firstEmpty() throws Exception {
Integer v = Observable.<Integer>empty()
.firstStage(2)
.toCompletableFuture()
.get();
Reported by PMD.
Line: 42
@Test
public void firstEmpty() throws Exception {
Integer v = Observable.<Integer>empty()
.firstStage(2)
.toCompletableFuture()
.get();
assertEquals((Integer)2, v);
Reported by PMD.
Line: 42
@Test
public void firstEmpty() throws Exception {
Integer v = Observable.<Integer>empty()
.firstStage(2)
.toCompletableFuture()
.get();
assertEquals((Integer)2, v);
Reported by PMD.
Line: 42
@Test
public void firstEmpty() throws Exception {
Integer v = Observable.<Integer>empty()
.firstStage(2)
.toCompletableFuture()
.get();
assertEquals((Integer)2, v);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrDefaultTest.java
211 issues
Line: 29
import io.reactivex.rxjava3.processors.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableStageSubscriberOrDefaultTest extends RxJavaTest {
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstStage(null)
Reported by PMD.
Line: 32
public class FlowableStageSubscriberOrDefaultTest extends RxJavaTest {
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstStage(null)
.toCompletableFuture()
.get();
Reported by PMD.
Line: 33
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstStage(null)
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 33
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstStage(null)
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 33
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstStage(null)
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 38
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
}
@Test
public void firstEmpty() throws Exception {
Integer v = Flowable.<Integer>empty()
Reported by PMD.
Line: 42
}
@Test
public void firstEmpty() throws Exception {
Integer v = Flowable.<Integer>empty()
.firstStage(2)
.toCompletableFuture()
.get();
Reported by PMD.
Line: 43
@Test
public void firstEmpty() throws Exception {
Integer v = Flowable.<Integer>empty()
.firstStage(2)
.toCompletableFuture()
.get();
assertEquals((Integer)2, v);
Reported by PMD.
Line: 43
@Test
public void firstEmpty() throws Exception {
Integer v = Flowable.<Integer>empty()
.firstStage(2)
.toCompletableFuture()
.get();
assertEquals((Integer)2, v);
Reported by PMD.
Line: 43
@Test
public void firstEmpty() throws Exception {
Integer v = Flowable.<Integer>empty()
.firstStage(2)
.toCompletableFuture()
.get();
assertEquals((Integer)2, v);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/jdk8/FlowableStageSubscriberOrErrorTest.java
208 issues
Line: 30
import io.reactivex.rxjava3.processors.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableStageSubscriberOrErrorTest extends RxJavaTest {
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstOrErrorStage()
Reported by PMD.
Line: 33
public class FlowableStageSubscriberOrErrorTest extends RxJavaTest {
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstOrErrorStage()
.toCompletableFuture()
.get();
Reported by PMD.
Line: 34
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstOrErrorStage()
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 34
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstOrErrorStage()
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 34
@Test
public void firstJust() throws Exception {
Integer v = Flowable.just(1)
.firstOrErrorStage()
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 39
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
}
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Reported by PMD.
Line: 43
}
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Flowable.<Integer>empty()
.firstOrErrorStage()
.toCompletableFuture(), NoSuchElementException.class);
}
Reported by PMD.
Line: 43
}
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Flowable.<Integer>empty()
.firstOrErrorStage()
.toCompletableFuture(), NoSuchElementException.class);
}
Reported by PMD.
Line: 45
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Flowable.<Integer>empty()
.firstOrErrorStage()
.toCompletableFuture(), NoSuchElementException.class);
}
@Test
Reported by PMD.
Line: 45
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Flowable.<Integer>empty()
.firstOrErrorStage()
.toCompletableFuture(), NoSuchElementException.class);
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/jdk8/ObservableStageSubscriberOrErrorTest.java
208 issues
Line: 29
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableStageSubscriberOrErrorTest extends RxJavaTest {
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstOrErrorStage()
Reported by PMD.
Line: 32
public class ObservableStageSubscriberOrErrorTest extends RxJavaTest {
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstOrErrorStage()
.toCompletableFuture()
.get();
Reported by PMD.
Line: 33
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstOrErrorStage()
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 33
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstOrErrorStage()
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 33
@Test
public void firstJust() throws Exception {
Integer v = Observable.just(1)
.firstOrErrorStage()
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
Reported by PMD.
Line: 38
.toCompletableFuture()
.get();
assertEquals((Integer)1, v);
}
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Reported by PMD.
Line: 42
}
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Observable.<Integer>empty()
.firstOrErrorStage()
.toCompletableFuture(), NoSuchElementException.class);
}
Reported by PMD.
Line: 42
}
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Observable.<Integer>empty()
.firstOrErrorStage()
.toCompletableFuture(), NoSuchElementException.class);
}
Reported by PMD.
Line: 44
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Observable.<Integer>empty()
.firstOrErrorStage()
.toCompletableFuture(), NoSuchElementException.class);
}
@Test
Reported by PMD.
Line: 44
@Test
public void firstEmpty() throws Exception {
TestHelper.assertError(
Observable.<Integer>empty()
.firstOrErrorStage()
.toCompletableFuture(), NoSuchElementException.class);
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupJoinTest.java
207 issues
Line: 301
Function<Integer, Flowable<Integer>> fail = new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer t1) {
throw new RuntimeException("Forced failure");
}
};
Flowable<Flowable<Integer>> m = source1.groupJoin(source2,
fail,
Reported by PMD.
Line: 325
Function<Integer, Flowable<Integer>> fail = new Function<Integer, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(Integer t1) {
throw new RuntimeException("Forced failure");
}
};
Flowable<Flowable<Integer>> m = source1.groupJoin(source2,
just(Flowable.never()),
Reported by PMD.
Line: 349
BiFunction<Integer, Flowable<Integer>, Integer> fail = new BiFunction<Integer, Flowable<Integer>, Integer>() {
@Override
public Integer apply(Integer t1, Flowable<Integer> t2) {
throw new RuntimeException("Forced failure");
}
};
Flowable<Integer> m = source1.groupJoin(source2,
just(Flowable.never()),
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 36
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableGroupJoinTest extends RxJavaTest {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
BiFunction<Integer, Integer, Integer> add = new BiFunction<Integer, Integer, Integer>() {
@Override
Reported by PMD.
Line: 38
public class FlowableGroupJoinTest extends RxJavaTest {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
BiFunction<Integer, Integer, Integer> add = new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
Reported by PMD.
Line: 40
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
BiFunction<Integer, Integer, Integer> add = new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
};
Reported by PMD.
Line: 65
};
}
BiFunction<Integer, Flowable<Integer>, Flowable<Integer>> add2 = new BiFunction<Integer, Flowable<Integer>, Flowable<Integer>>() {
@Override
public Flowable<Integer> apply(final Integer leftValue, Flowable<Integer> rightValues) {
return rightValues.map(new Function<Integer, Integer>() {
@Override
public Integer apply(Integer rightValue) throws Throwable {
Reported by PMD.
Line: 88
PublishProcessor<Integer> source1 = PublishProcessor.create();
PublishProcessor<Integer> source2 = PublishProcessor.create();
Flowable<Integer> m = Flowable.merge(source1.groupJoin(source2,
just(Flowable.never()),
just(Flowable.never()), add2));
m.subscribe(subscriber);
Reported by PMD.
Line: 92
just(Flowable.never()),
just(Flowable.never()), add2));
m.subscribe(subscriber);
source1.onNext(1);
source1.onNext(2);
source1.onNext(4);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subscribers/SerializedSubscriberTest.java
206 issues
Line: 669
}
subscriber.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
System.out.println("starting TestSingleThreadedObservable thread");
Reported by PMD.
Line: 683
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Reported by PMD.
Line: 729
System.out.println("TestMultiThreadedObservable onNext: " + s + " on thread " + Thread.currentThread().getName());
if (s == null) {
// force an error
throw npe;
} else {
// allow the exception to queue up
int sleep = (fj % 3) * 10;
if (sleep != 0) {
Thread.sleep(sleep);
Reported by PMD.
Line: 755
// we are done spawning threads
threadPool.shutdown();
} catch (Throwable e) {
throw new RuntimeException(e);
}
// wait until all threads are done, then mark it as COMPLETED
try {
// wait for all the threads to finish
Reported by PMD.
Line: 765
System.out.println("Threadpool did not terminate in time.");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
subscriber.onComplete();
}
});
System.out.println("starting TestMultiThreadedObservable thread");
Reported by PMD.
Line: 779
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private static class BusySubscriber extends DefaultSubscriber<String> {
Reported by PMD.
Line: 104
onSubscribe.waitToFinish();
busySubscriber.terminalEvent.await();
System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Subscriber maxConcurrentThreads: " + busySubscriber.maxConcurrentThreads.get());
// we can't know how many onNext calls will occur since they each run on a separate thread
// that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
// assertEquals(3, busySubscriber.onNextCount.get());
assertTrue(busySubscriber.onNextCount.get() < 4);
Reported by PMD.
Line: 137
w.subscribe(aw);
onSubscribe.waitToFinish();
System.out.println("OnSubscribe maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get() + " Subscriber maxConcurrentThreads: " + busySubscriber.maxConcurrentThreads.get());
// we can have concurrency ...
assertTrue(onSubscribe.maxConcurrentThreads.get() > 1);
// ... but the onNext execution should be single threaded
assertEquals(1, busySubscriber.maxConcurrentThreads.get());
Reported by PMD.
Line: 145
assertEquals(1, busySubscriber.maxConcurrentThreads.get());
// this should not be the full number of items since the error should stop it before it completes all 9
System.out.println("onNext count: " + busySubscriber.onNextCount.get());
assertFalse(busySubscriber.onComplete);
assertTrue(busySubscriber.onError);
assertTrue(busySubscriber.onNextCount.get() < 9);
// no onComplete because onError was invoked
// non-deterministic because unsubscribe happens after 'waitToFinish' releases
Reported by PMD.
Line: 312
firstOnNext.await();
Thread t1 = ts.lastThread();
System.out.println("first onNext on thread: " + t1);
latch.countDown();
waitOnThreads(f1, f2);
// not completed yet
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithFlowableTest.java
206 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 37
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableWindowWithFlowableTest extends RxJavaTest {
@Test
public void windowViaFlowableNormal1() {
PublishProcessor<Integer> source = PublishProcessor.create();
PublishProcessor<Integer> boundary = PublishProcessor.create();
Reported by PMD.
Line: 68
}
};
source.window(boundary).subscribe(wo);
int n = 30;
for (int i = 0; i < n; i++) {
source.onNext(i);
if (i % 3 == 2 && i < n - 1) {
Reported by PMD.
Line: 68
}
};
source.window(boundary).subscribe(wo);
int n = 30;
for (int i = 0; i < n; i++) {
source.onNext(i);
if (i % 3 == 2 && i < n - 1) {
Reported by PMD.
Line: 77
boundary.onNext(i / 3);
}
}
source.onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
assertEquals(n / 3, values.size());
Reported by PMD.
Line: 79
}
source.onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
assertEquals(n / 3, values.size());
int j = 0;
for (Subscriber<Object> mo : values) {
Reported by PMD.
Line: 81
verify(subscriber, never()).onError(any(Throwable.class));
assertEquals(n / 3, values.size());
int j = 0;
for (Subscriber<Object> mo : values) {
verify(mo, never()).onError(any(Throwable.class));
for (int i = 0; i < 3; i++) {
Reported by PMD.
Line: 85
int j = 0;
for (Subscriber<Object> mo : values) {
verify(mo, never()).onError(any(Throwable.class));
for (int i = 0; i < 3; i++) {
verify(mo).onNext(j + i);
}
verify(mo).onComplete();
j += 3;
Reported by PMD.
Line: 87
for (Subscriber<Object> mo : values) {
verify(mo, never()).onError(any(Throwable.class));
for (int i = 0; i < 3; i++) {
verify(mo).onNext(j + i);
}
verify(mo).onComplete();
j += 3;
}
Reported by PMD.
Line: 89
for (int i = 0; i < 3; i++) {
verify(mo).onNext(j + i);
}
verify(mo).onComplete();
j += 3;
}
verify(subscriber).onComplete();
}
Reported by PMD.