The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeTimeoutPublisherTest.java
119 issues
Line: 32
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.testsupport.*;
public class MaybeTimeoutPublisherTest extends RxJavaTest {
@Test
public void mainError() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
Reported by PMD.
Line: 35
public class MaybeTimeoutPublisherTest extends RxJavaTest {
@Test
public void mainError() {
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleElement().timeout(pp2).test();
Reported by PMD.
Line: 39
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleElement().timeout(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
Reported by PMD.
Line: 39
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleElement().timeout(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
Reported by PMD.
Line: 39
PublishProcessor<Integer> pp1 = PublishProcessor.create();
PublishProcessor<Integer> pp2 = PublishProcessor.create();
TestObserver<Integer> to = pp1.singleElement().timeout(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
Reported by PMD.
Line: 41
TestObserver<Integer> to = pp1.singleElement().timeout(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
assertFalse(pp1.hasSubscribers());
Reported by PMD.
Line: 41
TestObserver<Integer> to = pp1.singleElement().timeout(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
assertFalse(pp1.hasSubscribers());
Reported by PMD.
Line: 42
TestObserver<Integer> to = pp1.singleElement().timeout(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
assertFalse(pp1.hasSubscribers());
assertFalse(pp2.hasSubscribers());
Reported by PMD.
Line: 42
TestObserver<Integer> to = pp1.singleElement().timeout(pp2).test();
assertTrue(pp1.hasSubscribers());
assertTrue(pp2.hasSubscribers());
pp1.onError(new TestException());
assertFalse(pp1.hasSubscribers());
assertFalse(pp2.hasSubscribers());
Reported by PMD.
Line: 46
pp1.onError(new TestException());
assertFalse(pp1.hasSubscribers());
assertFalse(pp2.hasSubscribers());
to.assertFailure(TestException.class);
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subscribers/SafeSubscriberTest.java
119 issues
Line: 148
@Override
public void cancel() {
// going to do nothing to pretend I'm a bad Observable that keeps allowing events to be sent
System.out.println("==> SynchronizeTest unsubscribe that does nothing!");
}
@Override
public void request(long n) {
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.subscribers;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.*;
public class SafeSubscriberTest extends RxJavaTest {
/**
* Ensure onNext can not be called after onError.
*/
@Test
Reported by PMD.
Line: 46
Subscriber<String> w = TestHelper.mockSubscriber();
st.subscribe(new SafeSubscriber<>(new TestSubscriber<>(w)));
t.sendOnNext("one");
t.sendOnError(new RuntimeException("bad"));
t.sendOnNext("two");
verify(w, times(1)).onNext("one");
verify(w, times(1)).onError(any(Throwable.class));
Reported by PMD.
Line: 48
t.sendOnNext("one");
t.sendOnError(new RuntimeException("bad"));
t.sendOnNext("two");
verify(w, times(1)).onNext("one");
verify(w, times(1)).onError(any(Throwable.class));
verify(w, Mockito.never()).onNext("two");
}
Reported by PMD.
Line: 50
t.sendOnError(new RuntimeException("bad"));
t.sendOnNext("two");
verify(w, times(1)).onNext("one");
verify(w, times(1)).onError(any(Throwable.class));
verify(w, Mockito.never()).onNext("two");
}
/**
Reported by PMD.
Line: 51
t.sendOnNext("two");
verify(w, times(1)).onNext("one");
verify(w, times(1)).onError(any(Throwable.class));
verify(w, Mockito.never()).onNext("two");
}
/**
* Ensure onComplete can not be called after onError.
Reported by PMD.
Line: 52
verify(w, times(1)).onNext("one");
verify(w, times(1)).onError(any(Throwable.class));
verify(w, Mockito.never()).onNext("two");
}
/**
* Ensure onComplete can not be called after onError.
*/
Reported by PMD.
Line: 71
t.sendOnError(new RuntimeException("bad"));
t.sendOnCompleted();
verify(w, times(1)).onNext("one");
verify(w, times(1)).onError(any(Throwable.class));
verify(w, Mockito.never()).onComplete();
}
/**
Reported by PMD.
Line: 72
t.sendOnCompleted();
verify(w, times(1)).onNext("one");
verify(w, times(1)).onError(any(Throwable.class));
verify(w, Mockito.never()).onComplete();
}
/**
* Ensure onNext can not be called after onComplete.
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTakeTest.java
118 issues
Line: 264
}
subscriber.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
System.out.println("starting TestFlowable thread");
Reported by PMD.
Line: 185
fail(e.getMessage());
}
System.out.println("TestFlowable thread finished");
verify(subscriber).onSubscribe((Subscription)notNull());
verify(subscriber, times(1)).onNext("one");
verify(subscriber, never()).onNext("two");
verify(subscriber, never()).onNext("three");
verify(subscriber, times(1)).onComplete();
Reported by PMD.
Line: 220
BooleanSubscription bs = new BooleanSubscription();
s.onSubscribe(bs);
for (int i = 0; !bs.isCancelled(); i++) {
System.out.println("Emit: " + i);
count.incrementAndGet();
s.onNext(i);
}
}
Reported by PMD.
Line: 230
@Override
public void accept(Integer t1) {
System.out.println("Receive: " + t1);
}
});
Reported by PMD.
Line: 251
@Override
public void subscribe(final Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
System.out.println("TestFlowable subscribed to ...");
t = new Thread(new Runnable() {
@Override
public void run() {
try {
Reported by PMD.
Line: 257
@Override
public void run() {
try {
System.out.println("running TestFlowable thread");
for (String s : values) {
System.out.println("TestFlowable onNext: " + s);
subscriber.onNext(s);
}
subscriber.onComplete();
Reported by PMD.
Line: 259
try {
System.out.println("running TestFlowable thread");
for (String s : values) {
System.out.println("TestFlowable onNext: " + s);
subscriber.onNext(s);
}
subscriber.onComplete();
} catch (Throwable e) {
throw new RuntimeException(e);
Reported by PMD.
Line: 269
}
});
System.out.println("starting TestFlowable thread");
t.start();
System.out.println("done starting TestFlowable thread");
}
}
Reported by PMD.
Line: 271
});
System.out.println("starting TestFlowable thread");
t.start();
System.out.println("done starting TestFlowable thread");
}
}
private static Flowable<Long> INFINITE_OBSERVABLE = Flowable.unsafeCreate(new Publisher<Long>() {
Reported by PMD.
Line: 393
.doOnRequest(new LongConsumer() {
@Override
public void accept(long n) {
System.out.println(n);
requests.addAndGet(n);
}})
//
.take(2)
//
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java
117 issues
Line: 32
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class FlowableFlatMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Function<? super T, ? extends Publisher<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public FlowableFlatMap(Flowable<T> source,
Reported by PMD.
Line: 33
public final class FlowableFlatMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Function<? super T, ? extends Publisher<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public FlowableFlatMap(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends U>> mapper,
Reported by PMD.
Line: 34
public final class FlowableFlatMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Function<? super T, ? extends Publisher<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public FlowableFlatMap(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
Reported by PMD.
Line: 35
final Function<? super T, ? extends Publisher<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
public FlowableFlatMap(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
Reported by PMD.
Line: 61
return new MergeSubscriber<>(s, mapper, delayErrors, maxConcurrency, bufferSize);
}
static final class MergeSubscriber<T, U> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -2117620485640801370L;
final Subscriber<? super U> downstream;
final Function<? super T, ? extends Publisher<? extends U>> mapper;
Reported by PMD.
Line: 61
return new MergeSubscriber<>(s, mapper, delayErrors, maxConcurrency, bufferSize);
}
static final class MergeSubscriber<T, U> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -2117620485640801370L;
final Subscriber<? super U> downstream;
final Function<? super T, ? extends Publisher<? extends U>> mapper;
Reported by PMD.
Line: 61
return new MergeSubscriber<>(s, mapper, delayErrors, maxConcurrency, bufferSize);
}
static final class MergeSubscriber<T, U> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -2117620485640801370L;
final Subscriber<? super U> downstream;
final Function<? super T, ? extends Publisher<? extends U>> mapper;
Reported by PMD.
Line: 61
return new MergeSubscriber<>(s, mapper, delayErrors, maxConcurrency, bufferSize);
}
static final class MergeSubscriber<T, U> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -2117620485640801370L;
final Subscriber<? super U> downstream;
final Function<? super T, ? extends Publisher<? extends U>> mapper;
Reported by PMD.
Line: 61
return new MergeSubscriber<>(s, mapper, delayErrors, maxConcurrency, bufferSize);
}
static final class MergeSubscriber<T, U> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -2117620485640801370L;
final Subscriber<? super U> downstream;
final Function<? super T, ? extends Publisher<? extends U>> mapper;
Reported by PMD.
Line: 65
private static final long serialVersionUID = -2117620485640801370L;
final Subscriber<? super U> downstream;
final Function<? super T, ? extends Publisher<? extends U>> mapper;
final boolean delayErrors;
final int maxConcurrency;
final int bufferSize;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDebounceTest.java
115 issues
Line: 40
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableDebounceTest extends RxJavaTest {
private TestScheduler scheduler;
private Observer<String> observer;
private Scheduler.Worker innerScheduler;
Reported by PMD.
Line: 42
public class ObservableDebounceTest extends RxJavaTest {
private TestScheduler scheduler;
private Observer<String> observer;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
Reported by PMD.
Line: 43
public class ObservableDebounceTest extends RxJavaTest {
private TestScheduler scheduler;
private Observer<String> observer;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
scheduler = new TestScheduler();
Reported by PMD.
Line: 44
private TestScheduler scheduler;
private Observer<String> observer;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
scheduler = new TestScheduler();
observer = TestHelper.mockObserver();
Reported by PMD.
Line: 67
});
Observable<String> sampled = source.debounce(400, TimeUnit.MILLISECONDS, scheduler);
sampled.subscribe(observer);
scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(observer);
// must go to 800 since it must be 400 after when two is sent, which is at 400
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 73
InOrder inOrder = inOrder(observer);
// must go to 800 since it must be 400 after when two is sent, which is at 400
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("two");
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
Reported by PMD.
Line: 73
InOrder inOrder = inOrder(observer);
// must go to 800 since it must be 400 after when two is sent, which is at 400
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("two");
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
Reported by PMD.
Line: 75
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("two");
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
@Test
public void debounceNeverEmits() {
Reported by PMD.
Line: 75
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("two");
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
@Test
public void debounceNeverEmits() {
Reported by PMD.
Line: 76
inOrder.verify(observer, times(1)).onNext("two");
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
@Test
public void debounceNeverEmits() {
Observable<String> source = Observable.unsafeCreate(new ObservableSource<String>() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/mixed/FlowableConcatMapSingleTest.java
115 issues
Line: 38
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableConcatMapSingleTest extends RxJavaTest {
@Test
public void simple() {
Flowable.range(1, 5)
.concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
Reported by PMD.
Line: 41
public class FlowableConcatMapSingleTest extends RxJavaTest {
@Test
public void simple() {
Flowable.range(1, 5)
.concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v)
throws Exception {
Reported by PMD.
Line: 55
}
@Test
public void simpleLongPrefetch() {
Flowable.range(1, 1024)
.concatMapSingle(Single::just, 32)
.test()
.assertValueCount(1024)
.assertNoErrors()
Reported by PMD.
Line: 56
@Test
public void simpleLongPrefetch() {
Flowable.range(1, 1024)
.concatMapSingle(Single::just, 32)
.test()
.assertValueCount(1024)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 56
@Test
public void simpleLongPrefetch() {
Flowable.range(1, 1024)
.concatMapSingle(Single::just, 32)
.test()
.assertValueCount(1024)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 56
@Test
public void simpleLongPrefetch() {
Flowable.range(1, 1024)
.concatMapSingle(Single::just, 32)
.test()
.assertValueCount(1024)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 56
@Test
public void simpleLongPrefetch() {
Flowable.range(1, 1024)
.concatMapSingle(Single::just, 32)
.test()
.assertValueCount(1024)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 56
@Test
public void simpleLongPrefetch() {
Flowable.range(1, 1024)
.concatMapSingle(Single::just, 32)
.test()
.assertValueCount(1024)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 65
}
@Test
public void simpleLongPrefetchHidden() {
Flowable.range(1, 1024).hide()
.concatMapSingle(Single::just, 32)
.test()
.assertValueCount(1024)
.assertNoErrors()
Reported by PMD.
Line: 66
@Test
public void simpleLongPrefetchHidden() {
Flowable.range(1, 1024).hide()
.concatMapSingle(Single::just, 32)
.test()
.assertValueCount(1024)
.assertNoErrors()
.assertComplete();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapMaybeTest.java
114 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.*;
Reported by PMD.
Line: 34
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableFlatMapMaybeTest extends RxJavaTest {
@Test
public void normal() {
Observable.range(1, 10)
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
Reported by PMD.
Line: 37
public class ObservableFlatMapMaybeTest extends RxJavaTest {
@Test
public void normal() {
Observable.range(1, 10)
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v) throws Exception {
return Maybe.just(v);
Reported by PMD.
Line: 50
}
@Test
public void normalEmpty() {
Observable.range(1, 10)
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v) throws Exception {
return Maybe.empty();
Reported by PMD.
Line: 63
}
@Test
public void normalDelayError() {
Observable.range(1, 10)
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v) throws Exception {
return Maybe.just(v);
Reported by PMD.
Line: 76
}
@Test
public void normalAsync() {
TestObserverEx<Integer> to = Observable.range(1, 10)
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v) throws Exception {
return Maybe.just(v).subscribeOn(Schedulers.computation());
Reported by PMD.
Line: 81
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
public MaybeSource<Integer> apply(Integer v) throws Exception {
return Maybe.just(v).subscribeOn(Schedulers.computation());
}
})
.to(TestHelper.<Integer>testConsumer())
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
Reported by PMD.
Line: 94
}
@Test
public void mapperThrowsObservable() {
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ps
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
@Override
Reported by PMD.
Line: 106
})
.test();
assertTrue(ps.hasObservers());
ps.onNext(1);
to.assertFailure(TestException.class);
Reported by PMD.
Line: 106
})
.test();
assertTrue(ps.hasObservers());
ps.onNext(1);
to.assertFailure(TestException.class);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java
113 issues
Line: 631
}
}
/* test */ final void removeSome(int n) {
Node head = get();
while (n > 0) {
head = head.get();
n--;
size--;
Reported by PMD.
Line: 650
* Arranges the given node is the new head from now on.
* @param n the Node instance to set as first
*/
final void setFirst(Node n) {
if (eagerTruncate) {
Node m = new Node(null);
m.lazySet(n.get());
n = m;
}
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.schedulers.Timed;
public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T> {
/** The source observable. */
final ObservableSource<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplayObserver<T>> current;
/** A factory that creates the appropriate buffer for the ReplayObserver. */
Reported by PMD.
Line: 35
public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T> {
/** The source observable. */
final ObservableSource<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplayObserver<T>> current;
/** A factory that creates the appropriate buffer for the ReplayObserver. */
final BufferSupplier<T> bufferFactory;
Reported by PMD.
Line: 35
public final class ObservableReplay<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T> {
/** The source observable. */
final ObservableSource<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplayObserver<T>> current;
/** A factory that creates the appropriate buffer for the ReplayObserver. */
final BufferSupplier<T> bufferFactory;
Reported by PMD.
Line: 37
/** The source observable. */
final ObservableSource<T> source;
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplayObserver<T>> current;
/** A factory that creates the appropriate buffer for the ReplayObserver. */
final BufferSupplier<T> bufferFactory;
final ObservableSource<T> onSubscribe;
Reported by PMD.
Line: 39
/** Holds the current subscriber that is, will be or just was subscribed to the source observable. */
final AtomicReference<ReplayObserver<T>> current;
/** A factory that creates the appropriate buffer for the ReplayObserver. */
final BufferSupplier<T> bufferFactory;
final ObservableSource<T> onSubscribe;
interface BufferSupplier<T> {
ReplayBuffer<T> call();
Reported by PMD.
Line: 41
/** A factory that creates the appropriate buffer for the ReplayObserver. */
final BufferSupplier<T> bufferFactory;
final ObservableSource<T> onSubscribe;
interface BufferSupplier<T> {
ReplayBuffer<T> call();
}
Reported by PMD.
Line: 71
* @param source the source observable
* @return the new ConnectableObservable instance
*/
@SuppressWarnings("unchecked")
public static <T> ConnectableObservable<T> createFrom(ObservableSource<? extends T> source) {
return create(source, DEFAULT_UNBOUNDED_FACTORY);
}
/**
Reported by PMD.
Line: 155
@Override
public void reset() {
ReplayObserver<T> conn = current.get();
if (conn != null && conn.isDisposed()) {
current.compareAndSet(conn, null);
}
}
@Override
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapCompletableTest.java
113 issues
Line: 421
try {
assertNull(qd.poll());
} catch (Throwable ex) {
throw new RuntimeException(ex);
}
assertTrue(qd.isEmpty());
qd.clear();
}
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import java.util.List;
import java.util.concurrent.*;
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableFlatMapCompletableTest extends RxJavaTest {
@Test
public void normalObservable() {
Observable.range(1, 10)
.flatMapCompletable(new Function<Integer, CompletableSource>() {
Reported by PMD.
Line: 36
public class ObservableFlatMapCompletableTest extends RxJavaTest {
@Test
public void normalObservable() {
Observable.range(1, 10)
.flatMapCompletable(new Function<Integer, CompletableSource>() {
@Override
public CompletableSource apply(Integer v) throws Exception {
return Completable.complete();
Reported by PMD.
Line: 49
}
@Test
public void mapperThrowsObservable() {
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = ps
.flatMapCompletable(new Function<Integer, CompletableSource>() {
@Override
Reported by PMD.
Line: 61
}).<Integer>toObservable()
.test();
assertTrue(ps.hasObservers());
ps.onNext(1);
to.assertFailure(TestException.class);
Reported by PMD.
Line: 61
}).<Integer>toObservable()
.test();
assertTrue(ps.hasObservers());
ps.onNext(1);
to.assertFailure(TestException.class);
Reported by PMD.
Line: 63
assertTrue(ps.hasObservers());
ps.onNext(1);
to.assertFailure(TestException.class);
assertFalse(ps.hasObservers());
}
Reported by PMD.
Line: 67
to.assertFailure(TestException.class);
assertFalse(ps.hasObservers());
}
@Test
public void mapperReturnsNullObservable() {
PublishSubject<Integer> ps = PublishSubject.create();
Reported by PMD.
Line: 67
to.assertFailure(TestException.class);
assertFalse(ps.hasObservers());
}
@Test
public void mapperReturnsNullObservable() {
PublishSubject<Integer> ps = PublishSubject.create();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeCacheTest.java
113 issues
Line: 30
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class MaybeCacheTest extends RxJavaTest {
@Test
public void offlineSuccess() {
Maybe<Integer> source = Maybe.just(1).cache();
assertEquals(1, source.blockingGet().intValue());
Reported by PMD.
Line: 34
@Test
public void offlineSuccess() {
Maybe<Integer> source = Maybe.just(1).cache();
assertEquals(1, source.blockingGet().intValue());
source.test()
.assertResult(1);
}
Reported by PMD.
Line: 35
@Test
public void offlineSuccess() {
Maybe<Integer> source = Maybe.just(1).cache();
assertEquals(1, source.blockingGet().intValue());
source.test()
.assertResult(1);
}
Reported by PMD.
Line: 35
@Test
public void offlineSuccess() {
Maybe<Integer> source = Maybe.just(1).cache();
assertEquals(1, source.blockingGet().intValue());
source.test()
.assertResult(1);
}
Reported by PMD.
Line: 35
@Test
public void offlineSuccess() {
Maybe<Integer> source = Maybe.just(1).cache();
assertEquals(1, source.blockingGet().intValue());
source.test()
.assertResult(1);
}
Reported by PMD.
Line: 37
Maybe<Integer> source = Maybe.just(1).cache();
assertEquals(1, source.blockingGet().intValue());
source.test()
.assertResult(1);
}
@Test
public void offlineError() {
Reported by PMD.
Line: 37
Maybe<Integer> source = Maybe.just(1).cache();
assertEquals(1, source.blockingGet().intValue());
source.test()
.assertResult(1);
}
@Test
public void offlineError() {
Reported by PMD.
Line: 47
try {
source.blockingGet();
fail("Should have thrown");
} catch (TestException ex) {
// expected
}
source.test()
Reported by PMD.
Line: 48
try {
source.blockingGet();
fail("Should have thrown");
} catch (TestException ex) {
// expected
}
source.test()
.assertFailure(TestException.class);
Reported by PMD.
Line: 52
// expected
}
source.test()
.assertFailure(TestException.class);
}
@Test
public void offlineComplete() {
Reported by PMD.