The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorReturnTest.java
49 issues
Line: 82
@Override
public String apply(Throwable e) {
capturedException.set(e);
throw new RuntimeException("exception from function");
}
});
Observer<String> observer = TestHelper.mockObserver();
Reported by PMD.
Line: 116
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Reported by PMD.
Line: 203
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
throw new RuntimeException("Forced Failure");
} catch (Throwable e) {
observer.onError(e);
}
}
Reported by PMD.
Line: 118
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Observable<String> observable = w.onErrorReturn(new Function<Throwable, String>() {
Reported by PMD.
Line: 192
@Override
public void subscribe(final Observer<? super String> observer) {
observer.onSubscribe(Disposable.empty());
System.out.println("TestObservable subscribed to ...");
t = new Thread(new Runnable() {
@Override
public void run() {
try {
Reported by PMD.
Line: 198
@Override
public void run() {
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
throw new RuntimeException("Forced Failure");
Reported by PMD.
Line: 200
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
throw new RuntimeException("Forced Failure");
} catch (Throwable e) {
observer.onError(e);
Reported by PMD.
Line: 210
}
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
Reported by PMD.
Line: 212
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
@Test
public void returnItem() {
Reported by PMD.
Line: 38
@Test
public void resumeNext() {
TestObservable f = new TestObservable("one");
Observable<String> w = Observable.unsafeCreate(f);
final AtomicReference<Throwable> capturedException = new AtomicReference<>();
Observable<String> observable = w.onErrorReturn(new Function<Throwable, String>() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeUntilPredicateTest.java
48 issues
Line: 34
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableTakeUntilPredicateTest extends RxJavaTest {
@Test
public void takeEmpty() {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
Flowable.empty().takeUntil(new Predicate<Object>() {
Reported by PMD.
Line: 46
}
}).subscribe(subscriber);
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber).onComplete();
}
@Test
Reported by PMD.
Line: 47
}).subscribe(subscriber);
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber).onComplete();
}
@Test
public void takeAll() {
Reported by PMD.
Line: 48
verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber).onComplete();
}
@Test
public void takeAll() {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
Reported by PMD.
Line: 62
}
}).subscribe(subscriber);
verify(subscriber).onNext(1);
verify(subscriber).onNext(2);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber).onComplete();
}
Reported by PMD.
Line: 63
}).subscribe(subscriber);
verify(subscriber).onNext(1);
verify(subscriber).onNext(2);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber).onComplete();
}
@Test
Reported by PMD.
Line: 64
verify(subscriber).onNext(1);
verify(subscriber).onNext(2);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber).onComplete();
}
@Test
public void takeFirst() {
Reported by PMD.
Line: 65
verify(subscriber).onNext(1);
verify(subscriber).onNext(2);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber).onComplete();
}
@Test
public void takeFirst() {
Subscriber<Object> subscriber = TestHelper.mockSubscriber();
Reported by PMD.
Line: 79
}
}).subscribe(subscriber);
verify(subscriber).onNext(1);
verify(subscriber, never()).onNext(2);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber).onComplete();
}
Reported by PMD.
Line: 80
}).subscribe(subscriber);
verify(subscriber).onNext(1);
verify(subscriber, never()).onNext(2);
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber).onComplete();
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSwitchOnNextTest.java
48 issues
Line: 29
public class MaybeSwitchOnNextTest extends RxJavaTest {
@Test
public void normal() {
Maybe.switchOnNext(
Flowable.range(1, 10)
.map(v -> {
if (v % 2 == 0) {
return Maybe.just(v);
Reported by PMD.
Line: 30
@Test
public void normal() {
Maybe.switchOnNext(
Flowable.range(1, 10)
.map(v -> {
if (v % 2 == 0) {
return Maybe.just(v);
}
Reported by PMD.
Line: 30
@Test
public void normal() {
Maybe.switchOnNext(
Flowable.range(1, 10)
.map(v -> {
if (v % 2 == 0) {
return Maybe.just(v);
}
Reported by PMD.
Line: 31
@Test
public void normal() {
Maybe.switchOnNext(
Flowable.range(1, 10)
.map(v -> {
if (v % 2 == 0) {
return Maybe.just(v);
}
return Maybe.empty();
Reported by PMD.
Line: 44
}
@Test
public void normalDelayError() {
Maybe.switchOnNextDelayError(
Flowable.range(1, 10)
.map(v -> {
if (v % 2 == 0) {
return Maybe.just(v);
Reported by PMD.
Line: 45
@Test
public void normalDelayError() {
Maybe.switchOnNextDelayError(
Flowable.range(1, 10)
.map(v -> {
if (v % 2 == 0) {
return Maybe.just(v);
}
Reported by PMD.
Line: 45
@Test
public void normalDelayError() {
Maybe.switchOnNextDelayError(
Flowable.range(1, 10)
.map(v -> {
if (v % 2 == 0) {
return Maybe.just(v);
}
Reported by PMD.
Line: 46
@Test
public void normalDelayError() {
Maybe.switchOnNextDelayError(
Flowable.range(1, 10)
.map(v -> {
if (v % 2 == 0) {
return Maybe.just(v);
}
return Maybe.empty();
Reported by PMD.
Line: 59
}
@Test
public void noDelaySwitch() {
PublishProcessor<Maybe<Integer>> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = Maybe.switchOnNext(pp).test();
assertTrue(pp.hasSubscribers());
Reported by PMD.
Line: 62
public void noDelaySwitch() {
PublishProcessor<Maybe<Integer>> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = Maybe.switchOnNext(pp).test();
assertTrue(pp.hasSubscribers());
ts.assertEmpty();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelRunOn.java
48 issues
Line: 52
}
@Override
public void subscribe(Subscriber<? super T>[] subscribers) {
subscribers = RxJavaPlugins.onSubscribe(this, subscribers);
if (!validate(subscribers)) {
return;
}
Reported by PMD.
Line: 38
* @param <T> the value type
*/
public final class ParallelRunOn<T> extends ParallelFlowable<T> {
final ParallelFlowable<? extends T> source;
final Scheduler scheduler;
final int prefetch;
Reported by PMD.
Line: 40
public final class ParallelRunOn<T> extends ParallelFlowable<T> {
final ParallelFlowable<? extends T> source;
final Scheduler scheduler;
final int prefetch;
public ParallelRunOn(ParallelFlowable<? extends T> parent,
Scheduler scheduler, int prefetch) {
Reported by PMD.
Line: 42
final Scheduler scheduler;
final int prefetch;
public ParallelRunOn(ParallelFlowable<? extends T> parent,
Scheduler scheduler, int prefetch) {
this.source = parent;
this.scheduler = scheduler;
Reported by PMD.
Line: 91
final class MultiWorkerCallback implements WorkerCallback {
final Subscriber<? super T>[] subscribers;
final Subscriber<T>[] parents;
MultiWorkerCallback(Subscriber<? super T>[] subscribers,
Subscriber<T>[] parents) {
Reported by PMD.
Line: 93
final Subscriber<? super T>[] subscribers;
final Subscriber<T>[] parents;
MultiWorkerCallback(Subscriber<? super T>[] subscribers,
Subscriber<T>[] parents) {
this.subscribers = subscribers;
this.parents = parents;
Reported by PMD.
Line: 95
final Subscriber<T>[] parents;
MultiWorkerCallback(Subscriber<? super T>[] subscribers,
Subscriber<T>[] parents) {
this.subscribers = subscribers;
this.parents = parents;
}
Reported by PMD.
Line: 96
final Subscriber<T>[] parents;
MultiWorkerCallback(Subscriber<? super T>[] subscribers,
Subscriber<T>[] parents) {
this.subscribers = subscribers;
this.parents = parents;
}
@Override
Reported by PMD.
Line: 117
private static final long serialVersionUID = 9222303586456402150L;
final int prefetch;
final int limit;
final SpscArrayQueue<T> queue;
Reported by PMD.
Line: 119
final int prefetch;
final int limit;
final SpscArrayQueue<T> queue;
final Worker worker;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSwitchMap.java
48 issues
Line: 30
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableSwitchMap<T, R> extends AbstractObservableWithUpstream<T, R> {
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
public ObservableSwitchMap(ObservableSource<T> source,
Reported by PMD.
Line: 31
public final class ObservableSwitchMap<T, R> extends AbstractObservableWithUpstream<T, R> {
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
public ObservableSwitchMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends R>> mapper, int bufferSize,
Reported by PMD.
Line: 33
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
public ObservableSwitchMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends R>> mapper, int bufferSize,
boolean delayErrors) {
super(source);
Reported by PMD.
Line: 54
source.subscribe(new SwitchMapObserver<>(t, mapper, bufferSize, delayErrors));
}
static final class SwitchMapObserver<T, R> extends AtomicInteger implements Observer<T>, Disposable {
private static final long serialVersionUID = -3491074160481096299L;
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int bufferSize;
Reported by PMD.
Line: 54
source.subscribe(new SwitchMapObserver<>(t, mapper, bufferSize, delayErrors));
}
static final class SwitchMapObserver<T, R> extends AtomicInteger implements Observer<T>, Disposable {
private static final long serialVersionUID = -3491074160481096299L;
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int bufferSize;
Reported by PMD.
Line: 57
static final class SwitchMapObserver<T, R> extends AtomicInteger implements Observer<T>, Disposable {
private static final long serialVersionUID = -3491074160481096299L;
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
Reported by PMD.
Line: 58
private static final long serialVersionUID = -3491074160481096299L;
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
final AtomicThrowable errors;
Reported by PMD.
Line: 59
private static final long serialVersionUID = -3491074160481096299L;
final Observer<? super R> downstream;
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
final AtomicThrowable errors;
Reported by PMD.
Line: 61
final Function<? super T, ? extends ObservableSource<? extends R>> mapper;
final int bufferSize;
final boolean delayErrors;
final AtomicThrowable errors;
volatile boolean done;
Reported by PMD.
Line: 63
final boolean delayErrors;
final AtomicThrowable errors;
volatile boolean done;
volatile boolean cancelled;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/validators/OperatorsUseInterfaces.java
48 issues
Line: 165
public void checkSelf() {
try {
checkClass(OperatorsUseInterfaces.class);
throw new RuntimeException("Should have failed");
} catch (AssertionError expected) {
assertTrue(expected.toString(), expected.toString().contains("method1"));
assertTrue(expected.toString(), expected.toString().contains("method2"));
assertTrue(expected.toString(), expected.toString().contains("method3"));
assertTrue(expected.toString(), expected.toString().contains("method4"));
Reported by PMD.
Line: 123
continue;
}
}
break;
}
}
pidx++;
}
}
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.validators;
import static org.junit.Assert.*;
import java.lang.reflect.*;
import java.util.*;
Reported by PMD.
Line: 34
* Verify that an operator method uses base interfaces as its direct input or
* has lambdas returning base interfaces.
*/
public class OperatorsUseInterfaces {
@Test
public void checkFlowable() {
checkClass(Flowable.class);
}
Reported by PMD.
Line: 34
* Verify that an operator method uses base interfaces as its direct input or
* has lambdas returning base interfaces.
*/
public class OperatorsUseInterfaces {
@Test
public void checkFlowable() {
checkClass(Flowable.class);
}
Reported by PMD.
Line: 34
* Verify that an operator method uses base interfaces as its direct input or
* has lambdas returning base interfaces.
*/
public class OperatorsUseInterfaces {
@Test
public void checkFlowable() {
checkClass(Flowable.class);
}
Reported by PMD.
Line: 37
public class OperatorsUseInterfaces {
@Test
public void checkFlowable() {
checkClass(Flowable.class);
}
@Test
public void checkObservable() {
Reported by PMD.
Line: 42
}
@Test
public void checkObservable() {
checkClass(Observable.class);
}
@Test
public void checkMaybe() {
Reported by PMD.
Line: 47
}
@Test
public void checkMaybe() {
checkClass(Maybe.class);
}
@Test
public void checkSingle() {
Reported by PMD.
Line: 52
}
@Test
public void checkSingle() {
checkClass(Single.class);
}
@Test
public void checkCompletable() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleSwitchOnNextTest.java
48 issues
Line: 29
public class SingleSwitchOnNextTest extends RxJavaTest {
@Test
public void normal() {
Single.switchOnNext(
Flowable.range(1, 5)
.map(v -> {
if (v % 2 == 0) {
return Single.just(v);
Reported by PMD.
Line: 30
@Test
public void normal() {
Single.switchOnNext(
Flowable.range(1, 5)
.map(v -> {
if (v % 2 == 0) {
return Single.just(v);
}
Reported by PMD.
Line: 30
@Test
public void normal() {
Single.switchOnNext(
Flowable.range(1, 5)
.map(v -> {
if (v % 2 == 0) {
return Single.just(v);
}
Reported by PMD.
Line: 31
@Test
public void normal() {
Single.switchOnNext(
Flowable.range(1, 5)
.map(v -> {
if (v % 2 == 0) {
return Single.just(v);
}
return Single.just(10 + v);
Reported by PMD.
Line: 44
}
@Test
public void normalDelayError() {
Single.switchOnNextDelayError(
Flowable.range(1, 5)
.map(v -> {
if (v % 2 == 0) {
return Single.just(v);
Reported by PMD.
Line: 45
@Test
public void normalDelayError() {
Single.switchOnNextDelayError(
Flowable.range(1, 5)
.map(v -> {
if (v % 2 == 0) {
return Single.just(v);
}
Reported by PMD.
Line: 45
@Test
public void normalDelayError() {
Single.switchOnNextDelayError(
Flowable.range(1, 5)
.map(v -> {
if (v % 2 == 0) {
return Single.just(v);
}
Reported by PMD.
Line: 46
@Test
public void normalDelayError() {
Single.switchOnNextDelayError(
Flowable.range(1, 5)
.map(v -> {
if (v % 2 == 0) {
return Single.just(v);
}
return Single.just(10 + v);
Reported by PMD.
Line: 59
}
@Test
public void noDelaySwitch() {
PublishProcessor<Single<Integer>> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = Single.switchOnNext(pp).test();
assertTrue(pp.hasSubscribers());
Reported by PMD.
Line: 62
public void noDelaySwitch() {
PublishProcessor<Single<Integer>> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = Single.switchOnNext(pp).test();
assertTrue(pp.hasSubscribers());
ts.assertEmpty();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundary.java
48 issues
Line: 30
import io.reactivex.rxjava3.subscribers.DisposableSubscriber;
public final class FlowableWindowBoundary<T, B> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final Publisher<B> other;
final int capacityHint;
public FlowableWindowBoundary(Flowable<T> source, Publisher<B> other, int capacityHint) {
super(source);
this.other = other;
Reported by PMD.
Line: 31
public final class FlowableWindowBoundary<T, B> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final Publisher<B> other;
final int capacityHint;
public FlowableWindowBoundary(Flowable<T> source, Publisher<B> other, int capacityHint) {
super(source);
this.other = other;
this.capacityHint = capacityHint;
Reported by PMD.
Line: 52
source.subscribe(parent);
}
static final class WindowBoundaryMainSubscriber<T, B>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 2233020065421370272L;
Reported by PMD.
Line: 52
source.subscribe(parent);
}
static final class WindowBoundaryMainSubscriber<T, B>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 2233020065421370272L;
Reported by PMD.
Line: 54
static final class WindowBoundaryMainSubscriber<T, B>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, Runnable {
private static final long serialVersionUID = 2233020065421370272L;
final Subscriber<? super Flowable<T>> downstream;
Reported by PMD.
Line: 58
private static final long serialVersionUID = 2233020065421370272L;
final Subscriber<? super Flowable<T>> downstream;
final int capacityHint;
final WindowBoundaryInnerSubscriber<T, B> boundarySubscriber;
Reported by PMD.
Line: 60
final Subscriber<? super Flowable<T>> downstream;
final int capacityHint;
final WindowBoundaryInnerSubscriber<T, B> boundarySubscriber;
final AtomicReference<Subscription> upstream;
Reported by PMD.
Line: 62
final int capacityHint;
final WindowBoundaryInnerSubscriber<T, B> boundarySubscriber;
final AtomicReference<Subscription> upstream;
final AtomicInteger windows;
Reported by PMD.
Line: 64
final WindowBoundaryInnerSubscriber<T, B> boundarySubscriber;
final AtomicReference<Subscription> upstream;
final AtomicInteger windows;
final MpscLinkedQueue<Object> queue;
Reported by PMD.
Line: 66
final AtomicReference<Subscription> upstream;
final AtomicInteger windows;
final MpscLinkedQueue<Object> queue;
final AtomicThrowable errors;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java
48 issues
Line: 33
import io.reactivex.rxjava3.subjects.UnicastSubject;
public final class ObservableWindowBoundarySelector<T, B, V> extends AbstractObservableWithUpstream<T, Observable<T>> {
final ObservableSource<B> open;
final Function<? super B, ? extends ObservableSource<V>> closingIndicator;
final int bufferSize;
public ObservableWindowBoundarySelector(
ObservableSource<T> source,
Reported by PMD.
Line: 34
public final class ObservableWindowBoundarySelector<T, B, V> extends AbstractObservableWithUpstream<T, Observable<T>> {
final ObservableSource<B> open;
final Function<? super B, ? extends ObservableSource<V>> closingIndicator;
final int bufferSize;
public ObservableWindowBoundarySelector(
ObservableSource<T> source,
ObservableSource<B> open, Function<? super B, ? extends ObservableSource<V>> closingIndicator,
Reported by PMD.
Line: 35
public final class ObservableWindowBoundarySelector<T, B, V> extends AbstractObservableWithUpstream<T, Observable<T>> {
final ObservableSource<B> open;
final Function<? super B, ? extends ObservableSource<V>> closingIndicator;
final int bufferSize;
public ObservableWindowBoundarySelector(
ObservableSource<T> source,
ObservableSource<B> open, Function<? super B, ? extends ObservableSource<V>> closingIndicator,
int bufferSize) {
Reported by PMD.
Line: 53
t, open, closingIndicator, bufferSize));
}
static final class WindowBoundaryMainObserver<T, B, V>
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Observer<? super Observable<T>> downstream;
Reported by PMD.
Line: 53
t, open, closingIndicator, bufferSize));
}
static final class WindowBoundaryMainObserver<T, B, V>
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Observer<? super Observable<T>> downstream;
Reported by PMD.
Line: 53
t, open, closingIndicator, bufferSize));
}
static final class WindowBoundaryMainObserver<T, B, V>
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Observer<? super Observable<T>> downstream;
Reported by PMD.
Line: 55
static final class WindowBoundaryMainObserver<T, B, V>
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Observer<? super Observable<T>> downstream;
final ObservableSource<B> open;
final Function<? super B, ? extends ObservableSource<V>> closingIndicator;
Reported by PMD.
Line: 58
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = 8646217640096099753L;
final Observer<? super Observable<T>> downstream;
final ObservableSource<B> open;
final Function<? super B, ? extends ObservableSource<V>> closingIndicator;
final int bufferSize;
final CompositeDisposable resources;
Reported by PMD.
Line: 59
private static final long serialVersionUID = 8646217640096099753L;
final Observer<? super Observable<T>> downstream;
final ObservableSource<B> open;
final Function<? super B, ? extends ObservableSource<V>> closingIndicator;
final int bufferSize;
final CompositeDisposable resources;
final WindowStartObserver<B> startObserver;
Reported by PMD.
Line: 59
private static final long serialVersionUID = 8646217640096099753L;
final Observer<? super Observable<T>> downstream;
final ObservableSource<B> open;
final Function<? super B, ? extends ObservableSource<V>> closingIndicator;
final int bufferSize;
final CompositeDisposable resources;
final WindowStartObserver<B> startObserver;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/FlowableConversionTest.java
48 issues
Line: 154
@Override
public void onComplete() {
System.out.println("Complete");
}
@Override
public void onError(Throwable e) {
System.out.println("error: " + e.getMessage());
Reported by PMD.
Line: 159
@Override
public void onError(Throwable e) {
System.out.println("error: " + e.getMessage());
e.printStackTrace();
}
@Override
public void onNext(String t) {
Reported by PMD.
Line: 165
@Override
public void onNext(String t) {
System.out.println(t);
}
});
List<Object> crewOfBattlestarGalactica = Arrays.asList(new Object[] {"William Adama", "Laura Roslin", "Lee Adama", new Cylon()});
Reported by PMD.
Line: 175
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object pv) {
System.out.println(pv);
}
})
.to(new ConvertToCylonDetector<>())
.beep(new Predicate<Object>() {
@Override
Reported by PMD.
Line: 256
Integer i = queue.poll();
if (i != null) {
x++;
System.out.println(x + " item: " + i);
}
}
Assert.assertNull(thrown.get());
}
}
Reported by PMD.
Line: 31
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;
public class FlowableConversionTest extends RxJavaTest {
public static class Cylon { }
public static class Jail {
Object cylon;
Reported by PMD.
Line: 31
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;
public class FlowableConversionTest extends RxJavaTest {
public static class Cylon { }
public static class Jail {
Object cylon;
Reported by PMD.
Line: 36
public static class Cylon { }
public static class Jail {
Object cylon;
Jail(Object cylon) {
this.cylon = cylon;
}
}
Reported by PMD.
Line: 44
}
public static class CylonDetectorObservable<T> {
protected Publisher<T> onSubscribe;
public static <T> CylonDetectorObservable<T> create(Publisher<T> onSubscribe) {
return new CylonDetectorObservable<>(onSubscribe);
}
Reported by PMD.
Line: 65
public <O> O x(Function<Publisher<T>, O> operator) {
try {
return operator.apply(onSubscribe);
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
public <R> CylonDetectorObservable<? extends R> compose(Function<CylonDetectorObservable<? super T>, CylonDetectorObservable<? extends R>> transformer) {
Reported by PMD.