The following issues were found
src/test/java/io/reactivex/rxjava3/observable/ObservableReduceTests.java
24 issues
Line: 36
}
}).toObservable().blockingSingle();
assertEquals(6, value);
}
@SuppressWarnings("unused")
@Test
public void reduceWithObjectsObservable() {
Reported by PMD.
Line: 58
}
}).toObservable();
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 77
}
}).toObservable();
assertNotNull(reduceResult2);
}
@Test
public void reduceInts() {
Observable<Integer> o = Observable.just(1, 2, 3);
Reported by PMD.
Line: 90
}
}).blockingGet();
assertEquals(6, value);
}
@SuppressWarnings("unused")
@Test
public void reduceWithObjects() {
Reported by PMD.
Line: 112
}
});
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 131
}
});
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 140
* https://github.com/ReactiveX/RxJava/issues/360#issuecomment-24203016
*/
@Test
public void reduceCovariance() {
// must type it to <Movie>
Observable<Movie> horrorMovies = Observable.<Movie> just(new HorrorMovie());
libraryFunctionActingOnMovieObservables(horrorMovies);
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.observable;
import static org.junit.Assert.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiFunction;
Reported by PMD.
Line: 20
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.observable.ObservableCovarianceTest.*;
public class ObservableReduceTests extends RxJavaTest {
Reported by PMD.
Line: 22
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.BiFunction;
import io.reactivex.rxjava3.observable.ObservableCovarianceTest.*;
public class ObservableReduceTests extends RxJavaTest {
@Test
public void reduceIntsObservable() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromRunnableTest.java
24 issues
Line: 42
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
}
@Test
public void fromRunnableTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 46
}
@Test
public void fromRunnableTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Runnable run = new Runnable() {
@Override
public void run() {
Reported by PMD.
Line: 56
}
};
Completable.fromRunnable(run)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 56
}
};
Completable.fromRunnable(run)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 60
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Completable.fromRunnable(run)
.test()
.assertResult();
Reported by PMD.
Line: 62
assertEquals(1, atomicInteger.get());
Completable.fromRunnable(run)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 62
assertEquals(1, atomicInteger.get());
Completable.fromRunnable(run)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 66
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
@Test
public void fromRunnableInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 70
}
@Test
public void fromRunnableInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Completable completable = Completable.fromRunnable(new Runnable() {
@Override
public void run() {
Reported by PMD.
Line: 80
}
});
assertEquals(0, atomicInteger.get());
completable
.test()
.assertResult();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromActionTest.java
24 issues
Line: 43
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
}
@Test
public void fromActionTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 47
}
@Test
public void fromActionTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Action run = new Action() {
@Override
public void run() throws Exception {
Reported by PMD.
Line: 57
}
};
Completable.fromAction(run)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 57
}
};
Completable.fromAction(run)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 61
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Completable.fromAction(run)
.test()
.assertResult();
Reported by PMD.
Line: 63
assertEquals(1, atomicInteger.get());
Completable.fromAction(run)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 63
assertEquals(1, atomicInteger.get());
Completable.fromAction(run)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 67
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
@Test
public void fromActionInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 71
}
@Test
public void fromActionInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Completable completable = Completable.fromAction(new Action() {
@Override
public void run() throws Exception {
Reported by PMD.
Line: 81
}
});
assertEquals(0, atomicInteger.get());
completable
.test()
.assertResult();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/FlowableReduceTests.java
24 issues
Line: 36
}
}).toFlowable().blockingSingle();
assertEquals(6, value);
}
@SuppressWarnings("unused")
@Test
public void reduceWithObjectsFlowable() {
Reported by PMD.
Line: 58
}
}).toFlowable();
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 77
}
}).toFlowable();
assertNotNull(reduceResult2);
}
@Test
public void reduceInts() {
Flowable<Integer> f = Flowable.just(1, 2, 3);
Reported by PMD.
Line: 90
}
}).toFlowable().blockingSingle();
assertEquals(6, value);
}
@SuppressWarnings("unused")
@Test
public void reduceWithObjects() {
Reported by PMD.
Line: 112
}
});
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 131
}
});
assertNotNull(reduceResult2);
}
/**
* Reduce consumes and produces T so can't do covariance.
*
Reported by PMD.
Line: 140
* https://github.com/ReactiveX/RxJava/issues/360#issuecomment-24203016
*/
@Test
public void reduceCovariance() {
// must type it to <Movie>
Flowable<Movie> horrorMovies = Flowable.<Movie> just(new HorrorMovie());
libraryFunctionActingOnMovieObservables(horrorMovies);
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.flowable;
import static org.junit.Assert.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.flowable.FlowableCovarianceTest.*;
Reported by PMD.
Line: 20
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.flowable.FlowableCovarianceTest.*;
import io.reactivex.rxjava3.functions.BiFunction;
public class FlowableReduceTests extends RxJavaTest {
Reported by PMD.
Line: 21
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.flowable.FlowableCovarianceTest.*;
import io.reactivex.rxjava3.functions.BiFunction;
public class FlowableReduceTests extends RxJavaTest {
@Test
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelMap.java
24 issues
Line: 46
}
@Override
public void subscribe(Subscriber<? super R>[] subscribers) {
subscribers = RxJavaPlugins.onSubscribe(this, subscribers);
if (!validate(subscribers)) {
return;
}
Reported by PMD.
Line: 36
*/
public final class ParallelMap<T, R> extends ParallelFlowable<R> {
final ParallelFlowable<T> source;
final Function<? super T, ? extends R> mapper;
public ParallelMap(ParallelFlowable<T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
Reported by PMD.
Line: 38
final ParallelFlowable<T> source;
final Function<? super T, ? extends R> mapper;
public ParallelMap(ParallelFlowable<T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
Reported by PMD.
Line: 76
static final class ParallelMapSubscriber<T, R> implements FlowableSubscriber<T>, Subscription {
final Subscriber<? super R> downstream;
final Function<? super T, ? extends R> mapper;
Subscription upstream;
Reported by PMD.
Line: 78
final Subscriber<? super R> downstream;
final Function<? super T, ? extends R> mapper;
Subscription upstream;
boolean done;
Reported by PMD.
Line: 80
final Function<? super T, ? extends R> mapper;
Subscription upstream;
boolean done;
ParallelMapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) {
this.downstream = actual;
Reported by PMD.
Line: 82
Subscription upstream;
boolean done;
ParallelMapSubscriber(Subscriber<? super R> actual, Function<? super T, ? extends R> mapper) {
this.downstream = actual;
this.mapper = mapper;
}
Reported by PMD.
Line: 117
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancel();
onError(ex);
return;
}
Reported by PMD.
Line: 149
}
static final class ParallelMapConditionalSubscriber<T, R> implements ConditionalSubscriber<T>, Subscription {
final ConditionalSubscriber<? super R> downstream;
final Function<? super T, ? extends R> mapper;
Subscription upstream;
Reported by PMD.
Line: 151
final ConditionalSubscriber<? super R> downstream;
final Function<? super T, ? extends R> mapper;
Subscription upstream;
boolean done;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableZipTests.java
24 issues
Line: 61
@Override
public void accept(Object pv) {
synchronized (pv) {
System.out.println(pv);
}
}
});
System.out.println("**** finished");
Reported by PMD.
Line: 66
}
});
System.out.println("**** finished");
Thread.sleep(200); // make sure the event streams receive their interrupt
}
/**
Reported by PMD.
Line: 104
Observable<Object> result = Observable.zip(observables, new Function<Object[], Object>() {
@Override
public Object apply(Object[] args) {
System.out.println("received: " + args);
Assert.assertEquals("No argument should have been passed", 0, args.length);
return invoked;
}
});
Reported by PMD.
Line: 123
Consumer<Result> action = new Consumer<Result>() {
@Override
public void accept(Result t1) {
System.out.println("Result: " + t1);
}
};
Consumer<ExtendedResult> extendedAction = new Consumer<ExtendedResult>() {
@Override
Reported by PMD.
Line: 130
Consumer<ExtendedResult> extendedAction = new Consumer<ExtendedResult>() {
@Override
public void accept(ExtendedResult t1) {
System.out.println("Result: " + t1);
}
};
@Test
public void zipWithDelayError() {
Reported by PMD.
Line: 32
public class ObservableZipTests extends RxJavaTest {
@Test
public void zipObservableOfObservables() throws Exception {
ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
.groupBy(new Function<Event, String>() {
@Override
public String apply(Event e) {
return e.instanceId;
Reported by PMD.
Line: 79
Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
Observable<CoolRating> ratings = Observable.just(new CoolRating());
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
Observable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
Reported by PMD.
Line: 79
Observable<HorrorMovie> horrors = Observable.just(new HorrorMovie());
Observable<CoolRating> ratings = Observable.just(new CoolRating());
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
Observable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
Reported by PMD.
Line: 80
Observable<CoolRating> ratings = Observable.just(new CoolRating());
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
Observable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
Reported by PMD.
Line: 80
Observable<CoolRating> ratings = Observable.just(new CoolRating());
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(extendedAction);
Observable.<Media, Rating, Result> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Media, Rating, ExtendedResult> zip(horrors, ratings, combine).blockingForEach(action);
Observable.<Movie, CoolRating, Result> zip(horrors, ratings, combine);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSubscribeOnTest.java
24 issues
Line: 33
public class ObservableSubscribeOnTest extends RxJavaTest {
@Test
public void issue813() throws InterruptedException {
// https://github.com/ReactiveX/RxJava/issues/813
final CountDownLatch scheduled = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(1);
Reported by PMD.
Line: 57
}
observer.onComplete();
} catch (Throwable e) {
observer.onError(e);
} finally {
doneLatch.countDown();
}
}
Reported by PMD.
Line: 76
}
@Test
public void onError() {
TestObserverEx<String> to = new TestObserverEx<>();
Observable.unsafeCreate(new ObservableSource<String>() {
@Override
public void subscribe(Observer<? super String> observer) {
Reported by PMD.
Line: 92
}
public static class SlowScheduler extends Scheduler {
final Scheduler actual;
final long delay;
final TimeUnit unit;
public SlowScheduler() {
this(Schedulers.computation(), 2, TimeUnit.SECONDS);
Reported by PMD.
Line: 93
public static class SlowScheduler extends Scheduler {
final Scheduler actual;
final long delay;
final TimeUnit unit;
public SlowScheduler() {
this(Schedulers.computation(), 2, TimeUnit.SECONDS);
}
Reported by PMD.
Line: 94
public static class SlowScheduler extends Scheduler {
final Scheduler actual;
final long delay;
final TimeUnit unit;
public SlowScheduler() {
this(Schedulers.computation(), 2, TimeUnit.SECONDS);
}
Reported by PMD.
Line: 114
private final class SlowInner extends Worker {
private final Scheduler.Worker actualInner;
private SlowInner(Worker actual) {
this.actualInner = actual;
}
Reported by PMD.
Line: 140
@Override
public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit delayUnit) {
TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit;
long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit);
return actualInner.schedule(action, t, common);
}
}
Reported by PMD.
Line: 140
@Override
public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit delayUnit) {
TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit;
long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit);
return actualInner.schedule(action, t, common);
}
}
Reported by PMD.
Line: 170
to.dispose();
Thread.sleep(200); // give time for the loop to continue
to.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertEquals(10, count.get());
}
@Test
public void cancelBeforeActualSubscribe() {
TestScheduler test = new TestScheduler();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/subscribers/QueueDrainSubscriber.java
24 issues
Line: 35
* @param <U> the value type in the queue
* @param <V> the value type the child subscriber accepts
*/
public abstract class QueueDrainSubscriber<T, U, V> extends QueueDrainSubscriberPad4 implements FlowableSubscriber<T>, QueueDrain<U, V> {
protected final Subscriber<? super V> downstream;
protected final SimplePlainQueue<U> queue;
Reported by PMD.
Line: 37
*/
public abstract class QueueDrainSubscriber<T, U, V> extends QueueDrainSubscriberPad4 implements FlowableSubscriber<T>, QueueDrain<U, V> {
protected final Subscriber<? super V> downstream;
protected final SimplePlainQueue<U> queue;
protected volatile boolean cancelled;
Reported by PMD.
Line: 39
protected final Subscriber<? super V> downstream;
protected final SimplePlainQueue<U> queue;
protected volatile boolean cancelled;
protected volatile boolean done;
protected Throwable error;
Reported by PMD.
Line: 41
protected final SimplePlainQueue<U> queue;
protected volatile boolean cancelled;
protected volatile boolean done;
protected Throwable error;
public QueueDrainSubscriber(Subscriber<? super V> actual, SimplePlainQueue<U> queue) {
Reported by PMD.
Line: 41
protected final SimplePlainQueue<U> queue;
protected volatile boolean cancelled;
protected volatile boolean done;
protected Throwable error;
public QueueDrainSubscriber(Subscriber<? super V> actual, SimplePlainQueue<U> queue) {
Reported by PMD.
Line: 43
protected volatile boolean cancelled;
protected volatile boolean done;
protected Throwable error;
public QueueDrainSubscriber(Subscriber<? super V> actual, SimplePlainQueue<U> queue) {
this.downstream = actual;
this.queue = queue;
Reported by PMD.
Line: 43
protected volatile boolean cancelled;
protected volatile boolean done;
protected Throwable error;
public QueueDrainSubscriber(Subscriber<? super V> actual, SimplePlainQueue<U> queue) {
this.downstream = actual;
this.queue = queue;
Reported by PMD.
Line: 44
protected volatile boolean cancelled;
protected volatile boolean done;
protected Throwable error;
public QueueDrainSubscriber(Subscriber<? super V> actual, SimplePlainQueue<U> queue) {
this.downstream = actual;
this.queue = queue;
}
Reported by PMD.
Line: 44
protected volatile boolean cancelled;
protected volatile boolean done;
protected Throwable error;
public QueueDrainSubscriber(Subscriber<? super V> actual, SimplePlainQueue<U> queue) {
this.downstream = actual;
this.queue = queue;
}
Reported by PMD.
Line: 76
if (fastEnter()) {
long r = requested.get();
if (r != 0L) {
if (accept(s, value)) {
if (r != Long.MAX_VALUE) {
produced(1);
}
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDoOnEach.java
23 issues
Line: 105
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
Reported by PMD.
Line: 24
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Reported by PMD.
Line: 25
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Reported by PMD.
Line: 26
public final class ObservableDoOnEach<T> extends AbstractObservableWithUpstream<T, T> {
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Reported by PMD.
Line: 27
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
public ObservableDoOnEach(ObservableSource<T> source, Consumer<? super T> onNext,
Consumer<? super Throwable> onError,
Action onComplete,
Action onAfterTerminate) {
Reported by PMD.
Line: 46
}
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
Reported by PMD.
Line: 47
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
Disposable upstream;
Reported by PMD.
Line: 47
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
Disposable upstream;
Reported by PMD.
Line: 48
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
Disposable upstream;
Reported by PMD.
Line: 48
static final class DoOnEachObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> downstream;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Action onAfterTerminate;
Disposable upstream;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFilterTest.java
23 issues
Line: 39
@Override
public boolean test(String t1) {
return t1.equals("two");
}
});
Observer<String> observer = TestHelper.mockObserver();
Reported by PMD.
Line: 39
@Override
public boolean test(String t1) {
return t1.equals("two");
}
});
Observer<String> observer = TestHelper.mockObserver();
Reported by PMD.
Line: 47
observable.subscribe(observer);
verify(observer, Mockito.never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, Mockito.never()).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 48
observable.subscribe(observer);
verify(observer, Mockito.never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, Mockito.never()).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 49
verify(observer, Mockito.never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, Mockito.never()).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 50
verify(observer, Mockito.never()).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, Mockito.never()).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void dispose() {
Reported by PMD.
Line: 51
verify(observer, times(1)).onNext("two");
verify(observer, Mockito.never()).onNext("three");
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.range(1, 5).filter(Functions.alwaysTrue()));
Reported by PMD.
Line: 55
}
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.range(1, 5).filter(Functions.alwaysTrue()));
}
@Test
public void doubleOnSubscribe() {
Reported by PMD.
Line: 56
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.range(1, 5).filter(Functions.alwaysTrue()));
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeObservable(new Function<Observable<Object>, ObservableSource<Object>>() {
Reported by PMD.
Line: 60
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeObservable(new Function<Observable<Object>, ObservableSource<Object>>() {
@Override
public ObservableSource<Object> apply(Observable<Object> o) throws Exception {
return o.filter(Functions.alwaysTrue());
}
Reported by PMD.