The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java
66 issues
Line: 37
*/
public final class FlowablePublishMulticast<T, R> extends AbstractFlowableWithUpstream<T, R> {
final Function<? super Flowable<T>, ? extends Publisher<? extends R>> selector;
final int prefetch;
final boolean delayError;
Reported by PMD.
Line: 39
final Function<? super Flowable<T>, ? extends Publisher<? extends R>> selector;
final int prefetch;
final boolean delayError;
public FlowablePublishMulticast(Flowable<T> source,
Function<? super Flowable<T>, ? extends Publisher<? extends R>> selector, int prefetch,
Reported by PMD.
Line: 41
final int prefetch;
final boolean delayError;
public FlowablePublishMulticast(Flowable<T> source,
Function<? super Flowable<T>, ? extends Publisher<? extends R>> selector, int prefetch,
boolean delayError) {
super(source);
Reported by PMD.
Line: 60
try {
other = Objects.requireNonNull(selector.apply(mp), "selector returned a null Publisher");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Reported by PMD.
Line: 68
OutputCanceller<R> out = new OutputCanceller<>(s, mp);
other.subscribe(out);
source.subscribe(mp);
}
static final class OutputCanceller<R> implements FlowableSubscriber<R>, Subscription {
Reported by PMD.
Line: 74
}
static final class OutputCanceller<R> implements FlowableSubscriber<R>, Subscription {
final Subscriber<? super R> downstream;
final MulticastProcessor<?> processor;
Subscription upstream;
Reported by PMD.
Line: 76
static final class OutputCanceller<R> implements FlowableSubscriber<R>, Subscription {
final Subscriber<? super R> downstream;
final MulticastProcessor<?> processor;
Subscription upstream;
OutputCanceller(Subscriber<? super R> actual, MulticastProcessor<?> processor) {
this.downstream = actual;
Reported by PMD.
Line: 78
final MulticastProcessor<?> processor;
Subscription upstream;
OutputCanceller(Subscriber<? super R> actual, MulticastProcessor<?> processor) {
this.downstream = actual;
this.processor = processor;
}
Reported by PMD.
Line: 123
}
}
static final class MulticastProcessor<T> extends Flowable<T> implements FlowableSubscriber<T> {
@SuppressWarnings("rawtypes")
static final MulticastSubscription[] EMPTY = new MulticastSubscription[0];
@SuppressWarnings("rawtypes")
Reported by PMD.
Line: 123
}
}
static final class MulticastProcessor<T> extends Flowable<T> implements FlowableSubscriber<T> {
@SuppressWarnings("rawtypes")
static final MulticastSubscription[] EMPTY = new MulticastSubscription[0];
@SuppressWarnings("rawtypes")
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapCompletableTest.java
66 issues
Line: 31
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableConcatMapCompletableTest extends RxJavaTest {
@Test
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
Reported by PMD.
Line: 34
public class ObservableConcatMapCompletableTest extends RxJavaTest {
@Test
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
Reported by PMD.
Line: 34
public class ObservableConcatMapCompletableTest extends RxJavaTest {
@Test
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
Reported by PMD.
Line: 37
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onComplete();
to.assertComplete();
Reported by PMD.
Line: 37
public void asyncFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onComplete();
to.assertComplete();
Reported by PMD.
Line: 39
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onComplete();
to.assertComplete();
to.assertValueCount(0);
}
Reported by PMD.
Line: 40
TestObserver<Void> to = us.concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onComplete();
to.assertComplete();
to.assertValueCount(0);
}
Reported by PMD.
Line: 42
us.onNext(1);
us.onComplete();
to.assertComplete();
to.assertValueCount(0);
}
@Test
public void notFused() throws Exception {
Reported by PMD.
Line: 43
us.onComplete();
to.assertComplete();
to.assertValueCount(0);
}
@Test
public void notFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
Reported by PMD.
Line: 47
}
@Test
public void notFused() throws Exception {
UnicastSubject<Integer> us = UnicastSubject.create();
TestObserver<Void> to = us.hide().concatMapCompletable(completableComplete(), 2).test();
us.onNext(1);
us.onNext(2);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/observers/ObservableConsumersTest.java
65 issues
Line: 50
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableConsumersTest implements Consumer<Object>, Action {
final CompositeDisposable composite = new CompositeDisposable();
final PublishSubject<Integer> processor = PublishSubject.create();
Reported by PMD.
Line: 52
public class ObservableConsumersTest implements Consumer<Object>, Action {
final CompositeDisposable composite = new CompositeDisposable();
final PublishSubject<Integer> processor = PublishSubject.create();
final List<Object> events = new ArrayList<>();
Reported by PMD.
Line: 54
final CompositeDisposable composite = new CompositeDisposable();
final PublishSubject<Integer> processor = PublishSubject.create();
final List<Object> events = new ArrayList<>();
@Override
public void run() throws Exception {
Reported by PMD.
Line: 56
final PublishSubject<Integer> processor = PublishSubject.create();
final List<Object> events = new ArrayList<>();
@Override
public void run() throws Exception {
events.add("OnComplete");
}
Reported by PMD.
Line: 74
}
@Test
public void onNextNormal() {
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING, () -> { });
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
Reported by PMD.
Line: 78
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING, () -> { });
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 78
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING, () -> { });
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 80
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
processor.onNext(1);
Reported by PMD.
Line: 86
processor.onNext(1);
assertTrue(composite.size() > 0);
assertEquals(Arrays.<Object>asList(1), events);
processor.onComplete();
Reported by PMD.
Line: 88
assertTrue(composite.size() > 0);
assertEquals(Arrays.<Object>asList(1), events);
processor.onComplete();
assertEquals(Arrays.<Object>asList(1), events);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDistinctTest.java
65 issues
Line: 37
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableDistinctTest extends RxJavaTest {
Subscriber<String> w;
// nulls lead to exceptions
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
Reported by PMD.
Line: 39
public class FlowableDistinctTest extends RxJavaTest {
Subscriber<String> w;
// nulls lead to exceptions
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
@Override
public String apply(String s) {
Reported by PMD.
Line: 42
Subscriber<String> w;
// nulls lead to exceptions
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
@Override
public String apply(String s) {
if (s.equals("x")) {
return "XX";
}
Reported by PMD.
Line: 45
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
@Override
public String apply(String s) {
if (s.equals("x")) {
return "XX";
}
return s.toUpperCase();
}
};
Reported by PMD.
Line: 45
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
@Override
public String apply(String s) {
if (s.equals("x")) {
return "XX";
}
return s.toUpperCase();
}
};
Reported by PMD.
Line: 48
if (s.equals("x")) {
return "XX";
}
return s.toUpperCase();
}
};
@Before
public void before() {
Reported by PMD.
Line: 60
@Test
public void distinctOfNone() {
Flowable<String> src = Flowable.empty();
src.distinct().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
Reported by PMD.
Line: 60
@Test
public void distinctOfNone() {
Flowable<String> src = Flowable.empty();
src.distinct().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
Reported by PMD.
Line: 62
Flowable<String> src = Flowable.empty();
src.distinct().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 63
src.distinct().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
@Test
public void distinctOfNoneWithKeySelector() {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchMap.java
65 issues
Line: 31
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class FlowableSwitchMap<T, R> extends AbstractFlowableWithUpstream<T, R> {
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
public FlowableSwitchMap(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper, int bufferSize,
Reported by PMD.
Line: 32
public final class FlowableSwitchMap<T, R> extends AbstractFlowableWithUpstream<T, R> {
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
public FlowableSwitchMap(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper, int bufferSize,
boolean delayErrors) {
Reported by PMD.
Line: 33
public final class FlowableSwitchMap<T, R> extends AbstractFlowableWithUpstream<T, R> {
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
public FlowableSwitchMap(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper, int bufferSize,
boolean delayErrors) {
super(source);
Reported by PMD.
Line: 52
source.subscribe(new SwitchMapSubscriber<>(s, mapper, bufferSize, delayErrors));
}
static final class SwitchMapSubscriber<T, R> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -3491074160481096299L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int bufferSize;
Reported by PMD.
Line: 52
source.subscribe(new SwitchMapSubscriber<>(s, mapper, bufferSize, delayErrors));
}
static final class SwitchMapSubscriber<T, R> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -3491074160481096299L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int bufferSize;
Reported by PMD.
Line: 55
static final class SwitchMapSubscriber<T, R> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -3491074160481096299L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
volatile boolean done;
Reported by PMD.
Line: 56
private static final long serialVersionUID = -3491074160481096299L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
volatile boolean done;
final AtomicThrowable errors;
Reported by PMD.
Line: 57
private static final long serialVersionUID = -3491074160481096299L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
volatile boolean done;
final AtomicThrowable errors;
Reported by PMD.
Line: 58
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
volatile boolean done;
final AtomicThrowable errors;
volatile boolean cancelled;
Reported by PMD.
Line: 60
final int bufferSize;
final boolean delayErrors;
volatile boolean done;
final AtomicThrowable errors;
volatile boolean cancelled;
Subscription upstream;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublish.java
65 issues
Line: 80
}
doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true);
break;
}
try {
connection.accept(conn);
} catch (Throwable ex) {
Reported by PMD.
Line: 111
conn = fresh;
}
break;
}
InnerSubscription<T> inner = new InnerSubscription<>(s, conn);
s.onSubscribe(inner);
Reported by PMD.
Line: 46
public final class FlowablePublish<T> extends ConnectableFlowable<T>
implements HasUpstreamPublisher<T> {
final Publisher<T> source;
final int bufferSize;
final AtomicReference<PublishConnection<T>> current;
Reported by PMD.
Line: 46
public final class FlowablePublish<T> extends ConnectableFlowable<T>
implements HasUpstreamPublisher<T> {
final Publisher<T> source;
final int bufferSize;
final AtomicReference<PublishConnection<T>> current;
Reported by PMD.
Line: 48
final Publisher<T> source;
final int bufferSize;
final AtomicReference<PublishConnection<T>> current;
public FlowablePublish(Publisher<T> source, int bufferSize) {
this.source = source;
Reported by PMD.
Line: 50
final int bufferSize;
final AtomicReference<PublishConnection<T>> current;
public FlowablePublish(Publisher<T> source, int bufferSize) {
this.source = source;
this.bufferSize = bufferSize;
this.current = new AtomicReference<>();
Reported by PMD.
Line: 72
conn = current.get();
if (conn == null || conn.isDisposed()) {
PublishConnection<T> fresh = new PublishConnection<>(current, bufferSize);
if (!current.compareAndSet(conn, fresh)) {
continue;
}
conn = fresh;
}
Reported by PMD.
Line: 79
conn = fresh;
}
doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true);
break;
}
try {
connection.accept(conn);
Reported by PMD.
Line: 79
conn = fresh;
}
doConnect = !conn.connect.get() && conn.connect.compareAndSet(false, true);
break;
}
try {
connection.accept(conn);
Reported by PMD.
Line: 85
try {
connection.accept(conn);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
if (doConnect) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDistinctTest.java
64 issues
Line: 38
import io.reactivex.rxjava3.subjects.UnicastSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableDistinctTest extends RxJavaTest {
Observer<String> w;
// nulls lead to exceptions
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
Reported by PMD.
Line: 40
public class ObservableDistinctTest extends RxJavaTest {
Observer<String> w;
// nulls lead to exceptions
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
@Override
public String apply(String s) {
Reported by PMD.
Line: 43
Observer<String> w;
// nulls lead to exceptions
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
@Override
public String apply(String s) {
if (s.equals("x")) {
return "XX";
}
Reported by PMD.
Line: 46
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
@Override
public String apply(String s) {
if (s.equals("x")) {
return "XX";
}
return s.toUpperCase();
}
};
Reported by PMD.
Line: 46
final Function<String, String> TO_UPPER_WITH_EXCEPTION = new Function<String, String>() {
@Override
public String apply(String s) {
if (s.equals("x")) {
return "XX";
}
return s.toUpperCase();
}
};
Reported by PMD.
Line: 49
if (s.equals("x")) {
return "XX";
}
return s.toUpperCase();
}
};
@Before
public void before() {
Reported by PMD.
Line: 61
@Test
public void distinctOfNone() {
Observable<String> src = Observable.empty();
src.distinct().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
Reported by PMD.
Line: 61
@Test
public void distinctOfNone() {
Observable<String> src = Observable.empty();
src.distinct().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
Reported by PMD.
Line: 63
Observable<String> src = Observable.empty();
src.distinct().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 64
src.distinct().subscribe(w);
verify(w, never()).onNext(anyString());
verify(w, never()).onError(any(Throwable.class));
verify(w, times(1)).onComplete();
}
@Test
public void distinctOfNoneWithKeySelector() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/util/HalfSerializerSubscriberTest.java
64 issues
Line: 33
public class HalfSerializerSubscriberTest extends RxJavaTest {
@Test
public void utilityClass() {
TestHelper.checkUtilityClass(HalfSerializer.class);
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
Reported by PMD.
Line: 38
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnNext() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Subscriber[] a = { null };
Reported by PMD.
Line: 38
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnNext() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Subscriber[] a = { null };
Reported by PMD.
Line: 39
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnNext() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Subscriber[] a = { null };
Reported by PMD.
Line: 78
HalfSerializer.onNext(s, 1, wip, error);
ts.assertValue(1).assertNoErrors().assertNotComplete();
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnError() {
Reported by PMD.
Line: 78
HalfSerializer.onNext(s, 1, wip, error);
ts.assertValue(1).assertNoErrors().assertNotComplete();
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnError() {
Reported by PMD.
Line: 83
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnError() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Subscriber[] a = { null };
Reported by PMD.
Line: 127
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnComplete() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Subscriber[] a = { null };
Reported by PMD.
Line: 172
@Test
@SuppressUndeliverable
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantErrorOnError() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Subscriber[] a = { null };
Reported by PMD.
Line: 216
public void onNextOnCompleteRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final TestSubscriber<Integer> ts = new TestSubscriber<>();
ts.onSubscribe(new BooleanSubscription());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/mixed/SingleFlatMapObservableTest.java
64 issues
Line: 31
public class SingleFlatMapObservableTest extends RxJavaTest {
@Test
public void cancelMain() {
SingleSubject<Integer> ss = SingleSubject.create();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ss.flatMapObservable(Functions.justFunction(ps))
.test();
Reported by PMD.
Line: 35
SingleSubject<Integer> ss = SingleSubject.create();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ss.flatMapObservable(Functions.justFunction(ps))
.test();
assertTrue(ss.hasObservers());
assertFalse(ps.hasObservers());
Reported by PMD.
Line: 35
SingleSubject<Integer> ss = SingleSubject.create();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ss.flatMapObservable(Functions.justFunction(ps))
.test();
assertTrue(ss.hasObservers());
assertFalse(ps.hasObservers());
Reported by PMD.
Line: 38
TestObserver<Integer> to = ss.flatMapObservable(Functions.justFunction(ps))
.test();
assertTrue(ss.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(ss.hasObservers());
Reported by PMD.
Line: 38
TestObserver<Integer> to = ss.flatMapObservable(Functions.justFunction(ps))
.test();
assertTrue(ss.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(ss.hasObservers());
Reported by PMD.
Line: 39
.test();
assertTrue(ss.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(ss.hasObservers());
assertFalse(ps.hasObservers());
Reported by PMD.
Line: 39
.test();
assertTrue(ss.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(ss.hasObservers());
assertFalse(ps.hasObservers());
Reported by PMD.
Line: 41
assertTrue(ss.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(ss.hasObservers());
assertFalse(ps.hasObservers());
}
Reported by PMD.
Line: 43
to.dispose();
assertFalse(ss.hasObservers());
assertFalse(ps.hasObservers());
}
@Test
public void cancelOther() {
Reported by PMD.
Line: 43
to.dispose();
assertFalse(ss.hasObservers());
assertFalse(ps.hasObservers());
}
@Test
public void cancelOther() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorResumeNextViaFlowableTest.java
64 issues
Line: 78
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Reported by PMD.
Line: 129
System.out.println("running TestObservable thread");
for (String s : values) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("TestObservable onNext: " + s);
subscriber.onNext(s);
}
System.out.println("TestObservable onComplete");
Reported by PMD.
Line: 80
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Flowable<String> flowable = w.onErrorResumeWith(resume);
Reported by PMD.
Line: 119
@Override
public void subscribe(final Subscriber<? super String> subscriber) {
System.out.println("TestObservable subscribed to ...");
subscriber.onSubscribe(upstream);
t = new Thread(new Runnable() {
@Override
public void run() {
Reported by PMD.
Line: 126
@Override
public void run() {
try {
System.out.println("running TestObservable thread");
for (String s : values) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("TestObservable onNext: " + s);
Reported by PMD.
Line: 131
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("TestObservable onNext: " + s);
subscriber.onNext(s);
}
System.out.println("TestObservable onComplete");
subscriber.onComplete();
} catch (Throwable e) {
Reported by PMD.
Line: 134
System.out.println("TestObservable onNext: " + s);
subscriber.onNext(s);
}
System.out.println("TestObservable onComplete");
subscriber.onComplete();
} catch (Throwable e) {
System.out.println("TestObservable onError: " + e);
subscriber.onError(e);
}
Reported by PMD.
Line: 137
System.out.println("TestObservable onComplete");
subscriber.onComplete();
} catch (Throwable e) {
System.out.println("TestObservable onError: " + e);
subscriber.onError(e);
}
}
});
Reported by PMD.
Line: 143
}
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
Reported by PMD.
Line: 145
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
@Test
public void backpressure() {
Reported by PMD.