The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableMostRecentTest.java
21 issues
Line: 31
public class BlockingFlowableMostRecentTest extends RxJavaTest {
@Test
public void mostRecent() {
FlowableProcessor<String> s = PublishProcessor.create();
Iterator<String> it = s.blockingMostRecent("default").iterator();
assertTrue(it.hasNext());
Reported by PMD.
Line: 34
public void mostRecent() {
FlowableProcessor<String> s = PublishProcessor.create();
Iterator<String> it = s.blockingMostRecent("default").iterator();
assertTrue(it.hasNext());
assertEquals("default", it.next());
assertEquals("default", it.next());
Reported by PMD.
Line: 34
public void mostRecent() {
FlowableProcessor<String> s = PublishProcessor.create();
Iterator<String> it = s.blockingMostRecent("default").iterator();
assertTrue(it.hasNext());
assertEquals("default", it.next());
assertEquals("default", it.next());
Reported by PMD.
Line: 34
public void mostRecent() {
FlowableProcessor<String> s = PublishProcessor.create();
Iterator<String> it = s.blockingMostRecent("default").iterator();
assertTrue(it.hasNext());
assertEquals("default", it.next());
assertEquals("default", it.next());
Reported by PMD.
Line: 40
assertEquals("default", it.next());
assertEquals("default", it.next());
s.onNext("one");
assertTrue(it.hasNext());
assertEquals("one", it.next());
assertEquals("one", it.next());
s.onNext("two");
Reported by PMD.
Line: 45
assertEquals("one", it.next());
assertEquals("one", it.next());
s.onNext("two");
assertTrue(it.hasNext());
assertEquals("two", it.next());
assertEquals("two", it.next());
s.onComplete();
Reported by PMD.
Line: 50
assertEquals("two", it.next());
assertEquals("two", it.next());
s.onComplete();
assertFalse(it.hasNext());
}
@Test(expected = TestException.class)
Reported by PMD.
Line: 59
public void mostRecentWithException() {
FlowableProcessor<String> s = PublishProcessor.create();
Iterator<String> it = s.blockingMostRecent("default").iterator();
assertTrue(it.hasNext());
assertEquals("default", it.next());
assertEquals("default", it.next());
Reported by PMD.
Line: 59
public void mostRecentWithException() {
FlowableProcessor<String> s = PublishProcessor.create();
Iterator<String> it = s.blockingMostRecent("default").iterator();
assertTrue(it.hasNext());
assertEquals("default", it.next());
assertEquals("default", it.next());
Reported by PMD.
Line: 74
@Test
public void singleSourceManyIterators() {
TestScheduler scheduler = new TestScheduler();
Flowable<Long> source = Flowable.interval(1, TimeUnit.SECONDS, scheduler).take(10);
Iterable<Long> iter = source.blockingMostRecent(-1L);
for (int j = 0; j < 3; j++) {
Iterator<Long> it = iter.iterator();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/schedulers/ComputationScheduler.java
21 issues
Line: 45
static final PoolWorker SHUTDOWN_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<FixedSchedulerPool> pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_COMPUTATION_PRIORITY = "rx3.computation-priority";
static {
Reported by PMD.
Line: 46
static final PoolWorker SHUTDOWN_WORKER;
final ThreadFactory threadFactory;
final AtomicReference<FixedSchedulerPool> pool;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_COMPUTATION_PRIORITY = "rx3.computation-priority";
static {
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
Reported by PMD.
Line: 70
}
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
Reported by PMD.
Line: 72
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
this.cores = maxThreads;
Reported by PMD.
Line: 73
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
Reported by PMD.
Line: 80
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}
public PoolWorker getEventLoop() {
int c = cores;
Reported by PMD.
Line: 109
} else {
int index = (int)n % c;
for (int i = 0; i < number; i++) {
callback.onWorker(i, new EventLoopWorker(eventLoops[index]));
if (++index == c) {
index = 0;
}
}
n = index;
Reported by PMD.
Line: 110
int index = (int)n % c;
for (int i = 0; i < number; i++) {
callback.onWorker(i, new EventLoopWorker(eventLoops[index]));
if (++index == c) {
index = 0;
}
}
n = index;
}
Reported by PMD.
Line: 143
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
}
@Override
public void createWorkers(int number, WorkerCallback callback) {
ObjectHelper.verifyPositive(number, "number > 0 required");
Reported by PMD.
Line: 149
@Override
public void createWorkers(int number, WorkerCallback callback) {
ObjectHelper.verifyPositive(number, "number > 0 required");
pool.get().createWorkers(number, callback);
}
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeZipArray.java
21 issues
Line: 28
public final class MaybeZipArray<T, R> extends Maybe<R> {
final MaybeSource<? extends T>[] sources;
final Function<? super Object[], ? extends R> zipper;
public MaybeZipArray(MaybeSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
Reported by PMD.
Line: 30
final MaybeSource<? extends T>[] sources;
final Function<? super Object[], ? extends R> zipper;
public MaybeZipArray(MaybeSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
this.zipper = zipper;
}
Reported by PMD.
Line: 32
final Function<? super Object[], ? extends R> zipper;
public MaybeZipArray(MaybeSource<? extends T>[] sources, Function<? super Object[], ? extends R> zipper) {
this.sources = sources;
this.zipper = zipper;
}
@Override
Reported by PMD.
Line: 42
MaybeSource<? extends T>[] sources = this.sources;
int n = sources.length;
if (n == 1) {
sources[0].subscribe(new MaybeMap.MapMaybeObserver<>(observer, new SingletonArrayFunc()));
return;
}
ZipCoordinator<T, R> parent = new ZipCoordinator<>(observer, n, zipper);
Reported by PMD.
Line: 59
MaybeSource<? extends T> source = sources[i];
if (source == null) {
parent.innerError(new NullPointerException("One of the sources is null"), i);
return;
}
source.subscribe(parent.observers[i]);
}
}
Reported by PMD.
Line: 62
parent.innerError(new NullPointerException("One of the sources is null"), i);
return;
}
source.subscribe(parent.observers[i]);
}
}
static final class ZipCoordinator<T, R> extends AtomicInteger implements Disposable {
Reported by PMD.
Line: 70
private static final long serialVersionUID = -5556924161382950569L;
final MaybeObserver<? super R> downstream;
final Function<? super Object[], ? extends R> zipper;
final ZipMaybeObserver<T>[] observers;
Reported by PMD.
Line: 72
final MaybeObserver<? super R> downstream;
final Function<? super Object[], ? extends R> zipper;
final ZipMaybeObserver<T>[] observers;
Object[] values;
Reported by PMD.
Line: 74
final Function<? super Object[], ? extends R> zipper;
final ZipMaybeObserver<T>[] observers;
Object[] values;
@SuppressWarnings("unchecked")
ZipCoordinator(MaybeObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
Reported by PMD.
Line: 76
final ZipMaybeObserver<T>[] observers;
Object[] values;
@SuppressWarnings("unchecked")
ZipCoordinator(MaybeObserver<? super R> observer, int n, Function<? super Object[], ? extends R> zipper) {
super(n);
this.downstream = observer;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSkipLastTimed.java
21 issues
Line: 27
import io.reactivex.rxjava3.internal.util.BackpressureHelper;
public final class FlowableSkipLastTimed<T> extends AbstractFlowableWithUpstream<T, T> {
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
Reported by PMD.
Line: 28
public final class FlowableSkipLastTimed<T> extends AbstractFlowableWithUpstream<T, T> {
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public FlowableSkipLastTimed(Flowable<T> source, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
Reported by PMD.
Line: 29
public final class FlowableSkipLastTimed<T> extends AbstractFlowableWithUpstream<T, T> {
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public FlowableSkipLastTimed(Flowable<T> source, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
super(source);
Reported by PMD.
Line: 30
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public FlowableSkipLastTimed(Flowable<T> source, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
super(source);
this.time = time;
Reported by PMD.
Line: 31
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public FlowableSkipLastTimed(Flowable<T> source, long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
super(source);
this.time = time;
this.unit = unit;
Reported by PMD.
Line: 50
static final class SkipLastTimedSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -5677354903406201275L;
final Subscriber<? super T> downstream;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Reported by PMD.
Line: 51
private static final long serialVersionUID = -5677354903406201275L;
final Subscriber<? super T> downstream;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Reported by PMD.
Line: 52
private static final long serialVersionUID = -5677354903406201275L;
final Subscriber<? super T> downstream;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Subscription upstream;
Reported by PMD.
Line: 53
final Subscriber<? super T> downstream;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Subscription upstream;
Reported by PMD.
Line: 54
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Subscription upstream;
final AtomicLong requested = new AtomicLong();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/schedulers/TrampolineScheduler.java
21 issues
Line: 30
* after the current unit of work is completed.
*/
public final class TrampolineScheduler extends Scheduler {
private static final TrampolineScheduler INSTANCE = new TrampolineScheduler();
public static TrampolineScheduler instance() {
return INSTANCE;
}
Reported by PMD.
Line: 48
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run) {
RxJavaPlugins.onSchedule(run).run();
return EmptyDisposable.INSTANCE;
}
@NonNull
@Override
Reported by PMD.
Line: 57
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
try {
unit.sleep(delay);
RxJavaPlugins.onSchedule(run).run();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
RxJavaPlugins.onError(ex);
}
return EmptyDisposable.INSTANCE;
Reported by PMD.
Line: 59
unit.sleep(delay);
RxJavaPlugins.onSchedule(run).run();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
RxJavaPlugins.onError(ex);
}
return EmptyDisposable.INSTANCE;
}
Reported by PMD.
Line: 66
}
static final class TrampolineWorker extends Scheduler.Worker implements Disposable {
final PriorityBlockingQueue<TimedRunnable> queue = new PriorityBlockingQueue<>();
private final AtomicInteger wip = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
Reported by PMD.
Line: 68
static final class TrampolineWorker extends Scheduler.Worker implements Disposable {
final PriorityBlockingQueue<TimedRunnable> queue = new PriorityBlockingQueue<>();
private final AtomicInteger wip = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
volatile boolean disposed;
Reported by PMD.
Line: 70
private final AtomicInteger wip = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
volatile boolean disposed;
@NonNull
@Override
Reported by PMD.
Line: 72
final AtomicInteger counter = new AtomicInteger();
volatile boolean disposed;
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action) {
return enqueue(action, now(TimeUnit.MILLISECONDS));
Reported by PMD.
Line: 108
break;
}
if (!polled.disposed) {
polled.run.run();
}
}
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
Reported by PMD.
Line: 135
}
final class AppendToQueueTask implements Runnable {
final TimedRunnable timedRunnable;
AppendToQueueTask(TimedRunnable timedRunnable) {
this.timedRunnable = timedRunnable;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRepeatWhen.java
21 issues
Line: 29
import io.reactivex.rxjava3.subscribers.SerializedSubscriber;
public final class FlowableRepeatWhen<T> extends AbstractFlowableWithUpstream<T, T> {
final Function<? super Flowable<Object>, ? extends Publisher<?>> handler;
public FlowableRepeatWhen(Flowable<T> source,
Function<? super Flowable<Object>, ? extends Publisher<?>> handler) {
super(source);
this.handler = handler;
Reported by PMD.
Line: 42
SerializedSubscriber<T> z = new SerializedSubscriber<>(s);
FlowableProcessor<Object> processor = UnicastProcessor.create(8).toSerialized();
Publisher<?> when;
try {
when = Objects.requireNonNull(handler.apply(processor), "handler returned a null Publisher");
Reported by PMD.
Line: 48
try {
when = Objects.requireNonNull(handler.apply(processor), "handler returned a null Publisher");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Reported by PMD.
Line: 62
s.onSubscribe(subscriber);
when.subscribe(receiver);
receiver.onNext(0);
}
static final class WhenReceiver<T, U>
Reported by PMD.
Line: 73
private static final long serialVersionUID = 2827772011130406689L;
final Publisher<T> source;
final AtomicReference<Subscription> upstream;
final AtomicLong requested;
Reported by PMD.
Line: 75
final Publisher<T> source;
final AtomicReference<Subscription> upstream;
final AtomicLong requested;
WhenSourceSubscriber<T, U> subscriber;
Reported by PMD.
Line: 77
final AtomicReference<Subscription> upstream;
final AtomicLong requested;
WhenSourceSubscriber<T, U> subscriber;
WhenReceiver(Publisher<T> source) {
this.source = source;
Reported by PMD.
Line: 79
final AtomicLong requested;
WhenSourceSubscriber<T, U> subscriber;
WhenReceiver(Publisher<T> source) {
this.source = source;
this.upstream = new AtomicReference<>();
this.requested = new AtomicLong();
Reported by PMD.
Line: 112
@Override
public void onError(Throwable t) {
subscriber.cancel();
subscriber.downstream.onError(t);
}
@Override
public void onComplete() {
subscriber.cancel();
Reported by PMD.
Line: 118
@Override
public void onComplete() {
subscriber.cancel();
subscriber.downstream.onComplete();
}
@Override
public void request(long n) {
SubscriptionHelper.deferredRequest(upstream, requested, n);
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableUsing.java
21 issues
Line: 149
@SuppressWarnings("unchecked")
@Override
public void onError(Throwable e) {
upstream = DisposableHelper.DISPOSED;
if (eager) {
Object resource = getAndSet(this);
if (resource != this) {
try {
Reported by PMD.
Line: 28
public final class CompletableUsing<R> extends Completable {
final Supplier<R> resourceSupplier;
final Function<? super R, ? extends CompletableSource> completableFunction;
final Consumer<? super R> disposer;
final boolean eager;
public CompletableUsing(Supplier<R> resourceSupplier,
Reported by PMD.
Line: 29
public final class CompletableUsing<R> extends Completable {
final Supplier<R> resourceSupplier;
final Function<? super R, ? extends CompletableSource> completableFunction;
final Consumer<? super R> disposer;
final boolean eager;
public CompletableUsing(Supplier<R> resourceSupplier,
Function<? super R, ? extends CompletableSource> completableFunction, Consumer<? super R> disposer,
Reported by PMD.
Line: 30
final Supplier<R> resourceSupplier;
final Function<? super R, ? extends CompletableSource> completableFunction;
final Consumer<? super R> disposer;
final boolean eager;
public CompletableUsing(Supplier<R> resourceSupplier,
Function<? super R, ? extends CompletableSource> completableFunction, Consumer<? super R> disposer,
boolean eager) {
Reported by PMD.
Line: 31
final Supplier<R> resourceSupplier;
final Function<? super R, ? extends CompletableSource> completableFunction;
final Consumer<? super R> disposer;
final boolean eager;
public CompletableUsing(Supplier<R> resourceSupplier,
Function<? super R, ? extends CompletableSource> completableFunction, Consumer<? super R> disposer,
boolean eager) {
this.resourceSupplier = resourceSupplier;
Reported by PMD.
Line: 48
try {
resource = resourceSupplier.get();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
Reported by PMD.
Line: 58
try {
source = Objects.requireNonNull(completableFunction.apply(resource), "The completableFunction returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
if (eager) {
try {
disposer.accept(resource);
} catch (Throwable exc) {
Reported by PMD.
Line: 63
if (eager) {
try {
disposer.accept(resource);
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
EmptyDisposable.error(new CompositeException(ex, exc), observer);
return;
}
}
Reported by PMD.
Line: 75
if (!eager) {
try {
disposer.accept(resource);
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
RxJavaPlugins.onError(exc);
}
}
return;
Reported by PMD.
Line: 92
private static final long serialVersionUID = -674404550052917487L;
final CompletableObserver downstream;
final Consumer<? super R> disposer;
final boolean eager;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/parallel/ParallelFlowable.java
21 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.parallel;
import java.util.*;
import java.util.stream.*;
import org.reactivestreams.*;
Reported by PMD.
Line: 43
* @param <T> the value type
* @since 2.2
*/
public abstract class ParallelFlowable<@NonNull T> {
/**
* Subscribes an array of {@link Subscriber}s to this {@code ParallelFlowable} and triggers
* the execution chain for all 'rails'.
* <dl>
Reported by PMD.
Line: 43
* @param <T> the value type
* @since 2.2
*/
public abstract class ParallelFlowable<@NonNull T> {
/**
* Subscribes an array of {@link Subscriber}s to this {@code ParallelFlowable} and triggers
* the execution chain for all 'rails'.
* <dl>
Reported by PMD.
Line: 43
* @param <T> the value type
* @since 2.2
*/
public abstract class ParallelFlowable<@NonNull T> {
/**
* Subscribes an array of {@link Subscriber}s to this {@code ParallelFlowable} and triggers
* the execution chain for all 'rails'.
* <dl>
Reported by PMD.
Line: 114
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <@NonNull T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source) {
return from(source, Runtime.getRuntime().availableProcessors(), Flowable.bufferSize());
}
/**
* Take a {@link Publisher} and prepare to consume it on parallelism number of 'rails' in a round-robin fashion.
* <dl>
Reported by PMD.
Line: 171
int parallelism, int prefetch) {
Objects.requireNonNull(source, "source is null");
ObjectHelper.verifyPositive(parallelism, "parallelism");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new ParallelFromPublisher<>(source, parallelism, prefetch));
}
/**
Reported by PMD.
Line: 197
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <@NonNull R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ParallelMap<>(this, mapper));
}
/**
* Maps the source values on each 'rail' to another value and
Reported by PMD.
Line: 228
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <@NonNull R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper, @NonNull ParallelFailureHandling errorHandler) {
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelMapTry<>(this, mapper, errorHandler));
}
/**
* Maps the source values on each 'rail' to another value and
Reported by PMD.
Line: 1106
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull R> R to(@NonNull ParallelFlowableConverter<T, R> converter) {
return Objects.requireNonNull(converter, "converter is null").apply(this);
}
/**
* Allows composing operators, in assembly time, on top of this {@code ParallelFlowable}
* and returns another {@code ParallelFlowable} with composed features.
Reported by PMD.
Line: 1130
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final <@NonNull U> ParallelFlowable<U> compose(@NonNull ParallelTransformer<T, U> composer) {
return RxJavaPlugins.onAssembly(Objects.requireNonNull(composer, "composer is null").apply(this));
}
/**
* Generates and flattens {@link Publisher}s on each 'rail'.
* <p>
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounce.java
20 issues
Line: 32
import io.reactivex.rxjava3.subscribers.*;
public final class FlowableDebounce<T, U> extends AbstractFlowableWithUpstream<T, T> {
final Function<? super T, ? extends Publisher<U>> debounceSelector;
public FlowableDebounce(Flowable<T> source, Function<? super T, ? extends Publisher<U>> debounceSelector) {
super(source);
this.debounceSelector = debounceSelector;
}
Reported by PMD.
Line: 48
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = 6725975399620862591L;
final Subscriber<? super T> downstream;
final Function<? super T, ? extends Publisher<U>> debounceSelector;
Subscription upstream;
final AtomicReference<Disposable> debouncer = new AtomicReference<>();
Reported by PMD.
Line: 49
private static final long serialVersionUID = 6725975399620862591L;
final Subscriber<? super T> downstream;
final Function<? super T, ? extends Publisher<U>> debounceSelector;
Subscription upstream;
final AtomicReference<Disposable> debouncer = new AtomicReference<>();
Reported by PMD.
Line: 51
final Subscriber<? super T> downstream;
final Function<? super T, ? extends Publisher<U>> debounceSelector;
Subscription upstream;
final AtomicReference<Disposable> debouncer = new AtomicReference<>();
volatile long index;
Reported by PMD.
Line: 53
Subscription upstream;
final AtomicReference<Disposable> debouncer = new AtomicReference<>();
volatile long index;
boolean done;
Reported by PMD.
Line: 55
final AtomicReference<Disposable> debouncer = new AtomicReference<>();
volatile long index;
boolean done;
DebounceSubscriber(Subscriber<? super T> actual,
Function<? super T, ? extends Publisher<U>> debounceSelector) {
Reported by PMD.
Line: 57
volatile long index;
boolean done;
DebounceSubscriber(Subscriber<? super T> actual,
Function<? super T, ? extends Publisher<U>> debounceSelector) {
this.downstream = actual;
this.debounceSelector = debounceSelector;
Reported by PMD.
Line: 92
try {
p = Objects.requireNonNull(debounceSelector.apply(t), "The publisher supplied is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
cancel();
downstream.onError(e);
return;
}
Reported by PMD.
Line: 123
@SuppressWarnings("unchecked")
DebounceInnerSubscriber<T, U> dis = (DebounceInnerSubscriber<T, U>)d;
if (dis != null) {
dis.emit();
}
DisposableHelper.dispose(debouncer);
downstream.onComplete();
}
}
Reported by PMD.
Line: 146
void emit(long idx, T value) {
if (idx == index) {
long r = get();
if (r != 0L) {
downstream.onNext(value);
BackpressureHelper.produced(this, 1);
} else {
cancel();
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/subscribers/BoundedSubscriber.java
20 issues
Line: 33
implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
Reported by PMD.
Line: 33
implements FlowableSubscriber<T>, Subscription, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
Reported by PMD.
Line: 34
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
int consumed;
Reported by PMD.
Line: 34
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
int consumed;
Reported by PMD.
Line: 35
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
int consumed;
final int limit;
Reported by PMD.
Line: 35
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
int consumed;
final int limit;
Reported by PMD.
Line: 36
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
int consumed;
final int limit;
Reported by PMD.
Line: 36
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
int consumed;
final int limit;
Reported by PMD.
Line: 38
final Action onComplete;
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
int consumed;
final int limit;
public BoundedSubscriber(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe, int bufferSize) {
Reported by PMD.
Line: 39
final Consumer<? super Subscription> onSubscribe;
final int bufferSize;
int consumed;
final int limit;
public BoundedSubscriber(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe, int bufferSize) {
super();
Reported by PMD.