The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorComplete.java
8 issues
Line: 30
*/
public final class MaybeOnErrorComplete<T> extends AbstractMaybeWithUpstream<T, T> {
final Predicate<? super Throwable> predicate;
public MaybeOnErrorComplete(MaybeSource<T> source,
Predicate<? super Throwable> predicate) {
super(source);
this.predicate = predicate;
Reported by PMD.
Line: 46
public static final class OnErrorCompleteMultiObserver<T>
implements MaybeObserver<T>, SingleObserver<T>, Disposable {
final MaybeObserver<? super T> downstream;
final Predicate<? super Throwable> predicate;
Disposable upstream;
Reported by PMD.
Line: 48
final MaybeObserver<? super T> downstream;
final Predicate<? super Throwable> predicate;
Disposable upstream;
public OnErrorCompleteMultiObserver(MaybeObserver<? super T> actual, Predicate<? super Throwable> predicate) {
this.downstream = actual;
Reported by PMD.
Line: 50
final Predicate<? super Throwable> predicate;
Disposable upstream;
public OnErrorCompleteMultiObserver(MaybeObserver<? super T> actual, Predicate<? super Throwable> predicate) {
this.downstream = actual;
this.predicate = predicate;
}
Reported by PMD.
Line: 77
try {
b = predicate.test(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(e, ex));
return;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
Reported by PMD.
Line: 18
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
/**
* Emits an onComplete if the source emits an onError and the predicate returns true for
Reported by PMD.
Line: 76
boolean b;
try {
b = predicate.test(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(e, ex));
return;
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/exceptions/OnErrorNotImplementedExceptionTest.java
8 issues
Line: 27
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class OnErrorNotImplementedExceptionTest extends RxJavaTest {
List<Throwable> errors;
@Before
public void before() {
Reported by PMD.
Line: 29
public class OnErrorNotImplementedExceptionTest extends RxJavaTest {
List<Throwable> errors;
@Before
public void before() {
errors = TestHelper.trackPluginErrors();
}
Reported by PMD.
Line: 40
public void after() {
RxJavaPlugins.reset();
assertFalse("" + errors, errors.isEmpty());
TestHelper.assertError(errors, 0, OnErrorNotImplementedException.class);
Throwable c = errors.get(0).getCause();
assertTrue("" + c, c instanceof TestException);
}
Reported by PMD.
Line: 42
assertFalse("" + errors, errors.isEmpty());
TestHelper.assertError(errors, 0, OnErrorNotImplementedException.class);
Throwable c = errors.get(0).getCause();
assertTrue("" + c, c instanceof TestException);
}
@Test
public void flowableSubscribe0() {
Reported by PMD.
Line: 43
assertFalse("" + errors, errors.isEmpty());
TestHelper.assertError(errors, 0, OnErrorNotImplementedException.class);
Throwable c = errors.get(0).getCause();
assertTrue("" + c, c instanceof TestException);
}
@Test
public void flowableSubscribe0() {
Flowable.error(new TestException())
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.exceptions;
import static org.junit.Assert.*;
import java.util.List;
import org.junit.*;
Reported by PMD.
Line: 20
import java.util.List;
import org.junit.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.TestHelper;
Reported by PMD.
Line: 22
import org.junit.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class OnErrorNotImplementedExceptionTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFinallyTest.java
8 issues
Line: 27
public class ObservableFinallyTest extends RxJavaTest {
private Action aAction0;
private Observer<String> observer;
// mocking has to be unchecked, unfortunately
@Before
public void before() {
Reported by PMD.
Line: 28
public class ObservableFinallyTest extends RxJavaTest {
private Action aAction0;
private Observer<String> observer;
// mocking has to be unchecked, unfortunately
@Before
public void before() {
aAction0 = mock(Action.class);
Reported by PMD.
Line: 38
}
private void checkActionCalled(Observable<String> input) {
input.doAfterTerminate(aAction0).subscribe(observer);
try {
verify(aAction0, times(1)).run();
} catch (Throwable e) {
throw ExceptionHelper.wrapOrThrow(e);
}
Reported by PMD.
Line: 40
private void checkActionCalled(Observable<String> input) {
input.doAfterTerminate(aAction0).subscribe(observer);
try {
verify(aAction0, times(1)).run();
} catch (Throwable e) {
throw ExceptionHelper.wrapOrThrow(e);
}
}
Reported by PMD.
Line: 41
input.doAfterTerminate(aAction0).subscribe(observer);
try {
verify(aAction0, times(1)).run();
} catch (Throwable e) {
throw ExceptionHelper.wrapOrThrow(e);
}
}
@Test
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import static org.mockito.Mockito.*;
import org.junit.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Action;
Reported by PMD.
Line: 18
import static org.mockito.Mockito.*;
import org.junit.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
import io.reactivex.rxjava3.testsupport.TestHelper;
Reported by PMD.
Line: 20
import org.junit.*;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableFinallyTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromObservableTest.java
8 issues
Line: 22
public class CompletableFromObservableTest extends RxJavaTest {
@Test
public void fromObservable() {
Completable.fromObservable(Observable.just(1))
.test()
.assertResult();
}
Reported by PMD.
Line: 23
public class CompletableFromObservableTest extends RxJavaTest {
@Test
public void fromObservable() {
Completable.fromObservable(Observable.just(1))
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 23
public class CompletableFromObservableTest extends RxJavaTest {
@Test
public void fromObservable() {
Completable.fromObservable(Observable.just(1))
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 29
}
@Test
public void fromObservableEmpty() {
Completable.fromObservable(Observable.empty())
.test()
.assertResult();
}
Reported by PMD.
Line: 30
@Test
public void fromObservableEmpty() {
Completable.fromObservable(Observable.empty())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 30
@Test
public void fromObservableEmpty() {
Completable.fromObservable(Observable.empty())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 36
}
@Test
public void fromObservableError() {
Completable.fromObservable(Observable.error(new UnsupportedOperationException()))
.test()
.assertFailure(UnsupportedOperationException.class);
}
}
Reported by PMD.
Line: 18
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
public class CompletableFromObservableTest extends RxJavaTest {
@Test
public void fromObservable() {
Completable.fromObservable(Observable.just(1))
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableFromMaybeTest.java
8 issues
Line: 22
public class CompletableFromMaybeTest extends RxJavaTest {
@Test
public void fromMaybe() {
Completable.fromMaybe(Maybe.just(1))
.test()
.assertResult();
}
Reported by PMD.
Line: 23
public class CompletableFromMaybeTest extends RxJavaTest {
@Test
public void fromMaybe() {
Completable.fromMaybe(Maybe.just(1))
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 23
public class CompletableFromMaybeTest extends RxJavaTest {
@Test
public void fromMaybe() {
Completable.fromMaybe(Maybe.just(1))
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 29
}
@Test
public void fromMaybeEmpty() {
Completable.fromMaybe(Maybe.<Integer>empty())
.test()
.assertResult();
}
Reported by PMD.
Line: 30
@Test
public void fromMaybeEmpty() {
Completable.fromMaybe(Maybe.<Integer>empty())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 30
@Test
public void fromMaybeEmpty() {
Completable.fromMaybe(Maybe.<Integer>empty())
.test()
.assertResult();
}
@Test
Reported by PMD.
Line: 36
}
@Test
public void fromMaybeError() {
Completable.fromMaybe(Maybe.error(new UnsupportedOperationException()))
.test()
.assertFailure(UnsupportedOperationException.class);
}
}
Reported by PMD.
Line: 18
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
public class CompletableFromMaybeTest extends RxJavaTest {
@Test
public void fromMaybe() {
Completable.fromMaybe(Maybe.just(1))
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableResourceWrapperTest.java
8 issues
Line: 29
public class ObservableResourceWrapperTest extends RxJavaTest {
@Test
public void disposed() {
TestObserver<Object> to = new TestObserver<>();
ObserverResourceWrapper<Object> orw = new ObserverResourceWrapper<>(to);
Disposable d = Disposable.empty();
Reported by PMD.
Line: 37
orw.onSubscribe(d);
assertFalse(orw.isDisposed());
orw.dispose();
assertTrue(orw.isDisposed());
}
Reported by PMD.
Line: 41
orw.dispose();
assertTrue(orw.isDisposed());
}
@Test
public void doubleOnSubscribe() {
TestObserver<Object> to = new TestObserver<>();
Reported by PMD.
Line: 45
}
@Test
public void doubleOnSubscribe() {
TestObserver<Object> to = new TestObserver<>();
ObserverResourceWrapper<Object> orw = new ObserverResourceWrapper<>(to);
TestHelper.doubleOnSubscribe(orw);
}
Reported by PMD.
Line: 66
orw.onError(new TestException());
assertTrue(d1.isDisposed());
to.assertFailure(TestException.class);
}
}
Reported by PMD.
Line: 66
orw.onError(new TestException());
assertTrue(d1.isDisposed());
to.assertFailure(TestException.class);
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.RxJavaTest;
import io.reactivex.rxjava3.disposables.*;
Reported by PMD.
Line: 21
import org.junit.Test;
import io.reactivex.rxjava3.core.RxJavaTest;
import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableResourceWrapperTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromObservableTest.java
8 issues
Line: 24
public class FlowableFromObservableTest extends RxJavaTest {
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.just(1).toFlowable(BackpressureStrategy.MISSING));
}
@Test
public void error() {
Reported by PMD.
Line: 25
public class FlowableFromObservableTest extends RxJavaTest {
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.just(1).toFlowable(BackpressureStrategy.MISSING));
}
@Test
public void error() {
Observable.error(new TestException())
Reported by PMD.
Line: 29
}
@Test
public void error() {
Observable.error(new TestException())
.toFlowable(BackpressureStrategy.MISSING)
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 37
}
@Test
public void all() {
for (BackpressureStrategy mode : BackpressureStrategy.values()) {
Flowable.fromObservable(Observable.range(1, 5), mode)
.test()
.withTag("mode: " + mode)
.assertResult(1, 2, 3, 4, 5);
Reported by PMD.
Line: 39
@Test
public void all() {
for (BackpressureStrategy mode : BackpressureStrategy.values()) {
Flowable.fromObservable(Observable.range(1, 5), mode)
.test()
.withTag("mode: " + mode)
.assertResult(1, 2, 3, 4, 5);
}
}
Reported by PMD.
Line: 39
@Test
public void all() {
for (BackpressureStrategy mode : BackpressureStrategy.values()) {
Flowable.fromObservable(Observable.range(1, 5), mode)
.test()
.withTag("mode: " + mode)
.assertResult(1, 2, 3, 4, 5);
}
}
Reported by PMD.
Line: 39
@Test
public void all() {
for (BackpressureStrategy mode : BackpressureStrategy.values()) {
Flowable.fromObservable(Observable.range(1, 5), mode)
.test()
.withTag("mode: " + mode)
.assertResult(1, 2, 3, 4, 5);
}
}
Reported by PMD.
Line: 18
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableFromObservableTest extends RxJavaTest {
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableIntervalTest.java
8 issues
Line: 29
public class ObservableIntervalTest extends RxJavaTest {
@Test
public void dispose() {
TestHelper.checkDisposed(Observable.interval(1, TimeUnit.MILLISECONDS, new TestScheduler()));
}
@Test
public void cancel() {
Reported by PMD.
Line: 34
}
@Test
public void cancel() {
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
.take(10)
.test()
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
}
Reported by PMD.
Line: 35
@Test
public void cancel() {
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
.take(10)
.test()
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
}
Reported by PMD.
Line: 35
@Test
public void cancel() {
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
.take(10)
.test()
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
}
Reported by PMD.
Line: 35
@Test
public void cancel() {
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.trampoline())
.take(10)
.test()
.assertResult(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L);
}
Reported by PMD.
Line: 42
}
@Test
public void cancelledOnRun() {
TestObserver<Long> to = new TestObserver<>();
IntervalObserver is = new IntervalObserver(to);
to.onSubscribe(is);
is.dispose();
Reported by PMD.
Line: 20
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.operators.observable.ObservableInterval.IntervalObserver;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.schedulers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
Reported by PMD.
Line: 23
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.operators.observable.ObservableInterval.IntervalObserver;
import io.reactivex.rxjava3.observers.TestObserver;
import io.reactivex.rxjava3.schedulers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableIntervalTest extends RxJavaTest {
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatDelayErrorTest.java
8 issues
Line: 41
.test()
.assertFailure(TestException.class);
verify(action1).run();
verify(action2).run();
}
@Test
Reported by PMD.
Line: 43
verify(action1).run();
verify(action2).run();
}
@Test
public void normalPublisher() throws Throwable {
Action action1 = mock(Action.class);
Reported by PMD.
Line: 59
.test()
.assertFailure(TestException.class);
verify(action1).run();
verify(action2).run();
}
@Test
Reported by PMD.
Line: 61
verify(action1).run();
verify(action2).run();
}
@Test
public void normalPublisherPrefetch() throws Throwable {
Action action1 = mock(Action.class);
Reported by PMD.
Line: 77
.test()
.assertFailure(TestException.class);
verify(action1).run();
verify(action2).run();
}
}
Reported by PMD.
Line: 79
verify(action1).run();
verify(action2).run();
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.completable;
import static org.mockito.Mockito.*;
import java.util.Arrays;
import org.junit.Test;
Reported by PMD.
Line: 22
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Action;
public class CompletableConcatDelayErrorTest {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/processors/SerializedProcessor.java
8 issues
Line: 30
*/
/* public */ final class SerializedProcessor<T> extends FlowableProcessor<T> {
/** The actual subscriber to serialize Subscriber calls to. */
final FlowableProcessor<T> actual;
/** Indicates an emission is going on, guarded by this. */
boolean emitting;
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;
/** Indicates a terminal event has been received and all further events will be dropped. */
Reported by PMD.
Line: 32
/** The actual subscriber to serialize Subscriber calls to. */
final FlowableProcessor<T> actual;
/** Indicates an emission is going on, guarded by this. */
boolean emitting;
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;
/** Indicates a terminal event has been received and all further events will be dropped. */
volatile boolean done;
Reported by PMD.
Line: 34
/** Indicates an emission is going on, guarded by this. */
boolean emitting;
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;
/** Indicates a terminal event has been received and all further events will be dropped. */
volatile boolean done;
/**
* Constructor that wraps an actual subject.
Reported by PMD.
Line: 36
/** If not null, it holds the missed NotificationLite events. */
AppendOnlyLinkedArrayList<Object> queue;
/** Indicates a terminal event has been received and all further events will be dropped. */
volatile boolean done;
/**
* Constructor that wraps an actual subject.
* @param actual the subject wrapped
*/
Reported by PMD.
Line: 173
emitting = false;
return;
}
queue = null;
}
q.accept(actual);
}
}
Reported by PMD.
Line: 176
queue = null;
}
q.accept(actual);
}
}
@Override
public boolean hasSubscribers() {
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.processors;
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 19
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.Nullable;
import io.reactivex.rxjava3.internal.util.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
/**
* Serializes calls to the Subscriber methods.
* <p>All other Publisher and Subject methods are thread-safe by design.
Reported by PMD.