The following issues were found
src/main/java/io/reactivex/rxjava3/subscribers/TestSubscriber.java
7 issues
Line: 38
*/
public class TestSubscriber<T>
extends BaseTestConsumer<T, TestSubscriber<T>>
implements FlowableSubscriber<T>, Subscription {
/** The actual subscriber to forward events to. */
private final Subscriber<? super T> downstream;
/** Makes sure the incoming Subscriptions get cancelled immediately. */
private volatile boolean cancelled;
Reported by PMD.
Line: 40
extends BaseTestConsumer<T, TestSubscriber<T>>
implements FlowableSubscriber<T>, Subscription {
/** The actual subscriber to forward events to. */
private final Subscriber<? super T> downstream;
/** Makes sure the incoming Subscriptions get cancelled immediately. */
private volatile boolean cancelled;
/** Holds the current subscription if any. */
Reported by PMD.
Line: 43
private final Subscriber<? super T> downstream;
/** Makes sure the incoming Subscriptions get cancelled immediately. */
private volatile boolean cancelled;
/** Holds the current subscription if any. */
private final AtomicReference<Subscription> upstream;
/** Holds the requested amount until a subscription arrives. */
Reported by PMD.
Line: 46
private volatile boolean cancelled;
/** Holds the current subscription if any. */
private final AtomicReference<Subscription> upstream;
/** Holds the requested amount until a subscription arrives. */
private final AtomicLong missedRequested;
/**
Reported by PMD.
Line: 49
private final AtomicReference<Subscription> upstream;
/** Holds the requested amount until a subscription arrives. */
private final AtomicLong missedRequested;
/**
* Creates a {@code TestSubscriber} with {@link Long#MAX_VALUE} initial request amount.
* @param <T> the value type
* @return the new {@code TestSubscriber} instance.
Reported by PMD.
Line: 145
downstream.onSubscribe(s);
long mr = missedRequested.getAndSet(0L);
if (mr != 0L) {
s.request(mr);
}
onStart();
}
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.*;
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
import io.reactivex.rxjava3.observers.BaseTestConsumer;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/subjects/SerializedSubject.java
7 issues
Line: 31
*/
/* public */ final class SerializedSubject<T> extends Subject<T> implements NonThrowingPredicate<Object> {
/** The actual subscriber to serialize Subscriber calls to. */
final Subject<T> actual;
/** Indicates an emission is going on, guarded by this. */
boolean emitting;
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;
/** Indicates a terminal event has been received and all further events will be dropped. */
Reported by PMD.
Line: 33
/** The actual subscriber to serialize Subscriber calls to. */
final Subject<T> actual;
/** Indicates an emission is going on, guarded by this. */
boolean emitting;
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;
/** Indicates a terminal event has been received and all further events will be dropped. */
volatile boolean done;
Reported by PMD.
Line: 35
/** Indicates an emission is going on, guarded by this. */
boolean emitting;
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;
/** Indicates a terminal event has been received and all further events will be dropped. */
volatile boolean done;
/**
* Constructor that wraps an actual subject.
Reported by PMD.
Line: 37
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;
/** Indicates a terminal event has been received and all further events will be dropped. */
volatile boolean done;
/**
* Constructor that wraps an actual subject.
* @param actual the subject wrapped
*/
Reported by PMD.
Line: 174
emitting = false;
return;
}
queue = null;
}
q.forEachWhile(this);
}
}
Reported by PMD.
Line: 176
}
queue = null;
}
q.forEachWhile(this);
}
}
@Override
public boolean test(Object o) {
Reported by PMD.
Line: 19
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.internal.util.AppendOnlyLinkedArrayList.NonThrowingPredicate;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
* Serializes calls to the Observer methods.
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleMapTest.java
7 issues
Line: 59
Single.just(1).map(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(final Integer integer) throws Exception {
throw new RuntimeException("something went terribly wrong!");
}
})
.to(TestHelper.<SingleSource<Integer>>testConsumer())
.assertNoValues()
.assertError(RuntimeException.class)
Reported by PMD.
Line: 25
public class SingleMapTest extends RxJavaTest {
@Test
public void mapValue() {
Single.just(1).map(new Function<Integer, Integer>() {
@Override
public Integer apply(final Integer integer) throws Exception {
if (integer == 1) {
return 2;
Reported by PMD.
Line: 29
Single.just(1).map(new Function<Integer, Integer>() {
@Override
public Integer apply(final Integer integer) throws Exception {
if (integer == 1) {
return 2;
}
return 1;
}
Reported by PMD.
Line: 41
}
@Test
public void mapValueNull() {
Single.just(1).map(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(final Integer integer) throws Exception {
return null;
}
Reported by PMD.
Line: 55
}
@Test
public void mapValueErrorThrown() {
Single.just(1).map(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(final Integer integer) throws Exception {
throw new RuntimeException("something went terribly wrong!");
}
Reported by PMD.
Line: 69
}
@Test
public void mapError() {
RuntimeException exception = new RuntimeException("test");
Single.error(exception).map(new Function<Object, Object>() {
@Override
public Object apply(final Object integer) throws Exception {
Reported by PMD.
Line: 18
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class SingleMapTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleSubscribeOnTest.java
7 issues
Line: 39
try {
TestScheduler scheduler = new TestScheduler();
TestObserver<Integer> to = Single.just(1)
.subscribeOn(scheduler)
.test();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Reported by PMD.
Line: 39
try {
TestScheduler scheduler = new TestScheduler();
TestObserver<Integer> to = Single.just(1)
.subscribeOn(scheduler)
.test();
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
Reported by PMD.
Line: 45
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
to.assertResult(1);
assertTrue(list.toString(), list.isEmpty());
} finally {
RxJavaPlugins.reset();
}
Reported by PMD.
Line: 54
}
@Test
public void dispose() {
TestHelper.checkDisposed(PublishSubject.create().singleOrError().subscribeOn(new TestScheduler()));
}
@Test
public void error() {
Reported by PMD.
Line: 59
}
@Test
public void error() {
Single.error(new TestException())
.subscribeOn(Schedulers.single())
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
Reported by PMD.
Line: 23
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.*;
import io.reactivex.rxjava3.subjects.PublishSubject;
Reported by PMD.
Line: 27
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.*;
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class SingleSubscribeOnTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/schedulers/RxThreadFactoryTest.java
7 issues
Line: 26
public class RxThreadFactoryTest extends RxJavaTest {
@Test
public void normal() {
RxThreadFactory tf = new RxThreadFactory("Test", 1);
assertEquals("RxThreadFactory[Test]", tf.toString());
Thread t = tf.newThread(Functions.EMPTY_RUNNABLE);
Reported by PMD.
Line: 29
public void normal() {
RxThreadFactory tf = new RxThreadFactory("Test", 1);
assertEquals("RxThreadFactory[Test]", tf.toString());
Thread t = tf.newThread(Functions.EMPTY_RUNNABLE);
assertTrue(t.isDaemon());
assertEquals(1, t.getPriority());
Reported by PMD.
Line: 33
Thread t = tf.newThread(Functions.EMPTY_RUNNABLE);
assertTrue(t.isDaemon());
assertEquals(1, t.getPriority());
}
}
Reported by PMD.
Line: 33
Thread t = tf.newThread(Functions.EMPTY_RUNNABLE);
assertTrue(t.isDaemon());
assertEquals(1, t.getPriority());
}
}
Reported by PMD.
Line: 34
Thread t = tf.newThread(Functions.EMPTY_RUNNABLE);
assertTrue(t.isDaemon());
assertEquals(1, t.getPriority());
}
}
Reported by PMD.
Line: 34
Thread t = tf.newThread(Functions.EMPTY_RUNNABLE);
assertTrue(t.isDaemon());
assertEquals(1, t.getPriority());
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.schedulers;
import static org.junit.Assert.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.RxJavaTest;
import io.reactivex.rxjava3.internal.functions.Functions;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/schedulers/ComputationSchedulerInternalTest.java
7 issues
Line: 25
public class ComputationSchedulerInternalTest extends RxJavaTest {
@Test
public void capPoolSize() {
assertEquals(8, ComputationScheduler.cap(8, -1));
assertEquals(8, ComputationScheduler.cap(8, 0));
assertEquals(4, ComputationScheduler.cap(8, 4));
assertEquals(8, ComputationScheduler.cap(8, 8));
assertEquals(8, ComputationScheduler.cap(8, 9));
Reported by PMD.
Line: 26
@Test
public void capPoolSize() {
assertEquals(8, ComputationScheduler.cap(8, -1));
assertEquals(8, ComputationScheduler.cap(8, 0));
assertEquals(4, ComputationScheduler.cap(8, 4));
assertEquals(8, ComputationScheduler.cap(8, 8));
assertEquals(8, ComputationScheduler.cap(8, 9));
assertEquals(8, ComputationScheduler.cap(8, 16));
Reported by PMD.
Line: 27
@Test
public void capPoolSize() {
assertEquals(8, ComputationScheduler.cap(8, -1));
assertEquals(8, ComputationScheduler.cap(8, 0));
assertEquals(4, ComputationScheduler.cap(8, 4));
assertEquals(8, ComputationScheduler.cap(8, 8));
assertEquals(8, ComputationScheduler.cap(8, 9));
assertEquals(8, ComputationScheduler.cap(8, 16));
}
Reported by PMD.
Line: 28
public void capPoolSize() {
assertEquals(8, ComputationScheduler.cap(8, -1));
assertEquals(8, ComputationScheduler.cap(8, 0));
assertEquals(4, ComputationScheduler.cap(8, 4));
assertEquals(8, ComputationScheduler.cap(8, 8));
assertEquals(8, ComputationScheduler.cap(8, 9));
assertEquals(8, ComputationScheduler.cap(8, 16));
}
}
Reported by PMD.
Line: 29
assertEquals(8, ComputationScheduler.cap(8, -1));
assertEquals(8, ComputationScheduler.cap(8, 0));
assertEquals(4, ComputationScheduler.cap(8, 4));
assertEquals(8, ComputationScheduler.cap(8, 8));
assertEquals(8, ComputationScheduler.cap(8, 9));
assertEquals(8, ComputationScheduler.cap(8, 16));
}
}
Reported by PMD.
Line: 30
assertEquals(8, ComputationScheduler.cap(8, 0));
assertEquals(4, ComputationScheduler.cap(8, 4));
assertEquals(8, ComputationScheduler.cap(8, 8));
assertEquals(8, ComputationScheduler.cap(8, 9));
assertEquals(8, ComputationScheduler.cap(8, 16));
}
}
Reported by PMD.
Line: 31
assertEquals(4, ComputationScheduler.cap(8, 4));
assertEquals(8, ComputationScheduler.cap(8, 8));
assertEquals(8, ComputationScheduler.cap(8, 9));
assertEquals(8, ComputationScheduler.cap(8, 16));
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/tck/BaseTck.java
7 issues
Line: 80
}
static final class FiniteRange implements Iterable<Long> {
final long end;
FiniteRange(long end) {
this.end = end;
}
@Override
Reported by PMD.
Line: 91
}
static final class FiniteRangeIterator implements Iterator<Long> {
final long end;
long count;
FiniteRangeIterator(long end) {
this.end = end;
}
Reported by PMD.
Line: 92
static final class FiniteRangeIterator implements Iterator<Long> {
final long end;
long count;
FiniteRangeIterator(long end) {
this.end = end;
}
Reported by PMD.
Line: 127
}
static final class InfiniteRangeIterator implements Iterator<Long> {
long count;
@Override
public boolean hasNext() {
return true;
}
Reported by PMD.
Line: 19
import java.util.*;
import org.reactivestreams.Publisher;
import org.reactivestreams.tck.*;
import org.testng.annotations.Test;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.exceptions.TestException;
Reported by PMD.
Line: 72
* @return the array
*/
protected Long[] array(long elements) {
Long[] a = new Long[(int)elements];
for (int i = 0; i < elements; i++) {
a[i] = (long)i;
}
return a;
}
Reported by PMD.
Line: 74
protected Long[] array(long elements) {
Long[] a = new Long[(int)elements];
for (int i = 0; i < elements; i++) {
a[i] = (long)i;
}
return a;
}
static final class FiniteRange implements Iterable<Long> {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorComplete.java
7 issues
Line: 23
public final class CompletableOnErrorComplete extends Completable {
final CompletableSource source;
final Predicate<? super Throwable> predicate;
public CompletableOnErrorComplete(CompletableSource source, Predicate<? super Throwable> predicate) {
this.source = source;
Reported by PMD.
Line: 25
final CompletableSource source;
final Predicate<? super Throwable> predicate;
public CompletableOnErrorComplete(CompletableSource source, Predicate<? super Throwable> predicate) {
this.source = source;
this.predicate = predicate;
}
Reported by PMD.
Line: 40
final class OnError implements CompletableObserver {
private final CompletableObserver downstream;
OnError(CompletableObserver observer) {
this.downstream = observer;
}
Reported by PMD.
Line: 57
try {
b = predicate.test(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(e, ex));
return;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.completable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Predicate;
public final class CompletableOnErrorComplete extends Completable {
Reported by PMD.
Line: 18
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Predicate;
public final class CompletableOnErrorComplete extends Completable {
final CompletableSource source;
Reported by PMD.
Line: 56
boolean b;
try {
b = predicate.test(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(e, ex));
return;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeContains.java
7 issues
Line: 30
*/
public final class MaybeContains<T> extends Single<Boolean> implements HasUpstreamMaybeSource<T> {
final MaybeSource<T> source;
final Object value;
public MaybeContains(MaybeSource<T> source, Object value) {
this.source = source;
Reported by PMD.
Line: 30
*/
public final class MaybeContains<T> extends Single<Boolean> implements HasUpstreamMaybeSource<T> {
final MaybeSource<T> source;
final Object value;
public MaybeContains(MaybeSource<T> source, Object value) {
this.source = source;
Reported by PMD.
Line: 32
final MaybeSource<T> source;
final Object value;
public MaybeContains(MaybeSource<T> source, Object value) {
this.source = source;
this.value = value;
}
Reported by PMD.
Line: 51
static final class ContainsMaybeObserver implements MaybeObserver<Object>, Disposable {
final SingleObserver<? super Boolean> downstream;
final Object value;
Disposable upstream;
Reported by PMD.
Line: 53
final SingleObserver<? super Boolean> downstream;
final Object value;
Disposable upstream;
ContainsMaybeObserver(SingleObserver<? super Boolean> actual, Object value) {
this.downstream = actual;
Reported by PMD.
Line: 55
final Object value;
Disposable upstream;
ContainsMaybeObserver(SingleObserver<? super Boolean> actual, Object value) {
this.downstream = actual;
this.value = value;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.fuseable.HasUpstreamMaybeSource;
import java.util.Objects;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapCompletable.java
7 issues
Line: 31
*/
public final class SingleFlatMapCompletable<T> extends Completable {
final SingleSource<T> source;
final Function<? super T, ? extends CompletableSource> mapper;
public SingleFlatMapCompletable(SingleSource<T> source, Function<? super T, ? extends CompletableSource> mapper) {
this.source = source;
Reported by PMD.
Line: 33
final SingleSource<T> source;
final Function<? super T, ? extends CompletableSource> mapper;
public SingleFlatMapCompletable(SingleSource<T> source, Function<? super T, ? extends CompletableSource> mapper) {
this.source = source;
this.mapper = mapper;
}
Reported by PMD.
Line: 53
private static final long serialVersionUID = -2177128922851101253L;
final CompletableObserver downstream;
final Function<? super T, ? extends CompletableSource> mapper;
FlatMapCompletableObserver(CompletableObserver actual,
Function<? super T, ? extends CompletableSource> mapper) {
Reported by PMD.
Line: 55
final CompletableObserver downstream;
final Function<? super T, ? extends CompletableSource> mapper;
FlatMapCompletableObserver(CompletableObserver actual,
Function<? super T, ? extends CompletableSource> mapper) {
this.downstream = actual;
this.mapper = mapper;
Reported by PMD.
Line: 84
try {
cs = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onError(ex);
return;
}
Reported by PMD.
Line: 19
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
Reported by PMD.
Line: 83
CompletableSource cs;
try {
cs = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onError(ex);
return;
}
Reported by PMD.