The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableToXTest.java
29 issues
Line: 25
public class ObservableToXTest extends RxJavaTest {
@Test
public void toFlowableBuffer() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.BUFFER)
.test(2L)
.assertValues(1, 2)
.assertNoErrors()
Reported by PMD.
Line: 26
@Test
public void toFlowableBuffer() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.BUFFER)
.test(2L)
.assertValues(1, 2)
.assertNoErrors()
.assertNotComplete();
Reported by PMD.
Line: 26
@Test
public void toFlowableBuffer() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.BUFFER)
.test(2L)
.assertValues(1, 2)
.assertNoErrors()
.assertNotComplete();
Reported by PMD.
Line: 26
@Test
public void toFlowableBuffer() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.BUFFER)
.test(2L)
.assertValues(1, 2)
.assertNoErrors()
.assertNotComplete();
Reported by PMD.
Line: 26
@Test
public void toFlowableBuffer() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.BUFFER)
.test(2L)
.assertValues(1, 2)
.assertNoErrors()
.assertNotComplete();
Reported by PMD.
Line: 26
@Test
public void toFlowableBuffer() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.BUFFER)
.test(2L)
.assertValues(1, 2)
.assertNoErrors()
.assertNotComplete();
Reported by PMD.
Line: 35
}
@Test
public void toFlowableDrop() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.DROP)
.test(1)
.assertResult(1);
}
Reported by PMD.
Line: 36
@Test
public void toFlowableDrop() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.DROP)
.test(1)
.assertResult(1);
}
Reported by PMD.
Line: 36
@Test
public void toFlowableDrop() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.DROP)
.test(1)
.assertResult(1);
}
Reported by PMD.
Line: 36
@Test
public void toFlowableDrop() {
Observable.range(1, 5)
.toFlowable(BackpressureStrategy.DROP)
.test(1)
.assertResult(1);
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/subjects/BehaviorSubject.java
29 issues
Line: 150
* @param <T>
* the type of item expected to be observed by the Subject
*/
public final class BehaviorSubject<T> extends Subject<T> {
final AtomicReference<Object> value;
final AtomicReference<BehaviorDisposable<T>[]> observers;
Reported by PMD.
Line: 154
final AtomicReference<Object> value;
final AtomicReference<BehaviorDisposable<T>[]> observers;
@SuppressWarnings("rawtypes")
static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];
@SuppressWarnings("rawtypes")
Reported by PMD.
Line: 161
@SuppressWarnings("rawtypes")
static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0];
final ReadWriteLock lock;
final Lock readLock;
final Lock writeLock;
final AtomicReference<Throwable> terminalEvent;
Reported by PMD.
Line: 162
@SuppressWarnings("rawtypes")
static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0];
final ReadWriteLock lock;
final Lock readLock;
final Lock writeLock;
final AtomicReference<Throwable> terminalEvent;
long index;
Reported by PMD.
Line: 163
static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0];
final ReadWriteLock lock;
final Lock readLock;
final Lock writeLock;
final AtomicReference<Throwable> terminalEvent;
long index;
Reported by PMD.
Line: 165
final Lock readLock;
final Lock writeLock;
final AtomicReference<Throwable> terminalEvent;
long index;
/**
* Creates a {@link BehaviorSubject} without a default item.
Reported by PMD.
Line: 167
final AtomicReference<Throwable> terminalEvent;
long index;
/**
* Creates a {@link BehaviorSubject} without a default item.
*
* @param <T>
Reported by PMD.
Line: 206
* @param defaultValue the initial value, not null (verified)
* @since 2.0
*/
@SuppressWarnings("unchecked")
BehaviorSubject(T defaultValue) {
this.lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
this.observers = new AtomicReference<>(EMPTY);
Reported by PMD.
Line: 284
@Override
@CheckReturnValue
public boolean hasObservers() {
return observers.get().length != 0;
}
@CheckReturnValue
/* test support*/ int subscriberCount() {
return observers.get().length;
Reported by PMD.
Line: 289
@CheckReturnValue
/* test support*/ int subscriberCount() {
return observers.get().length;
}
@Override
@Nullable
@CheckReturnValue
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelMapTest.java
29 issues
Line: 35
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ParallelMapTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.map(Functions.identity()));
Reported by PMD.
Line: 38
public class ParallelMapTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.map(Functions.identity()));
}
@Test
Reported by PMD.
Line: 39
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.map(Functions.identity()));
}
@Test
public void doubleFilter() {
Reported by PMD.
Line: 39
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.map(Functions.identity()));
}
@Test
public void doubleFilter() {
Reported by PMD.
Line: 44
}
@Test
public void doubleFilter() {
Flowable.range(1, 10)
.parallel()
.map(Functions.<Integer>identity())
.filter(new Predicate<Integer>() {
@Override
Reported by PMD.
Line: 66
}
@Test
public void doubleFilterAsync() {
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(Functions.<Integer>identity())
.filter(new Predicate<Integer>() {
Reported by PMD.
Line: 90
}
@Test
public void doubleError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.map(Functions.<Object>identity())
.sequential()
Reported by PMD.
Line: 99
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 109
}
@Test
public void doubleError2() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.map(Functions.<Object>identity())
.filter(Functions.alwaysTrue())
Reported by PMD.
Line: 119
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/FlowableZipTests.java
29 issues
Line: 61
@Override
public void accept(HashMap<String, String> v) {
synchronized (v) {
System.out.println(v);
}
}
});
System.out.println("**** finished");
Reported by PMD.
Line: 66
}
});
System.out.println("**** finished");
}
/**
* This won't compile if super/extends isn't done correctly on generics.
*/
Reported by PMD.
Line: 102
Flowable<Object> result = Flowable.zip(observables, new Function<Object[], Object>() {
@Override
public Object apply(Object[] args) {
System.out.println("received: " + args);
assertEquals("No argument should have been passed", 0, args.length);
return invoked;
}
});
Reported by PMD.
Line: 121
Consumer<Result> action = new Consumer<Result>() {
@Override
public void accept(Result t1) {
System.out.println("Result: " + t1);
}
};
Consumer<ExtendedResult> extendedAction = new Consumer<ExtendedResult>() {
@Override
Reported by PMD.
Line: 128
Consumer<ExtendedResult> extendedAction = new Consumer<ExtendedResult>() {
@Override
public void accept(ExtendedResult t1) {
System.out.println("Result: " + t1);
}
};
@Test
public void zipWithDelayError() {
Reported by PMD.
Line: 32
public class FlowableZipTests extends RxJavaTest {
@Test
public void zipObservableOfObservables() {
FlowableEventStream.getEventStream("HTTP-ClusterB", 20)
.groupBy(new Function<Event, String>() {
@Override
public String apply(Event e) {
return e.instanceId;
Reported by PMD.
Line: 73
* This won't compile if super/extends isn't done correctly on generics.
*/
@Test
public void covarianceOfZip() {
Flowable<HorrorMovie> horrors = Flowable.just(new HorrorMovie());
Flowable<CoolRating> ratings = Flowable.just(new CoolRating());
Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Reported by PMD.
Line: 77
Flowable<HorrorMovie> horrors = Flowable.just(new HorrorMovie());
Flowable<CoolRating> ratings = Flowable.just(new CoolRating());
Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
Flowable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
Reported by PMD.
Line: 77
Flowable<HorrorMovie> horrors = Flowable.just(new HorrorMovie());
Flowable<CoolRating> ratings = Flowable.just(new CoolRating());
Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
Flowable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
Reported by PMD.
Line: 78
Flowable<CoolRating> ratings = Flowable.just(new CoolRating());
Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
Flowable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
Flowable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatIterableTest.java
29 issues
Line: 34
public class MaybeConcatIterableTest extends RxJavaTest {
@Test
public void take() {
Maybe.concat(Arrays.asList(Maybe.just(1), Maybe.just(2), Maybe.just(3)))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 35
@Test
public void take() {
Maybe.concat(Arrays.asList(Maybe.just(1), Maybe.just(2), Maybe.just(3)))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 35
@Test
public void take() {
Maybe.concat(Arrays.asList(Maybe.just(1), Maybe.just(2), Maybe.just(3)))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 35
@Test
public void take() {
Maybe.concat(Arrays.asList(Maybe.just(1), Maybe.just(2), Maybe.just(3)))
.take(1)
.test()
.assertResult(1);
}
Reported by PMD.
Line: 42
}
@Test
public void iteratorThrows() {
Maybe.concat(new Iterable<MaybeSource<Object>>() {
@Override
public Iterator<MaybeSource<Object>> iterator() {
throw new TestException("iterator()");
}
Reported by PMD.
Line: 54
}
@Test
public void error() {
Maybe.concat(Arrays.asList(Maybe.just(1), Maybe.<Integer>error(new TestException()), Maybe.just(3)))
.test()
.assertFailure(TestException.class, 1);
}
Reported by PMD.
Line: 61
}
@Test
public void successCancelRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final PublishProcessor<Integer> pp = PublishProcessor.create();
final TestSubscriber<Integer> ts = Maybe.concat(Arrays.asList(pp.singleElement()))
Reported by PMD.
Line: 66
final PublishProcessor<Integer> pp = PublishProcessor.create();
final TestSubscriber<Integer> ts = Maybe.concat(Arrays.asList(pp.singleElement()))
.test();
pp.onNext(1);
Runnable r1 = new Runnable() {
Reported by PMD.
Line: 66
final PublishProcessor<Integer> pp = PublishProcessor.create();
final TestSubscriber<Integer> ts = Maybe.concat(Arrays.asList(pp.singleElement()))
.test();
pp.onNext(1);
Runnable r1 = new Runnable() {
Reported by PMD.
Line: 69
final TestSubscriber<Integer> ts = Maybe.concat(Arrays.asList(pp.singleElement()))
.test();
pp.onNext(1);
Runnable r1 = new Runnable() {
@Override
public void run() {
ts.cancel();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/exceptions/CompositeException.java
29 issues
Line: 35
* If you invoke {@link #getCause()}, it will lazily create the causal chain but will stop if it finds any
* Throwable in the chain that it has already seen.
*/
public final class CompositeException extends RuntimeException {
private static final long serialVersionUID = 3026362227162912146L;
private final List<Throwable> exceptions;
private final String message;
Reported by PMD.
Line: 35
* If you invoke {@link #getCause()}, it will lazily create the causal chain but will stop if it finds any
* Throwable in the chain that it has already seen.
*/
public final class CompositeException extends RuntimeException {
private static final long serialVersionUID = 3026362227162912146L;
private final List<Throwable> exceptions;
private final String message;
Reported by PMD.
Line: 41
private final List<Throwable> exceptions;
private final String message;
private Throwable cause;
/**
* Constructs a CompositeException with the given array of Throwables as the
* list of suppressed exceptions.
* @param exceptions the Throwables to have as initially suppressed exceptions
Reported by PMD.
Line: 107
public synchronized Throwable getCause() { // NOPMD
if (cause == null) {
String separator = System.getProperty("line.separator");
if (exceptions.size() > 1) {
Map<Throwable, Boolean> seenCauses = new IdentityHashMap<>();
StringBuilder aggregateMessage = new StringBuilder();
aggregateMessage.append("Multiple exceptions (").append(exceptions.size()).append(")").append(separator);
Reported by PMD.
Line: 120
aggregateMessage.append(" ");
}
aggregateMessage.append("|-- ");
aggregateMessage.append(inner.getClass().getCanonicalName()).append(": ");
String innerMessage = inner.getMessage();
if (innerMessage != null && innerMessage.contains(separator)) {
aggregateMessage.append(separator);
for (String line : innerMessage.split(separator)) {
for (int i = 0; i < depth + 2; i++) {
Reported by PMD.
Line: 122
aggregateMessage.append("|-- ");
aggregateMessage.append(inner.getClass().getCanonicalName()).append(": ");
String innerMessage = inner.getMessage();
if (innerMessage != null && innerMessage.contains(separator)) {
aggregateMessage.append(separator);
for (String line : innerMessage.split(separator)) {
for (int i = 0; i < depth + 2; i++) {
aggregateMessage.append(" ");
}
Reported by PMD.
Line: 124
String innerMessage = inner.getMessage();
if (innerMessage != null && innerMessage.contains(separator)) {
aggregateMessage.append(separator);
for (String line : innerMessage.split(separator)) {
for (int i = 0; i < depth + 2; i++) {
aggregateMessage.append(" ");
}
aggregateMessage.append(line).append(separator);
}
Reported by PMD.
Line: 146
if (!seenCauses.containsKey(inner)) {
seenCauses.put(inner, true);
inner = inner.getCause();
depth++;
} else {
inner = inner.getCause();
if (inner != null) {
for (int i = 0; i < depth + 2; i++) {
Reported by PMD.
Line: 146
if (!seenCauses.containsKey(inner)) {
seenCauses.put(inner, true);
inner = inner.getCause();
depth++;
} else {
inner = inner.getCause();
if (inner != null) {
for (int i = 0; i < depth + 2; i++) {
Reported by PMD.
Line: 149
inner = inner.getCause();
depth++;
} else {
inner = inner.getCause();
if (inner != null) {
for (int i = 0; i < depth + 2; i++) {
aggregateMessage.append(" ");
}
aggregateMessage.append("|-- ");
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcat.java
29 issues
Line: 30
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class CompletableConcat extends Completable {
final Publisher<? extends CompletableSource> sources;
final int prefetch;
public CompletableConcat(Publisher<? extends CompletableSource> sources, int prefetch) {
this.sources = sources;
this.prefetch = prefetch;
Reported by PMD.
Line: 31
public final class CompletableConcat extends Completable {
final Publisher<? extends CompletableSource> sources;
final int prefetch;
public CompletableConcat(Publisher<? extends CompletableSource> sources, int prefetch) {
this.sources = sources;
this.prefetch = prefetch;
}
Reported by PMD.
Line: 48
implements FlowableSubscriber<CompletableSource>, Disposable {
private static final long serialVersionUID = 9032184911934499404L;
final CompletableObserver downstream;
final int prefetch;
final int limit;
Reported by PMD.
Line: 50
final CompletableObserver downstream;
final int prefetch;
final int limit;
final ConcatInnerObserver inner;
Reported by PMD.
Line: 52
final int prefetch;
final int limit;
final ConcatInnerObserver inner;
final AtomicBoolean once;
Reported by PMD.
Line: 54
final int limit;
final ConcatInnerObserver inner;
final AtomicBoolean once;
int sourceFused;
Reported by PMD.
Line: 56
final ConcatInnerObserver inner;
final AtomicBoolean once;
int sourceFused;
int consumed;
Reported by PMD.
Line: 58
final AtomicBoolean once;
int sourceFused;
int consumed;
SimpleQueue<CompletableSource> queue;
Reported by PMD.
Line: 60
int sourceFused;
int consumed;
SimpleQueue<CompletableSource> queue;
Subscription upstream;
Reported by PMD.
Line: 62
int consumed;
SimpleQueue<CompletableSource> queue;
Subscription upstream;
volatile boolean done;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/jdk8/ParallelFlatMapStreamTest.java
29 issues
Line: 26
public class ParallelFlatMapStreamTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.flatMapStream(v -> Stream.of(1, 2, 3)));
}
@Test
Reported by PMD.
Line: 27
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.flatMapStream(v -> Stream.of(1, 2, 3)));
}
@Test
public void normal() {
Reported by PMD.
Line: 27
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.flatMapStream(v -> Stream.of(1, 2, 3)));
}
@Test
public void normal() {
Reported by PMD.
Line: 32
}
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapStream(v -> Stream.of(v, v + 1))
.sequential()
Reported by PMD.
Line: 34
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapStream(v -> Stream.of(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 34
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapStream(v -> Stream.of(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 34
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapStream(v -> Stream.of(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 34
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapStream(v -> Stream.of(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 34
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapStream(v -> Stream.of(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 34
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapStream(v -> Stream.of(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTimeoutTimed.java
29 issues
Line: 27
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableTimeoutTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final ObservableSource<? extends T> other;
public ObservableTimeoutTimed(Observable<T> source,
Reported by PMD.
Line: 28
public final class ObservableTimeoutTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final ObservableSource<? extends T> other;
public ObservableTimeoutTimed(Observable<T> source,
long timeout, TimeUnit unit, Scheduler scheduler, ObservableSource<? extends T> other) {
Reported by PMD.
Line: 29
public final class ObservableTimeoutTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final ObservableSource<? extends T> other;
public ObservableTimeoutTimed(Observable<T> source,
long timeout, TimeUnit unit, Scheduler scheduler, ObservableSource<? extends T> other) {
super(source);
Reported by PMD.
Line: 30
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final ObservableSource<? extends T> other;
public ObservableTimeoutTimed(Observable<T> source,
long timeout, TimeUnit unit, Scheduler scheduler, ObservableSource<? extends T> other) {
super(source);
this.timeout = timeout;
Reported by PMD.
Line: 61
private static final long serialVersionUID = 3764492702657003550L;
final Observer<? super T> downstream;
final long timeout;
final TimeUnit unit;
Reported by PMD.
Line: 63
final Observer<? super T> downstream;
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
Reported by PMD.
Line: 65
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
final SequentialDisposable task;
Reported by PMD.
Line: 67
final TimeUnit unit;
final Scheduler.Worker worker;
final SequentialDisposable task;
final AtomicReference<Disposable> upstream;
Reported by PMD.
Line: 69
final Scheduler.Worker worker;
final SequentialDisposable task;
final AtomicReference<Disposable> upstream;
TimeoutObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Scheduler.Worker worker) {
this.downstream = actual;
Reported by PMD.
Line: 71
final SequentialDisposable task;
final AtomicReference<Disposable> upstream;
TimeoutObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Scheduler.Worker worker) {
this.downstream = actual;
this.timeout = timeout;
this.unit = unit;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelFlatMapIterableTest.java
29 issues
Line: 25
public class ParallelFlatMapIterableTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.flatMapIterable(v -> Arrays.asList(1, 2, 3)));
}
@Test
Reported by PMD.
Line: 26
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.flatMapIterable(v -> Arrays.asList(1, 2, 3)));
}
@Test
public void normal() {
Reported by PMD.
Line: 26
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.flatMapIterable(v -> Arrays.asList(1, 2, 3)));
}
@Test
public void normal() {
Reported by PMD.
Line: 31
}
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapIterable(v -> Arrays.asList(v, v + 1))
.sequential()
Reported by PMD.
Line: 33
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapIterable(v -> Arrays.asList(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 33
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapIterable(v -> Arrays.asList(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 33
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapIterable(v -> Arrays.asList(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 33
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapIterable(v -> Arrays.asList(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 33
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapIterable(v -> Arrays.asList(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.
Line: 33
@Test
public void normal() {
for (int i = 1; i < 32; i++) {
Flowable.range(1, 1000)
.parallel(i)
.flatMapIterable(v -> Arrays.asList(v, v + 1))
.sequential()
.test()
.withTag("Parallelism: " + i)
Reported by PMD.