The following issues were found
src/main/java/io/reactivex/rxjava3/internal/util/OpenHashSet.java
14 issues
Line: 107
}
}
boolean removeEntry(int pos, T[] a, int m) {
size--;
int last;
int slot;
T curr;
Reported by PMD.
Line: 107
}
}
boolean removeEntry(int pos, T[] a, int m) {
size--;
int last;
int slot;
T curr;
Reported by PMD.
Line: 30
public final class OpenHashSet<T> {
private static final int INT_PHI = 0x9E3779B9;
final float loadFactor;
int mask;
int size;
int maxSize;
T[] keys;
Reported by PMD.
Line: 31
private static final int INT_PHI = 0x9E3779B9;
final float loadFactor;
int mask;
int size;
int maxSize;
T[] keys;
public OpenHashSet() {
Reported by PMD.
Line: 32
final float loadFactor;
int mask;
int size;
int maxSize;
T[] keys;
public OpenHashSet() {
this(16, 0.75f);
Reported by PMD.
Line: 32
final float loadFactor;
int mask;
int size;
int maxSize;
T[] keys;
public OpenHashSet() {
this(16, 0.75f);
Reported by PMD.
Line: 33
final float loadFactor;
int mask;
int size;
int maxSize;
T[] keys;
public OpenHashSet() {
this(16, 0.75f);
}
Reported by PMD.
Line: 34
int mask;
int size;
int maxSize;
T[] keys;
public OpenHashSet() {
this(16, 0.75f);
}
Reported by PMD.
Line: 34
int mask;
int size;
int maxSize;
T[] keys;
public OpenHashSet() {
this(16, 0.75f);
}
Reported by PMD.
Line: 79
}
}
a[pos] = value;
if (++size >= maxSize) {
rehash();
}
return true;
}
public boolean remove(T value) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/processors/AsyncProcessor.java
14 issues
Line: 117
* </code></pre>
* @param <T> the value type
*/
public final class AsyncProcessor<@NonNull T> extends FlowableProcessor<T> {
@SuppressWarnings("rawtypes")
static final AsyncSubscription[] EMPTY = new AsyncSubscription[0];
@SuppressWarnings("rawtypes")
Reported by PMD.
Line: 125
@SuppressWarnings("rawtypes")
static final AsyncSubscription[] TERMINATED = new AsyncSubscription[0];
final AtomicReference<AsyncSubscription<T>[]> subscribers;
/** Write before updating subscribers, read after reading subscribers as TERMINATED. */
Throwable error;
/** Write before updating subscribers, read after reading subscribers as TERMINATED. */
Reported by PMD.
Line: 128
final AtomicReference<AsyncSubscription<T>[]> subscribers;
/** Write before updating subscribers, read after reading subscribers as TERMINATED. */
Throwable error;
/** Write before updating subscribers, read after reading subscribers as TERMINATED. */
T value;
/**
Reported by PMD.
Line: 131
Throwable error;
/** Write before updating subscribers, read after reading subscribers as TERMINATED. */
T value;
/**
* Creates a new AsyncProcessor.
* @param <T> the value type to be received and emitted
* @return the new AsyncProcessor instance
Reported by PMD.
Line: 148
* Constructs an AsyncProcessor.
* @since 2.0
*/
@SuppressWarnings("unchecked")
AsyncProcessor() {
this.subscribers = new AtomicReference<>(EMPTY);
}
@Override
Reported by PMD.
Line: 180
RxJavaPlugins.onError(t);
return;
}
value = null;
error = t;
for (AsyncSubscription<T> as : subscribers.getAndSet(TERMINATED)) {
as.onError(t);
}
}
Reported by PMD.
Line: 209
@Override
@CheckReturnValue
public boolean hasSubscribers() {
return subscribers.get().length != 0;
}
@Override
@CheckReturnValue
public boolean hasThrowable() {
Reported by PMD.
Line: 269
int n = a.length;
@SuppressWarnings("unchecked")
AsyncSubscription<T>[] b = new AsyncSubscription[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = ps;
if (subscribers.compareAndSet(a, b)) {
return true;
Reported by PMD.
Line: 306
AsyncSubscription<T>[] b;
if (n == 1) {
b = EMPTY;
} else {
b = new AsyncSubscription[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
Reported by PMD.
Line: 309
if (n == 1) {
b = EMPTY;
} else {
b = new AsyncSubscription[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (subscribers.compareAndSet(a, b)) {
return;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/tck/RefCountProcessor.java
14 issues
Line: 31
* @param <T> the upstream and downstream value type
* @since 2.1.8
*/
/* public */final class RefCountProcessor<T> extends FlowableProcessor<T> implements Subscription {
final FlowableProcessor<T> actual;
final AtomicReference<Subscription> upstream;
Reported by PMD.
Line: 33
*/
/* public */final class RefCountProcessor<T> extends FlowableProcessor<T> implements Subscription {
final FlowableProcessor<T> actual;
final AtomicReference<Subscription> upstream;
final AtomicReference<RefCountSubscriber<T>[]> subscribers;
Reported by PMD.
Line: 35
final FlowableProcessor<T> actual;
final AtomicReference<Subscription> upstream;
final AtomicReference<RefCountSubscriber<T>[]> subscribers;
@SuppressWarnings("rawtypes")
static final RefCountSubscriber[] EMPTY = new RefCountSubscriber[0];
Reported by PMD.
Line: 37
final AtomicReference<Subscription> upstream;
final AtomicReference<RefCountSubscriber<T>[]> subscribers;
@SuppressWarnings("rawtypes")
static final RefCountSubscriber[] EMPTY = new RefCountSubscriber[0];
@SuppressWarnings("rawtypes")
Reported by PMD.
Line: 113
@Override
public void request(long n) {
upstream.get().request(n);
}
boolean add(RefCountSubscriber<T> rcs) {
for (;;) {
RefCountSubscriber<T>[] a = subscribers.get();
Reported by PMD.
Line: 124
}
int n = a.length;
@SuppressWarnings("unchecked")
RefCountSubscriber<T>[] b = new RefCountSubscriber[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = rcs;
if (subscribers.compareAndSet(a, b)) {
return true;
}
Reported by PMD.
Line: 155
}
RefCountSubscriber<T>[] b;
if (n == 1) {
b = TERMINATED;
} else {
b = new RefCountSubscriber[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
Reported by PMD.
Line: 158
if (n == 1) {
b = TERMINATED;
} else {
b = new RefCountSubscriber[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (subscribers.compareAndSet(a, b)) {
if (b == TERMINATED) {
Reported by PMD.
Line: 175
private static final long serialVersionUID = -4317488092687530631L;
final Subscriber<? super T> downstream;
final RefCountProcessor<T> parent;
Subscription upstream;
Reported by PMD.
Line: 177
final Subscriber<? super T> downstream;
final RefCountProcessor<T> parent;
Subscription upstream;
RefCountSubscriber(Subscriber<? super T> actual, RefCountProcessor<T> parent) {
this.downstream = actual;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/testsupport/TestObserverEx.java
14 issues
Line: 115
}
@Override
public void onNext(T t) {
if (!checkSubscriptionOnce) {
checkSubscriptionOnce = true;
if (upstream.get() == null) {
errors.add(new IllegalStateException("onSubscribe not called in proper order"));
}
Reported by PMD.
Line: 35
*/
public class TestObserverEx<T>
extends BaseTestConsumerEx<T, TestObserverEx<T>>
implements Observer<T>, Disposable, MaybeObserver<T>, SingleObserver<T>, CompletableObserver {
/** The actual observer to forward events to. */
private final Observer<? super T> downstream;
/** Holds the current subscription if any. */
private final AtomicReference<Disposable> upstream = new AtomicReference<>();
Reported by PMD.
Line: 37
extends BaseTestConsumerEx<T, TestObserverEx<T>>
implements Observer<T>, Disposable, MaybeObserver<T>, SingleObserver<T>, CompletableObserver {
/** The actual observer to forward events to. */
private final Observer<? super T> downstream;
/** Holds the current subscription if any. */
private final AtomicReference<Disposable> upstream = new AtomicReference<>();
private QueueDisposable<T> qd;
Reported by PMD.
Line: 40
private final Observer<? super T> downstream;
/** Holds the current subscription if any. */
private final AtomicReference<Disposable> upstream = new AtomicReference<>();
private QueueDisposable<T> qd;
/**
* Constructs a non-forwarding TestObserver.
Reported by PMD.
Line: 42
/** Holds the current subscription if any. */
private final AtomicReference<Disposable> upstream = new AtomicReference<>();
private QueueDisposable<T> qd;
/**
* Constructs a non-forwarding TestObserver.
*/
public TestObserverEx() {
Reported by PMD.
Line: 86
}
if (initialFusionMode != 0) {
if (d instanceof QueueDisposable) {
qd = (QueueDisposable<T>)d;
int m = qd.requestFusion(initialFusionMode);
establishedFusionMode = m;
Reported by PMD.
Line: 92
int m = qd.requestFusion(initialFusionMode);
establishedFusionMode = m;
if (m == QueueFuseable.SYNC) {
checkSubscriptionOnce = true;
lastThread = Thread.currentThread();
try {
T t;
while ((t = qd.poll()) != null) {
Reported by PMD.
Line: 97
lastThread = Thread.currentThread();
try {
T t;
while ((t = qd.poll()) != null) {
values.add(t);
}
completions++;
upstream.lazySet(DisposableHelper.DISPOSED);
Reported by PMD.
Line: 103
completions++;
upstream.lazySet(DisposableHelper.DISPOSED);
} catch (Throwable ex) {
errors.add(ex);
}
return;
}
}
Reported by PMD.
Line: 127
if (establishedFusionMode == QueueFuseable.ASYNC) {
try {
while ((t = qd.poll()) != null) {
values.add(t);
}
} catch (Throwable ex) {
errors.add(ex);
qd.dispose();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java
14 issues
Line: 44
.blockingForEach(new Consumer<GroupedFlowable<Object, Event>>() {
@Override
public void accept(GroupedFlowable<Object, Event> v) {
System.out.println(v);
v.take(1).subscribe(); // FIXME groups need consumption to a certain degree to cancel upstream
}
});
System.out.println("**** finished");
Reported by PMD.
Line: 49
}
});
System.out.println("**** finished");
}
@Test
public void takeUnsubscribesOnFlatMapOfGroupBy() {
Flowable.merge(
Reported by PMD.
Line: 80
.blockingForEach(new Consumer<Object>() {
@Override
public void accept(Object v) {
System.out.println(v);
}
});
System.out.println("**** finished");
}
Reported by PMD.
Line: 84
}
});
System.out.println("**** finished");
}
@Test
public void groupsCompleteAsSoonAsMainCompletes() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Reported by PMD.
Line: 28
public class FlowableGroupByTests extends RxJavaTest {
@Test
public void takeUnsubscribesOnGroupBy() {
Flowable.merge(
FlowableEventStream.getEventStream("HTTP-ClusterA", 50),
FlowableEventStream.getEventStream("HTTP-ClusterB", 20)
)
// group by type (2 clusters)
Reported by PMD.
Line: 45
@Override
public void accept(GroupedFlowable<Object, Event> v) {
System.out.println(v);
v.take(1).subscribe(); // FIXME groups need consumption to a certain degree to cancel upstream
}
});
System.out.println("**** finished");
}
Reported by PMD.
Line: 53
}
@Test
public void takeUnsubscribesOnFlatMapOfGroupBy() {
Flowable.merge(
FlowableEventStream.getEventStream("HTTP-ClusterA", 50),
FlowableEventStream.getEventStream("HTTP-ClusterB", 20)
)
// group by type (2 clusters)
Reported by PMD.
Line: 71
return g.map(new Function<Event, Object>() {
@Override
public Object apply(Event event) {
return event.instanceId + " - " + event.values.get("count200");
}
});
}
})
.take(20)
Reported by PMD.
Line: 88
}
@Test
public void groupsCompleteAsSoonAsMainCompletes() {
TestSubscriber<Integer> ts = TestSubscriber.create();
Flowable.range(0, 20)
.groupBy(new Function<Integer, Integer>() {
@Override
Reported by PMD.
Line: 108
// Behavior change: this now counts as group abandonment because concatMap
// doesn't subscribe to the 2nd+ emitted groups immediately
ts.assertValues(
0, 5, 10, 15, // First group is okay
// any other group gets abandoned so we get 16 one-element group
1, 2, 3, 4, 6, 7, 8, 9, 11, 12, 13, 14, 16, 17, 18, 19
);
ts.assertComplete();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/Burst.java
14 issues
Line: 34
*/
public final class Burst<T> extends Flowable<T> {
final List<T> items;
final Throwable error;
Burst(Throwable error, List<T> items) {
if (items.isEmpty()) {
throw new IllegalArgumentException("items cannot be empty");
Reported by PMD.
Line: 34
*/
public final class Burst<T> extends Flowable<T> {
final List<T> items;
final Throwable error;
Burst(Throwable error, List<T> items) {
if (items.isEmpty()) {
throw new IllegalArgumentException("items cannot be empty");
Reported by PMD.
Line: 35
public final class Burst<T> extends Flowable<T> {
final List<T> items;
final Throwable error;
Burst(Throwable error, List<T> items) {
if (items.isEmpty()) {
throw new IllegalArgumentException("items cannot be empty");
}
Reported by PMD.
Line: 66
}
final class BurstSubscription implements Subscription {
private final Subscriber<? super T> subscriber;
final Queue<T> q = new ConcurrentLinkedQueue<>(items);
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled;
BurstSubscription(Subscriber<? super T> subscriber) {
Reported by PMD.
Line: 67
final class BurstSubscription implements Subscription {
private final Subscriber<? super T> subscriber;
final Queue<T> q = new ConcurrentLinkedQueue<>(items);
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled;
BurstSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
Reported by PMD.
Line: 68
final class BurstSubscription implements Subscription {
private final Subscriber<? super T> subscriber;
final Queue<T> q = new ConcurrentLinkedQueue<>(items);
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled;
BurstSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}
Reported by PMD.
Line: 69
private final Subscriber<? super T> subscriber;
final Queue<T> q = new ConcurrentLinkedQueue<>(items);
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled;
BurstSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}
Reported by PMD.
Line: 84
if (SubscriptionHelper.validate(n)) {
// just for testing, don't care about perf
// so no attempt made to reduce volatile reads
if (BackpressureHelper.add(requested, n) == 0) {
if (q.isEmpty()) {
return;
}
while (!q.isEmpty() && requested.get() > 0) {
T item = q.poll();
Reported by PMD.
Line: 85
// just for testing, don't care about perf
// so no attempt made to reduce volatile reads
if (BackpressureHelper.add(requested, n) == 0) {
if (q.isEmpty()) {
return;
}
while (!q.isEmpty() && requested.get() > 0) {
T item = q.poll();
requested.decrementAndGet();
Reported by PMD.
Line: 94
subscriber.onNext(item);
}
if (q.isEmpty()) {
if (error != null) {
subscriber.onError(error);
} else {
subscriber.onComplete();
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromPubisherTest.java
14 issues
Line: 24
public class MaybeFromPubisherTest extends RxJavaTest {
@Test
public void empty() {
Maybe.fromPublisher(Flowable.empty().hide())
.test()
.assertResult();
}
Reported by PMD.
Line: 25
@Test
public void empty() {
Maybe.fromPublisher(Flowable.empty().hide())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 25
@Test
public void empty() {
Maybe.fromPublisher(Flowable.empty().hide())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 25
@Test
public void empty() {
Maybe.fromPublisher(Flowable.empty().hide())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 31
}
@Test
public void just() {
Maybe.fromPublisher(Flowable.just(1).hide())
.test()
.assertResult(1);
}
Reported by PMD.
Line: 32
@Test
public void just() {
Maybe.fromPublisher(Flowable.just(1).hide())
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 32
@Test
public void just() {
Maybe.fromPublisher(Flowable.just(1).hide())
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 32
@Test
public void just() {
Maybe.fromPublisher(Flowable.just(1).hide())
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 38
}
@Test
public void range() {
Maybe.fromPublisher(Flowable.range(1, 5).hide())
.test()
.assertResult(1);
}
Reported by PMD.
Line: 39
@Test
public void range() {
Maybe.fromPublisher(Flowable.range(1, 5).hide())
.test()
.assertResult(1);
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromObservableTest.java
14 issues
Line: 24
public class MaybeFromObservableTest extends RxJavaTest {
@Test
public void empty() {
Maybe.fromObservable(Observable.empty().hide())
.test()
.assertResult();
}
Reported by PMD.
Line: 25
@Test
public void empty() {
Maybe.fromObservable(Observable.empty().hide())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 25
@Test
public void empty() {
Maybe.fromObservable(Observable.empty().hide())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 25
@Test
public void empty() {
Maybe.fromObservable(Observable.empty().hide())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 31
}
@Test
public void just() {
Maybe.fromObservable(Observable.just(1).hide())
.test()
.assertResult(1);
}
Reported by PMD.
Line: 32
@Test
public void just() {
Maybe.fromObservable(Observable.just(1).hide())
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 32
@Test
public void just() {
Maybe.fromObservable(Observable.just(1).hide())
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 32
@Test
public void just() {
Maybe.fromObservable(Observable.just(1).hide())
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 38
}
@Test
public void range() {
Maybe.fromObservable(Observable.range(1, 5).hide())
.test()
.assertResult(1);
}
Reported by PMD.
Line: 39
@Test
public void range() {
Maybe.fromObservable(Observable.range(1, 5).hide())
.test()
.assertResult(1);
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeSafeSubscribeTest.java
14 issues
Line: 34
public class MaybeSafeSubscribeTest {
@Test
public void normalSuccess() throws Throwable {
TestHelper.withErrorTracking(errors -> {
@SuppressWarnings("unchecked")
MaybeObserver<Integer> consumer = mock(MaybeObserver.class);
Maybe.just(1)
Reported by PMD.
Line: 36
@Test
public void normalSuccess() throws Throwable {
TestHelper.withErrorTracking(errors -> {
@SuppressWarnings("unchecked")
MaybeObserver<Integer> consumer = mock(MaybeObserver.class);
Maybe.just(1)
.safeSubscribe(consumer);
Reported by PMD.
Line: 47
order.verify(consumer).onSuccess(1);
order.verifyNoMoreInteractions();
assertTrue("" + errors, errors.isEmpty());
});
}
@Test
public void normalError() throws Throwable {
Reported by PMD.
Line: 52
}
@Test
public void normalError() throws Throwable {
TestHelper.withErrorTracking(errors -> {
@SuppressWarnings("unchecked")
MaybeObserver<Integer> consumer = mock(MaybeObserver.class);
Maybe.<Integer>error(new TestException())
Reported by PMD.
Line: 65
order.verify(consumer).onError(any(TestException.class));
order.verifyNoMoreInteractions();
assertTrue("" + errors, errors.isEmpty());
});
}
@Test
public void normalEmpty() throws Throwable {
Reported by PMD.
Line: 70
}
@Test
public void normalEmpty() throws Throwable {
TestHelper.withErrorTracking(errors -> {
@SuppressWarnings("unchecked")
MaybeObserver<Integer> consumer = mock(MaybeObserver.class);
Maybe.<Integer>empty()
Reported by PMD.
Line: 86
}
@Test
public void onSubscribeCrash() throws Throwable {
TestHelper.withErrorTracking(errors -> {
@SuppressWarnings("unchecked")
MaybeObserver<Integer> consumer = mock(MaybeObserver.class);
doThrow(new TestException()).when(consumer).onSubscribe(any());
Reported by PMD.
Line: 110
order.verify(consumer).onSubscribe(any(Disposable.class));
order.verifyNoMoreInteractions();
assertTrue(d.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
TestHelper.assertUndeliverable(errors, 1, IOException.class);
});
}
Reported by PMD.
Line: 118
}
@Test
public void onSuccessCrash() throws Throwable {
TestHelper.withErrorTracking(errors -> {
@SuppressWarnings("unchecked")
MaybeObserver<Integer> consumer = mock(MaybeObserver.class);
doThrow(new TestException()).when(consumer).onSuccess(any());
Reported by PMD.
Line: 143
}
@Test
public void onErrorCrash() throws Throwable {
TestHelper.withErrorTracking(errors -> {
@SuppressWarnings("unchecked")
MaybeObserver<Integer> consumer = mock(MaybeObserver.class);
doThrow(new TestException()).when(consumer).onError(any());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableMergeIterableTest.java
13 issues
Line: 33
public class CompletableMergeIterableTest extends RxJavaTest {
@Test
public void errorRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final PublishSubject<Integer> ps1 = PublishSubject.create();
final PublishSubject<Integer> ps2 = PublishSubject.create();
Reported by PMD.
Line: 40
final PublishSubject<Integer> ps1 = PublishSubject.create();
final PublishSubject<Integer> ps2 = PublishSubject.create();
TestObserver<Void> to = Completable.merge(
Arrays.asList(ps1.ignoreElements(), ps2.ignoreElements())).test();
final TestException ex = new TestException();
Runnable r1 = new Runnable() {
Reported by PMD.
Line: 41
final PublishSubject<Integer> ps2 = PublishSubject.create();
TestObserver<Void> to = Completable.merge(
Arrays.asList(ps1.ignoreElements(), ps2.ignoreElements())).test();
final TestException ex = new TestException();
Runnable r1 = new Runnable() {
@Override
Reported by PMD.
Line: 41
final PublishSubject<Integer> ps2 = PublishSubject.create();
TestObserver<Void> to = Completable.merge(
Arrays.asList(ps1.ignoreElements(), ps2.ignoreElements())).test();
final TestException ex = new TestException();
Runnable r1 = new Runnable() {
@Override
Reported by PMD.
Line: 43
TestObserver<Void> to = Completable.merge(
Arrays.asList(ps1.ignoreElements(), ps2.ignoreElements())).test();
final TestException ex = new TestException();
Runnable r1 = new Runnable() {
@Override
public void run() {
ps1.onError(ex);
Reported by PMD.
Line: 45
final TestException ex = new TestException();
Runnable r1 = new Runnable() {
@Override
public void run() {
ps1.onError(ex);
}
};
Reported by PMD.
Line: 51
ps1.onError(ex);
}
};
Runnable r2 = new Runnable() {
@Override
public void run() {
ps2.onError(ex);
}
};
Reported by PMD.
Line: 60
TestHelper.race(r1, r2);
to.assertFailure(TestException.class);
if (!errors.isEmpty()) {
TestHelper.assertUndeliverable(errors, 0, TestException.class);
}
} finally {
Reported by PMD.
Line: 62
to.assertFailure(TestException.class);
if (!errors.isEmpty()) {
TestHelper.assertUndeliverable(errors, 0, TestException.class);
}
} finally {
RxJavaPlugins.reset();
}
Reported by PMD.
Line: 72
}
@Test
public void cancelAfterHasNext() {
final TestObserver<Void> to = new TestObserver<>();
Completable.merge(new Iterable<Completable>() {
@Override
public Iterator<Completable> iterator() {
Reported by PMD.