The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleConcatTest.java
81 issues
Line: 27
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class SingleConcatTest extends RxJavaTest {
@Test
public void concatWith() {
Single.just(1).concatWith(Single.just(2))
.test()
.assertResult(1, 2);
Reported by PMD.
Line: 29
public class SingleConcatTest extends RxJavaTest {
@Test
public void concatWith() {
Single.just(1).concatWith(Single.just(2))
.test()
.assertResult(1, 2);
}
Reported by PMD.
Line: 30
public class SingleConcatTest extends RxJavaTest {
@Test
public void concatWith() {
Single.just(1).concatWith(Single.just(2))
.test()
.assertResult(1, 2);
}
@Test
Reported by PMD.
Line: 30
public class SingleConcatTest extends RxJavaTest {
@Test
public void concatWith() {
Single.just(1).concatWith(Single.just(2))
.test()
.assertResult(1, 2);
}
@Test
Reported by PMD.
Line: 30
public class SingleConcatTest extends RxJavaTest {
@Test
public void concatWith() {
Single.just(1).concatWith(Single.just(2))
.test()
.assertResult(1, 2);
}
@Test
Reported by PMD.
Line: 36
}
@Test
public void concat2() {
Single.concat(Single.just(1), Single.just(2))
.test()
.assertResult(1, 2);
}
Reported by PMD.
Line: 37
@Test
public void concat2() {
Single.concat(Single.just(1), Single.just(2))
.test()
.assertResult(1, 2);
}
@Test
Reported by PMD.
Line: 37
@Test
public void concat2() {
Single.concat(Single.just(1), Single.just(2))
.test()
.assertResult(1, 2);
}
@Test
Reported by PMD.
Line: 43
}
@Test
public void concat3() {
Single.concat(Single.just(1), Single.just(2), Single.just(3))
.test()
.assertResult(1, 2, 3);
}
Reported by PMD.
Line: 44
@Test
public void concat3() {
Single.concat(Single.just(1), Single.just(2), Single.just(3))
.test()
.assertResult(1, 2, 3);
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelCollectorTest.java
81 issues
Line: 34
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class ParallelCollectorTest extends RxJavaTest {
static Set<Integer> set(int count) {
return IntStream.rangeClosed(1, count)
.boxed()
.collect(Collectors.toSet());
Reported by PMD.
Line: 37
public class ParallelCollectorTest extends RxJavaTest {
static Set<Integer> set(int count) {
return IntStream.rangeClosed(1, count)
.boxed()
.collect(Collectors.toSet());
}
@Test
Reported by PMD.
Line: 37
public class ParallelCollectorTest extends RxJavaTest {
static Set<Integer> set(int count) {
return IntStream.rangeClosed(1, count)
.boxed()
.collect(Collectors.toSet());
}
@Test
Reported by PMD.
Line: 43
}
@Test
public void basic() {
TestSubscriberEx<List<Integer>> ts = Flowable.range(1, 5)
.parallel()
.collect(Collectors.toList())
.subscribeWith(new TestSubscriberEx<>());
Reported by PMD.
Line: 49
.collect(Collectors.toList())
.subscribeWith(new TestSubscriberEx<>());
ts
.assertValueCount(1)
.assertNoErrors()
.assertComplete();
assertEquals(5, ts.values().get(0).size());
Reported by PMD.
Line: 49
.collect(Collectors.toList())
.subscribeWith(new TestSubscriberEx<>());
ts
.assertValueCount(1)
.assertNoErrors()
.assertComplete();
assertEquals(5, ts.values().get(0).size());
Reported by PMD.
Line: 54
.assertNoErrors()
.assertComplete();
assertEquals(5, ts.values().get(0).size());
assertTrue(ts.values().get(0).containsAll(set(5)));
}
@Test
public void empty() {
Reported by PMD.
Line: 54
.assertNoErrors()
.assertComplete();
assertEquals(5, ts.values().get(0).size());
assertTrue(ts.values().get(0).containsAll(set(5)));
}
@Test
public void empty() {
Reported by PMD.
Line: 54
.assertNoErrors()
.assertComplete();
assertEquals(5, ts.values().get(0).size());
assertTrue(ts.values().get(0).containsAll(set(5)));
}
@Test
public void empty() {
Reported by PMD.
Line: 55
.assertComplete();
assertEquals(5, ts.values().get(0).size());
assertTrue(ts.values().get(0).containsAll(set(5)));
}
@Test
public void empty() {
Flowable.empty()
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTakeUntilPublisherTest.java
81 issues
Line: 33
public class MaybeTakeUntilPublisherTest extends RxJavaTest {
@Test
public void disposed() {
TestHelper.checkDisposed(PublishProcessor.create().singleElement().takeUntil(Flowable.never()));
}
@Test
public void doubleOnSubscribe() {
Reported by PMD.
Line: 34
@Test
public void disposed() {
TestHelper.checkDisposed(PublishProcessor.create().singleElement().takeUntil(Flowable.never()));
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeMaybe(new Function<Maybe<Object>, MaybeSource<Object>>() {
Reported by PMD.
Line: 34
@Test
public void disposed() {
TestHelper.checkDisposed(PublishProcessor.create().singleElement().takeUntil(Flowable.never()));
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeMaybe(new Function<Maybe<Object>, MaybeSource<Object>>() {
Reported by PMD.
Line: 38
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeMaybe(new Function<Maybe<Object>, MaybeSource<Object>>() {
@Override
public MaybeSource<Object> apply(Maybe<Object> m) throws Exception {
return m.takeUntil(Flowable.never());
}
Reported by PMD.
Line: 48
}
@Test
public void mainErrors() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleElement().takeUntil(pp2).test();
Reported by PMD.
Line: 52
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleElement().takeUntil(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
Reported by PMD.
Line: 52
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleElement().takeUntil(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
Reported by PMD.
Line: 52
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleElement().takeUntil(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
Reported by PMD.
Line: 54
TestObserver<Integer> to = pp1.singleElement().takeUntil(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
assertFalse(pp1.hasSubscribers());
Reported by PMD.
Line: 54
TestObserver<Integer> to = pp1.singleElement().takeUntil(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
assertFalse(pp1.hasSubscribers());
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMap.java
81 issues
Line: 140
}
@SuppressWarnings("unchecked")
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Supplier) {
if (tryEmitScalar(((Supplier<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
boolean empty = false;
synchronized (this) {
Reported by PMD.
Line: 440
}
}
void subscribeMore(int innerCompleted) {
while (innerCompleted-- != 0) {
ObservableSource<? extends U> p;
synchronized (this) {
p = sources.poll();
if (p == null) {
Reported by PMD.
Line: 31
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Reported by PMD.
Line: 32
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
Reported by PMD.
Line: 33
public final class ObservableFlatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
Reported by PMD.
Line: 34
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
Reported by PMD.
Line: 56
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
private static final long serialVersionUID = -2117620485640801370L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
Reported by PMD.
Line: 56
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
private static final long serialVersionUID = -2117620485640801370L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
Reported by PMD.
Line: 56
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
private static final long serialVersionUID = -2117620485640801370L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
Reported by PMD.
Line: 56
source.subscribe(new MergeObserver<>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
static final class MergeObserver<T, U> extends AtomicInteger implements Disposable, Observer<T> {
private static final long serialVersionUID = -2117620485640801370L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleMiscTest.java
80 issues
Line: 282
}
@Test
public void equals() {
Single.sequenceEqual(Single.just(1), Single.just(1).hide())
.test()
.assertResult(true);
Single.sequenceEqual(Single.just(1), Single.just(2))
Reported by PMD.
Line: 29
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class SingleMiscTest extends RxJavaTest {
@Test
public void never() {
Single.never()
.test()
.assertNoValues()
Reported by PMD.
Line: 31
public class SingleMiscTest extends RxJavaTest {
@Test
public void never() {
Single.never()
.test()
.assertNoValues()
.assertNoErrors()
.assertNotComplete();
Reported by PMD.
Line: 32
public class SingleMiscTest extends RxJavaTest {
@Test
public void never() {
Single.never()
.test()
.assertNoValues()
.assertNoErrors()
.assertNotComplete();
}
Reported by PMD.
Line: 32
public class SingleMiscTest extends RxJavaTest {
@Test
public void never() {
Single.never()
.test()
.assertNoValues()
.assertNoErrors()
.assertNotComplete();
}
Reported by PMD.
Line: 32
public class SingleMiscTest extends RxJavaTest {
@Test
public void never() {
Single.never()
.test()
.assertNoValues()
.assertNoErrors()
.assertNotComplete();
}
Reported by PMD.
Line: 32
public class SingleMiscTest extends RxJavaTest {
@Test
public void never() {
Single.never()
.test()
.assertNoValues()
.assertNoErrors()
.assertNotComplete();
}
Reported by PMD.
Line: 40
}
@Test
public void timer() throws Exception {
Single.timer(100, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(0L);
}
Reported by PMD.
Line: 40
}
@Test
public void timer() throws Exception {
Single.timer(100, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(0L);
}
Reported by PMD.
Line: 41
@Test
public void timer() throws Exception {
Single.timer(100, TimeUnit.MILLISECONDS)
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertResult(0L);
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeWhileTest.java
80 issues
Line: 208
}
observer.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
System.out.println("starting TestObservable thread");
Reported by PMD.
Line: 174
fail(e.getMessage());
}
System.out.println("TestObservable thread finished");
verify(observer, times(1)).onNext("one");
verify(observer, never()).onNext("two");
verify(observer, never()).onNext("three");
verify(upstream, times(1)).dispose();
}
Reported by PMD.
Line: 194
@Override
public void subscribe(final Observer<? super String> observer) {
System.out.println("TestObservable subscribed to ...");
observer.onSubscribe(upstream);
t = new Thread(new Runnable() {
@Override
public void run() {
Reported by PMD.
Line: 201
@Override
public void run() {
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
observer.onComplete();
Reported by PMD.
Line: 203
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
observer.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
Reported by PMD.
Line: 213
}
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
Reported by PMD.
Line: 215
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
@Test
public void noUnsubscribeDownstream() {
Reported by PMD.
Line: 31
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableTakeWhileTest extends RxJavaTest {
@Test
public void takeWhile1() {
Observable<Integer> w = Observable.just(1, 2, 3);
Observable<Integer> take = w.takeWhile(new Predicate<Integer>() {
Reported by PMD.
Line: 45
Observer<Integer> observer = TestHelper.mockObserver();
take.subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(2);
verify(observer, never()).onNext(3);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 46
Observer<Integer> observer = TestHelper.mockObserver();
take.subscribe(observer);
verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(2);
verify(observer, never()).onNext(3);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeWhileTest.java
80 issues
Line: 220
}
subscriber.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
System.out.println("starting TestFlowable thread");
Reported by PMD.
Line: 186
fail(e.getMessage());
}
System.out.println("TestFlowable thread finished");
verify(subscriber, times(1)).onNext("one");
verify(subscriber, never()).onNext("two");
verify(subscriber, never()).onNext("three");
verify(s, times(1)).cancel();
}
Reported by PMD.
Line: 206
@Override
public void subscribe(final Subscriber<? super String> subscriber) {
System.out.println("TestFlowable subscribed to ...");
subscriber.onSubscribe(upstream);
t = new Thread(new Runnable() {
@Override
public void run() {
Reported by PMD.
Line: 213
@Override
public void run() {
try {
System.out.println("running TestFlowable thread");
for (String s : values) {
System.out.println("TestFlowable onNext: " + s);
subscriber.onNext(s);
}
subscriber.onComplete();
Reported by PMD.
Line: 215
try {
System.out.println("running TestFlowable thread");
for (String s : values) {
System.out.println("TestFlowable onNext: " + s);
subscriber.onNext(s);
}
subscriber.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
Reported by PMD.
Line: 225
}
});
System.out.println("starting TestFlowable thread");
t.start();
System.out.println("done starting TestFlowable thread");
}
}
Reported by PMD.
Line: 227
});
System.out.println("starting TestFlowable thread");
t.start();
System.out.println("done starting TestFlowable thread");
}
}
@Test
public void backpressure() {
Reported by PMD.
Line: 35
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableTakeWhileTest extends RxJavaTest {
@Test
public void takeWhile1() {
Flowable<Integer> w = Flowable.just(1, 2, 3);
Flowable<Integer> take = w.takeWhile(new Predicate<Integer>() {
Reported by PMD.
Line: 50
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
take.subscribe(subscriber);
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(2);
verify(subscriber, never()).onNext(3);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
Reported by PMD.
Line: 51
take.subscribe(subscriber);
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(2);
verify(subscriber, never()).onNext(3);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleTimeoutTest.java
79 issues
Line: 35
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class SingleTimeoutTest extends RxJavaTest {
@Test
public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() {
final PublishSubject<String> subject = PublishSubject.create();
final TestScheduler scheduler = new TestScheduler();
Reported by PMD.
Line: 38
public class SingleTimeoutTest extends RxJavaTest {
@Test
public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() {
final PublishSubject<String> subject = PublishSubject.create();
final TestScheduler scheduler = new TestScheduler();
final TestObserver<String> observer = subject.single("")
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
Reported by PMD.
Line: 42
final PublishSubject<String> subject = PublishSubject.create();
final TestScheduler scheduler = new TestScheduler();
final TestObserver<String> observer = subject.single("")
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
.test();
assertTrue(subject.hasObservers());
Reported by PMD.
Line: 42
final PublishSubject<String> subject = PublishSubject.create();
final TestScheduler scheduler = new TestScheduler();
final TestObserver<String> observer = subject.single("")
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
.test();
assertTrue(subject.hasObservers());
Reported by PMD.
Line: 42
final PublishSubject<String> subject = PublishSubject.create();
final TestScheduler scheduler = new TestScheduler();
final TestObserver<String> observer = subject.single("")
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
.test();
assertTrue(subject.hasObservers());
Reported by PMD.
Line: 46
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
.test();
assertTrue(subject.hasObservers());
observer.dispose();
assertFalse(subject.hasObservers());
}
Reported by PMD.
Line: 46
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
.test();
assertTrue(subject.hasObservers());
observer.dispose();
assertFalse(subject.hasObservers());
}
Reported by PMD.
Line: 48
assertTrue(subject.hasObservers());
observer.dispose();
assertFalse(subject.hasObservers());
}
@Test
Reported by PMD.
Line: 50
observer.dispose();
assertFalse(subject.hasObservers());
}
@Test
public void otherErrors() {
Single.never()
Reported by PMD.
Line: 50
observer.dispose();
assertFalse(subject.hasObservers());
}
@Test
public void otherErrors() {
Single.never()
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/util/EndConsumerHelperTest.java
78 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.util;
import static org.junit.Assert.*;
import java.util.List;
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class EndConsumerHelperTest extends RxJavaTest {
List<Throwable> errors;
@Before
public void before() {
Reported by PMD.
Line: 35
public class EndConsumerHelperTest extends RxJavaTest {
List<Throwable> errors;
@Before
public void before() {
errors = TestHelper.trackPluginErrors();
}
Reported by PMD.
Line: 53
}
@Test
public void checkDoubleDefaultSubscriber() {
Subscriber<Integer> consumer = new DefaultSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
}
Reported by PMD.
Line: 83
assertTrue(sub2.isCancelled());
TestHelper.assertError(errors, 0, ProtocolViolationException.class);
assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage());
assertEquals(errors.toString(), 1, errors.size());
}
static final class EndDefaultSubscriber extends DefaultSubscriber<Integer> {
@Override
Reported by PMD.
Line: 83
assertTrue(sub2.isCancelled());
TestHelper.assertError(errors, 0, ProtocolViolationException.class);
assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage());
assertEquals(errors.toString(), 1, errors.size());
}
static final class EndDefaultSubscriber extends DefaultSubscriber<Integer> {
@Override
Reported by PMD.
Line: 102
}
@Test
public void checkDoubleDefaultSubscriberNonAnonymous() {
Subscriber<Integer> consumer = new EndDefaultSubscriber();
BooleanSubscription sub1 = new BooleanSubscription();
consumer.onSubscribe(sub1);
Reported by PMD.
Line: 122
TestHelper.assertError(errors, 0, ProtocolViolationException.class);
// with this consumer, the class name should be predictable
assertEquals(EndConsumerHelper.composeMessage("io.reactivex.rxjava3.internal.util.EndConsumerHelperTest$EndDefaultSubscriber"), errors.get(0).getMessage());
assertEquals(errors.toString(), 1, errors.size());
}
@Test
public void checkDoubleDisposableSubscriber() {
Reported by PMD.
Line: 127
}
@Test
public void checkDoubleDisposableSubscriber() {
Subscriber<Integer> consumer = new DisposableSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
}
Reported by PMD.
Line: 157
assertTrue(sub2.isCancelled());
TestHelper.assertError(errors, 0, ProtocolViolationException.class);
assertEquals(EndConsumerHelper.composeMessage(consumer.getClass().getName()), errors.get(0).getMessage());
assertEquals(errors.toString(), 1, errors.size());
}
@Test
public void checkDoubleResourceSubscriber() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorResumeNextTest.java
78 issues
Line: 125
@Override
public Observable<String> apply(Throwable t1) {
throw new RuntimeException("exception from function");
}
};
Observable<String> o = Observable.unsafeCreate(w).onErrorResumeNext(resume);
Reported by PMD.
Line: 159
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Reported by PMD.
Line: 213
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
throw new RuntimeException("Forced Failure");
} catch (Throwable e) {
observer.onError(e);
}
}
Reported by PMD.
Line: 161
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Observable<String> o = w.onErrorResumeNext(new Function<Throwable, Observable<String>>() {
Reported by PMD.
Line: 201
@Override
public void subscribe(final Observer<? super String> observer) {
System.out.println("TestObservable subscribed to ...");
observer.onSubscribe(Disposable.empty());
t = new Thread(new Runnable() {
@Override
public void run() {
Reported by PMD.
Line: 208
@Override
public void run() {
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
throw new RuntimeException("Forced Failure");
Reported by PMD.
Line: 210
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
throw new RuntimeException("Forced Failure");
} catch (Throwable e) {
observer.onError(e);
Reported by PMD.
Line: 220
}
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
Reported by PMD.
Line: 222
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
@Test
Reported by PMD.
Line: 46
@Override
public void subscribe(Observer<? super String> observer) {
observer.onSubscribe(Disposable.empty());
observer.onNext("one");
observer.onError(new Throwable("injected failure"));
observer.onNext("two");
observer.onNext("three");
}
});
Reported by PMD.