The following issues were found
src/test/java/io/reactivex/rxjava3/observable/ObservableCovarianceTest.java
28 issues
Line: 77
.doOnNext(new Consumer<GroupedObservable<Object, Movie>>() {
@Override
public void accept(GroupedObservable<Object, Movie> g) {
System.out.println(g.getKey());
}
})
.flatMap(new Function<GroupedObservable<Object, Movie>, Observable<String>>() {
@Override
public Observable<String> apply(GroupedObservable<Object, Movie> g) {
Reported by PMD.
Line: 87
.doOnNext(new Consumer<Movie>() {
@Override
public void accept(Movie pv) {
System.out.println(pv);
}
})
.compose(new ObservableTransformer<Movie, Movie>() {
@Override
public Observable<Movie> apply(Observable<Movie> m) {
Reported by PMD.
Line: 39
* This won't compile if super/extends isn't done correctly on generics.
*/
@Test
public void covarianceOfFrom() {
Observable.<Movie> just(new HorrorMovie());
Observable.<Movie> fromIterable(new ArrayList<HorrorMovie>());
// Observable.<HorrorMovie>from(new Movie()); // may not compile
}
Reported by PMD.
Line: 46
}
@Test
public void sortedList() {
Comparator<Media> sortFunction = new Comparator<Media>() {
@Override
public int compare(Media t1, Media t2) {
return 1;
}
Reported by PMD.
Line: 109
to.assertTerminated();
to.assertNoErrors();
// System.out.println(ts.getOnNextEvents());
assertEquals(6, to.values().size());
}
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose() {
Reported by PMD.
Line: 109
to.assertTerminated();
to.assertNoErrors();
// System.out.println(ts.getOnNextEvents());
assertEquals(6, to.values().size());
}
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose() {
Reported by PMD.
Line: 112
assertEquals(6, to.values().size());
}
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<Movie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, Movie>() {
@Override
Reported by PMD.
Line: 114
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<Movie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, Movie>() {
@Override
public Observable<Movie> apply(Observable<HorrorMovie> t) {
return Observable.just(new Movie());
Reported by PMD.
Line: 126
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose2() {
Observable<Movie> movie = Observable.<Movie> just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
@Override
public Observable<HorrorMovie> apply(Observable<Movie> t) {
return Observable.just(new HorrorMovie());
Reported by PMD.
Line: 138
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose3() {
Observable<Movie> movie = Observable.<Movie>just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
@Override
public Observable<HorrorMovie> apply(Observable<Movie> t) {
return Observable.just(new HorrorMovie()).map(new Function<HorrorMovie, HorrorMovie>() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticastTest.java
28 issues
Line: 34
public class FlowablePublishMulticastTest extends RxJavaTest {
@Test
public void asyncFusedInput() {
MulticastProcessor<Integer> mp = new MulticastProcessor<>(128, true);
UnicastProcessor<Integer> up = UnicastProcessor.create();
up.subscribe(mp);
Reported by PMD.
Line: 39
UnicastProcessor<Integer> up = UnicastProcessor.create();
up.subscribe(mp);
TestSubscriber<Integer> ts1 = mp.test();
TestSubscriber<Integer> ts2 = mp.take(1).test();
up.onNext(1);
Reported by PMD.
Line: 42
up.subscribe(mp);
TestSubscriber<Integer> ts1 = mp.test();
TestSubscriber<Integer> ts2 = mp.take(1).test();
up.onNext(1);
up.onNext(2);
up.onComplete();
Reported by PMD.
Line: 44
TestSubscriber<Integer> ts1 = mp.test();
TestSubscriber<Integer> ts2 = mp.take(1).test();
up.onNext(1);
up.onNext(2);
up.onComplete();
ts1.assertResult(1, 2);
ts2.assertResult(1);
Reported by PMD.
Line: 45
TestSubscriber<Integer> ts2 = mp.take(1).test();
up.onNext(1);
up.onNext(2);
up.onComplete();
ts1.assertResult(1, 2);
ts2.assertResult(1);
}
Reported by PMD.
Line: 46
up.onNext(1);
up.onNext(2);
up.onComplete();
ts1.assertResult(1, 2);
ts2.assertResult(1);
}
Reported by PMD.
Line: 48
up.onNext(2);
up.onComplete();
ts1.assertResult(1, 2);
ts2.assertResult(1);
}
@Test
public void fusionRejectedInput() {
Reported by PMD.
Line: 49
up.onComplete();
ts1.assertResult(1, 2);
ts2.assertResult(1);
}
@Test
public void fusionRejectedInput() {
MulticastProcessor<Integer> mp = new MulticastProcessor<>(128, true);
Reported by PMD.
Line: 53
}
@Test
public void fusionRejectedInput() {
MulticastProcessor<Integer> mp = new MulticastProcessor<>(128, true);
mp.onSubscribe(new QueueSubscription<Integer>() {
@Override
Reported by PMD.
Line: 102
mp.onNext(2);
mp.onComplete();
ts.assertResult(1, 2);
}
@Test
public void addRemoveRace() {
for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableErrorHandlingTests.java
28 issues
Line: 55
@Override
public void onNext(Long args) {
throw new RuntimeException("forced failure");
}
};
o.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 91
@Override
public void onNext(Long args) {
throw new RuntimeException("forced failure");
}
};
o.observeOn(Schedulers.newThread())
.safeSubscribe(observer);
Reported by PMD.
Line: 42
@Override
public void onComplete() {
System.out.println("completed");
latch.countDown();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 48
@Override
public void onError(Throwable e) {
System.out.println("error: " + e);
caughtError.set(e);
latch.countDown();
}
@Override
Reported by PMD.
Line: 78
@Override
public void onComplete() {
System.out.println("completed");
latch.countDown();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 84
@Override
public void onError(Throwable e) {
System.out.println("error: " + e);
caughtError.set(e);
latch.countDown();
}
@Override
Reported by PMD.
Line: 58
throw new RuntimeException("forced failure");
}
};
o.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}
Reported by PMD.
Line: 61
o.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}
/**
* Test that an error from a user provided Observer.onNext is handled and emitted to the onError
* even when done across thread boundaries with observeOn.
Reported by PMD.
Line: 94
throw new RuntimeException("forced failure");
}
};
o.observeOn(Schedulers.newThread())
.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}
Reported by PMD.
Line: 94
throw new RuntimeException("forced failure");
}
};
o.observeOn(Schedulers.newThread())
.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBuffer.java
28 issues
Line: 28
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
public final class ObservableBuffer<T, U extends Collection<? super T>> extends AbstractObservableWithUpstream<T, U> {
final int count;
final int skip;
final Supplier<U> bufferSupplier;
public ObservableBuffer(ObservableSource<T> source, int count, int skip, Supplier<U> bufferSupplier) {
super(source);
Reported by PMD.
Line: 29
public final class ObservableBuffer<T, U extends Collection<? super T>> extends AbstractObservableWithUpstream<T, U> {
final int count;
final int skip;
final Supplier<U> bufferSupplier;
public ObservableBuffer(ObservableSource<T> source, int count, int skip, Supplier<U> bufferSupplier) {
super(source);
this.count = count;
Reported by PMD.
Line: 30
public final class ObservableBuffer<T, U extends Collection<? super T>> extends AbstractObservableWithUpstream<T, U> {
final int count;
final int skip;
final Supplier<U> bufferSupplier;
public ObservableBuffer(ObservableSource<T> source, int count, int skip, Supplier<U> bufferSupplier) {
super(source);
this.count = count;
this.skip = skip;
Reported by PMD.
Line: 52
}
static final class BufferExactObserver<T, U extends Collection<? super T>> implements Observer<T>, Disposable {
final Observer<? super U> downstream;
final int count;
final Supplier<U> bufferSupplier;
U buffer;
int size;
Reported by PMD.
Line: 53
static final class BufferExactObserver<T, U extends Collection<? super T>> implements Observer<T>, Disposable {
final Observer<? super U> downstream;
final int count;
final Supplier<U> bufferSupplier;
U buffer;
int size;
Reported by PMD.
Line: 54
static final class BufferExactObserver<T, U extends Collection<? super T>> implements Observer<T>, Disposable {
final Observer<? super U> downstream;
final int count;
final Supplier<U> bufferSupplier;
U buffer;
int size;
Disposable upstream;
Reported by PMD.
Line: 55
final Observer<? super U> downstream;
final int count;
final Supplier<U> bufferSupplier;
U buffer;
int size;
Disposable upstream;
Reported by PMD.
Line: 57
final Supplier<U> bufferSupplier;
U buffer;
int size;
Disposable upstream;
BufferExactObserver(Observer<? super U> actual, int count, Supplier<U> bufferSupplier) {
this.downstream = actual;
Reported by PMD.
Line: 59
int size;
Disposable upstream;
BufferExactObserver(Observer<? super U> actual, int count, Supplier<U> bufferSupplier) {
this.downstream = actual;
this.count = count;
this.bufferSupplier = bufferSupplier;
Reported by PMD.
Line: 71
U b;
try {
b = Objects.requireNonNull(bufferSupplier.get(), "Empty buffer supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
buffer = null;
if (upstream == null) {
EmptyDisposable.error(t, downstream);
} else {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelPeekTest.java
28 issues
Line: 34
public class ParallelPeekTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.doOnNext(Functions.emptyConsumer()));
}
@Test
Reported by PMD.
Line: 35
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.doOnNext(Functions.emptyConsumer()));
}
@Test
@SuppressUndeliverable
Reported by PMD.
Line: 35
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.doOnNext(Functions.emptyConsumer()));
}
@Test
@SuppressUndeliverable
Reported by PMD.
Line: 41
@Test
@SuppressUndeliverable
public void onSubscribeCrash() {
Flowable.range(1, 5)
.parallel()
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
Reported by PMD.
Line: 56
}
@Test
public void doubleError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.doOnNext(Functions.emptyConsumer())
.sequential()
Reported by PMD.
Line: 65
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 75
}
@Test
public void requestCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.range(1, 5)
.parallel()
Reported by PMD.
Line: 91
.test()
.assertResult(1, 2, 3, 4, 5);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
Reported by PMD.
Line: 102
}
@Test
public void cancelCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.<Integer>never()
.parallel()
Reported by PMD.
Line: 118
.test()
.cancel();
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromCallableTest.java
28 issues
Line: 50
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
}
@Test
public void fromCallableTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 54
}
@Test
public void fromCallableTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Callable<Object> callable = new Callable<Object>() {
@Override
public Object call() throws Exception {
Reported by PMD.
Line: 65
}
};
Completable.fromCallable(callable)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 65
}
};
Completable.fromCallable(callable)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 69
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Completable.fromCallable(callable)
.test()
.assertResult();
Reported by PMD.
Line: 71
assertEquals(1, atomicInteger.get());
Completable.fromCallable(callable)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 71
assertEquals(1, atomicInteger.get());
Completable.fromCallable(callable)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 75
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
@Test
public void fromCallableInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 79
}
@Test
public void fromCallableInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Completable completable = Completable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
Reported by PMD.
Line: 90
}
});
assertEquals(0, atomicInteger.get());
completable
.test()
.assertResult();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapCompletable.java
28 issues
Line: 36
*/
public final class FlowableConcatMapCompletable<T> extends Completable {
final Flowable<T> source;
final Function<? super T, ? extends CompletableSource> mapper;
final ErrorMode errorMode;
Reported by PMD.
Line: 38
final Flowable<T> source;
final Function<? super T, ? extends CompletableSource> mapper;
final ErrorMode errorMode;
final int prefetch;
Reported by PMD.
Line: 40
final Function<? super T, ? extends CompletableSource> mapper;
final ErrorMode errorMode;
final int prefetch;
public FlowableConcatMapCompletable(Flowable<T> source,
Function<? super T, ? extends CompletableSource> mapper,
Reported by PMD.
Line: 42
final ErrorMode errorMode;
final int prefetch;
public FlowableConcatMapCompletable(Flowable<T> source,
Function<? super T, ? extends CompletableSource> mapper,
ErrorMode errorMode,
int prefetch) {
Reported by PMD.
Line: 59
source.subscribe(new ConcatMapCompletableObserver<>(observer, mapper, errorMode, prefetch));
}
static final class ConcatMapCompletableObserver<T>
extends ConcatMapXMainSubscriber<T>
implements Disposable {
private static final long serialVersionUID = 3610901111000061034L;
Reported by PMD.
Line: 59
source.subscribe(new ConcatMapCompletableObserver<>(observer, mapper, errorMode, prefetch));
}
static final class ConcatMapCompletableObserver<T>
extends ConcatMapXMainSubscriber<T>
implements Disposable {
private static final long serialVersionUID = 3610901111000061034L;
Reported by PMD.
Line: 65
private static final long serialVersionUID = 3610901111000061034L;
final CompletableObserver downstream;
final Function<? super T, ? extends CompletableSource> mapper;
final ConcatMapInnerObserver inner;
Reported by PMD.
Line: 67
final CompletableObserver downstream;
final Function<? super T, ? extends CompletableSource> mapper;
final ConcatMapInnerObserver inner;
volatile boolean active;
Reported by PMD.
Line: 69
final Function<? super T, ? extends CompletableSource> mapper;
final ConcatMapInnerObserver inner;
volatile boolean active;
int consumed;
Reported by PMD.
Line: 71
final ConcatMapInnerObserver inner;
volatile boolean active;
int consumed;
ConcatMapCompletableObserver(CompletableObserver downstream,
Function<? super T, ? extends CompletableSource> mapper,
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatArray.java
28 issues
Line: 27
public final class FlowableConcatArray<T> extends Flowable<T> {
final Publisher<? extends T>[] sources;
final boolean delayError;
public FlowableConcatArray(Publisher<? extends T>[] sources, boolean delayError) {
this.sources = sources;
Reported by PMD.
Line: 29
final Publisher<? extends T>[] sources;
final boolean delayError;
public FlowableConcatArray(Publisher<? extends T>[] sources, boolean delayError) {
this.sources = sources;
this.delayError = delayError;
}
Reported by PMD.
Line: 31
final boolean delayError;
public FlowableConcatArray(Publisher<? extends T>[] sources, boolean delayError) {
this.sources = sources;
this.delayError = delayError;
}
@Override
Reported by PMD.
Line: 44
parent.onComplete();
}
static final class ConcatArraySubscriber<T> extends SubscriptionArbiter implements FlowableSubscriber<T> {
private static final long serialVersionUID = -8158322871608889516L;
final Subscriber<? super T> downstream;
Reported by PMD.
Line: 44
parent.onComplete();
}
static final class ConcatArraySubscriber<T> extends SubscriptionArbiter implements FlowableSubscriber<T> {
private static final long serialVersionUID = -8158322871608889516L;
final Subscriber<? super T> downstream;
Reported by PMD.
Line: 48
private static final long serialVersionUID = -8158322871608889516L;
final Subscriber<? super T> downstream;
final Publisher<? extends T>[] sources;
final boolean delayError;
Reported by PMD.
Line: 50
final Subscriber<? super T> downstream;
final Publisher<? extends T>[] sources;
final boolean delayError;
final AtomicInteger wip;
Reported by PMD.
Line: 52
final Publisher<? extends T>[] sources;
final boolean delayError;
final AtomicInteger wip;
int index;
Reported by PMD.
Line: 54
final boolean delayError;
final AtomicInteger wip;
int index;
List<Throwable> errors;
Reported by PMD.
Line: 56
final AtomicInteger wip;
int index;
List<Throwable> errors;
long produced;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observers/DisposableCompletableObserverTest.java
28 issues
Line: 32
static final class TestCompletable extends DisposableCompletableObserver {
int start;
int complete;
final List<Throwable> errors = new ArrayList<>();
Reported by PMD.
Line: 34
int start;
int complete;
final List<Throwable> errors = new ArrayList<>();
@Override
protected void onStart() {
Reported by PMD.
Line: 36
int complete;
final List<Throwable> errors = new ArrayList<>();
@Override
protected void onStart() {
super.onStart();
Reported by PMD.
Line: 58
}
@Test
public void normal() {
TestCompletable tc = new TestCompletable();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertEquals(0, tc.complete);
Reported by PMD.
Line: 61
public void normal() {
TestCompletable tc = new TestCompletable();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertEquals(0, tc.complete);
assertTrue(tc.errors.isEmpty());
Completable.complete().subscribe(tc);
Reported by PMD.
Line: 62
TestCompletable tc = new TestCompletable();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertEquals(0, tc.complete);
assertTrue(tc.errors.isEmpty());
Completable.complete().subscribe(tc);
Reported by PMD.
Line: 63
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertEquals(0, tc.complete);
assertTrue(tc.errors.isEmpty());
Completable.complete().subscribe(tc);
assertFalse(tc.isDisposed());
Reported by PMD.
Line: 64
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertEquals(0, tc.complete);
assertTrue(tc.errors.isEmpty());
Completable.complete().subscribe(tc);
assertFalse(tc.isDisposed());
assertEquals(1, tc.start);
Reported by PMD.
Line: 64
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertEquals(0, tc.complete);
assertTrue(tc.errors.isEmpty());
Completable.complete().subscribe(tc);
assertFalse(tc.isDisposed());
assertEquals(1, tc.start);
Reported by PMD.
Line: 66
assertEquals(0, tc.complete);
assertTrue(tc.errors.isEmpty());
Completable.complete().subscribe(tc);
assertFalse(tc.isDisposed());
assertEquals(1, tc.start);
assertEquals(1, tc.complete);
assertTrue(tc.errors.isEmpty());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelFilterTest.java
28 issues
Line: 32
public class ParallelFilterTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.filter(Functions.alwaysTrue()));
}
@Test
Reported by PMD.
Line: 33
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.filter(Functions.alwaysTrue()));
}
@Test
public void doubleFilter() {
Reported by PMD.
Line: 33
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.filter(Functions.alwaysTrue()));
}
@Test
public void doubleFilter() {
Reported by PMD.
Line: 38
}
@Test
public void doubleFilter() {
Flowable.range(1, 10)
.parallel()
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer v) throws Exception {
Reported by PMD.
Line: 59
}
@Test
public void doubleError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.filter(Functions.alwaysTrue())
.sequential()
Reported by PMD.
Line: 68
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 78
}
@Test
public void doubleError2() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.filter(Functions.alwaysTrue())
.filter(Functions.alwaysTrue())
Reported by PMD.
Line: 88
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 98
}
@Test
public void error() {
Flowable.error(new TestException())
.parallel()
.filter(Functions.alwaysTrue())
.sequential()
.test()
Reported by PMD.
Line: 108
}
@Test
public void predicateThrows() {
Flowable.just(1)
.parallel()
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer v) throws Exception {
Reported by PMD.