The following issues were found
src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapOptionalTest.java
68 issues
Line: 32
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ParallelMapOptionalTest extends RxJavaTest {
@Test
public void doubleFilter() {
Flowable.range(1, 10)
.parallel()
Reported by PMD.
Line: 35
public class ParallelMapOptionalTest extends RxJavaTest {
@Test
public void doubleFilter() {
Flowable.range(1, 10)
.parallel()
.mapOptional(Optional::of)
.filter(new Predicate<Integer>() {
@Override
Reported by PMD.
Line: 57
}
@Test
public void doubleFilterAsync() {
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.mapOptional(Optional::of)
.filter(new Predicate<Integer>() {
Reported by PMD.
Line: 81
}
@Test
public void doubleError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.mapOptional(Optional::of)
.sequential()
Reported by PMD.
Line: 90
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 100
}
@Test
public void doubleError2() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.mapOptional(Optional::of)
.filter(Functions.alwaysTrue())
Reported by PMD.
Line: 110
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 120
}
@Test
public void error() {
Flowable.error(new TestException())
.parallel()
.mapOptional(Optional::of)
.sequential()
.test()
Reported by PMD.
Line: 130
}
@Test
public void mapCrash() {
Flowable.just(1)
.parallel()
.mapOptional(v -> { throw new TestException(); })
.sequential()
.test()
Reported by PMD.
Line: 131
@Test
public void mapCrash() {
Flowable.just(1)
.parallel()
.mapOptional(v -> { throw new TestException(); })
.sequential()
.test()
.assertFailure(TestException.class);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableMergeMaxConcurrentTest.java
68 issues
Line: 33
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableMergeMaxConcurrentTest extends RxJavaTest {
Observer<String> stringObserver;
@Before
public void before() {
Reported by PMD.
Line: 35
public class ObservableMergeMaxConcurrentTest extends RxJavaTest {
Observer<String> stringObserver;
@Before
public void before() {
stringObserver = TestHelper.mockObserver();
}
Reported by PMD.
Line: 45
@Test
public void whenMaxConcurrentIsOne() {
for (int i = 0; i < 100; i++) {
List<Observable<String>> os = new ArrayList<>();
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
Reported by PMD.
Line: 46
public void whenMaxConcurrentIsOne() {
for (int i = 0; i < 100; i++) {
List<Observable<String>> os = new ArrayList<>();
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
Reported by PMD.
Line: 46
public void whenMaxConcurrentIsOne() {
for (int i = 0; i < 100; i++) {
List<Observable<String>> os = new ArrayList<>();
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
Reported by PMD.
Line: 46
public void whenMaxConcurrentIsOne() {
for (int i = 0; i < 100; i++) {
List<Observable<String>> os = new ArrayList<>();
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
Reported by PMD.
Line: 46
public void whenMaxConcurrentIsOne() {
for (int i = 0; i < 100; i++) {
List<Observable<String>> os = new ArrayList<>();
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
Reported by PMD.
Line: 46
public void whenMaxConcurrentIsOne() {
for (int i = 0; i < 100; i++) {
List<Observable<String>> os = new ArrayList<>();
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
Reported by PMD.
Line: 46
public void whenMaxConcurrentIsOne() {
for (int i = 0; i < 100; i++) {
List<Observable<String>> os = new ArrayList<>();
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
Reported by PMD.
Line: 47
for (int i = 0; i < 100; i++) {
List<Observable<String>> os = new ArrayList<>();
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
os.add(Observable.just("one", "two", "three", "four", "five").subscribeOn(Schedulers.newThread()));
List<String> expected = Arrays.asList("one", "two", "three", "four", "five", "one", "two", "three", "four", "five", "one", "two", "three", "four", "five");
Iterator<String> iter = Observable.merge(os, 1).blockingIterable().iterator();
List<String> actual = new ArrayList<>();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureReduceTest.java
68 issues
Line: 158
private <T> void assertValuesDropped(TestSubscriberEx<T> ts, int totalValues) {
int n = ts.values().size();
System.out.println("testAsynchronousDrop -> " + n);
Assert.assertTrue("All events received?", n < totalValues);
}
private void assertIncreasingSequence(TestSubscriberEx<Integer> ts) {
int previous = 0;
Reported by PMD.
Line: 31
import java.util.Random;
import java.util.concurrent.TimeUnit;
public class FlowableOnBackpressureReduceTest extends RxJavaTest {
static final BiFunction<Integer, Integer, Integer> TEST_INT_REDUCER = (previous, current) -> previous + current + 50;
static final BiFunction<Object, Object, Object> TEST_OBJECT_REDUCER = (previous, current) -> current;
Reported by PMD.
Line: 38
static final BiFunction<Object, Object, Object> TEST_OBJECT_REDUCER = (previous, current) -> current;
@Test
public void simple() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
Flowable.range(1, 5).onBackpressureReduce(TEST_INT_REDUCER).subscribe(ts);
ts.assertNoErrors();
Reported by PMD.
Line: 41
public void simple() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
Flowable.range(1, 5).onBackpressureReduce(TEST_INT_REDUCER).subscribe(ts);
ts.assertNoErrors();
ts.assertTerminated();
ts.assertValues(1, 2, 3, 4, 5);
}
Reported by PMD.
Line: 41
public void simple() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
Flowable.range(1, 5).onBackpressureReduce(TEST_INT_REDUCER).subscribe(ts);
ts.assertNoErrors();
ts.assertTerminated();
ts.assertValues(1, 2, 3, 4, 5);
}
Reported by PMD.
Line: 49
}
@Test
public void simpleError() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
Flowable.range(1, 5).concatWith(Flowable.error(new TestException()))
.onBackpressureReduce(TEST_INT_REDUCER).subscribe(ts);
Reported by PMD.
Line: 61
}
@Test
public void simpleBackpressure() {
TestSubscriber<Integer> ts = new TestSubscriber<>(2L);
Flowable.range(1, 5).onBackpressureReduce(TEST_INT_REDUCER).subscribe(ts);
ts.assertNoErrors();
Reported by PMD.
Line: 64
public void simpleBackpressure() {
TestSubscriber<Integer> ts = new TestSubscriber<>(2L);
Flowable.range(1, 5).onBackpressureReduce(TEST_INT_REDUCER).subscribe(ts);
ts.assertNoErrors();
ts.assertValues(1, 2);
ts.assertNotComplete();
}
Reported by PMD.
Line: 64
public void simpleBackpressure() {
TestSubscriber<Integer> ts = new TestSubscriber<>(2L);
Flowable.range(1, 5).onBackpressureReduce(TEST_INT_REDUCER).subscribe(ts);
ts.assertNoErrors();
ts.assertValues(1, 2);
ts.assertNotComplete();
}
Reported by PMD.
Line: 72
}
@Test
public void synchronousDrop() {
PublishProcessor<Integer> source = PublishProcessor.create();
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0L);
source.onBackpressureReduce(TEST_INT_REDUCER).subscribe(ts);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSkipUntilTest.java
68 issues
Line: 28
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableSkipUntilTest extends RxJavaTest {
Subscriber<Object> subscriber;
@Before
public void before() {
subscriber = TestHelper.mockSubscriber();
}
Reported by PMD.
Line: 40
PublishProcessor<Integer> source = PublishProcessor.create();
PublishProcessor<Integer> other = PublishProcessor.create();
Flowable<Integer> m = source.skipUntil(other);
m.subscribe(subscriber);
source.onNext(0);
source.onNext(1);
Reported by PMD.
Line: 41
PublishProcessor<Integer> other = PublishProcessor.create();
Flowable<Integer> m = source.skipUntil(other);
m.subscribe(subscriber);
source.onNext(0);
source.onNext(1);
other.onNext(100);
Reported by PMD.
Line: 43
Flowable<Integer> m = source.skipUntil(other);
m.subscribe(subscriber);
source.onNext(0);
source.onNext(1);
other.onNext(100);
source.onNext(2);
Reported by PMD.
Line: 44
m.subscribe(subscriber);
source.onNext(0);
source.onNext(1);
other.onNext(100);
source.onNext(2);
source.onNext(3);
Reported by PMD.
Line: 46
source.onNext(0);
source.onNext(1);
other.onNext(100);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
Reported by PMD.
Line: 48
other.onNext(100);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
Reported by PMD.
Line: 49
other.onNext(100);
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onNext(2);
Reported by PMD.
Line: 50
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onNext(2);
verify(subscriber, times(1)).onNext(3);
Reported by PMD.
Line: 51
source.onNext(2);
source.onNext(3);
source.onNext(4);
source.onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onNext(2);
verify(subscriber, times(1)).onNext(3);
verify(subscriber, times(1)).onNext(4);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/schedulers/InstantPeriodicTaskTest.java
67 issues
Line: 32
public class InstantPeriodicTaskTest extends RxJavaTest {
@Test
public void taskCrash() throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() {
Reported by PMD.
Line: 46
try {
task.call();
fail("Should have thrown!");
} catch (TestException excepted) {
// excepted
}
TestHelper.assertUndeliverable(errors, 0, TestException.class);
Reported by PMD.
Line: 47
try {
task.call();
fail("Should have thrown!");
} catch (TestException excepted) {
// excepted
}
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
Reported by PMD.
Line: 59
}
@Test
public void dispose() throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() {
@Override
Reported by PMD.
Line: 59
}
@Test
public void dispose() throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() {
@Override
Reported by PMD.
Line: 70
}
}, exec);
assertFalse(task.isDisposed());
task.dispose();
assertTrue(task.isDisposed());
Reported by PMD.
Line: 74
task.dispose();
assertTrue(task.isDisposed());
task.dispose();
assertTrue(task.isDisposed());
} finally {
Reported by PMD.
Line: 78
task.dispose();
assertTrue(task.isDisposed());
} finally {
exec.shutdownNow();
RxJavaPlugins.reset();
}
}
Reported by PMD.
Line: 86
}
@Test
public void dispose2() throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() {
@Override
Reported by PMD.
Line: 86
}
@Test
public void dispose2() throws Exception {
ExecutorService exec = Executors.newSingleThreadExecutor();
try {
InstantPeriodicTask task = new InstantPeriodicTask(new Runnable() {
@Override
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatTest.java
67 issues
Line: 281
try {
Thread.sleep(30);
} catch (InterruptedException e) {
System.out.println("Interrupted! " + Thread.currentThread());
interrupted[0] = true;
}
}
});
Completable.concat(Arrays.asList(Completable.complete()
Reported by PMD.
Line: 35
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class CompletableConcatTest extends RxJavaTest {
@Test
public void overflowReported() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Reported by PMD.
Line: 38
public class CompletableConcatTest extends RxJavaTest {
@Test
public void overflowReported() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Completable.concat(
Flowable.fromPublisher(new Publisher<Completable>() {
@Override
Reported by PMD.
Line: 69
Completable.concat(Flowable.just(Completable.complete()), -99);
fail("Should have thrown IllegalArgumentExceptio");
} catch (IllegalArgumentException ex) {
assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
}
}
@Test
public void dispose() {
Reported by PMD.
Line: 74
}
@Test
public void dispose() {
TestHelper.checkDisposed(Completable.concat(Flowable.just(Completable.complete())));
}
@Test
public void errorRace() {
Reported by PMD.
Line: 79
}
@Test
public void errorRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final PublishProcessor<Integer> pp1 = PublishProcessor.create();
Reported by PMD.
Line: 87
final PublishProcessor<Integer> pp1 = PublishProcessor.create();
final PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Void> to = Completable.concat(pp1.map(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v) throws Exception {
return pp2.ignoreElements();
}
})).test();
Reported by PMD.
Line: 94
}
})).test();
pp1.onNext(1);
final TestException ex = new TestException();
Runnable r1 = new Runnable() {
@Override
Reported by PMD.
Line: 96
pp1.onNext(1);
final TestException ex = new TestException();
Runnable r1 = new Runnable() {
@Override
public void run() {
pp1.onError(ex);
Reported by PMD.
Line: 98
final TestException ex = new TestException();
Runnable r1 = new Runnable() {
@Override
public void run() {
pp1.onError(ex);
}
};
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/subscribers/FlowableConsumersTest.java
67 issues
Line: 51
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableConsumersTest implements Consumer<Object>, Action {
final CompositeDisposable composite = new CompositeDisposable();
final PublishProcessor<Integer> processor = PublishProcessor.create();
Reported by PMD.
Line: 53
public class FlowableConsumersTest implements Consumer<Object>, Action {
final CompositeDisposable composite = new CompositeDisposable();
final PublishProcessor<Integer> processor = PublishProcessor.create();
final List<Object> events = new ArrayList<>();
Reported by PMD.
Line: 55
final CompositeDisposable composite = new CompositeDisposable();
final PublishProcessor<Integer> processor = PublishProcessor.create();
final List<Object> events = new ArrayList<>();
@Override
public void run() throws Exception {
Reported by PMD.
Line: 57
final PublishProcessor<Integer> processor = PublishProcessor.create();
final List<Object> events = new ArrayList<>();
@Override
public void run() throws Exception {
events.add("OnComplete");
}
Reported by PMD.
Line: 75
}
@Test
public void onNextNormal() {
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING, () -> { });
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
Reported by PMD.
Line: 79
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING, () -> { });
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 79
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING, () -> { });
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 81
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
processor.onNext(1);
Reported by PMD.
Line: 87
processor.onNext(1);
assertTrue(composite.size() > 0);
assertEquals(Arrays.<Object>asList(1), events);
processor.onComplete();
Reported by PMD.
Line: 89
assertTrue(composite.size() > 0);
assertEquals(Arrays.<Object>asList(1), events);
processor.onComplete();
assertEquals(Arrays.<Object>asList(1), events);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromIterable.java
67 issues
Line: 212
}
@Override
void slowPath(long r) {
long e = 0L;
Iterator<? extends T> it = this.iterator;
Subscriber<? super T> a = downstream;
for (;;) {
Reported by PMD.
Line: 212
}
@Override
void slowPath(long r) {
long e = 0L;
Iterator<? extends T> it = this.iterator;
Subscriber<? super T> a = downstream;
for (;;) {
Reported by PMD.
Line: 348
}
@Override
void slowPath(long r) {
long e = 0L;
Iterator<? extends T> it = this.iterator;
ConditionalSubscriber<? super T> a = downstream;
for (;;) {
Reported by PMD.
Line: 348
}
@Override
void slowPath(long r) {
long e = 0L;
Iterator<? extends T> it = this.iterator;
ConditionalSubscriber<? super T> a = downstream;
for (;;) {
Reported by PMD.
Line: 30
public final class FlowableFromIterable<T> extends Flowable<T> {
final Iterable<? extends T> source;
public FlowableFromIterable(Iterable<? extends T> source) {
this.source = source;
}
Reported by PMD.
Line: 41
Iterator<? extends T> it;
try {
it = source.iterator();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptySubscription.error(e, s);
return;
}
Reported by PMD.
Line: 54
boolean hasNext;
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptySubscription.error(e, s);
return;
}
Reported by PMD.
Line: 76
abstract static class BaseRangeSubscription<T> extends BasicQueueSubscription<T> {
private static final long serialVersionUID = -2252972430506210021L;
Iterator<? extends T> iterator;
volatile boolean cancelled;
boolean once;
Reported by PMD.
Line: 78
Iterator<? extends T> iterator;
volatile boolean cancelled;
boolean once;
BaseRangeSubscription(Iterator<? extends T> it) {
this.iterator = it;
Reported by PMD.
Line: 80
volatile boolean cancelled;
boolean once;
BaseRangeSubscription(Iterator<? extends T> it) {
this.iterator = it;
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapCompletableTest.java
66 issues
Line: 31
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableConcatMapCompletableTest extends RxJavaTest {
@Test
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
Reported by PMD.
Line: 34
public class ObservableConcatMapCompletableTest extends RxJavaTest {
@Test
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
Reported by PMD.
Line: 34
public class ObservableConcatMapCompletableTest extends RxJavaTest {
@Test
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
Reported by PMD.
Line: 37
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onComplete();
to.assertComplete();
Reported by PMD.
Line: 37
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onComplete();
to.assertComplete();
Reported by PMD.
Line: 39
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onComplete();
to.assertComplete();
to.assertValueCount(0);
}
Reported by PMD.
Line: 40
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onComplete();
to.assertComplete();
to.assertValueCount(0);
}
Reported by PMD.
Line: 42
us.onNext(1);
us.onComplete();
to.assertComplete();
to.assertValueCount(0);
}
@Test
public void notFused() throws Exception {
Reported by PMD.
Line: 43
us.onComplete();
to.assertComplete();
to.assertValueCount(0);
}
@Test
public void notFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
Reported by PMD.
Line: 47
}
@Test
public void notFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.hide().concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onNext(2);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapScheduler.java
66 issues
Line: 32
public final class FlowableConcatMapScheduler<T, R> extends AbstractFlowableWithUpstream<T, R> {
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final ErrorMode errorMode;
Reported by PMD.
Line: 34
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final ErrorMode errorMode;
final Scheduler scheduler;
Reported by PMD.
Line: 36
final int prefetch;
final ErrorMode errorMode;
final Scheduler scheduler;
public FlowableConcatMapScheduler(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper,
Reported by PMD.
Line: 38
final ErrorMode errorMode;
final Scheduler scheduler;
public FlowableConcatMapScheduler(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper,
int prefetch, ErrorMode errorMode, Scheduler scheduler) {
super(source);
Reported by PMD.
Line: 52
@Override
protected void subscribeActual(Subscriber<? super R> s) {
switch (errorMode) {
case BOUNDARY:
source.subscribe(new ConcatMapDelayed<>(s, mapper, prefetch, false, scheduler.createWorker()));
break;
case END:
source.subscribe(new ConcatMapDelayed<>(s, mapper, prefetch, true, scheduler.createWorker()));
Reported by PMD.
Line: 70
private static final long serialVersionUID = -3511336836796789179L;
final ConcatMapInner<R> inner;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
Reported by PMD.
Line: 72
final ConcatMapInner<R> inner;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final int limit;
Reported by PMD.
Line: 74
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int prefetch;
final int limit;
final Scheduler.Worker worker;
Reported by PMD.
Line: 76
final int prefetch;
final int limit;
final Scheduler.Worker worker;
Subscription upstream;
Reported by PMD.
Line: 78
final int limit;
final Scheduler.Worker worker;
Subscription upstream;
int consumed;
Reported by PMD.