The following issues were found
src/test/java/io/reactivex/rxjava3/flowable/FlowableFuseableTest.java
31 issues
Line: 27
public class FlowableFuseableTest extends RxJavaTest {
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Flowable.range(1, 10)
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 38
}
@Test
public void syncArray() {
Flowable.fromArray(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 })
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Reported by PMD.
Line: 49
}
@Test
public void syncIterable() {
Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Reported by PMD.
Line: 51
@Test
public void syncIterable() {
Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 51
@Test
public void syncIterable() {
Flowable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.to(TestHelper.<Integer>testSubscriber(Long.MAX_VALUE, QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoOnLifecycleTest.java
31 issues
Line: 34
public class FlowableDoOnLifecycleTest extends RxJavaTest {
@Test
public void onSubscribeCrashed() {
Flowable.just(1)
.doOnLifecycle(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) throws Exception {
throw new TestException();
Reported by PMD.
Line: 47
}
@Test
public void doubleOnSubscribe() {
final int[] calls = { 0, 0 };
TestHelper.checkDoubleOnSubscribeFlowable(new Function<Flowable<Object>, Publisher<Object>>() {
@Override
public Publisher<Object> apply(Flowable<Object> f) throws Exception {
Reported by PMD.
Line: 68
}
});
assertEquals(2, calls[0]);
assertEquals(0, calls[1]);
}
@Test
public void dispose() {
Reported by PMD.
Line: 69
});
assertEquals(2, calls[0]);
assertEquals(0, calls[1]);
}
@Test
public void dispose() {
final int[] calls = { 0, 0 };
Reported by PMD.
Line: 73
}
@Test
public void dispose() {
final int[] calls = { 0, 0 };
TestHelper.checkDisposed(Flowable.just(1)
.doOnLifecycle(new Consumer<Subscription>() {
@Override
Reported by PMD.
Line: 90
})
);
assertEquals(1, calls[0]);
assertEquals(1, calls[1]);
}
@Test
public void requestCrashed() {
Reported by PMD.
Line: 91
);
assertEquals(1, calls[0]);
assertEquals(1, calls[1]);
}
@Test
public void requestCrashed() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Reported by PMD.
Line: 95
}
@Test
public void requestCrashed() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.doOnLifecycle(Functions.emptyConsumer(),
new LongConsumer() {
Reported by PMD.
Line: 117
}
@Test
public void cancelCrashed() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.just(1)
.doOnLifecycle(Functions.emptyConsumer(),
Functions.EMPTY_LONG_CONSUMER,
Reported by PMD.
Line: 162
.to(TestHelper.<Integer>testConsumer())
.assertFailureAndMessage(TestException.class, "First");
assertTrue(bs.isCancelled());
TestHelper.assertUndeliverable(errors, 0, TestException.class, "Second");
} finally {
RxJavaPlugins.reset();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableFuseableTest.java
31 issues
Line: 27
public class ObservableFuseableTest extends RxJavaTest {
@Test
public void syncRange() {
Observable.range(1, 10)
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Observable.range(1, 10)
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Observable.range(1, 10)
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Observable.range(1, 10)
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Observable.range(1, 10)
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 29
@Test
public void syncRange() {
Observable.range(1, 10)
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 38
}
@Test
public void syncArray() {
Observable.fromArray(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 })
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Reported by PMD.
Line: 49
}
@Test
public void syncIterable() {
Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Reported by PMD.
Line: 51
@Test
public void syncIterable() {
Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
Line: 51
@Test
public void syncIterable() {
Observable.fromIterable(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.to(TestHelper.<Integer>testConsumer(QueueFuseable.ANY, false))
.assertFusionMode(QueueFuseable.SYNC)
.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.assertNoErrors()
.assertComplete();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observers/DisposableSingleObserverTest.java
31 issues
Line: 32
static final class TestSingle<T> extends DisposableSingleObserver<T> {
int start;
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
Reported by PMD.
Line: 34
int start;
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
@Override
protected void onStart() {
Reported by PMD.
Line: 36
final List<T> values = new ArrayList<>();
final List<Throwable> errors = new ArrayList<>();
@Override
protected void onStart() {
super.onStart();
Reported by PMD.
Line: 58
}
@Test
public void normal() {
TestSingle<Integer> tc = new TestSingle<>();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
Reported by PMD.
Line: 61
public void normal() {
TestSingle<Integer> tc = new TestSingle<>();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Single.just(1).subscribe(tc);
Reported by PMD.
Line: 62
TestSingle<Integer> tc = new TestSingle<>();
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Single.just(1).subscribe(tc);
Reported by PMD.
Line: 63
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Single.just(1).subscribe(tc);
assertFalse(tc.isDisposed());
Reported by PMD.
Line: 63
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Single.just(1).subscribe(tc);
assertFalse(tc.isDisposed());
Reported by PMD.
Line: 64
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Single.just(1).subscribe(tc);
assertFalse(tc.isDisposed());
assertEquals(1, tc.start);
Reported by PMD.
Line: 64
assertFalse(tc.isDisposed());
assertEquals(0, tc.start);
assertTrue(tc.values.isEmpty());
assertTrue(tc.errors.isEmpty());
Single.just(1).subscribe(tc);
assertFalse(tc.isDisposed());
assertEquals(1, tc.start);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGenerateTest.java
31 issues
Line: 30
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableGenerateTest extends RxJavaTest {
@Test
public void statefulBiconsumer() {
Flowable.generate(new Supplier<Object>() {
@Override
Reported by PMD.
Line: 33
public class FlowableGenerateTest extends RxJavaTest {
@Test
public void statefulBiconsumer() {
Flowable.generate(new Supplier<Object>() {
@Override
public Object get() throws Exception {
return 10;
}
Reported by PMD.
Line: 56
}
@Test
public void stateSupplierThrows() {
Flowable.generate(new Supplier<Object>() {
@Override
public Object get() throws Exception {
throw new TestException();
}
Reported by PMD.
Line: 73
}
@Test
public void generatorThrows() {
Flowable.generate(new Supplier<Object>() {
@Override
public Object get() throws Exception {
return 1;
}
Reported by PMD.
Line: 90
}
@Test
public void disposerThrows() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Flowable.generate(new Supplier<Object>() {
@Override
public Object get() throws Exception {
Reported by PMD.
Line: 119
}
@Test
public void dispose() {
TestHelper.checkDisposed(Flowable.generate(new Supplier<Object>() {
@Override
public Object get() throws Exception {
return 1;
}
Reported by PMD.
Line: 142
public void accept(Integer s, Emitter<Object> e) throws Exception {
try {
e.onError(null);
} catch (NullPointerException ex) {
call[0]++;
}
}
}, Functions.emptyConsumer())
.test()
Reported by PMD.
Line: 142
public void accept(Integer s, Emitter<Object> e) throws Exception {
try {
e.onError(null);
} catch (NullPointerException ex) {
call[0]++;
}
}
}, Functions.emptyConsumer())
.test()
Reported by PMD.
Line: 150
.test()
.assertFailure(NullPointerException.class);
assertEquals(0, call[0]);
}
@Test
public void badRequest() {
TestHelper.assertBadRequestReported(Flowable.generate(new Supplier<Object>() {
Reported by PMD.
Line: 154
}
@Test
public void badRequest() {
TestHelper.assertBadRequestReported(Flowable.generate(new Supplier<Object>() {
@Override
public Object get() throws Exception {
return 1;
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableSkipLastTimed.java
31 issues
Line: 25
import io.reactivex.rxjava3.internal.queue.SpscLinkedArrayQueue;
public final class ObservableSkipLastTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
Reported by PMD.
Line: 26
public final class ObservableSkipLastTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public ObservableSkipLastTimed(ObservableSource<T> source,
Reported by PMD.
Line: 27
public final class ObservableSkipLastTimed<T> extends AbstractObservableWithUpstream<T, T> {
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public ObservableSkipLastTimed(ObservableSource<T> source,
long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
Reported by PMD.
Line: 28
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public ObservableSkipLastTimed(ObservableSource<T> source,
long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
super(source);
Reported by PMD.
Line: 29
final TimeUnit unit;
final Scheduler scheduler;
final int bufferSize;
final boolean delayError;
public ObservableSkipLastTimed(ObservableSource<T> source,
long time, TimeUnit unit, Scheduler scheduler, int bufferSize, boolean delayError) {
super(source);
this.time = time;
Reported by PMD.
Line: 46
source.subscribe(new SkipLastTimedObserver<>(t, time, unit, scheduler, bufferSize, delayError));
}
static final class SkipLastTimedObserver<T> extends AtomicInteger implements Observer<T>, Disposable {
private static final long serialVersionUID = -5677354903406201275L;
final Observer<? super T> downstream;
final long time;
final TimeUnit unit;
Reported by PMD.
Line: 46
source.subscribe(new SkipLastTimedObserver<>(t, time, unit, scheduler, bufferSize, delayError));
}
static final class SkipLastTimedObserver<T> extends AtomicInteger implements Observer<T>, Disposable {
private static final long serialVersionUID = -5677354903406201275L;
final Observer<? super T> downstream;
final long time;
final TimeUnit unit;
Reported by PMD.
Line: 49
static final class SkipLastTimedObserver<T> extends AtomicInteger implements Observer<T>, Disposable {
private static final long serialVersionUID = -5677354903406201275L;
final Observer<? super T> downstream;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Reported by PMD.
Line: 50
private static final long serialVersionUID = -5677354903406201275L;
final Observer<? super T> downstream;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Reported by PMD.
Line: 51
private static final long serialVersionUID = -5677354903406201275L;
final Observer<? super T> downstream;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
Disposable upstream;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/subjects/UnicastSubject.java
31 issues
Line: 143
* @param <T> the value type received and emitted by this Subject subclass
* @since 2.0
*/
public final class UnicastSubject<T> extends Subject<T> {
/** The queue that buffers the source events. */
final SpscLinkedArrayQueue<T> queue;
/** The single Observer. */
final AtomicReference<Observer<? super T>> downstream;
Reported by PMD.
Line: 143
* @param <T> the value type received and emitted by this Subject subclass
* @since 2.0
*/
public final class UnicastSubject<T> extends Subject<T> {
/** The queue that buffers the source events. */
final SpscLinkedArrayQueue<T> queue;
/** The single Observer. */
final AtomicReference<Observer<? super T>> downstream;
Reported by PMD.
Line: 143
* @param <T> the value type received and emitted by this Subject subclass
* @since 2.0
*/
public final class UnicastSubject<T> extends Subject<T> {
/** The queue that buffers the source events. */
final SpscLinkedArrayQueue<T> queue;
/** The single Observer. */
final AtomicReference<Observer<? super T>> downstream;
Reported by PMD.
Line: 145
*/
public final class UnicastSubject<T> extends Subject<T> {
/** The queue that buffers the source events. */
final SpscLinkedArrayQueue<T> queue;
/** The single Observer. */
final AtomicReference<Observer<? super T>> downstream;
/** The optional callback when the Subject gets cancelled or terminates. */
Reported by PMD.
Line: 148
final SpscLinkedArrayQueue<T> queue;
/** The single Observer. */
final AtomicReference<Observer<? super T>> downstream;
/** The optional callback when the Subject gets cancelled or terminates. */
final AtomicReference<Runnable> onTerminate;
/** deliver onNext events before error event. */
Reported by PMD.
Line: 151
final AtomicReference<Observer<? super T>> downstream;
/** The optional callback when the Subject gets cancelled or terminates. */
final AtomicReference<Runnable> onTerminate;
/** deliver onNext events before error event. */
final boolean delayError;
/** Indicates the single observer has cancelled. */
Reported by PMD.
Line: 154
final AtomicReference<Runnable> onTerminate;
/** deliver onNext events before error event. */
final boolean delayError;
/** Indicates the single observer has cancelled. */
volatile boolean disposed;
/** Indicates the source has terminated. */
Reported by PMD.
Line: 157
final boolean delayError;
/** Indicates the single observer has cancelled. */
volatile boolean disposed;
/** Indicates the source has terminated. */
volatile boolean done;
/**
* The terminal error if not null.
Reported by PMD.
Line: 160
volatile boolean disposed;
/** Indicates the source has terminated. */
volatile boolean done;
/**
* The terminal error if not null.
* Must be set before writing to done and read after done == true.
*/
Throwable error;
Reported by PMD.
Line: 165
* The terminal error if not null.
* Must be set before writing to done and read after done == true.
*/
Throwable error;
/** Set to 1 atomically for the first and only Subscriber. */
final AtomicBoolean once;
/** The wip counter and QueueDisposable surface. */
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInternalHelper.java
31 issues
Line: 29
/**
* Helper utility class to support Flowable with inner classes.
*/
public final class FlowableInternalHelper {
/** Utility class. */
private FlowableInternalHelper() {
throw new IllegalStateException("No instances!");
}
Reported by PMD.
Line: 37
}
static final class SimpleGenerator<T, S> implements BiFunction<S, Emitter<T>, S> {
final Consumer<Emitter<T>> consumer;
SimpleGenerator(Consumer<Emitter<T>> consumer) {
this.consumer = consumer;
}
Reported by PMD.
Line: 55
}
static final class SimpleBiGenerator<T, S> implements BiFunction<S, Emitter<T>, S> {
final BiConsumer<S, Emitter<T>> consumer;
SimpleBiGenerator(BiConsumer<S, Emitter<T>> consumer) {
this.consumer = consumer;
}
Reported by PMD.
Line: 73
}
static final class ItemDelayFunction<T, U> implements Function<T, Publisher<T>> {
final Function<? super T, ? extends Publisher<U>> itemDelay;
ItemDelayFunction(Function<? super T, ? extends Publisher<U>> itemDelay) {
this.itemDelay = itemDelay;
}
Reported by PMD.
Line: 91
}
static final class SubscriberOnNext<T> implements Consumer<T> {
final Subscriber<T> subscriber;
SubscriberOnNext(Subscriber<T> subscriber) {
this.subscriber = subscriber;
}
Reported by PMD.
Line: 104
}
static final class SubscriberOnError<T> implements Consumer<Throwable> {
final Subscriber<T> subscriber;
SubscriberOnError(Subscriber<T> subscriber) {
this.subscriber = subscriber;
}
Reported by PMD.
Line: 117
}
static final class SubscriberOnComplete<T> implements Action {
final Subscriber<T> subscriber;
SubscriberOnComplete(Subscriber<T> subscriber) {
this.subscriber = subscriber;
}
Reported by PMD.
Line: 142
}
static final class FlatMapWithCombinerInner<U, R, T> implements Function<U, R> {
private final BiFunction<? super T, ? super U, ? extends R> combiner;
private final T t;
FlatMapWithCombinerInner(BiFunction<? super T, ? super U, ? extends R> combiner, T t) {
this.combiner = combiner;
this.t = t;
Reported by PMD.
Line: 143
static final class FlatMapWithCombinerInner<U, R, T> implements Function<U, R> {
private final BiFunction<? super T, ? super U, ? extends R> combiner;
private final T t;
FlatMapWithCombinerInner(BiFunction<? super T, ? super U, ? extends R> combiner, T t) {
this.combiner = combiner;
this.t = t;
}
Reported by PMD.
Line: 157
}
static final class FlatMapWithCombinerOuter<T, R, U> implements Function<T, Publisher<R>> {
private final BiFunction<? super T, ? super U, ? extends R> combiner;
private final Function<? super T, ? extends Publisher<? extends U>> mapper;
FlatMapWithCombinerOuter(BiFunction<? super T, ? super U, ? extends R> combiner,
Function<? super T, ? extends Publisher<? extends U>> mapper) {
this.combiner = combiner;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleUnsubscribeOnTest.java
30 issues
Line: 33
public class SingleUnsubscribeOnTest extends RxJavaTest {
@Test
public void normal() throws Exception {
PublishProcessor<Integer> pp = PublishProcessor.create();
final String[] name = { null };
final CountDownLatch cdl = new CountDownLatch(1);
Reported by PMD.
Line: 33
public class SingleUnsubscribeOnTest extends RxJavaTest {
@Test
public void normal() throws Exception {
PublishProcessor<Integer> pp = PublishProcessor.create();
final String[] name = { null };
final CountDownLatch cdl = new CountDownLatch(1);
Reported by PMD.
Line: 43
pp.doOnCancel(new Action() {
@Override
public void run() throws Exception {
name[0] = Thread.currentThread().getName();
cdl.countDown();
}
})
.single(-99)
.unsubscribeOn(Schedulers.single())
Reported by PMD.
Line: 52
.test(true)
;
assertTrue(cdl.await(5, TimeUnit.SECONDS));
int times = 10;
while (times-- > 0 && pp.hasSubscribers()) {
Thread.sleep(100);
Reported by PMD.
Line: 56
int times = 10;
while (times-- > 0 && pp.hasSubscribers()) {
Thread.sleep(100);
}
assertFalse(pp.hasSubscribers());
Reported by PMD.
Line: 56
int times = 10;
while (times-- > 0 && pp.hasSubscribers()) {
Thread.sleep(100);
}
assertFalse(pp.hasSubscribers());
Reported by PMD.
Line: 60
Thread.sleep(100);
}
assertFalse(pp.hasSubscribers());
assertNotEquals(Thread.currentThread().getName(), name[0]);
}
@Test
Reported by PMD.
Line: 60
Thread.sleep(100);
}
assertFalse(pp.hasSubscribers());
assertNotEquals(Thread.currentThread().getName(), name[0]);
}
@Test
Reported by PMD.
Line: 62
assertFalse(pp.hasSubscribers());
assertNotEquals(Thread.currentThread().getName(), name[0]);
}
@Test
public void just() {
Single.just(1)
Reported by PMD.
Line: 66
}
@Test
public void just() {
Single.just(1)
.unsubscribeOn(Schedulers.single())
.test()
.assertResult(1);
}
Reported by PMD.
src/main/java/io/reactivex/rxjava3/plugins/RxJavaPlugins.java
30 issues
Line: 365
* @see #setErrorHandler(Consumer)
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling">Error handling Wiki</a>
*/
public static void onError(@NonNull Throwable error) {
Consumer<? super Throwable> f = errorHandler;
if (error == null) {
error = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
} else {
Reported by PMD.
Line: 365
* @see #setErrorHandler(Consumer)
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0#error-handling">Error handling Wiki</a>
*/
public static void onError(@NonNull Throwable error) {
Consumer<? super Throwable> f = errorHandler;
if (error == null) {
error = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
} else {
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.plugins;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Objects;
import java.util.concurrent.*;
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.plugins;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Objects;
import java.util.concurrent.*;
Reported by PMD.
Line: 35
/**
* Utility class to inject handlers to certain standard RxJava operations.
*/
public final class RxJavaPlugins {
@Nullable
static volatile Consumer<? super Throwable> errorHandler;
@Nullable
static volatile Function<? super Runnable, ? extends Runnable> onScheduleHandler;
Reported by PMD.
Line: 35
/**
* Utility class to inject handlers to certain standard RxJava operations.
*/
public final class RxJavaPlugins {
@Nullable
static volatile Consumer<? super Throwable> errorHandler;
@Nullable
static volatile Function<? super Runnable, ? extends Runnable> onScheduleHandler;
Reported by PMD.
Line: 35
/**
* Utility class to inject handlers to certain standard RxJava operations.
*/
public final class RxJavaPlugins {
@Nullable
static volatile Consumer<? super Throwable> errorHandler;
@Nullable
static volatile Function<? super Runnable, ? extends Runnable> onScheduleHandler;
Reported by PMD.
Line: 35
/**
* Utility class to inject handlers to certain standard RxJava operations.
*/
public final class RxJavaPlugins {
@Nullable
static volatile Consumer<? super Throwable> errorHandler;
@Nullable
static volatile Function<? super Runnable, ? extends Runnable> onScheduleHandler;
Reported by PMD.
Line: 35
/**
* Utility class to inject handlers to certain standard RxJava operations.
*/
public final class RxJavaPlugins {
@Nullable
static volatile Consumer<? super Throwable> errorHandler;
@Nullable
static volatile Function<? super Runnable, ? extends Runnable> onScheduleHandler;
Reported by PMD.
Line: 66
@Nullable
static volatile Function<? super Scheduler, ? extends Scheduler> onNewThreadHandler;
@SuppressWarnings("rawtypes")
@Nullable
static volatile Function<? super Flowable, ? extends Flowable> onFlowableAssembly;
@SuppressWarnings("rawtypes")
@Nullable
Reported by PMD.