The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeEmptyTest.java
6 issues
Line: 26
public class MaybeEmptyTest extends RxJavaTest {
@Test
public void scalarSupplier() {
Maybe<Integer> m = Maybe.empty();
assertTrue(m.getClass().toString(), m instanceof ScalarSupplier);
assertNull(((ScalarSupplier<?>)m).get());
Reported by PMD.
Line: 29
public void scalarSupplier() {
Maybe<Integer> m = Maybe.empty();
assertTrue(m.getClass().toString(), m instanceof ScalarSupplier);
assertNull(((ScalarSupplier<?>)m).get());
}
}
Reported by PMD.
Line: 29
public void scalarSupplier() {
Maybe<Integer> m = Maybe.empty();
assertTrue(m.getClass().toString(), m instanceof ScalarSupplier);
assertNull(((ScalarSupplier<?>)m).get());
}
}
Reported by PMD.
Line: 31
assertTrue(m.getClass().toString(), m instanceof ScalarSupplier);
assertNull(((ScalarSupplier<?>)m).get());
}
}
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.operators.maybe;
import static org.junit.Assert.*;
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
Reported by PMD.
Line: 20
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.internal.fuseable.ScalarSupplier;
public class MaybeEmptyTest extends RxJavaTest {
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableEventStream.java
6 issues
Line: 53
}
static final class EventConsumer implements Consumer<Emitter<Event>> {
private final int numInstances;
private final String type;
EventConsumer(int numInstances, String type) {
this.numInstances = numInstances;
this.type = type;
Reported by PMD.
Line: 54
static final class EventConsumer implements Consumer<Emitter<Event>> {
private final int numInstances;
private final String type;
EventConsumer(int numInstances, String type) {
this.numInstances = numInstances;
this.type = type;
}
Reported by PMD.
Line: 68
// slow it down somewhat
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
s.onError(e);
}
}
}
Reported by PMD.
Line: 75
}
public static class Event {
public final String type;
public final String instanceId;
public final Map<String, Object> values;
/**
* Construct an event with the provided parameters.
Reported by PMD.
Line: 76
public static class Event {
public final String type;
public final String instanceId;
public final Map<String, Object> values;
/**
* Construct an event with the provided parameters.
* @param type the event type
Reported by PMD.
Line: 77
public static class Event {
public final String type;
public final String instanceId;
public final Map<String, Object> values;
/**
* Construct an event with the provided parameters.
* @param type the event type
* @param instanceId the instance identifier
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/util/ExceptionHelperTest.java
6 issues
Line: 28
public class ExceptionHelperTest extends RxJavaTest {
@Test
public void utilityClass() {
TestHelper.checkUtilityClass(ExceptionHelper.class);
}
@Test
public void addRace() {
Reported by PMD.
Line: 36
public void addRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final AtomicReference<Throwable> error = new AtomicReference<>();
final TestException ex = new TestException();
Runnable r = new Runnable() {
@Override
Reported by PMD.
Line: 38
final AtomicReference<Throwable> error = new AtomicReference<>();
final TestException ex = new TestException();
Runnable r = new Runnable() {
@Override
public void run() {
assertTrue(ExceptionHelper.addThrowable(error, ex));
Reported by PMD.
Line: 40
final TestException ex = new TestException();
Runnable r = new Runnable() {
@Override
public void run() {
assertTrue(ExceptionHelper.addThrowable(error, ex));
}
};
Reported by PMD.
Line: 43
Runnable r = new Runnable() {
@Override
public void run() {
assertTrue(ExceptionHelper.addThrowable(error, ex));
}
};
TestHelper.race(r, r);
}
Reported by PMD.
Line: 52
}
@Test(expected = InternalError.class)
public void throwIfThrowable() throws Exception {
ExceptionHelper.<Exception>throwIfThrowable(new InternalError());
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/single/SingleTimerTest.java
5 issues
Line: 29
public class SingleTimerTest extends RxJavaTest {
@Test
public void timer() {
final TestScheduler testScheduler = new TestScheduler();
final AtomicLong atomicLong = new AtomicLong();
Single.timer(2, TimeUnit.SECONDS, testScheduler).subscribe(new Consumer<Long>() {
@Override
Reported by PMD.
Line: 40
}
});
assertEquals(0, atomicLong.get());
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
assertEquals(0, atomicLong.get());
Reported by PMD.
Line: 44
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
assertEquals(0, atomicLong.get());
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
assertEquals(1, atomicLong.get());
}
Reported by PMD.
Line: 48
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
assertEquals(1, atomicLong.get());
}
}
Reported by PMD.
Line: 23
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.TestScheduler;
public class SingleTimerTest extends RxJavaTest {
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/tck/MulticastProcessorAsPublisherTckTest.java
5 issues
Line: 32
@Override
public Publisher<Integer> createPublisher(final long elements) {
final MulticastProcessor<Integer> mp = MulticastProcessor.create();
mp.start();
Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
Reported by PMD.
Line: 45
return;
}
if (System.currentTimeMillis() - start > 200) {
return;
}
}
for (int i = 0; i < elements; i++) {
Reported by PMD.
Line: 53
for (int i = 0; i < elements; i++) {
while (!mp.offer(i)) {
Thread.yield();
if (System.currentTimeMillis() - start > 1000) {
return;
}
}
}
mp.onComplete();
Reported by PMD.
Line: 37
Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
while (!mp.hasSubscribers()) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
return;
Reported by PMD.
Line: 37
Schedulers.io().scheduleDirect(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
while (!mp.hasSubscribers()) {
try {
Thread.sleep(1);
} catch (InterruptedException ex) {
return;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTaskTest.java
5 issues
Line: 42
try {
task.run();
fail("Should have thrown!");
} catch (TestException expected) {
// expected
}
TestHelper.assertUndeliverable(errors, 0, TestException.class);
Reported by PMD.
Line: 31
@Test
public void runnableThrows() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(new Runnable() {
@Override
public void run() {
throw new TestException();
Reported by PMD.
Line: 31
@Test
public void runnableThrows() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(new Runnable() {
@Override
public void run() {
throw new TestException();
Reported by PMD.
Line: 33
public void runnableThrows() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(new Runnable() {
@Override
public void run() {
throw new TestException();
}
}, true);
Reported by PMD.
Line: 33
public void runnableThrows() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(new Runnable() {
@Override
public void run() {
throw new TestException();
}
}, true);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/maybe/MaybeTimerTest.java
5 issues
Line: 29
public class MaybeTimerTest extends RxJavaTest {
@Test
public void timer() {
final TestScheduler testScheduler = new TestScheduler();
final AtomicLong atomicLong = new AtomicLong();
Maybe.timer(2, TimeUnit.SECONDS, testScheduler).subscribe(new Consumer<Long>() {
@Override
Reported by PMD.
Line: 40
}
});
assertEquals(0, atomicLong.get());
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
assertEquals(0, atomicLong.get());
Reported by PMD.
Line: 44
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
assertEquals(0, atomicLong.get());
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
assertEquals(1, atomicLong.get());
}
Reported by PMD.
Line: 48
testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
assertEquals(1, atomicLong.get());
}
}
Reported by PMD.
Line: 23
import org.junit.Test;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.TestScheduler;
public class MaybeTimerTest extends RxJavaTest {
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/testsupport/TimesteppingScheduler.java
5 issues
Line: 45
@Override
public long now(TimeUnit unit) {
return TimesteppingScheduler.this.now(unit);
}
}
public long time;
Reported by PMD.
Line: 45
@Override
public long now(TimeUnit unit) {
return TimesteppingScheduler.this.now(unit);
}
}
public long time;
Reported by PMD.
Line: 49
}
}
public long time;
public boolean stepEnabled = true;
@Override
public Worker createWorker() {
Reported by PMD.
Line: 51
public long time;
public boolean stepEnabled = true;
@Override
public Worker createWorker() {
return new TimesteppingWorker();
}
Reported by PMD.
Line: 19
import java.util.concurrent.TimeUnit;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.*;
/**
* Basic scheduler that produces an ever increasing {@link #now(TimeUnit)} value.
* Use this scheduler only as a time source!
*/
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAutoConnect.java
5 issues
Line: 30
* @param <T> the value type of the chain
*/
public final class ObservableAutoConnect<T> extends Observable<T> {
final ConnectableObservable<? extends T> source;
final int numberOfObservers;
final Consumer<? super Disposable> connection;
final AtomicInteger clients;
public ObservableAutoConnect(ConnectableObservable<? extends T> source,
Reported by PMD.
Line: 31
*/
public final class ObservableAutoConnect<T> extends Observable<T> {
final ConnectableObservable<? extends T> source;
final int numberOfObservers;
final Consumer<? super Disposable> connection;
final AtomicInteger clients;
public ObservableAutoConnect(ConnectableObservable<? extends T> source,
int numberOfObservers,
Reported by PMD.
Line: 32
public final class ObservableAutoConnect<T> extends Observable<T> {
final ConnectableObservable<? extends T> source;
final int numberOfObservers;
final Consumer<? super Disposable> connection;
final AtomicInteger clients;
public ObservableAutoConnect(ConnectableObservable<? extends T> source,
int numberOfObservers,
Consumer<? super Disposable> connection) {
Reported by PMD.
Line: 33
final ConnectableObservable<? extends T> source;
final int numberOfObservers;
final Consumer<? super Disposable> connection;
final AtomicInteger clients;
public ObservableAutoConnect(ConnectableObservable<? extends T> source,
int numberOfObservers,
Consumer<? super Disposable> connection) {
this.source = source;
Reported by PMD.
Line: 18
import java.util.concurrent.atomic.AtomicInteger;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.observables.ConnectableObservable;
/**
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/observers/BlockingBaseObserver.java
5 issues
Line: 25
public abstract class BlockingBaseObserver<T> extends CountDownLatch
implements Observer<T>, Disposable {
T value;
Throwable error;
Disposable upstream;
volatile boolean cancelled;
Reported by PMD.
Line: 26
implements Observer<T>, Disposable {
T value;
Throwable error;
Disposable upstream;
volatile boolean cancelled;
Reported by PMD.
Line: 28
T value;
Throwable error;
Disposable upstream;
volatile boolean cancelled;
public BlockingBaseObserver() {
super(1);
Reported by PMD.
Line: 30
Disposable upstream;
volatile boolean cancelled;
public BlockingBaseObserver() {
super(1);
}
Reported by PMD.
Line: 20
import io.reactivex.rxjava3.core.Observer;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.internal.util.*;
public abstract class BlockingBaseObserver<T> extends CountDownLatch
implements Observer<T>, Disposable {
T value;
Reported by PMD.