The following issues were found
src/test/java/io/reactivex/rxjava3/tck/MergeWithMaybeTckTest.java
4 issues
Line: 27
@Override
public Publisher<Long> createPublisher(long elements) {
if (elements == 0) {
return Flowable.<Long>empty()
.mergeWith(Maybe.<Long>empty());
}
return
Flowable.rangeLong(1, elements - 1)
.mergeWith(Maybe.just(elements))
Reported by PMD.
Line: 27
@Override
public Publisher<Long> createPublisher(long elements) {
if (elements == 0) {
return Flowable.<Long>empty()
.mergeWith(Maybe.<Long>empty());
}
return
Flowable.rangeLong(1, elements - 1)
.mergeWith(Maybe.just(elements))
Reported by PMD.
Line: 31
.mergeWith(Maybe.<Long>empty());
}
return
Flowable.rangeLong(1, elements - 1)
.mergeWith(Maybe.just(elements))
;
}
}
Reported by PMD.
Line: 19
import org.reactivestreams.Publisher;
import org.testng.annotations.Test;
import io.reactivex.rxjava3.core.*;
@Test
public class MergeWithMaybeTckTest extends BaseTck<Long> {
@Override
Reported by PMD.
src/test/java/io/reactivex/rxjava3/tck/BehaviorProcessorAsPublisherTckTest.java
4 issues
Line: 44
return;
}
if (System.currentTimeMillis() - start > 200) {
return;
}
}
for (int i = 0; i < elements; i++) {
Reported by PMD.
Line: 52
for (int i = 0; i < elements; i++) {
while (!pp.offer(i)) {
Thread.yield();
if (System.currentTimeMillis() - start > 1000) {
return;
}
}
}
pp.onComplete();
Reported by PMD.
Line: 36
Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
while (!pp.hasSubscribers()) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
return;
Reported by PMD.
Line: 36
Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
while (!pp.hasSubscribers()) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
return;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subscribers/DefaultSubscriberTest.java
4 issues
Line: 28
static final class RequestEarly extends DefaultSubscriber<Integer> {
final List<Object> events = new ArrayList<>();
RequestEarly() {
request(5);
}
Reported by PMD.
Line: 59
public void requestUpfront() {
RequestEarly sub = new RequestEarly();
Flowable.range(1, 10).subscribe(sub);
assertEquals(Collections.emptyList(), sub.events);
}
}
Reported by PMD.
Line: 61
Flowable.range(1, 10).subscribe(sub);
assertEquals(Collections.emptyList(), sub.events);
}
}
Reported by PMD.
Line: 22
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
public class DefaultSubscriberTest extends RxJavaTest {
static final class RequestEarly extends DefaultSubscriber<Integer> {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAutoConnect.java
4 issues
Line: 32
* @param <T> the value type of the chain
*/
public final class FlowableAutoConnect<T> extends Flowable<T> {
final ConnectableFlowable<? extends T> source;
final int numberOfSubscribers;
final Consumer<? super Disposable> connection;
final AtomicInteger clients;
public FlowableAutoConnect(ConnectableFlowable<? extends T> source,
Reported by PMD.
Line: 33
*/
public final class FlowableAutoConnect<T> extends Flowable<T> {
final ConnectableFlowable<? extends T> source;
final int numberOfSubscribers;
final Consumer<? super Disposable> connection;
final AtomicInteger clients;
public FlowableAutoConnect(ConnectableFlowable<? extends T> source,
int numberOfSubscribers,
Reported by PMD.
Line: 34
public final class FlowableAutoConnect<T> extends Flowable<T> {
final ConnectableFlowable<? extends T> source;
final int numberOfSubscribers;
final Consumer<? super Disposable> connection;
final AtomicInteger clients;
public FlowableAutoConnect(ConnectableFlowable<? extends T> source,
int numberOfSubscribers,
Consumer<? super Disposable> connection) {
Reported by PMD.
Line: 35
final ConnectableFlowable<? extends T> source;
final int numberOfSubscribers;
final Consumer<? super Disposable> connection;
final AtomicInteger clients;
public FlowableAutoConnect(ConnectableFlowable<? extends T> source,
int numberOfSubscribers,
Consumer<? super Disposable> connection) {
this.source = source;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableError.java
4 issues
Line: 23
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
public final class ObservableError<T> extends Observable<T> {
final Supplier<? extends Throwable> errorSupplier;
public ObservableError(Supplier<? extends Throwable> errorSupplier) {
this.errorSupplier = errorSupplier;
}
@Override
Reported by PMD.
Line: 33
Throwable error;
try {
error = ExceptionHelper.nullCheck(errorSupplier.get(), "Supplier returned a null Throwable.");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
error = t;
}
EmptyDisposable.error(error, observer);
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
Reported by PMD.
Line: 32
public void subscribeActual(Observer<? super T> observer) {
Throwable error;
try {
error = ExceptionHelper.nullCheck(errorSupplier.get(), "Supplier returned a null Throwable.");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
error = t;
}
EmptyDisposable.error(error, observer);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeIgnoreElementCompletable.java
4 issues
Line: 29
*/
public final class MaybeIgnoreElementCompletable<T> extends Completable implements FuseToMaybe<T> {
final MaybeSource<T> source;
public MaybeIgnoreElementCompletable(MaybeSource<T> source) {
this.source = source;
}
Reported by PMD.
Line: 47
static final class IgnoreMaybeObserver<T> implements MaybeObserver<T>, Disposable {
final CompletableObserver downstream;
Disposable upstream;
IgnoreMaybeObserver(CompletableObserver downstream) {
this.downstream = downstream;
Reported by PMD.
Line: 49
final CompletableObserver downstream;
Disposable upstream;
IgnoreMaybeObserver(CompletableObserver downstream) {
this.downstream = downstream;
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
import io.reactivex.rxjava3.internal.fuseable.FuseToMaybe;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDoOnLifecycle.java
4 issues
Line: 22
import io.reactivex.rxjava3.internal.observers.DisposableLambdaObserver;
public final class ObservableDoOnLifecycle<T> extends AbstractObservableWithUpstream<T, T> {
private final Consumer<? super Disposable> onSubscribe;
private final Action onDispose;
public ObservableDoOnLifecycle(Observable<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
super(upstream);
Reported by PMD.
Line: 23
public final class ObservableDoOnLifecycle<T> extends AbstractObservableWithUpstream<T, T> {
private final Consumer<? super Disposable> onSubscribe;
private final Action onDispose;
public ObservableDoOnLifecycle(Observable<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
super(upstream);
this.onSubscribe = onSubscribe;
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.observers.DisposableLambdaObserver;
public final class ObservableDoOnLifecycle<T> extends AbstractObservableWithUpstream<T, T> {
Reported by PMD.
Line: 18
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.observers.DisposableLambdaObserver;
public final class ObservableDoOnLifecycle<T> extends AbstractObservableWithUpstream<T, T> {
private final Consumer<? super Disposable> onSubscribe;
private final Action onDispose;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeErrorCallable.java
4 issues
Line: 29
*/
public final class MaybeErrorCallable<T> extends Maybe<T> {
final Supplier<? extends Throwable> errorSupplier;
public MaybeErrorCallable(Supplier<? extends Throwable> errorSupplier) {
this.errorSupplier = errorSupplier;
}
Reported by PMD.
Line: 42
try {
ex = ExceptionHelper.nullCheck(errorSupplier.get(), "Supplier returned a null Throwable.");
} catch (Throwable ex1) {
Exceptions.throwIfFatal(ex1);
ex = ex1;
}
observer.onError(ex);
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
Reported by PMD.
Line: 41
Throwable ex;
try {
ex = ExceptionHelper.nullCheck(errorSupplier.get(), "Supplier returned a null Throwable.");
} catch (Throwable ex1) {
Exceptions.throwIfFatal(ex1);
ex = ex1;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/single/SingleError.java
4 issues
Line: 24
public final class SingleError<T> extends Single<T> {
final Supplier<? extends Throwable> errorSupplier;
public SingleError(Supplier<? extends Throwable> errorSupplier) {
this.errorSupplier = errorSupplier;
}
Reported by PMD.
Line: 36
try {
error = ExceptionHelper.nullCheck(errorSupplier.get(), "Supplier returned a null Throwable.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
error = e;
}
EmptyDisposable.error(error, observer);
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.single;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
Reported by PMD.
Line: 35
Throwable error;
try {
error = ExceptionHelper.nullCheck(errorSupplier.get(), "Supplier returned a null Throwable.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
error = e;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromSupplier.java
4 issues
Line: 30
*/
public final class ObservableFromSupplier<T> extends Observable<T> implements Supplier<T> {
final Supplier<? extends T> supplier;
public ObservableFromSupplier(Supplier<? extends T> supplier) {
this.supplier = supplier;
}
Reported by PMD.
Line: 46
T value;
try {
value = ExceptionHelper.nullCheck(supplier.get(), "Supplier returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
if (!d.isDisposed()) {
observer.onError(e);
} else {
RxJavaPlugins.onError(e);
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.observable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.observers.DeferredScalarDisposable;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
Reported by PMD.
Line: 45
}
T value;
try {
value = ExceptionHelper.nullCheck(supplier.get(), "Supplier returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
if (!d.isDisposed()) {
observer.onError(e);
} else {
Reported by PMD.