The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BufferUntilSubscriberTest.java
26 issues
Line: 28
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
@Test
public void issue1677() throws InterruptedException {
final AtomicLong counter = new AtomicLong();
final Integer[] numbers = new Integer[5000];
Reported by PMD.
Line: 28
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
@Test
public void issue1677() throws InterruptedException {
final AtomicLong counter = new AtomicLong();
final Integer[] numbers = new Integer[5000];
Reported by PMD.
Line: 40
final int NITERS = 250;
final CountDownLatch latch = new CountDownLatch(NITERS);
for (int iters = 0; iters < NITERS; iters++) {
final CountDownLatch innerLatch = new CountDownLatch(1);
final PublishProcessor<Void> s = PublishProcessor.create();
final AtomicBoolean completed = new AtomicBoolean();
Flowable.fromArray(numbers)
.takeUntil(s)
.window(50)
Reported by PMD.
Line: 42
for (int iters = 0; iters < NITERS; iters++) {
final CountDownLatch innerLatch = new CountDownLatch(1);
final PublishProcessor<Void> s = PublishProcessor.create();
final AtomicBoolean completed = new AtomicBoolean();
Flowable.fromArray(numbers)
.takeUntil(s)
.window(50)
.flatMap(new Function<Flowable<Integer>, Publisher<Object>>() {
@Override
Reported by PMD.
Line: 46
Flowable.fromArray(numbers)
.takeUntil(s)
.window(50)
.flatMap(new Function<Flowable<Integer>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<Integer> integerObservable) {
return integerObservable
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Object>() {
Reported by PMD.
Line: 51
public Publisher<Object> apply(Flowable<Integer> integerObservable) {
return integerObservable
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) {
if (integer >= 5 && completed.compareAndSet(false, true)) {
s.onComplete();
}
Reported by PMD.
Line: 65
}
})
.toList()
.doOnSuccess(new Consumer<List<Object>>() {
@Override
public void accept(List<Object> integers) {
counter.incrementAndGet();
latch.countDown();
innerLatch.countDown();
Reported by PMD.
Line: 20
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.junit.*;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.processors.PublishProcessor;
Reported by PMD.
Line: 23
import org.junit.*;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
Reported by PMD.
Line: 24
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBufferUntilSubscriberTest.java
26 issues
Line: 27
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class ObservableBufferUntilSubscriberTest extends RxJavaTest {
@Test
public void issue1677() throws InterruptedException {
final AtomicLong counter = new AtomicLong();
final Integer[] numbers = new Integer[5000];
Reported by PMD.
Line: 27
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class ObservableBufferUntilSubscriberTest extends RxJavaTest {
@Test
public void issue1677() throws InterruptedException {
final AtomicLong counter = new AtomicLong();
final Integer[] numbers = new Integer[5000];
Reported by PMD.
Line: 39
final int NITERS = 250;
final CountDownLatch latch = new CountDownLatch(NITERS);
for (int iters = 0; iters < NITERS; iters++) {
final CountDownLatch innerLatch = new CountDownLatch(1);
final PublishSubject<Void> s = PublishSubject.create();
final AtomicBoolean completed = new AtomicBoolean();
Observable.fromArray(numbers)
.takeUntil(s)
.window(50)
Reported by PMD.
Line: 41
for (int iters = 0; iters < NITERS; iters++) {
final CountDownLatch innerLatch = new CountDownLatch(1);
final PublishSubject<Void> s = PublishSubject.create();
final AtomicBoolean completed = new AtomicBoolean();
Observable.fromArray(numbers)
.takeUntil(s)
.window(50)
.flatMap(new Function<Observable<Integer>, Observable<Object>>() {
@Override
Reported by PMD.
Line: 45
Observable.fromArray(numbers)
.takeUntil(s)
.window(50)
.flatMap(new Function<Observable<Integer>, Observable<Object>>() {
@Override
public Observable<Object> apply(Observable<Integer> integerObservable) {
return integerObservable
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Object>() {
Reported by PMD.
Line: 50
public Observable<Object> apply(Observable<Integer> integerObservable) {
return integerObservable
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) {
if (integer >= 5 && completed.compareAndSet(false, true)) {
s.onComplete();
}
Reported by PMD.
Line: 64
}
})
.toList()
.doOnSuccess(new Consumer<List<Object>>() {
@Override
public void accept(List<Object> integers) {
counter.incrementAndGet();
latch.countDown();
innerLatch.countDown();
Reported by PMD.
Line: 20
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.junit.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
Reported by PMD.
Line: 22
import org.junit.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class ObservableBufferUntilSubscriberTest extends RxJavaTest {
Reported by PMD.
Line: 23
import org.junit.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
public class ObservableBufferUntilSubscriberTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromTest.java
25 issues
Line: 31
public class ObservableFromTest extends RxJavaTest {
@Test
public void fromFutureTimeout() throws Exception {
Observable.fromFuture(Observable.never()
.toFuture(), 100, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
Reported by PMD.
Line: 31
public class ObservableFromTest extends RxJavaTest {
@Test
public void fromFutureTimeout() throws Exception {
Observable.fromFuture(Observable.never()
.toFuture(), 100, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
Reported by PMD.
Line: 32
@Test
public void fromFutureTimeout() throws Exception {
Observable.fromFuture(Observable.never()
.toFuture(), 100, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
Reported by PMD.
Line: 32
@Test
public void fromFutureTimeout() throws Exception {
Observable.fromFuture(Observable.never()
.toFuture(), 100, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
Reported by PMD.
Line: 32
@Test
public void fromFutureTimeout() throws Exception {
Observable.fromFuture(Observable.never()
.toFuture(), 100, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
Reported by PMD.
Line: 32
@Test
public void fromFutureTimeout() throws Exception {
Observable.fromFuture(Observable.never()
.toFuture(), 100, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
Reported by PMD.
Line: 32
@Test
public void fromFutureTimeout() throws Exception {
Observable.fromFuture(Observable.never()
.toFuture(), 100, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.io())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TimeoutException.class);
Reported by PMD.
Line: 41
}
@Test
public void fromPublisher() {
Observable.fromPublisher(Flowable.just(1))
.test()
.assertResult(1);
}
Reported by PMD.
Line: 42
@Test
public void fromPublisher() {
Observable.fromPublisher(Flowable.just(1))
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 42
@Test
public void fromPublisher() {
Observable.fromPublisher(Flowable.just(1))
.test()
.assertResult(1);
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelReduceFullTest.java
25 issues
Line: 34
public class ParallelReduceFullTest extends RxJavaTest {
@Test
public void cancel() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = pp
.parallel()
.reduce(new BiFunction<Integer, Integer, Integer>() {
Reported by PMD.
Line: 47
})
.test();
assertTrue(pp.hasSubscribers());
ts.cancel();
assertFalse(pp.hasSubscribers());
}
Reported by PMD.
Line: 47
})
.test();
assertTrue(pp.hasSubscribers());
ts.cancel();
assertFalse(pp.hasSubscribers());
}
Reported by PMD.
Line: 51
ts.cancel();
assertFalse(pp.hasSubscribers());
}
@Test
public void error() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Reported by PMD.
Line: 51
ts.cancel();
assertFalse(pp.hasSubscribers());
}
@Test
public void error() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Reported by PMD.
Line: 70
.test()
.assertFailure(TestException.class);
assertTrue(errors.isEmpty());
} finally {
RxJavaPlugins.reset();
}
}
Reported by PMD.
Line: 77
}
@Test
public void error2() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
ParallelFlowable.fromArray(Flowable.<Integer>error(new IOException()), Flowable.<Integer>error(new TestException()))
.reduce(new BiFunction<Integer, Integer, Integer>() {
Reported by PMD.
Line: 98
}
@Test
public void empty() {
Flowable.<Integer>empty()
.parallel()
.reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer a, Integer b) throws Exception {
Reported by PMD.
Line: 112
}
@Test
public void doubleError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.reduce(new BiFunction<Object, Object, Object>() {
@Override
Reported by PMD.
Line: 119
.reduce(new BiFunction<Object, Object, Object>() {
@Override
public Object apply(Object a, Object b) throws Exception {
return "" + a + b;
}
})
.test()
.assertFailure(TestException.class);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelReduceFull.java
25 issues
Line: 106
current.compareAndSet(curr, null);
return curr;
}
return null;
}
}
@Override
public void cancel() {
Reported by PMD.
Line: 128
}
}
void innerComplete(T value) {
if (value != null) {
for (;;) {
SlotPair<T> sp = addValue(value);
if (sp != null) {
Reported by PMD.
Line: 37
*/
public final class ParallelReduceFull<T> extends Flowable<T> {
final ParallelFlowable<? extends T> source;
final BiFunction<T, T, T> reducer;
public ParallelReduceFull(ParallelFlowable<? extends T> source, BiFunction<T, T, T> reducer) {
this.source = source;
Reported by PMD.
Line: 39
final ParallelFlowable<? extends T> source;
final BiFunction<T, T, T> reducer;
public ParallelReduceFull(ParallelFlowable<? extends T> source, BiFunction<T, T, T> reducer) {
this.source = source;
this.reducer = reducer;
}
Reported by PMD.
Line: 58
private static final long serialVersionUID = -5370107872170712765L;
final ParallelReduceFullInnerSubscriber<T>[] subscribers;
final BiFunction<T, T, T> reducer;
final AtomicReference<SlotPair<T>> current = new AtomicReference<>();
Reported by PMD.
Line: 60
final ParallelReduceFullInnerSubscriber<T>[] subscribers;
final BiFunction<T, T, T> reducer;
final AtomicReference<SlotPair<T>> current = new AtomicReference<>();
final AtomicInteger remaining = new AtomicInteger();
Reported by PMD.
Line: 62
final BiFunction<T, T, T> reducer;
final AtomicReference<SlotPair<T>> current = new AtomicReference<>();
final AtomicInteger remaining = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
Reported by PMD.
Line: 64
final AtomicReference<SlotPair<T>> current = new AtomicReference<>();
final AtomicInteger remaining = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
ParallelReduceFullMainSubscriber(Subscriber<? super T> subscriber, int n, BiFunction<T, T, T> reducer) {
super(subscriber);
Reported by PMD.
Line: 66
final AtomicInteger remaining = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
ParallelReduceFullMainSubscriber(Subscriber<? super T> subscriber, int n, BiFunction<T, T, T> reducer) {
super(subscriber);
@SuppressWarnings("unchecked")
ParallelReduceFullInnerSubscriber<T>[] a = new ParallelReduceFullInnerSubscriber[n];
Reported by PMD.
Line: 85
SlotPair<T> curr = current.get();
if (curr == null) {
curr = new SlotPair<>();
if (!current.compareAndSet(null, curr)) {
continue;
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTimed.java
25 issues
Line: 25
import io.reactivex.rxjava3.internal.queue.SpscLinkedArrayQueue;
public final class ObservableTakeLastTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long count;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
Reported by PMD.
Line: 26
public final class ObservableTakeLastTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long count;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
Reported by PMD.
Line: 27
public final class ObservableTakeLastTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long count;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public ObservableTakeLastTimed(ObservableSource<T> source,
Reported by PMD.
Line: 28
final long count;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public ObservableTakeLastTimed(ObservableSource<T> source,
long count, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
Reported by PMD.
Line: 29
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public ObservableTakeLastTimed(ObservableSource<T> source,
long count, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
super(source);
Reported by PMD.
Line: 30
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public ObservableTakeLastTimed(ObservableSource<T> source,
long count, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
super(source);
this.count = count;
Reported by PMD.
Line: 52
extends AtomicBoolean implements Observer<T>, Disposable {
private static final long serialVersionUID = -5677354903406201275L;
final Observer<? super T> downstream;
final long count;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
Reported by PMD.
Line: 53
private static final long serialVersionUID = -5677354903406201275L;
final Observer<? super T> downstream;
final long count;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Reported by PMD.
Line: 54
private static final long serialVersionUID = -5677354903406201275L;
final Observer<? super T> downstream;
final long count;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Reported by PMD.
Line: 55
final Observer<? super T> downstream;
final long count;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Disposable upstream;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/disposables/FutureDisposableTest.java
25 issues
Line: 28
public class FutureDisposableTest extends RxJavaTest {
@Test
public void normal() {
FutureTask<Object> ft = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
Disposable d = Disposable.fromFuture(ft);
assertFalse(d.isDisposed());
d.dispose();
Reported by PMD.
Line: 31
public void normal() {
FutureTask<Object> ft = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
Disposable d = Disposable.fromFuture(ft);
assertFalse(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
Reported by PMD.
Line: 31
public void normal() {
FutureTask<Object> ft = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
Disposable d = Disposable.fromFuture(ft);
assertFalse(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
Reported by PMD.
Line: 33
Disposable d = Disposable.fromFuture(ft);
assertFalse(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
d.dispose();
Reported by PMD.
Line: 35
d.dispose();
assertTrue(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
Reported by PMD.
Line: 35
d.dispose();
assertTrue(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
Reported by PMD.
Line: 37
assertTrue(d.isDisposed());
d.dispose();
assertTrue(d.isDisposed());
assertTrue(ft.isCancelled());
}
Reported by PMD.
Line: 39
d.dispose();
assertTrue(d.isDisposed());
assertTrue(ft.isCancelled());
}
@Test
Reported by PMD.
Line: 39
d.dispose();
assertTrue(d.isDisposed());
assertTrue(ft.isCancelled());
}
@Test
Reported by PMD.
Line: 41
assertTrue(d.isDisposed());
assertTrue(ft.isCancelled());
}
@Test
public void interruptible() {
FutureTask<Object> ft = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableReduceTests.java
24 issues
Line: 36
}
}).toObservable().blockingSingle();
assertEquals(6, value);
}
@SuppressWarnings("unused")
@Test
public void reduceWithObjectsObservable() {
Reported by PMD.
Line: 58
}
}).toObservable();
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 77
}
}).toObservable();
assertNotNull(reduceResult2);
}
@Test
public void reduceInts() {
Observable<Integer> o = Observable.just(1, 2, 3);
Reported by PMD.
Line: 90
}
}).blockingGet();
assertEquals(6, value);
}
@SuppressWarnings("unused")
@Test
public void reduceWithObjects() {
Reported by PMD.
Line: 112
}
});
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 131
}
});
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 140
* https://github.com/ReactiveX/RxJava/issues/360#issuecomment-24203016
*/
@Test
public void reduceCovariance() {
// must type it to <Movie>
Observable<Movie> horrorMovies = Observable.<Movie> just(new HorrorMovie());
libraryFunctionActingOnMovieObservables(horrorMovies);
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.observable;
import static org.junit.Assert.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiFunction;
Reported by PMD.
Line: 20
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.observable.ObservableCovarianceTest.*;
public class ObservableReduceTests extends RxJavaTest {
Reported by PMD.
Line: 22
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.observable.ObservableCovarianceTest.*;
public class ObservableReduceTests extends RxJavaTest {
@Test
public void reduceIntsObservable() {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/jdk8/ParallelMapOptional.java
24 issues
Line: 36
*/
public final class ParallelMapOptional<T, R> extends ParallelFlowable<R> {
final ParallelFlowable<T> source;
final Function<? super T, Optional<? extends R>> mapper;
public ParallelMapOptional(ParallelFlowable<T> source, Function<? super T, Optional<? extends R>> mapper) {
this.source = source;
Reported by PMD.
Line: 38
final ParallelFlowable<T> source;
final Function<? super T, Optional<? extends R>> mapper;
public ParallelMapOptional(ParallelFlowable<T> source, Function<? super T, Optional<? extends R>> mapper) {
this.source = source;
this.mapper = mapper;
}
Reported by PMD.
Line: 74
static final class ParallelMapSubscriber<T, R> implements ConditionalSubscriber<T>, Subscription {
final Subscriber<? super R> downstream;
final Function<? super T, Optional<? extends R>> mapper;
Subscription upstream;
Reported by PMD.
Line: 76
final Subscriber<? super R> downstream;
final Function<? super T, Optional<? extends R>> mapper;
Subscription upstream;
boolean done;
Reported by PMD.
Line: 78
final Function<? super T, Optional<? extends R>> mapper;
Subscription upstream;
boolean done;
ParallelMapSubscriber(Subscriber<? super R> actual, Function<? super T, Optional<? extends R>> mapper) {
this.downstream = actual;
Reported by PMD.
Line: 80
Subscription upstream;
boolean done;
ParallelMapSubscriber(Subscriber<? super R> actual, Function<? super T, Optional<? extends R>> mapper) {
this.downstream = actual;
this.mapper = mapper;
}
Reported by PMD.
Line: 122
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Optional");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
onError(ex);
return true;
}
Reported by PMD.
Line: 129
return true;
}
if (v.isPresent()) {
downstream.onNext(v.get());
return true;
}
return false;
}
Reported by PMD.
Line: 158
}
static final class ParallelMapConditionalSubscriber<T, R> implements ConditionalSubscriber<T>, Subscription {
final ConditionalSubscriber<? super R> downstream;
final Function<? super T, Optional<? extends R>> mapper;
Subscription upstream;
Reported by PMD.
Line: 160
final ConditionalSubscriber<? super R> downstream;
final Function<? super T, Optional<? extends R>> mapper;
Subscription upstream;
boolean done;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/FlowableReduceTests.java
24 issues
Line: 36
}
}).toFlowable().blockingSingle();
assertEquals(6, value);
}
@SuppressWarnings("unused")
@Test
public void reduceWithObjectsFlowable() {
Reported by PMD.
Line: 58
}
}).toFlowable();
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 77
}
}).toFlowable();
assertNotNull(reduceResult2);
}
@Test
public void reduceInts() {
Flowable<Integer> f = Flowable.just(1, 2, 3);
Reported by PMD.
Line: 90
}
}).toFlowable().blockingSingle();
assertEquals(6, value);
}
@SuppressWarnings("unused")
@Test
public void reduceWithObjects() {
Reported by PMD.
Line: 112
}
});
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 131
}
});
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 140
* https://github.com/ReactiveX/RxJava/issues/360#issuecomment-24203016
*/
@Test
public void reduceCovariance() {
// must type it to <Movie>
Flowable<Movie> horrorMovies = Flowable.<Movie> just(new HorrorMovie());
libraryFunctionActingOnMovieObservables(horrorMovies);
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.flowable;
import static org.junit.Assert.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.flowable.FlowableCovarianceTest.*;
Reported by PMD.
Line: 20
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.flowable.FlowableCovarianceTest.*;
import io.reactivex.rxjava3.functions.BiFunction;
public class FlowableReduceTests extends RxJavaTest {
Reported by PMD.
Line: 21
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.flowable.FlowableCovarianceTest.*;
import io.reactivex.rxjava3.functions.BiFunction;
public class FlowableReduceTests extends RxJavaTest {
@Test
Reported by PMD.