The following issues were found
src/test/java/io/reactivex/rxjava3/internal/subscribers/StrictSubscriberTest.java
35 issues
Line: 52
@Override
public void onComplete() {
list.add("Done");
}
};
new Flowable<Object>() {
@Override
Reported by PMD.
Line: 68
}
static final class SubscriberWrapper<T> implements Subscriber<T> {
final TestSubscriberEx<T> tester;
SubscriberWrapper(TestSubscriberEx<T> tester) {
this.tester = tester;
}
Reported by PMD.
Line: 96
}
@Test
public void normalOnNext() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
SubscriberWrapper<Integer> wrapper = new SubscriberWrapper<>(ts);
Flowable.range(1, 5).subscribe(wrapper);
Reported by PMD.
Line: 100
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
SubscriberWrapper<Integer> wrapper = new SubscriberWrapper<>(ts);
Flowable.range(1, 5).subscribe(wrapper);
ts.assertResult(1, 2, 3, 4, 5);
}
@Test
Reported by PMD.
Line: 106
}
@Test
public void normalOnNextBackpressured() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0);
SubscriberWrapper<Integer> wrapper = new SubscriberWrapper<>(ts);
Flowable.range(1, 5).subscribe(wrapper);
Reported by PMD.
Line: 110
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(0);
SubscriberWrapper<Integer> wrapper = new SubscriberWrapper<>(ts);
Flowable.range(1, 5).subscribe(wrapper);
ts.assertEmpty()
.requestMore(1)
.assertValue(1)
.requestMore(2)
Reported by PMD.
Line: 112
Flowable.range(1, 5).subscribe(wrapper);
ts.assertEmpty()
.requestMore(1)
.assertValue(1)
.requestMore(2)
.assertValues(1, 2, 3)
.requestMore(2)
Reported by PMD.
Line: 112
Flowable.range(1, 5).subscribe(wrapper);
ts.assertEmpty()
.requestMore(1)
.assertValue(1)
.requestMore(2)
.assertValues(1, 2, 3)
.requestMore(2)
Reported by PMD.
Line: 112
Flowable.range(1, 5).subscribe(wrapper);
ts.assertEmpty()
.requestMore(1)
.assertValue(1)
.requestMore(2)
.assertValues(1, 2, 3)
.requestMore(2)
Reported by PMD.
Line: 112
Flowable.range(1, 5).subscribe(wrapper);
ts.assertEmpty()
.requestMore(1)
.assertValue(1)
.requestMore(2)
.assertValues(1, 2, 3)
.requestMore(2)
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeUnsubscribeOnTest.java
35 issues
Line: 33
public class MaybeUnsubscribeOnTest extends RxJavaTest {
@Test
public void normal() throws Exception {
PublishProcessor<Integer> pp = PublishProcessor.create();
final String[] name = { null };
final CountDownLatch cdl = new CountDownLatch(1);
Reported by PMD.
Line: 33
public class MaybeUnsubscribeOnTest extends RxJavaTest {
@Test
public void normal() throws Exception {
PublishProcessor<Integer> pp = PublishProcessor.create();
final String[] name = { null };
final CountDownLatch cdl = new CountDownLatch(1);
Reported by PMD.
Line: 43
pp.doOnCancel(new Action() {
@Override
public void run() throws Exception {
name[0] = Thread.currentThread().getName();
cdl.countDown();
}
})
.singleElement()
.unsubscribeOn(Schedulers.single())
Reported by PMD.
Line: 52
.test(true)
;
assertTrue(cdl.await(5, TimeUnit.SECONDS));
int times = 10;
while (times-- > 0 && pp.hasSubscribers()) {
Thread.sleep(100);
Reported by PMD.
Line: 56
int times = 10;
while (times-- > 0 && pp.hasSubscribers()) {
Thread.sleep(100);
}
assertFalse(pp.hasSubscribers());
Reported by PMD.
Line: 56
int times = 10;
while (times-- > 0 && pp.hasSubscribers()) {
Thread.sleep(100);
}
assertFalse(pp.hasSubscribers());
Reported by PMD.
Line: 60
Thread.sleep(100);
}
assertFalse(pp.hasSubscribers());
assertNotEquals(Thread.currentThread().getName(), name[0]);
}
@Test
Reported by PMD.
Line: 60
Thread.sleep(100);
}
assertFalse(pp.hasSubscribers());
assertNotEquals(Thread.currentThread().getName(), name[0]);
}
@Test
Reported by PMD.
Line: 62
assertFalse(pp.hasSubscribers());
assertNotEquals(Thread.currentThread().getName(), name[0]);
}
@Test
public void just() {
Maybe.just(1)
Reported by PMD.
Line: 66
}
@Test
public void just() {
Maybe.just(1)
.unsubscribeOn(Schedulers.single())
.test()
.assertResult(1);
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java
35 issues
Line: 30
import io.reactivex.rxjava3.observables.GroupedObservable;
public final class ObservableGroupBy<T, K, V> extends AbstractObservableWithUpstream<T, GroupedObservable<K, V>> {
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
public ObservableGroupBy(ObservableSource<T> source,
Reported by PMD.
Line: 31
public final class ObservableGroupBy<T, K, V> extends AbstractObservableWithUpstream<T, GroupedObservable<K, V>> {
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
public ObservableGroupBy(ObservableSource<T> source,
Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector,
Reported by PMD.
Line: 32
public final class ObservableGroupBy<T, K, V> extends AbstractObservableWithUpstream<T, GroupedObservable<K, V>> {
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
public ObservableGroupBy(ObservableSource<T> source,
Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector,
int bufferSize, boolean delayError) {
Reported by PMD.
Line: 33
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
public ObservableGroupBy(ObservableSource<T> source,
Function<? super T, ? extends K> keySelector, Function<? super T, ? extends V> valueSelector,
int bufferSize, boolean delayError) {
super(source);
Reported by PMD.
Line: 54
private static final long serialVersionUID = -3688291656102519502L;
final Observer<? super GroupedObservable<K, V>> downstream;
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;
Reported by PMD.
Line: 55
private static final long serialVersionUID = -3688291656102519502L;
final Observer<? super GroupedObservable<K, V>> downstream;
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;
Reported by PMD.
Line: 56
final Observer<? super GroupedObservable<K, V>> downstream;
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;
static final Object NULL_KEY = new Object();
Reported by PMD.
Line: 57
final Observer<? super GroupedObservable<K, V>> downstream;
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;
static final Object NULL_KEY = new Object();
Reported by PMD.
Line: 58
final Function<? super T, ? extends K> keySelector;
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;
static final Object NULL_KEY = new Object();
Disposable upstream;
Reported by PMD.
Line: 59
final Function<? super T, ? extends V> valueSelector;
final int bufferSize;
final boolean delayError;
final Map<Object, GroupedUnicast<K, V>> groups;
static final Object NULL_KEY = new Object();
Disposable upstream;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDoOnUnsubscribeTest.java
34 issues
Line: 34
public class ObservableDoOnUnsubscribeTest extends RxJavaTest {
@Test
public void doOnUnsubscribe() throws Exception {
int subCount = 3;
final CountDownLatch upperLatch = new CountDownLatch(subCount);
final CountDownLatch lowerLatch = new CountDownLatch(subCount);
final CountDownLatch onNextLatch = new CountDownLatch(subCount);
Reported by PMD.
Line: 34
public class ObservableDoOnUnsubscribeTest extends RxJavaTest {
@Test
public void doOnUnsubscribe() throws Exception {
int subCount = 3;
final CountDownLatch upperLatch = new CountDownLatch(subCount);
final CountDownLatch lowerLatch = new CountDownLatch(subCount);
final CountDownLatch onNextLatch = new CountDownLatch(subCount);
Reported by PMD.
Line: 75
List<TestObserver<Long>> subscribers = new ArrayList<>();
for (int i = 0; i < subCount; ++i) {
TestObserver<Long> observer = new TestObserver<>();
subscriptions.add(observer);
longs.subscribe(observer);
subscribers.add(observer);
}
Reported by PMD.
Line: 83
onNextLatch.await();
for (int i = 0; i < subCount; ++i) {
subscriptions.get(i).dispose();
// Test that unsubscribe() method is not affected in any way
// FIXME no longer valid
// subscribers.get(i).assertUnsubscribed();
}
Reported by PMD.
Line: 96
}
@Test
public void doOnUnSubscribeWorksWithRefCount() throws Exception {
int subCount = 3;
final CountDownLatch upperLatch = new CountDownLatch(1);
final CountDownLatch lowerLatch = new CountDownLatch(1);
final CountDownLatch onNextLatch = new CountDownLatch(subCount);
Reported by PMD.
Line: 96
}
@Test
public void doOnUnSubscribeWorksWithRefCount() throws Exception {
int subCount = 3;
final CountDownLatch upperLatch = new CountDownLatch(1);
final CountDownLatch lowerLatch = new CountDownLatch(1);
final CountDownLatch onNextLatch = new CountDownLatch(subCount);
Reported by PMD.
Line: 138
List<TestObserver<Long>> subscribers = new ArrayList<>();
for (int i = 0; i < subCount; ++i) {
TestObserver<Long> observer = new TestObserver<>();
longs.subscribe(observer);
subscriptions.add(observer);
subscribers.add(observer);
}
Reported by PMD.
Line: 146
onNextLatch.await();
for (int i = 0; i < subCount; ++i) {
subscriptions.get(i).dispose();
// Test that unsubscribe() method is not affected in any way
// FIXME no longer valid
// subscribers.get(i).assertUnsubscribed();
}
Reported by PMD.
Line: 175
.subscribe()
.dispose();
assertEquals(1, disposeCalled.get());
}
}
Reported by PMD.
Line: 27
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.RxJavaTest;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
public class ObservableDoOnUnsubscribeTest extends RxJavaTest {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/mixed/ObservableConcatMapSingle.java
34 issues
Line: 38
*/
public final class ObservableConcatMapSingle<T, R> extends Observable<R> {
final ObservableSource<T> source;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final ErrorMode errorMode;
Reported by PMD.
Line: 40
final ObservableSource<T> source;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final ErrorMode errorMode;
final int prefetch;
Reported by PMD.
Line: 42
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final ErrorMode errorMode;
final int prefetch;
public ObservableConcatMapSingle(ObservableSource<T> source,
Function<? super T, ? extends SingleSource<? extends R>> mapper,
Reported by PMD.
Line: 44
final ErrorMode errorMode;
final int prefetch;
public ObservableConcatMapSingle(ObservableSource<T> source,
Function<? super T, ? extends SingleSource<? extends R>> mapper,
ErrorMode errorMode, int prefetch) {
this.source = source;
Reported by PMD.
Line: 62
}
}
static final class ConcatMapSingleMainObserver<T, R>
extends ConcatMapXMainObserver<T> {
private static final long serialVersionUID = -9140123220065488293L;
final Observer<? super R> downstream;
Reported by PMD.
Line: 62
}
}
static final class ConcatMapSingleMainObserver<T, R>
extends ConcatMapXMainObserver<T> {
private static final long serialVersionUID = -9140123220065488293L;
final Observer<? super R> downstream;
Reported by PMD.
Line: 67
private static final long serialVersionUID = -9140123220065488293L;
final Observer<? super R> downstream;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final ConcatMapSingleObserver<R> inner;
Reported by PMD.
Line: 69
final Observer<? super R> downstream;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final ConcatMapSingleObserver<R> inner;
R item;
Reported by PMD.
Line: 71
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final ConcatMapSingleObserver<R> inner;
R item;
volatile int state;
Reported by PMD.
Line: 73
final ConcatMapSingleObserver<R> inner;
R item;
volatile int state;
/** No inner SingleSource is running. */
static final int STATE_INACTIVE = 0;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelFilterTry.java
34 issues
Line: 48
}
@Override
public void subscribe(Subscriber<? super T>[] subscribers) {
subscribers = RxJavaPlugins.onSubscribe(this, subscribers);
if (!validate(subscribers)) {
return;
}
Reported by PMD.
Line: 170
downstream.onNext(t);
return true;
}
return false;
}
}
return false;
}
Reported by PMD.
Line: 255
}
}
return b && downstream.tryOnNext(t);
}
}
return false;
}
Reported by PMD.
Line: 34
*/
public final class ParallelFilterTry<T> extends ParallelFlowable<T> {
final ParallelFlowable<T> source;
final Predicate<? super T> predicate;
final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
Reported by PMD.
Line: 36
final ParallelFlowable<T> source;
final Predicate<? super T> predicate;
final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
public ParallelFilterTry(ParallelFlowable<T> source, Predicate<? super T> predicate,
BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
Reported by PMD.
Line: 38
final Predicate<? super T> predicate;
final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
public ParallelFilterTry(ParallelFlowable<T> source, Predicate<? super T> predicate,
BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
this.source = source;
this.predicate = predicate;
Reported by PMD.
Line: 77
}
abstract static class BaseFilterSubscriber<T> implements ConditionalSubscriber<T>, Subscription {
final Predicate<? super T> predicate;
final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
Subscription upstream;
Reported by PMD.
Line: 79
abstract static class BaseFilterSubscriber<T> implements ConditionalSubscriber<T>, Subscription {
final Predicate<? super T> predicate;
final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
Subscription upstream;
boolean done;
Reported by PMD.
Line: 81
final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
Subscription upstream;
boolean done;
BaseFilterSubscriber(Predicate<? super T> predicate, BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
this.predicate = predicate;
Reported by PMD.
Line: 83
Subscription upstream;
boolean done;
BaseFilterSubscriber(Predicate<? super T> predicate, BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
this.predicate = predicate;
this.errorHandler = errorHandler;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapSingle.java
34 issues
Line: 41
*/
public final class FlowableSwitchMapSingle<T, R> extends Flowable<R> {
final Flowable<T> source;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final boolean delayErrors;
Reported by PMD.
Line: 43
final Flowable<T> source;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final boolean delayErrors;
public FlowableSwitchMapSingle(Flowable<T> source,
Function<? super T, ? extends SingleSource<? extends R>> mapper,
Reported by PMD.
Line: 45
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final boolean delayErrors;
public FlowableSwitchMapSingle(Flowable<T> source,
Function<? super T, ? extends SingleSource<? extends R>> mapper,
boolean delayErrors) {
this.source = source;
Reported by PMD.
Line: 60
source.subscribe(new SwitchMapSingleSubscriber<>(s, mapper, delayErrors));
}
static final class SwitchMapSingleSubscriber<T, R> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -5402190102429853762L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 60
source.subscribe(new SwitchMapSingleSubscriber<>(s, mapper, delayErrors));
}
static final class SwitchMapSingleSubscriber<T, R> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -5402190102429853762L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 65
private static final long serialVersionUID = -5402190102429853762L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final boolean delayErrors;
Reported by PMD.
Line: 67
final Subscriber<? super R> downstream;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final boolean delayErrors;
final AtomicThrowable errors;
Reported by PMD.
Line: 69
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final boolean delayErrors;
final AtomicThrowable errors;
final AtomicLong requested;
Reported by PMD.
Line: 71
final boolean delayErrors;
final AtomicThrowable errors;
final AtomicLong requested;
final AtomicReference<SwitchMapSingleObserver<R>> inner;
Reported by PMD.
Line: 73
final AtomicThrowable errors;
final AtomicLong requested;
final AtomicReference<SwitchMapSingleObserver<R>> inner;
static final SwitchMapSingleObserver<Object> INNER_DISPOSED =
new SwitchMapSingleObserver<>(null);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableSwitchMapMaybe.java
34 issues
Line: 41
*/
public final class FlowableSwitchMapMaybe<T, R> extends Flowable<R> {
final Flowable<T> source;
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
final boolean delayErrors;
Reported by PMD.
Line: 43
final Flowable<T> source;
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
final boolean delayErrors;
public FlowableSwitchMapMaybe(Flowable<T> source,
Function<? super T, ? extends MaybeSource<? extends R>> mapper,
Reported by PMD.
Line: 45
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
final boolean delayErrors;
public FlowableSwitchMapMaybe(Flowable<T> source,
Function<? super T, ? extends MaybeSource<? extends R>> mapper,
boolean delayErrors) {
this.source = source;
Reported by PMD.
Line: 60
source.subscribe(new SwitchMapMaybeSubscriber<>(s, mapper, delayErrors));
}
static final class SwitchMapMaybeSubscriber<T, R> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -5402190102429853762L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 60
source.subscribe(new SwitchMapMaybeSubscriber<>(s, mapper, delayErrors));
}
static final class SwitchMapMaybeSubscriber<T, R> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -5402190102429853762L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 65
private static final long serialVersionUID = -5402190102429853762L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
final boolean delayErrors;
Reported by PMD.
Line: 67
final Subscriber<? super R> downstream;
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
final boolean delayErrors;
final AtomicThrowable errors;
Reported by PMD.
Line: 69
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
final boolean delayErrors;
final AtomicThrowable errors;
final AtomicLong requested;
Reported by PMD.
Line: 71
final boolean delayErrors;
final AtomicThrowable errors;
final AtomicLong requested;
final AtomicReference<SwitchMapMaybeObserver<R>> inner;
Reported by PMD.
Line: 73
final AtomicThrowable errors;
final AtomicLong requested;
final AtomicReference<SwitchMapMaybeObserver<R>> inner;
static final SwitchMapMaybeObserver<Object> INNER_DISPOSED =
new SwitchMapMaybeObserver<>(null);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java
34 issues
Line: 163
* @param <T>
* the type of item expected to be observed and emitted by the Processor
*/
public final class BehaviorProcessor<@NonNull T> extends FlowableProcessor<T> {
final AtomicReference<BehaviorSubscription<T>[]> subscribers;
static final Object[] EMPTY_ARRAY = new Object[0];
@SuppressWarnings("rawtypes")
Reported by PMD.
Line: 163
* @param <T>
* the type of item expected to be observed and emitted by the Processor
*/
public final class BehaviorProcessor<@NonNull T> extends FlowableProcessor<T> {
final AtomicReference<BehaviorSubscription<T>[]> subscribers;
static final Object[] EMPTY_ARRAY = new Object[0];
@SuppressWarnings("rawtypes")
Reported by PMD.
Line: 164
* the type of item expected to be observed and emitted by the Processor
*/
public final class BehaviorProcessor<@NonNull T> extends FlowableProcessor<T> {
final AtomicReference<BehaviorSubscription<T>[]> subscribers;
static final Object[] EMPTY_ARRAY = new Object[0];
@SuppressWarnings("rawtypes")
static final BehaviorSubscription[] EMPTY = new BehaviorSubscription[0];
Reported by PMD.
Line: 174
@SuppressWarnings("rawtypes")
static final BehaviorSubscription[] TERMINATED = new BehaviorSubscription[0];
final ReadWriteLock lock;
final Lock readLock;
final Lock writeLock;
final AtomicReference<Object> value;
Reported by PMD.
Line: 175
static final BehaviorSubscription[] TERMINATED = new BehaviorSubscription[0];
final ReadWriteLock lock;
final Lock readLock;
final Lock writeLock;
final AtomicReference<Object> value;
final AtomicReference<Throwable> terminalEvent;
Reported by PMD.
Line: 176
final ReadWriteLock lock;
final Lock readLock;
final Lock writeLock;
final AtomicReference<Object> value;
final AtomicReference<Throwable> terminalEvent;
Reported by PMD.
Line: 180
final AtomicReference<Object> value;
final AtomicReference<Throwable> terminalEvent;
long index;
/**
* Creates a {@link BehaviorProcessor} without a default item.
Reported by PMD.
Line: 182
final AtomicReference<Throwable> terminalEvent;
long index;
/**
* Creates a {@link BehaviorProcessor} without a default item.
*
* @param <T>
Reported by PMD.
Line: 220
* Constructs an empty BehaviorProcessor.
* @since 2.0
*/
@SuppressWarnings("unchecked")
BehaviorProcessor() {
this.value = new AtomicReference<>();
this.lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
Reported by PMD.
Line: 346
@Override
@CheckReturnValue
public boolean hasSubscribers() {
return subscribers.get().length != 0;
}
@CheckReturnValue
/* test support*/ int subscriberCount() {
return subscribers.get().length;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observers/DisposableMaybeObserverTest.java
34 issues
Line: 32
static final class TestMaybe<T> extends DisposableMaybeObserver<T> {
int start;
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
Reported by PMD.
Line: 34
int start;
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
int complete;
Reported by PMD.
Line: 36
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
int complete;
@Override
protected void onStart() {
Reported by PMD.
Line: 38
final List<Throwable> errors = new ArrayList<>();
int complete;
@Override
protected void onStart() {
super.onStart();
Reported by PMD.
Line: 64
}
@Test
public void normal() {
TestMaybe<Integer> tc = new TestMaybe<>();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
Reported by PMD.
Line: 67
public void normal() {
TestMaybe<Integer> tc = new TestMaybe<>();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
assertEquals(0, tc.complete);
Reported by PMD.
Line: 68
TestMaybe<Integer> tc = new TestMaybe<>();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
assertEquals(0, tc.complete);
Maybe.just(1).subscribe(tc);
Reported by PMD.
Line: 69
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
assertEquals(0, tc.complete);
Maybe.just(1).subscribe(tc);
Reported by PMD.
Line: 69
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
assertEquals(0, tc.complete);
Maybe.just(1).subscribe(tc);
Reported by PMD.
Line: 70
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
assertEquals(0, tc.complete);
Maybe.just(1).subscribe(tc);
assertFalse(tc.isDisposed());
Reported by PMD.