The following issues were found
src/test/java/io/reactivex/rxjava3/observable/ObservableSubscriberTest.java
28 issues
Line: 60
});
assertEquals(1, c.get());
}
@Test
public void onStartCalledOnceViaUnsafeSubscribe() {
final AtomicInteger c = new AtomicInteger();
Reported by PMD.
Line: 89
});
assertEquals(1, c.get());
}
@Test
public void onStartCalledOnceViaLift() {
final AtomicInteger c = new AtomicInteger();
Reported by PMD.
Line: 126
}).subscribe();
assertEquals(1, c.get());
}
@Test
public void subscribeConsumerConsumer() {
final List<Integer> list = new ArrayList<>();
Reported by PMD.
Line: 145
}
});
assertEquals(Arrays.asList(1), list);
}
@Test
public void subscribeConsumerConsumerWithError() {
final List<Integer> list = new ArrayList<>();
Reported by PMD.
Line: 164
}
});
assertEquals(Arrays.asList(100), list);
}
@Test
public void methodTestCancelled() {
PublishSubject<Integer> ps = PublishSubject.create();
Reported by PMD.
Line: 171
public void methodTestCancelled() {
PublishSubject<Integer> ps = PublishSubject.create();
ps.test(true);
assertFalse(ps.hasObservers());
}
@Test
Reported by PMD.
Line: 173
ps.test(true);
assertFalse(ps.hasObservers());
}
@Test
public void safeSubscriberAlreadySafe() {
TestObserver<Integer> to = new TestObserver<>();
Reported by PMD.
Line: 173
ps.test(true);
assertFalse(ps.hasObservers());
}
@Test
public void safeSubscriberAlreadySafe() {
TestObserver<Integer> to = new TestObserver<>();
Reported by PMD.
Line: 177
}
@Test
public void safeSubscriberAlreadySafe() {
TestObserver<Integer> to = new TestObserver<>();
Observable.just(1).safeSubscribe(new SafeObserver<>(to));
to.assertResult(1);
}
Reported by PMD.
Line: 188
public void methodTestNoCancel() {
PublishSubject<Integer> ps = PublishSubject.create();
ps.test(false);
assertTrue(ps.hasObservers());
}
@SuppressWarnings("rawtypes")
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelPeekTest.java
28 issues
Line: 34
public class ParallelPeekTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.doOnNext(Functions.emptyConsumer()));
}
@Test
Reported by PMD.
Line: 35
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.doOnNext(Functions.emptyConsumer()));
}
@Test
@SuppressUndeliverable
Reported by PMD.
Line: 35
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.doOnNext(Functions.emptyConsumer()));
}
@Test
@SuppressUndeliverable
Reported by PMD.
Line: 41
@Test
@SuppressUndeliverable
public void onSubscribeCrash() {
Flowable.range(1, 5)
.parallel()
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
Reported by PMD.
Line: 56
}
@Test
public void doubleError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.doOnNext(Functions.emptyConsumer())
.sequential()
Reported by PMD.
Line: 65
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 75
}
@Test
public void requestCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.range(1, 5)
.parallel()
Reported by PMD.
Line: 91
.test()
.assertResult(1, 2, 3, 4, 5);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
Reported by PMD.
Line: 102
}
@Test
public void cancelCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.<Integer>never()
.parallel()
Reported by PMD.
Line: 118
.test()
.cancel();
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableErrorHandlingTests.java
28 issues
Line: 55
@Override
public void onNext(Long args) {
throw new RuntimeException("forced failure");
}
};
o.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 91
@Override
public void onNext(Long args) {
throw new RuntimeException("forced failure");
}
};
o.observeOn(Schedulers.newThread())
.safeSubscribe(observer);
Reported by PMD.
Line: 42
@Override
public void onComplete() {
System.out.println("completed");
latch.countDown();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 48
@Override
public void onError(Throwable e) {
System.out.println("error: " + e);
caughtError.set(e);
latch.countDown();
}
@Override
Reported by PMD.
Line: 78
@Override
public void onComplete() {
System.out.println("completed");
latch.countDown();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 84
@Override
public void onError(Throwable e) {
System.out.println("error: " + e);
caughtError.set(e);
latch.countDown();
}
@Override
Reported by PMD.
Line: 58
throw new RuntimeException("forced failure");
}
};
o.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}
Reported by PMD.
Line: 61
o.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}
/**
* Test that an error from a user provided Observer.onNext is handled and emitted to the onError
* even when done across thread boundaries with observeOn.
Reported by PMD.
Line: 94
throw new RuntimeException("forced failure");
}
};
o.observeOn(Schedulers.newThread())
.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}
Reported by PMD.
Line: 94
throw new RuntimeException("forced failure");
}
};
o.observeOn(Schedulers.newThread())
.safeSubscribe(observer);
latch.await(2000, TimeUnit.MILLISECONDS);
assertNotNull(caughtError.get());
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRefCount.java
28 issues
Line: 38
*/
public final class FlowableRefCount<T> extends Flowable<T> {
final ConnectableFlowable<T> source;
final int n;
final long timeout;
Reported by PMD.
Line: 40
final ConnectableFlowable<T> source;
final int n;
final long timeout;
final TimeUnit unit;
Reported by PMD.
Line: 42
final int n;
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
Reported by PMD.
Line: 42
final int n;
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
Reported by PMD.
Line: 44
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
RefConnection connection;
Reported by PMD.
Line: 46
final TimeUnit unit;
final Scheduler scheduler;
RefConnection connection;
public FlowableRefCount(ConnectableFlowable<T> source) {
this(source, 1, 0L, TimeUnit.NANOSECONDS, null);
Reported by PMD.
Line: 48
final Scheduler scheduler;
RefConnection connection;
public FlowableRefCount(ConnectableFlowable<T> source) {
this(source, 1, 0L, TimeUnit.NANOSECONDS, null);
}
Reported by PMD.
Line: 78
long c = conn.subscriberCount;
if (c == 0L && conn.timer != null) {
conn.timer.dispose();
}
conn.subscriberCount = c + 1;
if (!conn.connected && c + 1 == n) {
connect = true;
conn.connected = true;
Reported by PMD.
Line: 105
if (c != 0L || !rc.connected) {
return;
}
if (timeout == 0L) {
timeout(rc);
return;
}
sd = new SequentialDisposable();
rc.timer = sd;
Reported by PMD.
Line: 120
synchronized (this) {
if (connection == rc) {
if (rc.timer != null) {
rc.timer.dispose();
rc.timer = null;
}
if (--rc.subscriberCount == 0) {
connection = null;
source.reset();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableNullTests.java
28 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.observable;
import java.lang.reflect.*;
import java.util.*;
import java.util.concurrent.*;
Reported by PMD.
Line: 34
/**
* Verifies the operators handle null values properly by emitting/throwing NullPointerExceptions.
*/
public class ObservableNullTests extends RxJavaTest {
Observable<Integer> just1 = Observable.just(1);
//***********************************************************
// Static methods
Reported by PMD.
Line: 34
/**
* Verifies the operators handle null values properly by emitting/throwing NullPointerExceptions.
*/
public class ObservableNullTests extends RxJavaTest {
Observable<Integer> just1 = Observable.just(1);
//***********************************************************
// Static methods
Reported by PMD.
Line: 34
/**
* Verifies the operators handle null values properly by emitting/throwing NullPointerExceptions.
*/
public class ObservableNullTests extends RxJavaTest {
Observable<Integer> just1 = Observable.just(1);
//***********************************************************
// Static methods
Reported by PMD.
Line: 34
/**
* Verifies the operators handle null values properly by emitting/throwing NullPointerExceptions.
*/
public class ObservableNullTests extends RxJavaTest {
Observable<Integer> just1 = Observable.just(1);
//***********************************************************
// Static methods
Reported by PMD.
Line: 36
*/
public class ObservableNullTests extends RxJavaTest {
Observable<Integer> just1 = Observable.just(1);
//***********************************************************
// Static methods
//***********************************************************
Reported by PMD.
Line: 54
@Test
public void ambIterableOneIsNull() {
Observable.amb(Arrays.asList(Observable.never(), null))
.test()
.assertError(NullPointerException.class);
}
@Test(expected = NullPointerException.class)
Reported by PMD.
Line: 54
@Test
public void ambIterableOneIsNull() {
Observable.amb(Arrays.asList(Observable.never(), null))
.test()
.assertError(NullPointerException.class);
}
@Test(expected = NullPointerException.class)
Reported by PMD.
Line: 141
@Test(expected = NullPointerException.class)
public void concatIterableOneIsNull() {
Observable.concat(Arrays.asList(just1, null)).blockingLast();
}
@Test(expected = NullPointerException.class)
public void concatArrayOneIsNull() {
Observable.concatArray(just1, null).blockingLast();
Reported by PMD.
Line: 146
@Test(expected = NullPointerException.class)
public void concatArrayOneIsNull() {
Observable.concatArray(just1, null).blockingLast();
}
@Test(expected = NullPointerException.class)
public void deferFunctionReturnsNull() {
Observable.defer(new Supplier<Observable<Object>>() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleDoAfterSuccessTest.java
28 issues
Line: 33
public class SingleDoAfterSuccessTest extends RxJavaTest {
final List<Integer> values = new ArrayList<>();
final Consumer<Integer> afterSuccess = new Consumer<Integer>() {
@Override
public void accept(Integer e) throws Exception {
values.add(-e);
Reported by PMD.
Line: 35
final List<Integer> values = new ArrayList<>();
final Consumer<Integer> afterSuccess = new Consumer<Integer>() {
@Override
public void accept(Integer e) throws Exception {
values.add(-e);
}
};
Reported by PMD.
Line: 42
}
};
final TestObserver<Integer> to = new TestObserver<Integer>() {
@Override
public void onNext(Integer t) {
super.onNext(t);
SingleDoAfterSuccessTest.this.values.add(t);
}
Reported by PMD.
Line: 46
@Override
public void onNext(Integer t) {
super.onNext(t);
SingleDoAfterSuccessTest.this.values.add(t);
}
};
@Test
public void just() {
Reported by PMD.
Line: 46
@Override
public void onNext(Integer t) {
super.onNext(t);
SingleDoAfterSuccessTest.this.values.add(t);
}
};
@Test
public void just() {
Reported by PMD.
Line: 46
@Override
public void onNext(Integer t) {
super.onNext(t);
SingleDoAfterSuccessTest.this.values.add(t);
}
};
@Test
public void just() {
Reported by PMD.
Line: 52
@Test
public void just() {
Single.just(1)
.doAfterSuccess(afterSuccess)
.subscribeWith(to)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
Reported by PMD.
Line: 52
@Test
public void just() {
Single.just(1)
.doAfterSuccess(afterSuccess)
.subscribeWith(to)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
Reported by PMD.
Line: 52
@Test
public void just() {
Single.just(1)
.doAfterSuccess(afterSuccess)
.subscribeWith(to)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
Reported by PMD.
Line: 57
.subscribeWith(to)
.assertResult(1);
assertEquals(Arrays.asList(1, -1), values);
}
@Test
public void error() {
Single.<Integer>error(new TestException())
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelFilterTest.java
28 issues
Line: 32
public class ParallelFilterTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.filter(Functions.alwaysTrue()));
}
@Test
Reported by PMD.
Line: 33
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.filter(Functions.alwaysTrue()));
}
@Test
public void doubleFilter() {
Reported by PMD.
Line: 33
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.filter(Functions.alwaysTrue()));
}
@Test
public void doubleFilter() {
Reported by PMD.
Line: 38
}
@Test
public void doubleFilter() {
Flowable.range(1, 10)
.parallel()
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer v) throws Exception {
Reported by PMD.
Line: 59
}
@Test
public void doubleError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.filter(Functions.alwaysTrue())
.sequential()
Reported by PMD.
Line: 68
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 78
}
@Test
public void doubleError2() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.filter(Functions.alwaysTrue())
.filter(Functions.alwaysTrue())
Reported by PMD.
Line: 88
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 98
}
@Test
public void error() {
Flowable.error(new TestException())
.parallel()
.filter(Functions.alwaysTrue())
.sequential()
.test()
Reported by PMD.
Line: 108
}
@Test
public void predicateThrows() {
Flowable.just(1)
.parallel()
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer v) throws Exception {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableCovarianceTest.java
28 issues
Line: 77
.doOnNext(new Consumer<GroupedObservable<Object, Movie>>() {
@Override
public void accept(GroupedObservable<Object, Movie> g) {
System.out.println(g.getKey());
}
})
.flatMap(new Function<GroupedObservable<Object, Movie>, Observable<String>>() {
@Override
public Observable<String> apply(GroupedObservable<Object, Movie> g) {
Reported by PMD.
Line: 87
.doOnNext(new Consumer<Movie>() {
@Override
public void accept(Movie pv) {
System.out.println(pv);
}
})
.compose(new ObservableTransformer<Movie, Movie>() {
@Override
public Observable<Movie> apply(Observable<Movie> m) {
Reported by PMD.
Line: 39
* This won't compile if super/extends isn't done correctly on generics.
*/
@Test
public void covarianceOfFrom() {
Observable.<Movie> just(new HorrorMovie());
Observable.<Movie> fromIterable(new ArrayList<HorrorMovie>());
// Observable.<HorrorMovie>from(new Movie()); // may not compile
}
Reported by PMD.
Line: 46
}
@Test
public void sortedList() {
Comparator<Media> sortFunction = new Comparator<Media>() {
@Override
public int compare(Media t1, Media t2) {
return 1;
}
Reported by PMD.
Line: 109
to.assertTerminated();
to.assertNoErrors();
// System.out.println(ts.getOnNextEvents());
assertEquals(6, to.values().size());
}
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose() {
Reported by PMD.
Line: 109
to.assertTerminated();
to.assertNoErrors();
// System.out.println(ts.getOnNextEvents());
assertEquals(6, to.values().size());
}
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose() {
Reported by PMD.
Line: 112
assertEquals(6, to.values().size());
}
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<Movie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, Movie>() {
@Override
Reported by PMD.
Line: 114
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose() {
Observable<HorrorMovie> movie = Observable.just(new HorrorMovie());
Observable<Movie> movie2 = movie.compose(new ObservableTransformer<HorrorMovie, Movie>() {
@Override
public Observable<Movie> apply(Observable<HorrorMovie> t) {
return Observable.just(new Movie());
Reported by PMD.
Line: 126
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose2() {
Observable<Movie> movie = Observable.<Movie> just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
@Override
public Observable<HorrorMovie> apply(Observable<Movie> t) {
return Observable.just(new HorrorMovie());
Reported by PMD.
Line: 138
@SuppressWarnings("unused")
@Test
public void covarianceOfCompose3() {
Observable<Movie> movie = Observable.<Movie>just(new HorrorMovie());
Observable<HorrorMovie> movie2 = movie.compose(new ObservableTransformer<Movie, HorrorMovie>() {
@Override
public Observable<HorrorMovie> apply(Observable<Movie> t) {
return Observable.just(new HorrorMovie()).map(new Function<HorrorMovie, HorrorMovie>() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeToFutureTest.java
27 issues
Line: 30
public class MaybeToFutureTest extends RxJavaTest {
@Test
public void success() throws Exception {
assertEquals((Integer)1, Maybe.just(1)
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 31
@Test
public void success() throws Exception {
assertEquals((Integer)1, Maybe.just(1)
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 31
@Test
public void success() throws Exception {
assertEquals((Integer)1, Maybe.just(1)
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 31
@Test
public void success() throws Exception {
assertEquals((Integer)1, Maybe.just(1)
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 31
@Test
public void success() throws Exception {
assertEquals((Integer)1, Maybe.just(1)
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 38
}
@Test
public void empty() throws Exception {
assertNull(Maybe.empty()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 39
@Test
public void empty() throws Exception {
assertNull(Maybe.empty()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 39
@Test
public void empty() throws Exception {
assertNull(Maybe.empty()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 39
@Test
public void empty() throws Exception {
assertNull(Maybe.empty()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 39
@Test
public void empty() throws Exception {
assertNull(Maybe.empty()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapSingle.java
27 issues
Line: 34
*/
public final class ObservableFlatMapSingle<T, R> extends AbstractObservableWithUpstream<T, R> {
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final boolean delayErrors;
public ObservableFlatMapSingle(ObservableSource<T> source, Function<? super T, ? extends SingleSource<? extends R>> mapper,
boolean delayError) {
Reported by PMD.
Line: 36
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final boolean delayErrors;
public ObservableFlatMapSingle(ObservableSource<T> source, Function<? super T, ? extends SingleSource<? extends R>> mapper,
boolean delayError) {
super(source);
this.mapper = mapper;
Reported by PMD.
Line: 50
source.subscribe(new FlatMapSingleObserver<>(observer, mapper, delayErrors));
}
static final class FlatMapSingleObserver<T, R>
extends AtomicInteger
implements Observer<T>, Disposable {
private static final long serialVersionUID = 8600231336733376951L;
Reported by PMD.
Line: 50
source.subscribe(new FlatMapSingleObserver<>(observer, mapper, delayErrors));
}
static final class FlatMapSingleObserver<T, R>
extends AtomicInteger
implements Observer<T>, Disposable {
private static final long serialVersionUID = 8600231336733376951L;
Reported by PMD.
Line: 56
private static final long serialVersionUID = 8600231336733376951L;
final Observer<? super R> downstream;
final boolean delayErrors;
final CompositeDisposable set;
Reported by PMD.
Line: 58
final Observer<? super R> downstream;
final boolean delayErrors;
final CompositeDisposable set;
final AtomicInteger active;
Reported by PMD.
Line: 60
final boolean delayErrors;
final CompositeDisposable set;
final AtomicInteger active;
final AtomicThrowable errors;
Reported by PMD.
Line: 62
final CompositeDisposable set;
final AtomicInteger active;
final AtomicThrowable errors;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
Reported by PMD.
Line: 64
final AtomicInteger active;
final AtomicThrowable errors;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final AtomicReference<SpscLinkedArrayQueue<R>> queue;
Reported by PMD.
Line: 66
final AtomicThrowable errors;
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
final AtomicReference<SpscLinkedArrayQueue<R>> queue;
Disposable upstream;
Reported by PMD.