The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmb.java
26 issues
Line: 25
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableAmb<T> extends Observable<T> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
Reported by PMD.
Line: 26
public final class ObservableAmb<T> extends Observable<T> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
Reported by PMD.
Line: 28
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
@Override
Reported by PMD.
Line: 43
try {
for (ObservableSource<? extends T> p : sourcesIterable) {
if (p == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
return;
}
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
Reported by PMD.
Line: 47
return;
}
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
Reported by PMD.
Line: 53
}
sources[count++] = p;
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
} else {
Reported by PMD.
Line: 66
EmptyDisposable.complete(observer);
return;
} else
if (count == 1) {
sources[0].subscribe(observer);
return;
}
AmbCoordinator<T> ac = new AmbCoordinator<>(observer, count);
Reported by PMD.
Line: 76
}
static final class AmbCoordinator<T> implements Disposable {
final Observer<? super T> downstream;
final AmbInnerObserver<T>[] observers;
final AtomicInteger winner = new AtomicInteger();
@SuppressWarnings("unchecked")
Reported by PMD.
Line: 77
static final class AmbCoordinator<T> implements Disposable {
final Observer<? super T> downstream;
final AmbInnerObserver<T>[] observers;
final AtomicInteger winner = new AtomicInteger();
@SuppressWarnings("unchecked")
AmbCoordinator(Observer<? super T> actual, int count) {
Reported by PMD.
Line: 79
final Observer<? super T> downstream;
final AmbInnerObserver<T>[] observers;
final AtomicInteger winner = new AtomicInteger();
@SuppressWarnings("unchecked")
AmbCoordinator(Observer<? super T> actual, int count) {
this.downstream = actual;
this.observers = new AmbInnerObserver[count];
Reported by PMD.
src/test/java/io/reactivex/rxjava3/validators/BaseTypeParser.java
26 issues
Line: 22
/**
* Parses the java file of a reactive base type to allow discovering Javadoc mistakes algorithmically.
*/
public final class BaseTypeParser {
private BaseTypeParser() {
throw new IllegalStateException("No instances!");
}
Reported by PMD.
Line: 22
/**
* Parses the java file of a reactive base type to allow discovering Javadoc mistakes algorithmically.
*/
public final class BaseTypeParser {
private BaseTypeParser() {
throw new IllegalStateException("No instances!");
}
Reported by PMD.
Line: 29
}
public static class RxMethod {
public String signature;
public String backpressureKind;
public String schedulerKind;
Reported by PMD.
Line: 31
public static class RxMethod {
public String signature;
public String backpressureKind;
public String schedulerKind;
public String javadoc;
Reported by PMD.
Line: 33
public String backpressureKind;
public String schedulerKind;
public String javadoc;
public String backpressureDocumentation;
Reported by PMD.
Line: 35
public String schedulerKind;
public String javadoc;
public String backpressureDocumentation;
public String schedulerDocumentation;
Reported by PMD.
Line: 37
public String javadoc;
public String backpressureDocumentation;
public String schedulerDocumentation;
public int javadocLine;
Reported by PMD.
Line: 39
public String backpressureDocumentation;
public String schedulerDocumentation;
public int javadocLine;
public int methodLine;
Reported by PMD.
Line: 41
public String schedulerDocumentation;
public int javadocLine;
public int methodLine;
public int backpressureDocLine;
Reported by PMD.
Line: 43
public int javadocLine;
public int methodLine;
public int backpressureDocLine;
public int schedulerDocLine;
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableDelaySubscriptionTest.java
26 issues
Line: 47
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
assertEquals(1, counter.get());
}
@Test
public void error() {
final AtomicInteger counter = new AtomicInteger();
Reported by PMD.
Line: 67
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
assertEquals(1, counter.get());
}
@Test
public void disposeBeforeTime() {
TestScheduler scheduler = new TestScheduler();
Reported by PMD.
Line: 85
.delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
TestObserver<Void> to = result.test();
to.assertEmpty();
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.dispose();
Reported by PMD.
Line: 89
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.dispose();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertEmpty();
Reported by PMD.
Line: 93
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertEmpty();
assertEquals(0, counter.get());
}
@Test
Reported by PMD.
Line: 95
to.assertEmpty();
assertEquals(0, counter.get());
}
@Test
public void timestep() {
TestScheduler scheduler = new TestScheduler();
Reported by PMD.
Line: 114
TestObserver<Void> to = result.test();
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.assertEmpty();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertResult();
assertEquals(1, counter.get());
}
Reported by PMD.
Line: 116
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.assertEmpty();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertResult();
assertEquals(1, counter.get());
}
@Test
Reported by PMD.
Line: 118
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertResult();
assertEquals(1, counter.get());
}
@Test
public void timestepError() {
TestScheduler scheduler = new TestScheduler();
Reported by PMD.
Line: 140
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.assertEmpty();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertFailure(TestException.class);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BufferUntilSubscriberTest.java
26 issues
Line: 28
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
@Test
public void issue1677() throws InterruptedException {
final AtomicLong counter = new AtomicLong();
final Integer[] numbers = new Integer[5000];
Reported by PMD.
Line: 28
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
@Test
public void issue1677() throws InterruptedException {
final AtomicLong counter = new AtomicLong();
final Integer[] numbers = new Integer[5000];
Reported by PMD.
Line: 40
final int NITERS = 250;
final CountDownLatch latch = new CountDownLatch(NITERS);
for (int iters = 0; iters < NITERS; iters++) {
final CountDownLatch innerLatch = new CountDownLatch(1);
final PublishProcessor<Void> s = PublishProcessor.create();
final AtomicBoolean completed = new AtomicBoolean();
Flowable.fromArray(numbers)
.takeUntil(s)
.window(50)
Reported by PMD.
Line: 42
for (int iters = 0; iters < NITERS; iters++) {
final CountDownLatch innerLatch = new CountDownLatch(1);
final PublishProcessor<Void> s = PublishProcessor.create();
final AtomicBoolean completed = new AtomicBoolean();
Flowable.fromArray(numbers)
.takeUntil(s)
.window(50)
.flatMap(new Function<Flowable<Integer>, Publisher<Object>>() {
@Override
Reported by PMD.
Line: 46
Flowable.fromArray(numbers)
.takeUntil(s)
.window(50)
.flatMap(new Function<Flowable<Integer>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<Integer> integerObservable) {
return integerObservable
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Object>() {
Reported by PMD.
Line: 51
public Publisher<Object> apply(Flowable<Integer> integerObservable) {
return integerObservable
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) {
if (integer >= 5 && completed.compareAndSet(false, true)) {
s.onComplete();
}
Reported by PMD.
Line: 65
}
})
.toList()
.doOnSuccess(new Consumer<List<Object>>() {
@Override
public void accept(List<Object> integers) {
counter.incrementAndGet();
latch.countDown();
innerLatch.countDown();
Reported by PMD.
Line: 20
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.junit.*;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.processors.PublishProcessor;
Reported by PMD.
Line: 23
import org.junit.*;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
Reported by PMD.
Line: 24
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybePeek.java
26 issues
Line: 137
onErrorInner(e);
}
void onErrorInner(Throwable e) {
try {
parent.onErrorCall.accept(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
e = new CompositeException(e, ex);
Reported by PMD.
Line: 30
*/
public final class MaybePeek<T> extends AbstractMaybeWithUpstream<T, T> {
final Consumer<? super Disposable> onSubscribeCall;
final Consumer<? super T> onSuccessCall;
final Consumer<? super Throwable> onErrorCall;
Reported by PMD.
Line: 32
final Consumer<? super Disposable> onSubscribeCall;
final Consumer<? super T> onSuccessCall;
final Consumer<? super Throwable> onErrorCall;
final Action onCompleteCall;
Reported by PMD.
Line: 34
final Consumer<? super T> onSuccessCall;
final Consumer<? super Throwable> onErrorCall;
final Action onCompleteCall;
final Action onAfterTerminate;
Reported by PMD.
Line: 36
final Consumer<? super Throwable> onErrorCall;
final Action onCompleteCall;
final Action onAfterTerminate;
final Action onDisposeCall;
Reported by PMD.
Line: 38
final Action onCompleteCall;
final Action onAfterTerminate;
final Action onDisposeCall;
public MaybePeek(MaybeSource<T> source, Consumer<? super Disposable> onSubscribeCall,
Consumer<? super T> onSuccessCall, Consumer<? super Throwable> onErrorCall, Action onCompleteCall,
Reported by PMD.
Line: 40
final Action onAfterTerminate;
final Action onDisposeCall;
public MaybePeek(MaybeSource<T> source, Consumer<? super Disposable> onSubscribeCall,
Consumer<? super T> onSuccessCall, Consumer<? super Throwable> onErrorCall, Action onCompleteCall,
Action onAfterTerminate, Action onDispose) {
super(source);
Reported by PMD.
Line: 60
}
static final class MaybePeekObserver<T> implements MaybeObserver<T>, Disposable {
final MaybeObserver<? super T> downstream;
final MaybePeek<T> parent;
Disposable upstream;
Reported by PMD.
Line: 62
static final class MaybePeekObserver<T> implements MaybeObserver<T>, Disposable {
final MaybeObserver<? super T> downstream;
final MaybePeek<T> parent;
Disposable upstream;
MaybePeekObserver(MaybeObserver<? super T> actual, MaybePeek<T> parent) {
this.downstream = actual;
Reported by PMD.
Line: 64
final MaybePeek<T> parent;
Disposable upstream;
MaybePeekObserver(MaybeObserver<? super T> actual, MaybePeek<T> parent) {
this.downstream = actual;
this.parent = parent;
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableThrottleWithTimeoutTests.java
26 issues
Line: 31
public class ObservableThrottleWithTimeoutTests extends RxJavaTest {
@Test
public void throttle() {
Observer<Integer> observer = TestHelper.mockObserver();
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleWithTimeout(500, TimeUnit.MILLISECONDS, s)
Reported by PMD.
Line: 36
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleWithTimeout(500, TimeUnit.MILLISECONDS, s)
.subscribe(observer);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
Reported by PMD.
Line: 36
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleWithTimeout(500, TimeUnit.MILLISECONDS, s)
.subscribe(observer);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
Reported by PMD.
Line: 41
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
Reported by PMD.
Line: 42
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 44
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver at 1300 after 500ms has passed since onNext(5)
Reported by PMD.
Line: 46
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver at 1300 after 500ms has passed since onNext(5)
s.advanceTimeTo(1300, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
Reported by PMD.
Line: 48
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver at 1300 after 500ms has passed since onNext(5)
s.advanceTimeTo(1300, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1800, TimeUnit.MILLISECONDS);
o.onComplete();
Reported by PMD.
Line: 49
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver at 1300 after 500ms has passed since onNext(5)
s.advanceTimeTo(1300, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1800, TimeUnit.MILLISECONDS);
o.onComplete();
Reported by PMD.
Line: 51
o.onNext(5); // skip
o.onNext(6); // deliver at 1300 after 500ms has passed since onNext(5)
s.advanceTimeTo(1300, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1800, TimeUnit.MILLISECONDS);
o.onComplete();
InOrder inOrder = inOrder(observer);
inOrder.verify(observer).onNext(2);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/observers/SingleConsumersTest.java
26 issues
Line: 51
public class SingleConsumersTest implements Consumer<Object> {
final CompositeDisposable composite = new CompositeDisposable();
final SingleSubject<Integer> processor = SingleSubject.create();
final List<Object> events = new ArrayList<>();
Reported by PMD.
Line: 53
final CompositeDisposable composite = new CompositeDisposable();
final SingleSubject<Integer> processor = SingleSubject.create();
final List<Object> events = new ArrayList<>();
@Override
public void accept(Object t) throws Exception {
Reported by PMD.
Line: 55
final SingleSubject<Integer> processor = SingleSubject.create();
final List<Object> events = new ArrayList<>();
@Override
public void accept(Object t) throws Exception {
events.add(t);
}
Reported by PMD.
Line: 68
}
@Test
public void onSuccessNormal() {
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING);
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
Reported by PMD.
Line: 72
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING);
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 72
Disposable d = subscribeAutoDispose(processor, composite, this, Functions.ON_ERROR_MISSING);
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
Reported by PMD.
Line: 74
assertFalse(d.getClass().toString(), ((LambdaConsumerIntrospection)d).hasCustomOnError());
assertTrue(composite.size() > 0);
assertTrue(events.toString(), events.isEmpty());
processor.onSuccess(1);
Reported by PMD.
Line: 80
processor.onSuccess(1);
assertEquals(0, composite.size());
assertEquals(Arrays.<Object>asList(1), events);
}
Reported by PMD.
Line: 82
assertEquals(0, composite.size());
assertEquals(Arrays.<Object>asList(1), events);
}
@Test
public void onErrorNormal() {
Reported by PMD.
Line: 87
}
@Test
public void onErrorNormal() {
subscribeAutoDispose(processor, composite, this, this);
assertTrue(composite.size() > 0);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeConcatIterable.java
26 issues
Line: 36
*/
public final class MaybeConcatIterable<T> extends Flowable<T> {
final Iterable<? extends MaybeSource<? extends T>> sources;
public MaybeConcatIterable(Iterable<? extends MaybeSource<? extends T>> sources) {
this.sources = sources;
}
Reported by PMD.
Line: 49
try {
it = Objects.requireNonNull(sources.iterator(), "The sources Iterable returned a null Iterator");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Reported by PMD.
Line: 60
parent.drain();
}
static final class ConcatMaybeObserver<T>
extends AtomicInteger
implements MaybeObserver<T>, Subscription {
private static final long serialVersionUID = 3520831347801429610L;
Reported by PMD.
Line: 60
parent.drain();
}
static final class ConcatMaybeObserver<T>
extends AtomicInteger
implements MaybeObserver<T>, Subscription {
private static final long serialVersionUID = 3520831347801429610L;
Reported by PMD.
Line: 66
private static final long serialVersionUID = 3520831347801429610L;
final Subscriber<? super T> downstream;
final AtomicLong requested;
final AtomicReference<Object> current;
Reported by PMD.
Line: 68
final Subscriber<? super T> downstream;
final AtomicLong requested;
final AtomicReference<Object> current;
final SequentialDisposable disposables;
Reported by PMD.
Line: 70
final AtomicLong requested;
final AtomicReference<Object> current;
final SequentialDisposable disposables;
final Iterator<? extends MaybeSource<? extends T>> sources;
Reported by PMD.
Line: 72
final AtomicReference<Object> current;
final SequentialDisposable disposables;
final Iterator<? extends MaybeSource<? extends T>> sources;
long produced;
Reported by PMD.
Line: 74
final SequentialDisposable disposables;
final Iterator<? extends MaybeSource<? extends T>> sources;
long produced;
ConcatMaybeObserver(Subscriber<? super T> actual, Iterator<? extends MaybeSource<? extends T>> sources) {
this.downstream = actual;
Reported by PMD.
Line: 76
final Iterator<? extends MaybeSource<? extends T>> sources;
long produced;
ConcatMaybeObserver(Subscriber<? super T> actual, Iterator<? extends MaybeSource<? extends T>> sources) {
this.downstream = actual;
this.sources = sources;
this.requested = new AtomicLong();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeCountTest.java
26 issues
Line: 31
public class MaybeCountTest extends RxJavaTest {
@Test
public void one() {
Maybe.just(1).count().test().assertResult(1L);
}
@Test
public void empty() {
Reported by PMD.
Line: 32
@Test
public void one() {
Maybe.just(1).count().test().assertResult(1L);
}
@Test
public void empty() {
Maybe.empty().count().test().assertResult(0L);
Reported by PMD.
Line: 32
@Test
public void one() {
Maybe.just(1).count().test().assertResult(1L);
}
@Test
public void empty() {
Maybe.empty().count().test().assertResult(0L);
Reported by PMD.
Line: 32
@Test
public void one() {
Maybe.just(1).count().test().assertResult(1L);
}
@Test
public void empty() {
Maybe.empty().count().test().assertResult(0L);
Reported by PMD.
Line: 36
}
@Test
public void empty() {
Maybe.empty().count().test().assertResult(0L);
}
@Test
public void error() {
Reported by PMD.
Line: 37
@Test
public void empty() {
Maybe.empty().count().test().assertResult(0L);
}
@Test
public void error() {
Maybe.error(new TestException()).count().test().assertFailure(TestException.class);
Reported by PMD.
Line: 37
@Test
public void empty() {
Maybe.empty().count().test().assertResult(0L);
}
@Test
public void error() {
Maybe.error(new TestException()).count().test().assertFailure(TestException.class);
Reported by PMD.
Line: 37
@Test
public void empty() {
Maybe.empty().count().test().assertResult(0L);
}
@Test
public void error() {
Maybe.error(new TestException()).count().test().assertFailure(TestException.class);
Reported by PMD.
Line: 41
}
@Test
public void error() {
Maybe.error(new TestException()).count().test().assertFailure(TestException.class);
}
@Test
public void dispose() {
Reported by PMD.
Line: 46
}
@Test
public void dispose() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestObserver<Long> to = pp.singleElement().count().test();
assertTrue(pp.hasSubscribers());
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeAmb.java
26 issues
Line: 29
*
* @param <T> the value type emitted
*/
public final class MaybeAmb<T> extends Maybe<T> {
private final MaybeSource<? extends T>[] sources;
private final Iterable<? extends MaybeSource<? extends T>> sourcesIterable;
public MaybeAmb(MaybeSource<? extends T>[] sources, Iterable<? extends MaybeSource<? extends T>> sourcesIterable) {
this.sources = sources;
Reported by PMD.
Line: 29
*
* @param <T> the value type emitted
*/
public final class MaybeAmb<T> extends Maybe<T> {
private final MaybeSource<? extends T>[] sources;
private final Iterable<? extends MaybeSource<? extends T>> sourcesIterable;
public MaybeAmb(MaybeSource<? extends T>[] sources, Iterable<? extends MaybeSource<? extends T>> sourcesIterable) {
this.sources = sources;
Reported by PMD.
Line: 30
* @param <T> the value type emitted
*/
public final class MaybeAmb<T> extends Maybe<T> {
private final MaybeSource<? extends T>[] sources;
private final Iterable<? extends MaybeSource<? extends T>> sourcesIterable;
public MaybeAmb(MaybeSource<? extends T>[] sources, Iterable<? extends MaybeSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
Reported by PMD.
Line: 31
*/
public final class MaybeAmb<T> extends Maybe<T> {
private final MaybeSource<? extends T>[] sources;
private final Iterable<? extends MaybeSource<? extends T>> sourcesIterable;
public MaybeAmb(MaybeSource<? extends T>[] sources, Iterable<? extends MaybeSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
Reported by PMD.
Line: 33
private final MaybeSource<? extends T>[] sources;
private final Iterable<? extends MaybeSource<? extends T>> sourcesIterable;
public MaybeAmb(MaybeSource<? extends T>[] sources, Iterable<? extends MaybeSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
@Override
Reported by PMD.
Line: 40
@Override
@SuppressWarnings("unchecked")
protected void subscribeActual(MaybeObserver<? super T> observer) {
MaybeSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new MaybeSource[8];
try {
Reported by PMD.
Line: 40
@Override
@SuppressWarnings("unchecked")
protected void subscribeActual(MaybeObserver<? super T> observer) {
MaybeSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new MaybeSource[8];
try {
Reported by PMD.
Line: 40
@Override
@SuppressWarnings("unchecked")
protected void subscribeActual(MaybeObserver<? super T> observer) {
MaybeSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new MaybeSource[8];
try {
Reported by PMD.
Line: 48
try {
for (MaybeSource<? extends T> element : sourcesIterable) {
if (element == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
return;
}
if (count == sources.length) {
MaybeSource<? extends T>[] b = new MaybeSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
Reported by PMD.
Line: 52
return;
}
if (count == sources.length) {
MaybeSource<? extends T>[] b = new MaybeSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = element;
}
Reported by PMD.