The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableMaterialize.java
3 issues
Line: 32
}
static final class MaterializeObserver<T> implements Observer<T>, Disposable {
final Observer<? super Notification<T>> downstream;
Disposable upstream;
MaterializeObserver(Observer<? super Notification<T>> downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 34
static final class MaterializeObserver<T> implements Observer<T>, Disposable {
final Observer<? super Notification<T>> downstream;
Disposable upstream;
MaterializeObserver(Observer<? super Notification<T>> downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
public final class ObservableMaterialize<T> extends AbstractObservableWithUpstream<T, Notification<T>> {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream1HTckTest.java
3 issues
Line: 37
@Override
public Publisher<Integer> createFailedPublisher() {
Stream<Integer> stream = Stream.of(1);
stream.forEach(v -> { });
return Flowable.just(1).hide().flatMapStream(v -> stream);
}
}
Reported by PMD.
Line: 38
public Publisher<Integer> createFailedPublisher() {
Stream<Integer> stream = Stream.of(1);
stream.forEach(v -> { });
return Flowable.just(1).hide().flatMapStream(v -> stream);
}
}
Reported by PMD.
Line: 38
public Publisher<Integer> createFailedPublisher() {
Stream<Integer> stream = Stream.of(1);
stream.forEach(v -> { });
return Flowable.just(1).hide().flatMapStream(v -> stream);
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/jdk8/FlatMapStream2HTckTest.java
3 issues
Line: 44
@Override
public Publisher<Integer> createFailedPublisher() {
Stream<Integer> stream = Stream.of(1);
stream.forEach(v -> { });
return Flowable.just(1).hide().flatMapStream(v -> stream);
}
}
Reported by PMD.
Line: 45
public Publisher<Integer> createFailedPublisher() {
Stream<Integer> stream = Stream.of(1);
stream.forEach(v -> { });
return Flowable.just(1).hide().flatMapStream(v -> stream);
}
}
Reported by PMD.
Line: 45
public Publisher<Integer> createFailedPublisher() {
Stream<Integer> stream = Stream.of(1);
stream.forEach(v -> { });
return Flowable.just(1).hide().flatMapStream(v -> stream);
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/observers/CallbackCompletableObserverTest.java
3 issues
Line: 29
public void emptyActionShouldReportNoCustomOnError() {
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.EMPTY_ACTION);
assertFalse(o.hasCustomOnError());
}
@Test
public void customOnErrorShouldReportCustomOnError() {
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.<Throwable>emptyConsumer(),
Reported by PMD.
Line: 37
CallbackCompletableObserver o = new CallbackCompletableObserver(Functions.<Throwable>emptyConsumer(),
Functions.EMPTY_ACTION);
assertTrue(o.hasCustomOnError());
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.observers;
import static org.junit.Assert.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.RxJavaTest;
import io.reactivex.rxjava3.internal.functions.Functions;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/observers/ConsumerSingleObserverTest.java
3 issues
Line: 30
ConsumerSingleObserver<Integer> o = new ConsumerSingleObserver<>(Functions.<Integer>emptyConsumer(),
Functions.ON_ERROR_MISSING);
assertFalse(o.hasCustomOnError());
}
@Test
public void customOnErrorShouldReportCustomOnError() {
ConsumerSingleObserver<Integer> o = new ConsumerSingleObserver<>(Functions.<Integer>emptyConsumer(),
Reported by PMD.
Line: 38
ConsumerSingleObserver<Integer> o = new ConsumerSingleObserver<>(Functions.<Integer>emptyConsumer(),
Functions.<Throwable>emptyConsumer());
assertTrue(o.hasCustomOnError());
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.observers;
import static org.junit.Assert.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.RxJavaTest;
import io.reactivex.rxjava3.internal.functions.Functions;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcatArrayDelayErrorTest.java
3 issues
Line: 38
.test()
.assertFailure(TestException.class);
verify(action1).run();
verify(action2).run();
}
}
Reported by PMD.
Line: 40
verify(action1).run();
verify(action2).run();
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.completable;
import static org.mockito.Mockito.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Action;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/completable/CapturingUncaughtExceptionHandler.java
3 issues
Line: 19
import java.util.concurrent.CountDownLatch;
public final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
public int count;
public Throwable caught;
public CountDownLatch completed = new CountDownLatch(1);
@Override
public void uncaughtException(Thread t, Throwable e) {
Reported by PMD.
Line: 20
public final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
public int count;
public Throwable caught;
public CountDownLatch completed = new CountDownLatch(1);
@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
Reported by PMD.
Line: 21
public final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
public int count;
public Throwable caught;
public CountDownLatch completed = new CountDownLatch(1);
@Override
public void uncaughtException(Thread t, Throwable e) {
count++;
caught = e;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleContainstTest.java
3 issues
Line: 25
public class SingleContainstTest extends RxJavaTest {
@Test
public void comparerThrows() {
Single.just(1)
.contains(2, new BiPredicate<Object, Object>() {
@Override
public boolean test(Object a, Object b) throws Exception {
throw new TestException();
Reported by PMD.
Line: 38
}
@Test
public void error() {
Single.error(new TestException())
.contains(2)
.test()
.assertFailure(TestException.class);
}
Reported by PMD.
Line: 18
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.BiPredicate;
public class SingleContainstTest extends RxJavaTest {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/AbstractFlowableWithUpstreamTest.java
3 issues
Line: 31
public void source() {
Flowable<Integer> f = Flowable.just(1);
assertSame(f, ((HasUpstreamPublisher<Integer>)f.map(Functions.<Integer>identity())).source());
}
}
Reported by PMD.
Line: 31
public void source() {
Flowable<Integer> f = Flowable.just(1);
assertSame(f, ((HasUpstreamPublisher<Integer>)f.map(Functions.<Integer>identity())).source());
}
}
Reported by PMD.
Line: 20
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.internal.fuseable.HasUpstreamPublisher;
public class AbstractFlowableWithUpstreamTest extends RxJavaTest {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/subscribers/DisposableSubscriber.java
3 issues
Line: 77
* @param <T> the received value type.
*/
public abstract class DisposableSubscriber<T> implements FlowableSubscriber<T>, Disposable {
final AtomicReference<Subscription> upstream = new AtomicReference<>();
@Override
public final void onSubscribe(Subscription s) {
if (EndConsumerHelper.setOnce(this.upstream, s, getClass())) {
onStart();
Reported by PMD.
Line: 90
* Called once the single upstream {@link Subscription} is set via {@link #onSubscribe(Subscription)}.
*/
protected void onStart() {
upstream.get().request(Long.MAX_VALUE);
}
/**
* Requests the specified amount from the upstream if its {@link Subscription} is set via
* onSubscribe already.
Reported by PMD.
Line: 102
* @param n the request amount, positive
*/
protected final void request(long n) {
upstream.get().request(n);
}
/**
* Cancels the Subscription set via {@link #onSubscribe(Subscription)} or makes sure a
* {@link Subscription} set asynchronously (later) is cancelled immediately.
Reported by PMD.