The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableUsing.java
22 issues
Line: 107
}
@Override
public void onError(Throwable t) {
if (eager) {
if (compareAndSet(false, true)) {
try {
disposer.accept(resource);
} catch (Throwable e) {
Reported by PMD.
Line: 27
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableUsing<T, D> extends Observable<T> {
final Supplier<? extends D> resourceSupplier;
final Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier;
final Consumer<? super D> disposer;
final boolean eager;
public ObservableUsing(Supplier<? extends D> resourceSupplier,
Reported by PMD.
Line: 28
public final class ObservableUsing<T, D> extends Observable<T> {
final Supplier<? extends D> resourceSupplier;
final Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier;
final Consumer<? super D> disposer;
final boolean eager;
public ObservableUsing(Supplier<? extends D> resourceSupplier,
Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier,
Reported by PMD.
Line: 29
public final class ObservableUsing<T, D> extends Observable<T> {
final Supplier<? extends D> resourceSupplier;
final Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier;
final Consumer<? super D> disposer;
final boolean eager;
public ObservableUsing(Supplier<? extends D> resourceSupplier,
Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier,
Consumer<? super D> disposer,
Reported by PMD.
Line: 30
final Supplier<? extends D> resourceSupplier;
final Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier;
final Consumer<? super D> disposer;
final boolean eager;
public ObservableUsing(Supplier<? extends D> resourceSupplier,
Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier,
Consumer<? super D> disposer,
boolean eager) {
Reported by PMD.
Line: 48
try {
resource = resourceSupplier.get();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
Reported by PMD.
Line: 57
ObservableSource<? extends T> source;
try {
source = Objects.requireNonNull(sourceSupplier.apply(resource), "The sourceSupplier returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
try {
disposer.accept(resource);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Reported by PMD.
Line: 61
Exceptions.throwIfFatal(e);
try {
disposer.accept(resource);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(new CompositeException(e, ex), observer);
return;
}
EmptyDisposable.error(e, observer);
Reported by PMD.
Line: 72
UsingObserver<T, D> us = new UsingObserver<>(observer, resource, disposer, eager);
source.subscribe(us);
}
static final class UsingObserver<T, D> extends AtomicBoolean implements Observer<T>, Disposable {
private static final long serialVersionUID = 5904473792286235046L;
Reported by PMD.
Line: 79
private static final long serialVersionUID = 5904473792286235046L;
final Observer<? super T> downstream;
final D resource;
final Consumer<? super D> disposer;
final boolean eager;
Disposable upstream;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableToFutureTest.java
22 issues
Line: 30
public class CompletableToFutureTest extends RxJavaTest {
@Test
public void empty() throws Exception {
assertNull(Completable.complete()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 31
@Test
public void empty() throws Exception {
assertNull(Completable.complete()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 31
@Test
public void empty() throws Exception {
assertNull(Completable.complete()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 31
@Test
public void empty() throws Exception {
assertNull(Completable.complete()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 31
@Test
public void empty() throws Exception {
assertNull(Completable.complete()
.subscribeOn(Schedulers.computation())
.toFuture()
.get());
}
Reported by PMD.
Line: 47
fail("Should have thrown!");
} catch (ExecutionException ex) {
assertTrue("" + ex.getCause(), ex.getCause() instanceof TestException);
}
}
@Test
public void cancel() {
Reported by PMD.
Line: 52
}
@Test
public void cancel() {
CompletableSubject cs = CompletableSubject.create();
Future<Void> f = cs.toFuture();
assertTrue(cs.hasObservers());
Reported by PMD.
Line: 55
public void cancel() {
CompletableSubject cs = CompletableSubject.create();
Future<Void> f = cs.toFuture();
assertTrue(cs.hasObservers());
f.cancel(true);
Reported by PMD.
Line: 57
Future<Void> f = cs.toFuture();
assertTrue(cs.hasObservers());
f.cancel(true);
assertFalse(cs.hasObservers());
}
Reported by PMD.
Line: 57
Future<Void> f = cs.toFuture();
assertTrue(cs.hasObservers());
f.cancel(true);
assertFalse(cs.hasObservers());
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleCacheTest.java
22 issues
Line: 27
public class SingleCacheTest extends RxJavaTest {
@Test
public void cancelImmediately() {
PublishProcessor<Integer> pp = PublishProcessor.create();
Single<Integer> cached = pp.single(-99).cache();
TestObserver<Integer> to = cached.test(true);
Reported by PMD.
Line: 30
public void cancelImmediately() {
PublishProcessor<Integer> pp = PublishProcessor.create();
Single<Integer> cached = pp.single(-99).cache();
TestObserver<Integer> to = cached.test(true);
pp.onNext(1);
pp.onComplete();
Reported by PMD.
Line: 30
public void cancelImmediately() {
PublishProcessor<Integer> pp = PublishProcessor.create();
Single<Integer> cached = pp.single(-99).cache();
TestObserver<Integer> to = cached.test(true);
pp.onNext(1);
pp.onComplete();
Reported by PMD.
Line: 32
Single<Integer> cached = pp.single(-99).cache();
TestObserver<Integer> to = cached.test(true);
pp.onNext(1);
pp.onComplete();
to.assertEmpty();
Reported by PMD.
Line: 34
TestObserver<Integer> to = cached.test(true);
pp.onNext(1);
pp.onComplete();
to.assertEmpty();
cached.test().assertResult(1);
Reported by PMD.
Line: 35
TestObserver<Integer> to = cached.test(true);
pp.onNext(1);
pp.onComplete();
to.assertEmpty();
cached.test().assertResult(1);
}
Reported by PMD.
Line: 37
pp.onNext(1);
pp.onComplete();
to.assertEmpty();
cached.test().assertResult(1);
}
@Test
Reported by PMD.
Line: 39
to.assertEmpty();
cached.test().assertResult(1);
}
@Test
public void addRemoveRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
Reported by PMD.
Line: 39
to.assertEmpty();
cached.test().assertResult(1);
}
@Test
public void addRemoveRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
Reported by PMD.
Line: 43
}
@Test
public void addRemoveRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
PublishProcessor<Integer> pp = PublishProcessor.create();
final Single<Integer> cached = pp.single(-99).cache();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/schedulers/SchedulerWorkerTest.java
21 issues
Line: 98
Thread.sleep(150);
System.out.println("Runs: " + times.size());
for (int i = 0; i < times.size() - 1 ; i++) {
long diff = times.get(i + 1) - times.get(i);
System.out.println("Diff #" + i + ": " + diff);
assertTrue("" + i + ":" + diff, diff < 150 && diff > 50);
Reported by PMD.
Line: 102
for (int i = 0; i < times.size() - 1 ; i++) {
long diff = times.get(i + 1) - times.get(i);
System.out.println("Diff #" + i + ": " + diff);
assertTrue("" + i + ":" + diff, diff < 150 && diff > 50);
}
assertTrue("Too few invocations: " + times.size(), times.size() > 2);
Reported by PMD.
Line: 140
Thread.sleep(150);
System.out.println("Runs: " + times.size());
assertTrue(times.size() > 2);
for (int i = 0; i < times.size() - 1 ; i++) {
long diff = times.get(i + 1) - times.get(i);
Reported by PMD.
Line: 146
for (int i = 0; i < times.size() - 1 ; i++) {
long diff = times.get(i + 1) - times.get(i);
System.out.println("Diff #" + i + ": " + diff);
assertTrue("Diff out of range: " + diff, diff < 250 && diff > 50);
}
} finally {
w.dispose();
Reported by PMD.
Line: 30
public class SchedulerWorkerTest extends RxJavaTest {
static final class CustomDriftScheduler extends Scheduler {
public volatile long drift;
@NonNull
@Override
public Worker createWorker() {
final Worker w = Schedulers.computation().createWorker();
return new Worker() {
Reported by PMD.
Line: 34
@NonNull
@Override
public Worker createWorker() {
final Worker w = Schedulers.computation().createWorker();
return new Worker() {
@Override
public void dispose() {
w.dispose();
Reported by PMD.
Line: 73
}
@Test
public void currentTimeDriftBackwards() throws Exception {
CustomDriftScheduler s = new CustomDriftScheduler();
Scheduler.Worker w = s.createWorker();
try {
Reported by PMD.
Line: 73
}
@Test
public void currentTimeDriftBackwards() throws Exception {
CustomDriftScheduler s = new CustomDriftScheduler();
Scheduler.Worker w = s.createWorker();
try {
Reported by PMD.
Line: 90
Thread.sleep(150);
s.drift = -TimeUnit.SECONDS.toNanos(1) - Scheduler.clockDriftTolerance();
Thread.sleep(400);
d.dispose();
Reported by PMD.
Line: 103
for (int i = 0; i < times.size() - 1 ; i++) {
long diff = times.get(i + 1) - times.get(i);
System.out.println("Diff #" + i + ": " + diff);
assertTrue("" + i + ":" + diff, diff < 150 && diff > 50);
}
assertTrue("Too few invocations: " + times.size(), times.size() > 2);
} finally {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableThrottleLastTests.java
21 issues
Line: 31
public class ObservableThrottleLastTests extends RxJavaTest {
@Test
public void throttle() {
Observer<Integer> observer = TestHelper.mockObserver();
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
Reported by PMD.
Line: 36
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
Reported by PMD.
Line: 36
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
Reported by PMD.
Line: 40
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
Reported by PMD.
Line: 41
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 43
o.onNext(1); // skip
o.onNext(2); // deliver
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver
Reported by PMD.
Line: 45
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // skip
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
Reported by PMD.
Line: 47
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onComplete();
Reported by PMD.
Line: 48
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // deliver
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onComplete();
Reported by PMD.
Line: 50
o.onNext(5); // skip
o.onNext(6); // deliver
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onComplete();
InOrder inOrder = inOrder(observer);
inOrder.verify(observer).onNext(2);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeZipArray.java
21 issues
Line: 28
public final class MaybeZipArray<T, R> extends Maybe<R> {
final MaybeSource<? extends T>[] sources;
final Function<? super Object[], ? extends R> zipper;
public MaybeZipArray(MaybeSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
Reported by PMD.
Line: 30
final MaybeSource<? extends T>[] sources;
final Function<? super Object[], ? extends R> zipper;
public MaybeZipArray(MaybeSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
this.zipper = zipper;
}
Reported by PMD.
Line: 32
final Function<? super Object[], ? extends R> zipper;
public MaybeZipArray(MaybeSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
this.zipper = zipper;
}
@Override
Reported by PMD.
Line: 42
MaybeSource<? extends T>[] sources = this.sources;
int n = sources.length;
if (n == 1) {
sources[0].subscribe(new MaybeMap.MapMaybeObserver<>(observer, new SingletonArrayFunc()));
return;
}
ZipCoordinator<T, R> parent = new ZipCoordinator<>(observer, n, zipper);
Reported by PMD.
Line: 59
MaybeSource<? extends T> source = sources[i];
if (source == null) {
parent.innerError(new NullPointerException("One of the sources is null"), i);
return;
}
source.subscribe(parent.observers[i]);
}
}
Reported by PMD.
Line: 62
parent.innerError(new NullPointerException("One of the sources is null"), i);
return;
}
source.subscribe(parent.observers[i]);
}
}
static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
Reported by PMD.
Line: 70
private static final long serialVersionUID = -5556924161382950569L;
final MaybeObserver<? super R> downstream;
final Function<? super Object[], ? extends R> zipper;
final ZipMaybeObserver<T>[] observers;
Reported by PMD.
Line: 72
final MaybeObserver<? super R> downstream;
final Function<? super Object[], ? extends R> zipper;
final ZipMaybeObserver<T>[] observers;
Object[] values;
Reported by PMD.
Line: 74
final Function<? super Object[], ? extends R> zipper;
final ZipMaybeObserver<T>[] observers;
Object[] values;
@SuppressWarnings("unchecked")
ZipCoordinator(MaybeObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
Reported by PMD.
Line: 76
final ZipMaybeObserver<T>[] observers;
Object[] values;
@SuppressWarnings("unchecked")
ZipCoordinator(MaybeObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
super(n);
this.downstream = observer;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableUsing.java
21 issues
Line: 149
@SuppressWarnings("unchecked")
@Override
public void onError(Throwable e) {
upstream = DisposableHelper.DISPOSED;
if (eager) {
Object resource = getAndSet(this);
if (resource != this) {
try {
Reported by PMD.
Line: 28
public final class CompletableUsing<R> extends Completable {
final Supplier<R> resourceSupplier;
final Function<? super R, ? extends CompletableSource> completableFunction;
final Consumer<? super R> disposer;
final boolean eager;
public CompletableUsing(Supplier<R> resourceSupplier,
Reported by PMD.
Line: 29
public final class CompletableUsing<R> extends Completable {
final Supplier<R> resourceSupplier;
final Function<? super R, ? extends CompletableSource> completableFunction;
final Consumer<? super R> disposer;
final boolean eager;
public CompletableUsing(Supplier<R> resourceSupplier,
Function<? super R, ? extends CompletableSource> completableFunction, Consumer<? super R> disposer,
Reported by PMD.
Line: 30
final Supplier<R> resourceSupplier;
final Function<? super R, ? extends CompletableSource> completableFunction;
final Consumer<? super R> disposer;
final boolean eager;
public CompletableUsing(Supplier<R> resourceSupplier,
Function<? super R, ? extends CompletableSource> completableFunction, Consumer<? super R> disposer,
boolean eager) {
Reported by PMD.
Line: 31
final Supplier<R> resourceSupplier;
final Function<? super R, ? extends CompletableSource> completableFunction;
final Consumer<? super R> disposer;
final boolean eager;
public CompletableUsing(Supplier<R> resourceSupplier,
Function<? super R, ? extends CompletableSource> completableFunction, Consumer<? super R> disposer,
boolean eager) {
this.resourceSupplier = resourceSupplier;
Reported by PMD.
Line: 48
try {
resource = resourceSupplier.get();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
Line: 58
try {
source = Objects.requireNonNull(completableFunction.apply(resource), "The completableFunction returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (eager) {
try {
disposer.accept(resource);
} catch (Throwable exc) {
Reported by PMD.
Line: 63
if (eager) {
try {
disposer.accept(resource);
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
EmptyDisposable.error(new CompositeException(ex, exc), observer);
return;
}
}
Reported by PMD.
Line: 75
if (!eager) {
try {
disposer.accept(resource);
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
RxJavaPlugins.onError(exc);
}
}
return;
Reported by PMD.
Line: 92
private static final long serialVersionUID = -674404550052917487L;
final CompletableObserver downstream;
final Consumer<? super R> disposer;
final boolean eager;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapBiSelectorTest.java
21 issues
Line: 27
import io.reactivex.rxjava3.subjects.SingleSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class SingleFlatMapBiSelectorTest extends RxJavaTest {
BiFunction<Integer, Integer, String> stringCombine() {
return new BiFunction<Integer, Integer, String>() {
@Override
public String apply(Integer a, Integer b) throws Exception {
Reported by PMD.
Line: 39
}
@Test
public void normal() {
Single.just(1)
.flatMap(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return Single.just(2);
Reported by PMD.
Line: 66
.test()
.assertFailure(TestException.class);
assertEquals(0, call[0]);
}
@Test
public void justWithError() {
final int[] call = { 0 };
Reported by PMD.
Line: 84
.test()
.assertFailure(TestException.class);
assertEquals(1, call[0]);
}
@Test
public void dispose() {
TestHelper.checkDisposed(SingleSubject.create()
Reported by PMD.
Line: 88
}
@Test
public void dispose() {
TestHelper.checkDisposed(SingleSubject.create()
.flatMap(new Function<Object, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Object v) throws Exception {
return Single.just(1);
Reported by PMD.
Line: 104
}
@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeSingle(new Function<Single<Object>, SingleSource<Object>>() {
@Override
public SingleSource<Object> apply(Single<Object> v) throws Exception {
return v.flatMap(new Function<Object, SingleSource<Integer>>() {
@Override
Reported by PMD.
Line: 124
}
@Test
public void mapperThrows() {
Single.just(1)
.flatMap(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
throw new TestException();
Reported by PMD.
Line: 137
}
@Test
public void mapperReturnsNull() {
Single.just(1)
.flatMap(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return null;
Reported by PMD.
Line: 150
}
@Test
public void resultSelectorThrows() {
Single.just(1)
.flatMap(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return Single.just(2);
Reported by PMD.
Line: 168
}
@Test
public void resultSelectorReturnsNull() {
Single.just(1)
.flatMap(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(Integer v) throws Exception {
return Single.just(2);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/schedulers/TrampolineScheduler.java
21 issues
Line: 30
* after the current unit of work is completed.
*/
public final class TrampolineScheduler extends Scheduler {
private static final TrampolineScheduler INSTANCE = new TrampolineScheduler();
public static TrampolineScheduler instance() {
return INSTANCE;
}
Reported by PMD.
Line: 48
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run) {
RxJavaPlugins.onSchedule(run).run();
return EmptyDisposable.INSTANCE;
}
@NonNull
@Override
Reported by PMD.
Line: 57
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
try {
unit.sleep(delay);
RxJavaPlugins.onSchedule(run).run();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
RxJavaPlugins.onError(ex);
}
return EmptyDisposable.INSTANCE;
Reported by PMD.
Line: 59
unit.sleep(delay);
RxJavaPlugins.onSchedule(run).run();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
RxJavaPlugins.onError(ex);
}
return EmptyDisposable.INSTANCE;
}
Reported by PMD.
Line: 66
}
static final class TrampolineWorker extends Scheduler.Worker implements Disposable {
final PriorityBlockingQueue<TimedRunnable> queue = new PriorityBlockingQueue<>();
private final AtomicInteger wip = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
Reported by PMD.
Line: 68
static final class TrampolineWorker extends Scheduler.Worker implements Disposable {
final PriorityBlockingQueue<TimedRunnable> queue = new PriorityBlockingQueue<>();
private final AtomicInteger wip = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
volatile boolean disposed;
Reported by PMD.
Line: 70
private final AtomicInteger wip = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
volatile boolean disposed;
@NonNull
@Override
Reported by PMD.
Line: 72
final AtomicInteger counter = new AtomicInteger();
volatile boolean disposed;
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action) {
return enqueue(action, now(TimeUnit.MILLISECONDS));
Reported by PMD.
Line: 108
break;
}
if (!polled.disposed) {
polled.run.run();
}
}
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
Reported by PMD.
Line: 135
}
final class AppendToQueueTask implements Runnable {
final TimedRunnable timedRunnable;
AppendToQueueTask(TimedRunnable timedRunnable) {
this.timedRunnable = timedRunnable;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/schedulers/ComputationScheduler.java
21 issues
Line: 45
static final PoolWorker SHUTDOWN_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<FixedSchedulerPool> pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_COMPUTATION_PRIORITY = "rx3.computation-priority";
static {
Reported by PMD.
Line: 46
static final PoolWorker SHUTDOWN_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<FixedSchedulerPool> pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_COMPUTATION_PRIORITY = "rx3.computation-priority";
static {
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
Reported by PMD.
Line: 70
}
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
Reported by PMD.
Line: 72
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
this.cores = maxThreads;
Reported by PMD.
Line: 73
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
Reported by PMD.
Line: 80
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}
public PoolWorker getEventLoop() {
int c = cores;
Reported by PMD.
Line: 109
} else {
int index = (int)n % c;
for (int i = 0; i < number; i++) {
callback.onWorker(i, new EventLoopWorker(eventLoops[index]));
if (++index == c) {
index = 0;
}
}
n = index;
Reported by PMD.
Line: 110
int index = (int)n % c;
for (int i = 0; i < number; i++) {
callback.onWorker(i, new EventLoopWorker(eventLoops[index]));
if (++index == c) {
index = 0;
}
}
n = index;
}
Reported by PMD.
Line: 143
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}
@Override
public void createWorkers(int number, WorkerCallback callback) {
ObjectHelper.verifyPositive(number, "number > 0 required");
Reported by PMD.
Line: 149
@Override
public void createWorkers(int number, WorkerCallback callback) {
ObjectHelper.verifyPositive(number, "number > 0 required");
pool.get().createWorkers(number, callback);
}
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
Reported by PMD.