The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromSupplierTest.java
27 issues
Line: 51
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
}
@Test
public void fromSupplierTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 55
}
@Test
public void fromSupplierTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Supplier<Object> supplier = new Supplier<Object>() {
@Override
public Object get() throws Exception {
Reported by PMD.
Line: 66
}
};
Completable.fromSupplier(supplier)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 66
}
};
Completable.fromSupplier(supplier)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 70
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Completable.fromSupplier(supplier)
.test()
.assertResult();
Reported by PMD.
Line: 72
assertEquals(1, atomicInteger.get());
Completable.fromSupplier(supplier)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 72
assertEquals(1, atomicInteger.get());
Completable.fromSupplier(supplier)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 76
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
@Test
public void fromSupplierInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 80
}
@Test
public void fromSupplierInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Completable completable = Completable.fromSupplier(new Supplier<Object>() {
@Override
public Object get() throws Exception {
Reported by PMD.
Line: 91
}
});
assertEquals(0, atomicInteger.get());
completable
.test()
.assertResult();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableThrottleLatest.java
27 issues
Line: 36
*/
public final class ObservableThrottleLatest<T> extends AbstractObservableWithUpstream<T, T> {
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
Reported by PMD.
Line: 38
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final boolean emitLast;
Reported by PMD.
Line: 40
final TimeUnit unit;
final Scheduler scheduler;
final boolean emitLast;
public ObservableThrottleLatest(Observable<T> source,
long timeout, TimeUnit unit, Scheduler scheduler,
Reported by PMD.
Line: 42
final Scheduler scheduler;
final boolean emitLast;
public ObservableThrottleLatest(Observable<T> source,
long timeout, TimeUnit unit, Scheduler scheduler,
boolean emitLast) {
super(source);
Reported by PMD.
Line: 59
source.subscribe(new ThrottleLatestObserver<>(observer, timeout, unit, scheduler.createWorker(), emitLast));
}
static final class ThrottleLatestObserver<T>
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = -8296689127439125014L;
Reported by PMD.
Line: 59
source.subscribe(new ThrottleLatestObserver<>(observer, timeout, unit, scheduler.createWorker(), emitLast));
}
static final class ThrottleLatestObserver<T>
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = -8296689127439125014L;
Reported by PMD.
Line: 65
private static final long serialVersionUID = -8296689127439125014L;
final Observer<? super T> downstream;
final long timeout;
final TimeUnit unit;
Reported by PMD.
Line: 67
final Observer<? super T> downstream;
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
Reported by PMD.
Line: 69
final long timeout;
final TimeUnit unit;
final Scheduler.Worker worker;
final boolean emitLast;
Reported by PMD.
Line: 71
final TimeUnit unit;
final Scheduler.Worker worker;
final boolean emitLast;
final AtomicReference<T> latest;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCountTest.java
27 issues
Line: 25
public class FlowableCountTest extends RxJavaTest {
@Test
public void simpleFlowable() {
Assert.assertEquals(0, Flowable.empty().count().toFlowable().blockingLast().intValue());
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
Reported by PMD.
Line: 25
public class FlowableCountTest extends RxJavaTest {
@Test
public void simpleFlowable() {
Assert.assertEquals(0, Flowable.empty().count().toFlowable().blockingLast().intValue());
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
Reported by PMD.
Line: 25
public class FlowableCountTest extends RxJavaTest {
@Test
public void simpleFlowable() {
Assert.assertEquals(0, Flowable.empty().count().toFlowable().blockingLast().intValue());
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
Reported by PMD.
Line: 25
public class FlowableCountTest extends RxJavaTest {
@Test
public void simpleFlowable() {
Assert.assertEquals(0, Flowable.empty().count().toFlowable().blockingLast().intValue());
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
Reported by PMD.
Line: 27
public void simpleFlowable() {
Assert.assertEquals(0, Flowable.empty().count().toFlowable().blockingLast().intValue());
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
}
Reported by PMD.
Line: 27
public void simpleFlowable() {
Assert.assertEquals(0, Flowable.empty().count().toFlowable().blockingLast().intValue());
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
}
Reported by PMD.
Line: 27
public void simpleFlowable() {
Assert.assertEquals(0, Flowable.empty().count().toFlowable().blockingLast().intValue());
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
}
Reported by PMD.
Line: 27
public void simpleFlowable() {
Assert.assertEquals(0, Flowable.empty().count().toFlowable().blockingLast().intValue());
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
}
Reported by PMD.
Line: 29
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
}
@Test
public void simple() {
Reported by PMD.
Line: 29
Assert.assertEquals(1, Flowable.just(1).count().toFlowable().blockingLast().intValue());
Assert.assertEquals(10, Flowable.range(1, 10).count().toFlowable().blockingLast().intValue());
}
@Test
public void simple() {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableRefCount.java
27 issues
Line: 35
*/
public final class ObservableRefCount<T> extends Observable<T> {
final ConnectableObservable<T> source;
final int n;
final long timeout;
Reported by PMD.
Line: 37
final ConnectableObservable<T> source;
final int n;
final long timeout;
final TimeUnit unit;
Reported by PMD.
Line: 39
final int n;
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
Reported by PMD.
Line: 39
final int n;
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
Reported by PMD.
Line: 41
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
RefConnection connection;
Reported by PMD.
Line: 43
final TimeUnit unit;
final Scheduler scheduler;
RefConnection connection;
public ObservableRefCount(ConnectableObservable<T> source) {
this(source, 1, 0L, TimeUnit.NANOSECONDS, null);
Reported by PMD.
Line: 45
final Scheduler scheduler;
RefConnection connection;
public ObservableRefCount(ConnectableObservable<T> source) {
this(source, 1, 0L, TimeUnit.NANOSECONDS, null);
}
Reported by PMD.
Line: 75
long c = conn.subscriberCount;
if (c == 0L && conn.timer != null) {
conn.timer.dispose();
}
conn.subscriberCount = c + 1;
if (!conn.connected && c + 1 == n) {
connect = true;
conn.connected = true;
Reported by PMD.
Line: 102
if (c != 0L || !rc.connected) {
return;
}
if (timeout == 0L) {
timeout(rc);
return;
}
sd = new SequentialDisposable();
rc.timer = sd;
Reported by PMD.
Line: 117
synchronized (this) {
if (connection == rc) {
if (rc.timer != null) {
rc.timer.dispose();
rc.timer = null;
}
if (--rc.subscriberCount == 0) {
connection = null;
source.reset();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableToSingleTest.java
27 issues
Line: 29
@Test
public void justSingleItemObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Flowable.just("Hello World!").single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertResult("Hello World!");
}
Reported by PMD.
Line: 30
public void justSingleItemObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Flowable.just("Hello World!").single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertResult("Hello World!");
}
@Test
Reported by PMD.
Line: 30
public void justSingleItemObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Flowable.just("Hello World!").single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertResult("Hello World!");
}
@Test
Reported by PMD.
Line: 32
Single<String> single = Flowable.just("Hello World!").single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertResult("Hello World!");
}
@Test
public void errorObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Reported by PMD.
Line: 39
public void errorObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
IllegalArgumentException error = new IllegalArgumentException("Error");
Single<String> single = Flowable.<String>error(error).single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertError(error);
}
Reported by PMD.
Line: 39
public void errorObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
IllegalArgumentException error = new IllegalArgumentException("Error");
Single<String> single = Flowable.<String>error(error).single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertError(error);
}
Reported by PMD.
Line: 40
TestSubscriber<String> subscriber = TestSubscriber.create();
IllegalArgumentException error = new IllegalArgumentException("Error");
Single<String> single = Flowable.<String>error(error).single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertError(error);
}
@Test
Reported by PMD.
Line: 40
TestSubscriber<String> subscriber = TestSubscriber.create();
IllegalArgumentException error = new IllegalArgumentException("Error");
Single<String> single = Flowable.<String>error(error).single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertError(error);
}
@Test
Reported by PMD.
Line: 42
Single<String> single = Flowable.<String>error(error).single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertError(error);
}
@Test
public void justTwoEmissionsObservableThrowsError() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Reported by PMD.
Line: 48
@Test
public void justTwoEmissionsObservableThrowsError() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Flowable.just("First", "Second").single("");
single.toFlowable().subscribe(subscriber);
subscriber.assertError(IllegalArgumentException.class);
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/processors/UnicastProcessor.java
27 issues
Line: 150
* @param <T> the value type received and emitted by this Processor subclass
* @since 2.0
*/
public final class UnicastProcessor<@NonNull T> extends FlowableProcessor<T> {
final SpscLinkedArrayQueue<T> queue;
final AtomicReference<Runnable> onTerminate;
Reported by PMD.
Line: 152
*/
public final class UnicastProcessor<@NonNull T> extends FlowableProcessor<T> {
final SpscLinkedArrayQueue<T> queue;
final AtomicReference<Runnable> onTerminate;
final boolean delayError;
Reported by PMD.
Line: 154
final SpscLinkedArrayQueue<T> queue;
final AtomicReference<Runnable> onTerminate;
final boolean delayError;
volatile boolean done;
Reported by PMD.
Line: 156
final AtomicReference<Runnable> onTerminate;
final boolean delayError;
volatile boolean done;
Throwable error;
Reported by PMD.
Line: 158
final boolean delayError;
volatile boolean done;
Throwable error;
final AtomicReference<Subscriber<? super T>> downstream;
Reported by PMD.
Line: 160
volatile boolean done;
Throwable error;
final AtomicReference<Subscriber<? super T>> downstream;
volatile boolean cancelled;
Reported by PMD.
Line: 162
Throwable error;
final AtomicReference<Subscriber<? super T>> downstream;
volatile boolean cancelled;
final AtomicBoolean once;
Reported by PMD.
Line: 164
final AtomicReference<Subscriber<? super T>> downstream;
volatile boolean cancelled;
final AtomicBoolean once;
final BasicIntQueueSubscription<T> wip;
Reported by PMD.
Line: 166
volatile boolean cancelled;
final AtomicBoolean once;
final BasicIntQueueSubscription<T> wip;
final AtomicLong requested;
Reported by PMD.
Line: 168
final AtomicBoolean once;
final BasicIntQueueSubscription<T> wip;
final AtomicLong requested;
boolean enableOperatorFusion;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatWithMaybeTest.java
27 issues
Line: 29
public class FlowableConcatWithMaybeTest extends RxJavaTest {
@Test
public void normalEmpty() {
final TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5)
.concatWith(Maybe.<Integer>fromAction(new Action() {
@Override
Reported by PMD.
Line: 45
}
@Test
public void normalNonEmpty() {
final TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5)
.concatWith(Maybe.just(100))
.subscribe(ts);
Reported by PMD.
Line: 48
public void normalNonEmpty() {
final TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5)
.concatWith(Maybe.just(100))
.subscribe(ts);
ts.assertResult(1, 2, 3, 4, 5, 100);
}
Reported by PMD.
Line: 48
public void normalNonEmpty() {
final TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable.range(1, 5)
.concatWith(Maybe.just(100))
.subscribe(ts);
ts.assertResult(1, 2, 3, 4, 5, 100);
}
Reported by PMD.
Line: 56
}
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Maybe.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
Reported by PMD.
Line: 57
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Maybe.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
Line: 57
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Maybe.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
Line: 57
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Maybe.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
Line: 57
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Maybe.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
Line: 57
@Test
public void backpressure() {
Flowable.range(1, 5)
.concatWith(Maybe.just(100))
.test(0)
.assertEmpty()
.requestMore(3)
.assertValues(1, 2, 3)
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableDelaySubscriptionTest.java
26 issues
Line: 47
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
assertEquals(1, counter.get());
}
@Test
public void error() {
final AtomicInteger counter = new AtomicInteger();
Reported by PMD.
Line: 67
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class);
assertEquals(1, counter.get());
}
@Test
public void disposeBeforeTime() {
TestScheduler scheduler = new TestScheduler();
Reported by PMD.
Line: 85
.delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);
TestObserver<Void> to = result.test();
to.assertEmpty();
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.dispose();
Reported by PMD.
Line: 89
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.dispose();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertEmpty();
Reported by PMD.
Line: 93
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertEmpty();
assertEquals(0, counter.get());
}
@Test
Reported by PMD.
Line: 95
to.assertEmpty();
assertEquals(0, counter.get());
}
@Test
public void timestep() {
TestScheduler scheduler = new TestScheduler();
Reported by PMD.
Line: 114
TestObserver<Void> to = result.test();
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.assertEmpty();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertResult();
assertEquals(1, counter.get());
}
Reported by PMD.
Line: 116
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.assertEmpty();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertResult();
assertEquals(1, counter.get());
}
@Test
Reported by PMD.
Line: 118
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertResult();
assertEquals(1, counter.get());
}
@Test
public void timestepError() {
TestScheduler scheduler = new TestScheduler();
Reported by PMD.
Line: 140
scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS);
to.assertEmpty();
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
to.assertFailure(TestException.class);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BufferUntilSubscriberTest.java
26 issues
Line: 28
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
@Test
public void issue1677() throws InterruptedException {
final AtomicLong counter = new AtomicLong();
final Integer[] numbers = new Integer[5000];
Reported by PMD.
Line: 28
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
@Test
public void issue1677() throws InterruptedException {
final AtomicLong counter = new AtomicLong();
final Integer[] numbers = new Integer[5000];
Reported by PMD.
Line: 40
final int NITERS = 250;
final CountDownLatch latch = new CountDownLatch(NITERS);
for (int iters = 0; iters < NITERS; iters++) {
final CountDownLatch innerLatch = new CountDownLatch(1);
final PublishProcessor<Void> s = PublishProcessor.create();
final AtomicBoolean completed = new AtomicBoolean();
Flowable.fromArray(numbers)
.takeUntil(s)
.window(50)
Reported by PMD.
Line: 42
for (int iters = 0; iters < NITERS; iters++) {
final CountDownLatch innerLatch = new CountDownLatch(1);
final PublishProcessor<Void> s = PublishProcessor.create();
final AtomicBoolean completed = new AtomicBoolean();
Flowable.fromArray(numbers)
.takeUntil(s)
.window(50)
.flatMap(new Function<Flowable<Integer>, Publisher<Object>>() {
@Override
Reported by PMD.
Line: 46
Flowable.fromArray(numbers)
.takeUntil(s)
.window(50)
.flatMap(new Function<Flowable<Integer>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<Integer> integerObservable) {
return integerObservable
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Object>() {
Reported by PMD.
Line: 51
public Publisher<Object> apply(Flowable<Integer> integerObservable) {
return integerObservable
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer integer) {
if (integer >= 5 && completed.compareAndSet(false, true)) {
s.onComplete();
}
Reported by PMD.
Line: 65
}
})
.toList()
.doOnSuccess(new Consumer<List<Object>>() {
@Override
public void accept(List<Object> integers) {
counter.incrementAndGet();
latch.countDown();
innerLatch.countDown();
Reported by PMD.
Line: 20
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.junit.*;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.processors.PublishProcessor;
Reported by PMD.
Line: 23
import org.junit.*;
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
Reported by PMD.
Line: 24
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
public class BufferUntilSubscriberTest extends RxJavaTest {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmb.java
26 issues
Line: 25
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableAmb<T> extends Observable<T> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
Reported by PMD.
Line: 26
public final class ObservableAmb<T> extends Observable<T> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
Reported by PMD.
Line: 28
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
public ObservableAmb(ObservableSource<? extends T>[] sources, Iterable<? extends ObservableSource<? extends T>> sourcesIterable) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
}
@Override
Reported by PMD.
Line: 43
try {
for (ObservableSource<? extends T> p : sourcesIterable) {
if (p == null) {
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
return;
}
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
Reported by PMD.
Line: 47
return;
}
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
Reported by PMD.
Line: 53
}
sources[count++] = p;
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
} else {
Reported by PMD.
Line: 66
EmptyDisposable.complete(observer);
return;
} else
if (count == 1) {
sources[0].subscribe(observer);
return;
}
AmbCoordinator<T> ac = new AmbCoordinator<>(observer, count);
Reported by PMD.
Line: 76
}
static final class AmbCoordinator<T> implements Disposable {
final Observer<? super T> downstream;
final AmbInnerObserver<T>[] observers;
final AtomicInteger winner = new AtomicInteger();
@SuppressWarnings("unchecked")
Reported by PMD.
Line: 77
static final class AmbCoordinator<T> implements Disposable {
final Observer<? super T> downstream;
final AmbInnerObserver<T>[] observers;
final AtomicInteger winner = new AtomicInteger();
@SuppressWarnings("unchecked")
AmbCoordinator(Observer<? super T> actual, int count) {
Reported by PMD.
Line: 79
final Observer<? super T> downstream;
final AmbInnerObserver<T>[] observers;
final AtomicInteger winner = new AtomicInteger();
@SuppressWarnings("unchecked")
AmbCoordinator(Observer<? super T> actual, int count) {
this.downstream = actual;
this.observers = new AmbInnerObserver[count];
Reported by PMD.