The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipIterableTest.java
156 issues
Line: 342
Consumer<String> printer = new Consumer<String>() {
@Override
public void accept(String pv) {
System.out.println(pv);
}
};
static final class SquareStr implements Function<Integer, String> {
final AtomicInteger counter = new AtomicInteger();
Reported by PMD.
Line: 351
@Override
public String apply(Integer t1) {
counter.incrementAndGet();
System.out.println("Omg I'm calculating so hard: " + t1 + "*" + t1 + "=" + (t1 * t1));
return " " + (t1 * t1);
}
}
@Test
Reported by PMD.
Line: 37
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableZipIterableTest extends RxJavaTest {
BiFunction<String, String, String> concat2Strings;
PublishSubject<String> s1;
PublishSubject<String> s2;
Observable<String> zipped;
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableZipIterableTest extends RxJavaTest {
BiFunction<String, String, String> concat2Strings;
PublishSubject<String> s1;
PublishSubject<String> s2;
Observable<String> zipped;
Observer<String> observer;
Reported by PMD.
Line: 39
public class ObservableZipIterableTest extends RxJavaTest {
BiFunction<String, String, String> concat2Strings;
PublishSubject<String> s1;
PublishSubject<String> s2;
Observable<String> zipped;
Observer<String> observer;
InOrder inOrder;
Reported by PMD.
Line: 40
public class ObservableZipIterableTest extends RxJavaTest {
BiFunction<String, String, String> concat2Strings;
PublishSubject<String> s1;
PublishSubject<String> s2;
Observable<String> zipped;
Observer<String> observer;
InOrder inOrder;
Reported by PMD.
Line: 41
BiFunction<String, String, String> concat2Strings;
PublishSubject<String> s1;
PublishSubject<String> s2;
Observable<String> zipped;
Observer<String> observer;
InOrder inOrder;
@Before
Reported by PMD.
Line: 43
PublishSubject<String> s2;
Observable<String> zipped;
Observer<String> observer;
InOrder inOrder;
@Before
public void setUp() {
concat2Strings = new BiFunction<String, String, String>() {
Reported by PMD.
Line: 44
Observable<String> zipped;
Observer<String> observer;
InOrder inOrder;
@Before
public void setUp() {
concat2Strings = new BiFunction<String, String, String>() {
@Override
Reported by PMD.
Line: 65
zipped.subscribe(observer);
}
BiFunction<Object, Object, String> zipr2 = new BiFunction<Object, Object, String>() {
@Override
public String apply(Object t1, Object t2) {
return "" + t1 + t2;
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCacheTest.java
155 issues
Line: 99
@Override
public void run() {
counter.incrementAndGet();
System.out.println("published observable being executed");
subscriber.onNext("one");
subscriber.onComplete();
}
}).start();
}
Reported by PMD.
Line: 115
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
// subscribe again
Reported by PMD.
Line: 125
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
Reported by PMD.
Line: 35
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableCacheTest extends RxJavaTest {
@Test
public void coldReplayNoBackpressure() {
FlowableCache<Integer> source = new FlowableCache<>(Flowable.range(0, 1000), 16);
assertFalse("Source is connected!", source.isConnected());
Reported by PMD.
Line: 37
public class FlowableCacheTest extends RxJavaTest {
@Test
public void coldReplayNoBackpressure() {
FlowableCache<Integer> source = new FlowableCache<>(Flowable.range(0, 1000), 16);
assertFalse("Source is connected!", source.isConnected());
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
Reported by PMD.
Line: 52
ts.assertNoErrors();
ts.assertTerminated();
List<Integer> onNextEvents = ts.values();
assertEquals(1000, onNextEvents.size());
for (int i = 0; i < 1000; i++) {
assertEquals((Integer)i, onNextEvents.get(i));
}
}
Reported by PMD.
Line: 52
ts.assertNoErrors();
ts.assertTerminated();
List<Integer> onNextEvents = ts.values();
assertEquals(1000, onNextEvents.size());
for (int i = 0; i < 1000; i++) {
assertEquals((Integer)i, onNextEvents.get(i));
}
}
Reported by PMD.
Line: 55
assertEquals(1000, onNextEvents.size());
for (int i = 0; i < 1000; i++) {
assertEquals((Integer)i, onNextEvents.get(i));
}
}
@Test
public void coldReplayBackpressure() {
Reported by PMD.
Line: 60
}
@Test
public void coldReplayBackpressure() {
FlowableCache<Integer> source = new FlowableCache<>(Flowable.range(0, 1000), 16);
assertFalse("Source is connected!", source.isConnected());
TestSubscriber<Integer> ts = new TestSubscriber<>(0L);
Reported by PMD.
Line: 76
ts.assertNoErrors();
ts.assertNotComplete();
List<Integer> onNextEvents = ts.values();
assertEquals(10, onNextEvents.size());
for (int i = 0; i < 10; i++) {
assertEquals((Integer)i, onNextEvents.get(i));
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeUntilTest.java
152 issues
Line: 29
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableTakeUntilTest extends RxJavaTest {
@Test
public void takeUntil() {
Disposable sSource = mock(Disposable.class);
Disposable sOther = mock(Disposable.class);
Reported by PMD.
Line: 39
TestObservable other = new TestObservable(sOther);
Observer<String> result = TestHelper.mockObserver();
Observable<String> stringObservable = Observable.unsafeCreate(source)
.takeUntil(Observable.unsafeCreate(other));
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnNext("three");
Reported by PMD.
Line: 41
Observer<String> result = TestHelper.mockObserver();
Observable<String> stringObservable = Observable.unsafeCreate(source)
.takeUntil(Observable.unsafeCreate(other));
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnNext("three");
source.sendOnNext("four");
source.sendOnCompleted();
Reported by PMD.
Line: 42
Observable<String> stringObservable = Observable.unsafeCreate(source)
.takeUntil(Observable.unsafeCreate(other));
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnNext("three");
source.sendOnNext("four");
source.sendOnCompleted();
other.sendOnCompleted();
Reported by PMD.
Line: 43
.takeUntil(Observable.unsafeCreate(other));
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnNext("three");
source.sendOnNext("four");
source.sendOnCompleted();
other.sendOnCompleted();
Reported by PMD.
Line: 44
stringObservable.subscribe(result);
source.sendOnNext("one");
source.sendOnNext("two");
other.sendOnNext("three");
source.sendOnNext("four");
source.sendOnCompleted();
other.sendOnCompleted();
verify(result, times(1)).onNext("one");
Reported by PMD.
Line: 49
source.sendOnCompleted();
other.sendOnCompleted();
verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(0)).onNext("four");
verify(sSource, times(1)).dispose();
verify(sOther, times(1)).dispose();
Reported by PMD.
Line: 50
other.sendOnCompleted();
verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(0)).onNext("four");
verify(sSource, times(1)).dispose();
verify(sOther, times(1)).dispose();
Reported by PMD.
Line: 51
verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(0)).onNext("four");
verify(sSource, times(1)).dispose();
verify(sOther, times(1)).dispose();
}
Reported by PMD.
Line: 52
verify(result, times(1)).onNext("one");
verify(result, times(1)).onNext("two");
verify(result, times(0)).onNext("three");
verify(result, times(0)).onNext("four");
verify(sSource, times(1)).dispose();
verify(sOther, times(1)).dispose();
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java
152 issues
Line: 746
// can't null out the head's value because of late replayers would see null
setFirst(next);
}
/* test */ final void removeSome(int n) {
Node head = get();
while (n > 0) {
head = head.get();
n--;
size--;
Reported by PMD.
Line: 765
* Arranges the given node is the new head from now on.
* @param n the Node instance to set as first
*/
final void setFirst(Node n) {
if (eagerTruncate) {
Node m = new Node(null, n.index);
m.lazySet(n.get());
n = m;
}
Reported by PMD.
Line: 34
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.Timed;
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T> {
/** The source observable. */
final Flowable<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplaySubscriber<T>> current;
/** A factory that creates the appropriate buffer for the ReplaySubscriber. */
Reported by PMD.
Line: 36
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T> {
/** The source observable. */
final Flowable<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplaySubscriber<T>> current;
/** A factory that creates the appropriate buffer for the ReplaySubscriber. */
final Supplier<? extends ReplayBuffer<T>> bufferFactory;
Reported by PMD.
Line: 36
public final class FlowableReplay<T> extends ConnectableFlowable<T> implements HasUpstreamPublisher<T> {
/** The source observable. */
final Flowable<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplaySubscriber<T>> current;
/** A factory that creates the appropriate buffer for the ReplaySubscriber. */
final Supplier<? extends ReplayBuffer<T>> bufferFactory;
Reported by PMD.
Line: 38
/** The source observable. */
final Flowable<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplaySubscriber<T>> current;
/** A factory that creates the appropriate buffer for the ReplaySubscriber. */
final Supplier<? extends ReplayBuffer<T>> bufferFactory;
final Publisher<T> onSubscribe;
Reported by PMD.
Line: 40
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplaySubscriber<T>> current;
/** A factory that creates the appropriate buffer for the ReplaySubscriber. */
final Supplier<? extends ReplayBuffer<T>> bufferFactory;
final Publisher<T> onSubscribe;
@SuppressWarnings("rawtypes")
static final Supplier DEFAULT_UNBOUNDED_FACTORY = new DefaultUnboundedFactory();
Reported by PMD.
Line: 42
/** A factory that creates the appropriate buffer for the ReplaySubscriber. */
final Supplier<? extends ReplayBuffer<T>> bufferFactory;
final Publisher<T> onSubscribe;
@SuppressWarnings("rawtypes")
static final Supplier DEFAULT_UNBOUNDED_FACTORY = new DefaultUnboundedFactory();
/**
Reported by PMD.
Line: 68
* @param source the source Publisher to use
* @return the new ConnectableObservable instance
*/
@SuppressWarnings("unchecked")
public static <T> ConnectableFlowable<T> createFrom(Flowable<? extends T> source) {
return create(source, DEFAULT_UNBOUNDED_FACTORY);
}
/**
Reported by PMD.
Line: 157
@Override
public void reset() {
ReplaySubscriber<T> conn = current.get();
if (conn != null && conn.isDisposed()) {
current.compareAndSet(conn, null);
}
}
@Override
Reported by PMD.
src/main/java/io/reactivex/rxjava3/processors/ReplayProcessor.java
152 issues
Line: 700
@Override
@SuppressWarnings("unchecked")
public T[] getValues(T[] array) {
int s = size;
if (s == 0) {
if (array.length != 0) {
array[0] = null;
}
Reported by PMD.
Line: 935
@Override
@SuppressWarnings("unchecked")
public T[] getValues(T[] array) {
int s = 0;
Node<T> h = head;
Node<T> h0 = h;
for (;;) {
Node<T> next = h0.get();
Reported by PMD.
Line: 1204
@Override
@SuppressWarnings("unchecked")
public T[] getValues(T[] array) {
TimedNode<T> h = getHead();
int s = size(h);
if (s == 0) {
if (array.length != 0) {
Reported by PMD.
Line: 1336
return size(getHead());
}
int size(TimedNode<T> h) {
int s = 0;
while (s != Integer.MAX_VALUE) {
TimedNode<T> next = h.get();
if (next == null) {
break;
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.processors;
import java.lang.reflect.Array;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.*;
Reported by PMD.
Line: 144
*
* @param <T> the value type
*/
public final class ReplayProcessor<@NonNull T> extends FlowableProcessor<T> {
/** An empty array to avoid allocation in getValues(). */
private static final Object[] EMPTY_ARRAY = new Object[0];
final ReplayBuffer<T> buffer;
Reported by PMD.
Line: 144
*
* @param <T> the value type
*/
public final class ReplayProcessor<@NonNull T> extends FlowableProcessor<T> {
/** An empty array to avoid allocation in getValues(). */
private static final Object[] EMPTY_ARRAY = new Object[0];
final ReplayBuffer<T> buffer;
Reported by PMD.
Line: 144
*
* @param <T> the value type
*/
public final class ReplayProcessor<@NonNull T> extends FlowableProcessor<T> {
/** An empty array to avoid allocation in getValues(). */
private static final Object[] EMPTY_ARRAY = new Object[0];
final ReplayBuffer<T> buffer;
Reported by PMD.
Line: 148
/** An empty array to avoid allocation in getValues(). */
private static final Object[] EMPTY_ARRAY = new Object[0];
final ReplayBuffer<T> buffer;
boolean done;
final AtomicReference<ReplaySubscription<T>[]> subscribers;
Reported by PMD.
Line: 150
final ReplayBuffer<T> buffer;
boolean done;
final AtomicReference<ReplaySubscription<T>[]> subscribers;
@SuppressWarnings("rawtypes")
static final ReplaySubscription[] EMPTY = new ReplaySubscription[0];
Reported by PMD.
src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerInterruptibleTest.java
152 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.schedulers;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.*;
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ExecutorSchedulerInterruptibleTest extends AbstractSchedulerConcurrencyTests {
static final Executor executor = Executors.newFixedThreadPool(2, new RxThreadFactory("TestCustomPool"));
@Override
protected Scheduler getScheduler() {
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ExecutorSchedulerInterruptibleTest extends AbstractSchedulerConcurrencyTests {
static final Executor executor = Executors.newFixedThreadPool(2, new RxThreadFactory("TestCustomPool"));
@Override
protected Scheduler getScheduler() {
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ExecutorSchedulerInterruptibleTest extends AbstractSchedulerConcurrencyTests {
static final Executor executor = Executors.newFixedThreadPool(2, new RxThreadFactory("TestCustomPool"));
@Override
protected Scheduler getScheduler() {
Reported by PMD.
Line: 43
}
@Test
public final void handledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
SchedulerTestHelper.handledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
@Test
public void cancelledTaskRetention() throws InterruptedException {
Reported by PMD.
Line: 48
}
@Test
public void cancelledTaskRetention() throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Scheduler s = Schedulers.from(exec, true);
try {
Scheduler.Worker w = s.createWorker();
try {
Reported by PMD.
Line: 56
try {
ExecutorSchedulerTest.cancelledRetention(w, false);
} finally {
w.dispose();
}
w = s.createWorker();
try {
ExecutorSchedulerTest.cancelledRetention(w, true);
Reported by PMD.
Line: 63
try {
ExecutorSchedulerTest.cancelledRetention(w, true);
} finally {
w.dispose();
}
} finally {
exec.shutdownNow();
}
}
Reported by PMD.
Line: 72
/** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */
static final class TestExecutor implements Executor {
final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
@Override
public void execute(Runnable command) {
queue.offer(command);
}
public void executeOne() {
Reported by PMD.
Line: 85
}
public void executeAll() {
Runnable r;
while ((r = queue.poll()) != null) {
r.run();
}
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAnyTest.java
149 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 36
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableAnyTest extends RxJavaTest {
@Test
public void anyWithTwoItems() {
Flowable<Integer> w = Flowable.just(1, 2);
Single<Boolean> single = w.any(new Predicate<Integer>() {
Reported by PMD.
Line: 52
single.subscribe(observer);
verify(observer, never()).onSuccess(false);
verify(observer, times(1)).onSuccess(true);
verify(observer, never()).onError(any(Throwable.class));
}
@Test
Reported by PMD.
Line: 53
single.subscribe(observer);
verify(observer, never()).onSuccess(false);
verify(observer, times(1)).onSuccess(true);
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void isEmptyWithTwoItems() {
Reported by PMD.
Line: 54
verify(observer, never()).onSuccess(false);
verify(observer, times(1)).onSuccess(true);
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void isEmptyWithTwoItems() {
Flowable<Integer> w = Flowable.just(1, 2);
Reported by PMD.
Line: 60
@Test
public void isEmptyWithTwoItems() {
Flowable<Integer> w = Flowable.just(1, 2);
Single<Boolean> single = w.isEmpty();
SingleObserver<Boolean> observer = TestHelper.mockSingleObserver();
single.subscribe(observer);
Reported by PMD.
Line: 64
SingleObserver<Boolean> observer = TestHelper.mockSingleObserver();
single.subscribe(observer);
verify(observer, never()).onSuccess(true);
verify(observer, times(1)).onSuccess(false);
verify(observer, never()).onError(any(Throwable.class));
}
Reported by PMD.
Line: 66
single.subscribe(observer);
verify(observer, never()).onSuccess(true);
verify(observer, times(1)).onSuccess(false);
verify(observer, never()).onError(any(Throwable.class));
}
@Test
Reported by PMD.
Line: 67
single.subscribe(observer);
verify(observer, never()).onSuccess(true);
verify(observer, times(1)).onSuccess(false);
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void anyWithOneItem() {
Reported by PMD.
Line: 68
verify(observer, never()).onSuccess(true);
verify(observer, times(1)).onSuccess(false);
verify(observer, never()).onError(any(Throwable.class));
}
@Test
public void anyWithOneItem() {
Flowable<Integer> w = Flowable.just(1);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java
149 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.*;
Reported by PMD.
Line: 37
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableWindowWithStartEndFlowableTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
@Before
Reported by PMD.
Line: 39
public class FlowableWindowWithStartEndFlowableTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
scheduler = new TestScheduler();
Reported by PMD.
Line: 40
public class FlowableWindowWithStartEndFlowableTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
scheduler = new TestScheduler();
innerScheduler = scheduler.createWorker();
Reported by PMD.
Line: 49
}
@Test
public void flowableBasedOpenerAndCloser() {
final List<String> list = new ArrayList<>();
final List<List<String>> lists = new ArrayList<>();
Flowable<String> source = Flowable.unsafeCreate(new Publisher<String>() {
@Override
Reported by PMD.
Line: 91
};
Flowable<Flowable<String>> windowed = source.window(openings, closer);
windowed.subscribe(observeWindow(list, lists));
scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
assertEquals(2, lists.size());
assertEquals(lists.get(0), list("two", "three"));
assertEquals(lists.get(1), list("five"));
Reported by PMD.
Line: 151
}
@Test
public void noUnsubscribeAndNoLeak() {
PublishProcessor<Integer> source = PublishProcessor.create();
PublishProcessor<Integer> open = PublishProcessor.create();
final PublishProcessor<Integer> close = PublishProcessor.create();
Reported by PMD.
Line: 173
})
.subscribe(ts);
open.onNext(1);
source.onNext(1);
assertTrue(open.hasSubscribers());
assertTrue(close.hasSubscribers());
Reported by PMD.
Line: 174
.subscribe(ts);
open.onNext(1);
source.onNext(1);
assertTrue(open.hasSubscribers());
assertTrue(close.hasSubscribers());
close.onNext(1);
Reported by PMD.
Line: 176
open.onNext(1);
source.onNext(1);
assertTrue(open.hasSubscribers());
assertTrue(close.hasSubscribers());
close.onNext(1);
assertFalse(close.hasSubscribers());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableMergeDelayErrorTest.java
148 issues
Line: 151
o3.t.join();
o4.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
verify(stringObserver, times(1)).onNext("one");
verify(stringObserver, times(1)).onNext("two");
verify(stringObserver, times(1)).onNext("three");
Reported by PMD.
Line: 279
o1.t.join();
o2.t.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
verify(stringObserver, never()).onError(any(Throwable.class));
verify(stringObserver, times(2)).onNext("hello");
verify(stringObserver, times(1)).onComplete();
Reported by PMD.
Line: 359
boolean errorThrown = false;
for (String s : valuesToReturn) {
if (s == null) {
System.out.println("throwing exception");
observer.onError(new NullPointerException());
errorThrown = true;
// purposefully not returning here so it will continue calling onNext
// so that we also test that we handle bad sequences like this
} else {
Reported by PMD.
Line: 393
public void run() {
for (String s : valuesToReturn) {
if (s == null) {
System.out.println("throwing exception");
try {
Thread.sleep(100);
} catch (Throwable e) {
}
Reported by PMD.
Line: 405
observer.onNext(s);
}
}
System.out.println("subscription complete");
observer.onComplete();
}
});
t.start();
Reported by PMD.
Line: 468
TestObserverEx<String> to = new TestObserverEx<>(stringObserver);
Observable<String> m = Observable.mergeDelayError(parentObservable);
m.subscribe(to);
System.out.println("testErrorInParentObservableDelayed | " + i);
to.awaitDone(2000, TimeUnit.MILLISECONDS);
to.assertTerminated();
verify(stringObserver, times(2)).onNext("hello");
verify(stringObserver, times(1)).onError(any(NullPointerException.class));
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.observers.DefaultObserver;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableMergeDelayErrorTest extends RxJavaTest {
Observer<String> stringObserver;
@Before
public void before() {
Reported by PMD.
Line: 35
public class ObservableMergeDelayErrorTest extends RxJavaTest {
Observer<String> stringObserver;
@Before
public void before() {
stringObserver = TestHelper.mockObserver();
}
Reported by PMD.
Line: 44
@Test
public void errorDelayed1() {
final Observable<String> o1 = Observable.unsafeCreate(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
final Observable<String> o2 = Observable.unsafeCreate(new TestErrorObservable("one", "two", "three"));
Observable<String> m = Observable.mergeDelayError(o1, o2);
m.subscribe(stringObserver);
Reported by PMD.
Line: 44
@Test
public void errorDelayed1() {
final Observable<String> o1 = Observable.unsafeCreate(new TestErrorObservable("four", null, "six")); // we expect to lose "six" from the source (and it should never be sent by the source since onError was called
final Observable<String> o2 = Observable.unsafeCreate(new TestErrorObservable("one", "two", "three"));
Observable<String> m = Observable.mergeDelayError(o1, o2);
m.subscribe(stringObserver);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAnyTest.java
147 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 34
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableAnyTest extends RxJavaTest {
@Test
public void anyWithTwoItemsObservable() {
Observable<Integer> w = Observable.just(1, 2);
Observable<Boolean> observable = w.any(new Predicate<Integer>() {
Reported by PMD.
Line: 50
observable.subscribe(observer);
verify(observer, never()).onNext(false);
verify(observer, times(1)).onNext(true);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 51
observable.subscribe(observer);
verify(observer, never()).onNext(false);
verify(observer, times(1)).onNext(true);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 52
verify(observer, never()).onNext(false);
verify(observer, times(1)).onNext(true);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void isEmptyWithTwoItemsObservable() {
Reported by PMD.
Line: 53
verify(observer, never()).onNext(false);
verify(observer, times(1)).onNext(true);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void isEmptyWithTwoItemsObservable() {
Observable<Integer> w = Observable.just(1, 2);
Reported by PMD.
Line: 59
@Test
public void isEmptyWithTwoItemsObservable() {
Observable<Integer> w = Observable.just(1, 2);
Observable<Boolean> observable = w.isEmpty().toObservable();
Observer<Boolean> observer = TestHelper.mockObserver();
observable.subscribe(observer);
Reported by PMD.
Line: 59
@Test
public void isEmptyWithTwoItemsObservable() {
Observable<Integer> w = Observable.just(1, 2);
Observable<Boolean> observable = w.isEmpty().toObservable();
Observer<Boolean> observer = TestHelper.mockObserver();
observable.subscribe(observer);
Reported by PMD.
Line: 63
Observer<Boolean> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, never()).onNext(true);
verify(observer, times(1)).onNext(false);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
Reported by PMD.
Line: 65
observable.subscribe(observer);
verify(observer, never()).onNext(true);
verify(observer, times(1)).onNext(false);
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.