The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSerializeTest.java
101 issues
Line: 268
}
subscriber.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
System.out.println("starting TestSingleThreadedObservable thread");
Reported by PMD.
Line: 282
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Reported by PMD.
Line: 325
if (s == null) {
System.out.println("TestMultiThreadedObservable onNext: null");
// force an error
throw npe;
} else {
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
// ignored
Reported by PMD.
Line: 352
// we are done spawning threads
threadPool.shutdown();
} catch (Throwable e) {
throw new RuntimeException(e);
}
// wait until all threads are done, then mark it as COMPLETED
try {
// wait for all the threads to finish
Reported by PMD.
Line: 360
// wait for all the threads to finish
threadPool.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
subscriber.onComplete();
}
});
System.out.println("starting TestMultiThreadedObservable thread");
Reported by PMD.
Line: 374
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private static class BusyObserver extends DefaultSubscriber<String> {
Reported by PMD.
Line: 106
w.serialize().subscribe(busyobserver);
onSubscribe.waitToFinish();
System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
// we can't know how many onNext calls will occur since they each run on a separate thread
// that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
// assertEquals(3, busyobserver.onNextCount.get());
assertTrue(busyobserver.onNextCount.get() < 4);
Reported by PMD.
Line: 152
w.serialize().subscribe(busyobserver);
onSubscribe.waitToFinish();
System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
// this should not always be the full number of items since the error should (very often)
// stop it before it completes all 9
System.out.println("onNext count: " + busyobserver.onNextCount.get());
if (busyobserver.onNextCount.get() < 9) {
lessThan9 = true;
Reported by PMD.
Line: 155
System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
// this should not always be the full number of items since the error should (very often)
// stop it before it completes all 9
System.out.println("onNext count: " + busyobserver.onNextCount.get());
if (busyobserver.onNextCount.get() < 9) {
lessThan9 = true;
}
assertTrue(busyobserver.onError);
// no onComplete because onError was invoked
Reported by PMD.
Line: 218
try {
f.get();
} catch (Throwable e) {
System.err.println("Error while waiting on future in CompletionThread");
}
}
}
/* send the event */
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableElementAtTest.java
100 issues
Line: 32
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableElementAtTest extends RxJavaTest {
@Test
public void elementAtObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
.intValue());
Reported by PMD.
Line: 36
@Test
public void elementAtObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
.intValue());
}
@Test
public void elementAtWithIndexOutOfBoundsObservable() {
Reported by PMD.
Line: 36
@Test
public void elementAtObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
.intValue());
}
@Test
public void elementAtWithIndexOutOfBoundsObservable() {
Reported by PMD.
Line: 36
@Test
public void elementAtObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
.intValue());
}
@Test
public void elementAtWithIndexOutOfBoundsObservable() {
Reported by PMD.
Line: 36
@Test
public void elementAtObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
.intValue());
}
@Test
public void elementAtWithIndexOutOfBoundsObservable() {
Reported by PMD.
Line: 36
@Test
public void elementAtObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1).toObservable().blockingSingle()
.intValue());
}
@Test
public void elementAtWithIndexOutOfBoundsObservable() {
Reported by PMD.
Line: 42
@Test
public void elementAtWithIndexOutOfBoundsObservable() {
assertEquals(-99, Observable.fromArray(1, 2).elementAt(2).toObservable().blockingSingle(-99).intValue());
}
@Test
public void elementAtOrDefaultObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1, 0).toObservable().blockingSingle().intValue());
Reported by PMD.
Line: 42
@Test
public void elementAtWithIndexOutOfBoundsObservable() {
assertEquals(-99, Observable.fromArray(1, 2).elementAt(2).toObservable().blockingSingle(-99).intValue());
}
@Test
public void elementAtOrDefaultObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1, 0).toObservable().blockingSingle().intValue());
Reported by PMD.
Line: 42
@Test
public void elementAtWithIndexOutOfBoundsObservable() {
assertEquals(-99, Observable.fromArray(1, 2).elementAt(2).toObservable().blockingSingle(-99).intValue());
}
@Test
public void elementAtOrDefaultObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1, 0).toObservable().blockingSingle().intValue());
Reported by PMD.
Line: 42
@Test
public void elementAtWithIndexOutOfBoundsObservable() {
assertEquals(-99, Observable.fromArray(1, 2).elementAt(2).toObservable().blockingSingle(-99).intValue());
}
@Test
public void elementAtOrDefaultObservable() {
assertEquals(2, Observable.fromArray(1, 2).elementAt(1, 0).toObservable().blockingSingle().intValue());
Reported by PMD.
src/main/java/io/reactivex/rxjava3/core/Observable.java
99 issues
Line: 13187
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
/**
* Operator implementations (both source and intermediate) should implement this method that
Reported by PMD.
Line: 13169
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.core;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
Reported by PMD.
Line: 102
* @see Flowable
* @see io.reactivex.rxjava3.observers.DisposableObserver
*/
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
/**
* Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
* a termination notification.
* <p>
Reported by PMD.
Line: 102
* @see Flowable
* @see io.reactivex.rxjava3.observers.DisposableObserver
*/
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
/**
* Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
* a termination notification.
* <p>
Reported by PMD.
Line: 102
* @see Flowable
* @see io.reactivex.rxjava3.observers.DisposableObserver
*/
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
/**
* Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
* a termination notification.
* <p>
Reported by PMD.
Line: 102
* @see Flowable
* @see io.reactivex.rxjava3.observers.DisposableObserver
*/
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
/**
* Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
* a termination notification.
* <p>
Reported by PMD.
Line: 102
* @see Flowable
* @see io.reactivex.rxjava3.observers.DisposableObserver
*/
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
/**
* Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
* a termination notification.
* <p>
Reported by PMD.
Line: 102
* @see Flowable
* @see io.reactivex.rxjava3.observers.DisposableObserver
*/
public abstract class Observable<@NonNull T> implements ObservableSource<T> {
/**
* Mirrors the one {@link ObservableSource} in an {@link Iterable} of several {@code ObservableSource}s that first either emits an item or sends
* a termination notification.
* <p>
Reported by PMD.
Line: 134
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <@NonNull T> Observable<T> amb(@NonNull Iterable<@NonNull ? extends ObservableSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new ObservableAmb<>(null, sources));
}
/**
* Mirrors the one {@link ObservableSource} in an array of several {@code ObservableSource}s that first either emits an item or sends
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeTest.java
98 issues
Line: 232
}
observer.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
System.out.println("starting TestObservable thread");
Reported by PMD.
Line: 155
fail(e.getMessage());
}
System.out.println("TestObservable thread finished");
verify(observer).onSubscribe((Disposable)notNull());
verify(observer, times(1)).onNext("one");
verify(observer, never()).onNext("two");
verify(observer, never()).onNext("three");
verify(observer, times(1)).onComplete();
Reported by PMD.
Line: 188
Disposable bs = Disposable.empty();
observer.onSubscribe(bs);
for (int i = 0; !bs.isDisposed(); i++) {
System.out.println("Emit: " + i);
count.incrementAndGet();
observer.onNext(i);
}
}
Reported by PMD.
Line: 198
@Override
public void accept(Integer t1) {
System.out.println("Receive: " + t1);
}
});
Reported by PMD.
Line: 219
@Override
public void subscribe(final Observer<? super String> observer) {
observer.onSubscribe(Disposable.empty());
System.out.println("TestObservable subscribed to ...");
t = new Thread(new Runnable() {
@Override
public void run() {
try {
Reported by PMD.
Line: 225
@Override
public void run() {
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
observer.onComplete();
Reported by PMD.
Line: 227
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
observer.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
Reported by PMD.
Line: 237
}
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
Reported by PMD.
Line: 239
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
private static Observable<Long> INFINITE_OBSERVABLE = Observable.unsafeCreate(new ObservableSource<Long>() {
Reported by PMD.
Line: 39
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableTakeTest extends RxJavaTest {
@Test
public void take1() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Observable<String> take = w.take(2);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/schedulers/TestSchedulerTest.java
98 issues
Line: 51
inner.schedulePeriodically(new Runnable() {
@Override
public void run() {
System.out.println(scheduler.now(TimeUnit.MILLISECONDS));
try {
calledOp.apply(scheduler.now(TimeUnit.MILLISECONDS));
} catch (Throwable ex) {
ExceptionHelper.wrapOrThrow(ex);
}
Reported by PMD.
Line: 101
final Disposable subscription = inner.schedulePeriodically(new Runnable() {
@Override
public void run() {
System.out.println(scheduler.now(TimeUnit.MILLISECONDS));
try {
calledOp.apply(scheduler.now(TimeUnit.MILLISECONDS));
} catch (Throwable ex) {
ExceptionHelper.wrapOrThrow(ex);
}
Reported by PMD.
Line: 150
@Override
public void run() {
counter.incrementAndGet();
System.out.println("counter: " + counter.get());
inner.schedule(this);
}
});
inner.dispose();
Reported by PMD.
Line: 174
@Override
public void run() {
counter.incrementAndGet();
System.out.println("counter: " + counter.get());
inner.schedule(this);
}
});
subscription.dispose();
Reported by PMD.
Line: 36
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.TestScheduler.*;
public class TestSchedulerTest extends RxJavaTest {
@SuppressWarnings("unchecked")
// mocking is unchecked, unfortunately
@Test
public final void periodicScheduling() throws Throwable {
Reported by PMD.
Line: 54
System.out.println(scheduler.now(TimeUnit.MILLISECONDS));
try {
calledOp.apply(scheduler.now(TimeUnit.MILLISECONDS));
} catch (Throwable ex) {
ExceptionHelper.wrapOrThrow(ex);
}
}
}, 1, 2, TimeUnit.SECONDS);
Reported by PMD.
Line: 60
}
}, 1, 2, TimeUnit.SECONDS);
verify(calledOp, never()).apply(anyLong());
InOrder inOrder = Mockito.inOrder(calledOp);
scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).apply(anyLong());
Reported by PMD.
Line: 65
InOrder inOrder = Mockito.inOrder(calledOp);
scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).apply(anyLong());
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, times(1)).apply(1000L);
scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 65
InOrder inOrder = Mockito.inOrder(calledOp);
scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).apply(anyLong());
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, times(1)).apply(1000L);
scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 68
inOrder.verify(calledOp, never()).apply(anyLong());
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, times(1)).apply(1000L);
scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
inOrder.verify(calledOp, never()).apply(3000L);
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSerializeTest.java
97 issues
Line: 268
}
observer.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
System.out.println("starting TestSingleThreadedObservable thread");
Reported by PMD.
Line: 282
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
Reported by PMD.
Line: 325
if (s == null) {
System.out.println("TestMultiThreadedObservable onNext: null");
// force an error
throw npe;
} else {
System.out.println("TestMultiThreadedObservable onNext: " + s);
}
observer.onNext(s);
// capture 'maxThreads'
Reported by PMD.
Line: 347
// we are done spawning threads
threadPool.shutdown();
} catch (Throwable e) {
throw new RuntimeException(e);
}
// wait until all threads are done, then mark it as COMPLETED
try {
// wait for all the threads to finish
Reported by PMD.
Line: 355
// wait for all the threads to finish
threadPool.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
observer.onComplete();
}
});
System.out.println("starting TestMultiThreadedObservable thread");
Reported by PMD.
Line: 369
try {
t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private static class BusyObserver extends DefaultObserver<String> {
Reported by PMD.
Line: 105
w.serialize().subscribe(busyobserver);
onSubscribe.waitToFinish();
System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
// we can't know how many onNext calls will occur since they each run on a separate thread
// that depends on thread scheduling so 0, 1, 2 and 3 are all valid options
// assertEquals(3, busyobserver.onNextCount.get());
assertTrue(busyobserver.onNextCount.get() < 4);
Reported by PMD.
Line: 151
w.serialize().subscribe(busyobserver);
onSubscribe.waitToFinish();
System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
// this should not always be the full number of items since the error should (very often)
// stop it before it completes all 9
System.out.println("onNext count: " + busyobserver.onNextCount.get());
if (busyobserver.onNextCount.get() < 9) {
lessThan9 = true;
Reported by PMD.
Line: 154
System.out.println("maxConcurrentThreads: " + onSubscribe.maxConcurrentThreads.get());
// this should not always be the full number of items since the error should (very often)
// stop it before it completes all 9
System.out.println("onNext count: " + busyobserver.onNextCount.get());
if (busyobserver.onNextCount.get() < 9) {
lessThan9 = true;
}
assertTrue(busyobserver.onError);
// no onComplete because onError was invoked
Reported by PMD.
Line: 218
try {
f.get();
} catch (Throwable e) {
System.err.println("Error while waiting on future in CompletionThread");
}
}
}
/* send the event */
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapSingleTest.java
96 issues
Line: 34
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableFlatMapSingleTest extends RxJavaTest {
@Test
public void normal() {
Observable.range(1, 10)
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
Reported by PMD.
Line: 37
public class ObservableFlatMapSingleTest extends RxJavaTest {
@Test
public void normal() {
Observable.range(1, 10)
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return Single.just(v);
Reported by PMD.
Line: 50
}
@Test
public void normalDelayError() {
Observable.range(1, 10)
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return Single.just(v);
Reported by PMD.
Line: 63
}
@Test
public void normalAsync() {
TestObserverEx<Integer> to = Observable.range(1, 10)
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return Single.just(v).subscribeOn(Schedulers.computation());
Reported by PMD.
Line: 68
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return Single.just(v).subscribeOn(Schedulers.computation());
}
})
.to(TestHelper.<Integer>testConsumer())
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
Reported by PMD.
Line: 81
}
@Test
public void mapperThrowsObservable() {
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ps
.flatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
Reported by PMD.
Line: 93
})
.test();
assertTrue(ps.hasObservers());
ps.onNext(1);
to.assertFailure(TestException.class);
Reported by PMD.
Line: 93
})
.test();
assertTrue(ps.hasObservers());
ps.onNext(1);
to.assertFailure(TestException.class);
Reported by PMD.
Line: 95
assertTrue(ps.hasObservers());
ps.onNext(1);
to.assertFailure(TestException.class);
assertFalse(ps.hasObservers());
}
Reported by PMD.
Line: 99
to.assertFailure(TestException.class);
assertFalse(ps.hasObservers());
}
@Test
public void mapperReturnsNullObservable() {
PublishSubject<Integer> ps = PublishSubject.create();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowTimed.java
95 issues
Line: 453
}
}
UnicastSubject<T> createNewWindow(UnicastSubject<T> window) {
if (window != null) {
window.onComplete();
window = null;
}
Reported by PMD.
Line: 453
}
}
UnicastSubject<T> createNewWindow(UnicastSubject<T> window) {
if (window != null) {
window.onComplete();
window = null;
}
Reported by PMD.
Line: 31
import io.reactivex.rxjava3.subjects.UnicastSubject;
public final class ObservableWindowTimed<T> extends AbstractObservableWithUpstream<T, Observable<T>> {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
Reported by PMD.
Line: 32
public final class ObservableWindowTimed<T> extends AbstractObservableWithUpstream<T, Observable<T>> {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
Reported by PMD.
Line: 33
public final class ObservableWindowTimed<T> extends AbstractObservableWithUpstream<T, Observable<T>> {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
Reported by PMD.
Line: 34
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
public ObservableWindowTimed(Observable<T> source,
Reported by PMD.
Line: 35
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
public ObservableWindowTimed(Observable<T> source,
long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,
Reported by PMD.
Line: 36
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
public ObservableWindowTimed(Observable<T> source,
long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,
int bufferSize, boolean restartTimerOnMaxSize) {
Reported by PMD.
Line: 37
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
public ObservableWindowTimed(Observable<T> source,
long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,
int bufferSize, boolean restartTimerOnMaxSize) {
super(source);
Reported by PMD.
Line: 76
implements Observer<T>, Disposable {
private static final long serialVersionUID = 5724293814035355511L;
final Observer<? super Observable<T>> downstream;
final SimplePlainQueue<Object> queue;
final long timespan;
final TimeUnit unit;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferTest.java
95 issues
Line: 175
@Override
public void run() {
throw new RuntimeException();
}
};
@Test
public void nonFatalExceptionThrownByOnOverflowIsNotReportedByUpstream() {
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableOnBackpressureBufferTest extends RxJavaTest {
@Test
public void noBackpressureSupport() {
TestSubscriber<Long> ts = new TestSubscriber<>(0L);
// this will be ignored
Reported by PMD.
Line: 46
// this will be ignored
ts.request(100);
// we take 500 so it unsubscribes
infinite.take(500).subscribe(ts);
// it completely ignores the `request(100)` and we get 500
assertEquals(500, ts.values().size());
ts.assertNoErrors();
}
Reported by PMD.
Line: 48
// we take 500 so it unsubscribes
infinite.take(500).subscribe(ts);
// it completely ignores the `request(100)` and we get 500
assertEquals(500, ts.values().size());
ts.assertNoErrors();
}
@Test
public void fixBackpressureWithBuffer() throws InterruptedException {
Reported by PMD.
Line: 48
// we take 500 so it unsubscribes
infinite.take(500).subscribe(ts);
// it completely ignores the `request(100)` and we get 500
assertEquals(500, ts.values().size());
ts.assertNoErrors();
}
@Test
public void fixBackpressureWithBuffer() throws InterruptedException {
Reported by PMD.
Line: 53
}
@Test
public void fixBackpressureWithBuffer() throws InterruptedException {
final CountDownLatch l1 = new CountDownLatch(100);
final CountDownLatch l2 = new CountDownLatch(150);
TestSubscriber<Long> ts = new TestSubscriber<>(new DefaultSubscriber<Long>() {
@Override
Reported by PMD.
Line: 80
// this will be ignored
ts.request(100);
// we take 500 so it unsubscribes
infinite.subscribeOn(Schedulers.computation())
.onBackpressureBuffer()
.take(500)
.subscribe(ts);
// it completely ignores the `request(100)` and we get 500
Reported by PMD.
Line: 80
// this will be ignored
ts.request(100);
// we take 500 so it unsubscribes
infinite.subscribeOn(Schedulers.computation())
.onBackpressureBuffer()
.take(500)
.subscribe(ts);
// it completely ignores the `request(100)` and we get 500
Reported by PMD.
Line: 80
// this will be ignored
ts.request(100);
// we take 500 so it unsubscribes
infinite.subscribeOn(Schedulers.computation())
.onBackpressureBuffer()
.take(500)
.subscribe(ts);
// it completely ignores the `request(100)` and we get 500
Reported by PMD.
Line: 87
// it completely ignores the `request(100)` and we get 500
l1.await();
assertEquals(100, ts.values().size());
ts.request(50);
l2.await();
assertEquals(150, ts.values().size());
ts.request(350);
ts.awaitDone(5, TimeUnit.SECONDS);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromIterableTest.java
95 issues
Line: 37
import io.reactivex.rxjava3.observers.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableFromIterableTest extends RxJavaTest {
@Test
public void listIterable() {
Observable<String> o = Observable.fromIterable(Arrays.<String> asList("one", "two", "three"));
Reported by PMD.
Line: 41
@Test
public void listIterable() {
Observable<String> o = Observable.fromIterable(Arrays.<String> asList("one", "two", "three"));
Observer<String> observer = TestHelper.mockObserver();
o.subscribe(observer);
Reported by PMD.
Line: 41
@Test
public void listIterable() {
Observable<String> o = Observable.fromIterable(Arrays.<String> asList("one", "two", "three"));
Observer<String> observer = TestHelper.mockObserver();
o.subscribe(observer);
Reported by PMD.
Line: 41
@Test
public void listIterable() {
Observable<String> o = Observable.fromIterable(Arrays.<String> asList("one", "two", "three"));
Observer<String> observer = TestHelper.mockObserver();
o.subscribe(observer);
Reported by PMD.
Line: 45
Observer<String> observer = TestHelper.mockObserver();
o.subscribe(observer);
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
Reported by PMD.
Line: 47
o.subscribe(observer);
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 48
o.subscribe(observer);
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 49
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
/**
Reported by PMD.
Line: 50
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
/**
* This tests the path that can not optimize based on size so must use setProducer.
Reported by PMD.
Line: 51
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
/**
* This tests the path that can not optimize based on size so must use setProducer.
*/
Reported by PMD.