The following issues were found
src/test/java/io/reactivex/rxjava3/validators/TextualAorAn.java
62 issues
Line: 57
if (u.getName().endsWith(".java")) {
List<String> lines = new ArrayList<>();
BufferedReader in = new BufferedReader(new FileReader(u));
try {
for (;;) {
String line = in.readLine();
if (line == null) {
break;
Reported by PMD.
Line: 83
}
if (fail.length() != 0) {
System.out.println(fail);
throw new AssertionError(fail.toString());
}
}
static void processFile(StringBuilder b, List<String> lines, String className, String fileName) {
Reported by PMD.
Line: 26
/**
* Adds license header to java files.
*/
public class TextualAorAn {
@Test
public void checkFiles() throws Exception {
File f = TestHelper.findSource("Flowable");
if (f == null) {
Reported by PMD.
Line: 26
/**
* Adds license header to java files.
*/
public class TextualAorAn {
@Test
public void checkFiles() throws Exception {
File f = TestHelper.findSource("Flowable");
if (f == null) {
Reported by PMD.
Line: 29
public class TextualAorAn {
@Test
public void checkFiles() throws Exception {
File f = TestHelper.findSource("Flowable");
if (f == null) {
return;
}
Reported by PMD.
Line: 29
public class TextualAorAn {
@Test
public void checkFiles() throws Exception {
File f = TestHelper.findSource("Flowable");
if (f == null) {
return;
}
Reported by PMD.
Line: 29
public class TextualAorAn {
@Test
public void checkFiles() throws Exception {
File f = TestHelper.findSource("Flowable");
if (f == null) {
return;
}
Reported by PMD.
Line: 29
public class TextualAorAn {
@Test
public void checkFiles() throws Exception {
File f = TestHelper.findSource("Flowable");
if (f == null) {
return;
}
Reported by PMD.
Line: 29
public class TextualAorAn {
@Test
public void checkFiles() throws Exception {
File f = TestHelper.findSource("Flowable");
if (f == null) {
return;
}
Reported by PMD.
Line: 37
Queue<File> dirs = new ArrayDeque<>();
File parent = f.getParentFile().getParentFile();
dirs.offer(parent);
// dirs.offer(new File(parent.getAbsolutePath().replace('\\', '/').replace("src/main/java", "src/perf/java")));
// dirs.offer(new File(parent.getAbsolutePath().replace('\\', '/').replace("src/main/java", "src/test/java")));
StringBuilder fail = new StringBuilder();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnErrorReturnTest.java
62 issues
Line: 84
@Override
public String apply(Throwable e) {
capturedException.set(e);
throw new RuntimeException("exception from function");
}
});
Subscriber<String> subscriber = TestHelper.mockSubscriber();
Reported by PMD.
Line: 118
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Reported by PMD.
Line: 205
System.out.println("TestFlowable onNext: " + s);
subscriber.onNext(s);
}
throw new RuntimeException("Forced Failure");
} catch (Throwable e) {
subscriber.onError(e);
}
}
Reported by PMD.
Line: 120
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Flowable<String> flowable = w.onErrorReturn(new Function<Throwable, String>() {
Reported by PMD.
Line: 194
@Override
public void subscribe(final Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
System.out.println("TestFlowable subscribed to ...");
t = new Thread(new Runnable() {
@Override
public void run() {
try {
Reported by PMD.
Line: 200
@Override
public void run() {
try {
System.out.println("running TestFlowable thread");
for (String s : values) {
System.out.println("TestFlowable onNext: " + s);
subscriber.onNext(s);
}
throw new RuntimeException("Forced Failure");
Reported by PMD.
Line: 202
try {
System.out.println("running TestFlowable thread");
for (String s : values) {
System.out.println("TestFlowable onNext: " + s);
subscriber.onNext(s);
}
throw new RuntimeException("Forced Failure");
} catch (Throwable e) {
subscriber.onError(e);
Reported by PMD.
Line: 212
}
});
System.out.println("starting TestFlowable thread");
t.start();
System.out.println("done starting TestFlowable thread");
}
}
Reported by PMD.
Line: 214
});
System.out.println("starting TestFlowable thread");
t.start();
System.out.println("done starting TestFlowable thread");
}
}
@Test
public void normalBackpressure() {
Reported by PMD.
Line: 40
@Test
public void resumeNext() {
TestFlowable f = new TestFlowable("one");
Flowable<String> w = Flowable.unsafeCreate(f);
final AtomicReference<Throwable> capturedException = new AtomicReference<>();
Flowable<String> flowable = w.onErrorReturn(new Function<Throwable, String>() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSkipTest.java
62 issues
Line: 32
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableSkipTest extends RxJavaTest {
@Test(expected = IllegalArgumentException.class)
public void skipNegativeElements() {
Flowable<String> skip = Flowable.just("one", "two", "three").skip(-99);
Reported by PMD.
Line: 37
@Test(expected = IllegalArgumentException.class)
public void skipNegativeElements() {
Flowable<String> skip = Flowable.just("one", "two", "three").skip(-99);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
Reported by PMD.
Line: 37
@Test(expected = IllegalArgumentException.class)
public void skipNegativeElements() {
Flowable<String> skip = Flowable.just("one", "two", "three").skip(-99);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
Reported by PMD.
Line: 37
@Test(expected = IllegalArgumentException.class)
public void skipNegativeElements() {
Flowable<String> skip = Flowable.just("one", "two", "three").skip(-99);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
Reported by PMD.
Line: 37
@Test(expected = IllegalArgumentException.class)
public void skipNegativeElements() {
Flowable<String> skip = Flowable.just("one", "two", "three").skip(-99);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
Reported by PMD.
Line: 40
Flowable<String> skip = Flowable.just("one", "two", "three").skip(-99);
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
verify(subscriber, times(1)).onNext("three");
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
Reported by PMD.
Line: 41
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
verify(subscriber, times(1)).onNext("three");
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
Reported by PMD.
Line: 42
Subscriber<String> subscriber = TestHelper.mockSubscriber();
skip.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
verify(subscriber, times(1)).onNext("three");
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
Reported by PMD.
Line: 43
skip.subscribe(subscriber);
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
verify(subscriber, times(1)).onNext("three");
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 44
verify(subscriber, times(1)).onNext("one");
verify(subscriber, times(1)).onNext("two");
verify(subscriber, times(1)).onNext("three");
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
}
@Test
public void skipZeroElements() {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEager.java
61 issues
Line: 32
public final class FlowableConcatMapEager<T, R> extends AbstractFlowableWithUpstream<T, R> {
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int maxConcurrency;
final int prefetch;
Reported by PMD.
Line: 34
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int maxConcurrency;
final int prefetch;
final ErrorMode errorMode;
Reported by PMD.
Line: 36
final int maxConcurrency;
final int prefetch;
final ErrorMode errorMode;
public FlowableConcatMapEager(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper,
Reported by PMD.
Line: 38
final int prefetch;
final ErrorMode errorMode;
public FlowableConcatMapEager(Flowable<T> source,
Function<? super T, ? extends Publisher<? extends R>> mapper,
int maxConcurrency,
int prefetch,
Reported by PMD.
Line: 58
s, mapper, maxConcurrency, prefetch, errorMode));
}
static final class ConcatMapEagerDelayErrorSubscriber<T, R>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, InnerQueuedSubscriberSupport<R> {
private static final long serialVersionUID = -4255299542215038287L;
Reported by PMD.
Line: 58
s, mapper, maxConcurrency, prefetch, errorMode));
}
static final class ConcatMapEagerDelayErrorSubscriber<T, R>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, InnerQueuedSubscriberSupport<R> {
private static final long serialVersionUID = -4255299542215038287L;
Reported by PMD.
Line: 58
s, mapper, maxConcurrency, prefetch, errorMode));
}
static final class ConcatMapEagerDelayErrorSubscriber<T, R>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, InnerQueuedSubscriberSupport<R> {
private static final long serialVersionUID = -4255299542215038287L;
Reported by PMD.
Line: 60
static final class ConcatMapEagerDelayErrorSubscriber<T, R>
extends AtomicInteger
implements FlowableSubscriber<T>, Subscription, InnerQueuedSubscriberSupport<R> {
private static final long serialVersionUID = -4255299542215038287L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 64
private static final long serialVersionUID = -4255299542215038287L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int maxConcurrency;
Reported by PMD.
Line: 66
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Publisher<? extends R>> mapper;
final int maxConcurrency;
final int prefetch;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/util/HalfSerializerObserverTest.java
61 issues
Line: 32
public class HalfSerializerObserverTest extends RxJavaTest {
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnNext() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Observer[] a = { null };
Reported by PMD.
Line: 32
public class HalfSerializerObserverTest extends RxJavaTest {
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnNext() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Observer[] a = { null };
Reported by PMD.
Line: 33
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnNext() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Observer[] a = { null };
Reported by PMD.
Line: 72
HalfSerializer.onNext(observer, 1, wip, error);
to.assertValue(1).assertNoErrors().assertNotComplete();
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnError() {
Reported by PMD.
Line: 72
HalfSerializer.onNext(observer, 1, wip, error);
to.assertValue(1).assertNoErrors().assertNotComplete();
}
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnError() {
Reported by PMD.
Line: 77
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnError() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Observer[] a = { null };
Reported by PMD.
Line: 121
@Test
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantOnNextOnComplete() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Observer[] a = { null };
Reported by PMD.
Line: 166
@Test
@SuppressUndeliverable
@SuppressWarnings({ "rawtypes", "unchecked" })
public void reentrantErrorOnError() {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final Observer[] a = { null };
Reported by PMD.
Line: 210
public void onNextOnCompleteRace() {
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final TestObserver<Integer> to = new TestObserver<>();
to.onSubscribe(Disposable.empty());
Reported by PMD.
Line: 211
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
final AtomicInteger wip = new AtomicInteger();
final AtomicThrowable error = new AtomicThrowable();
final TestObserver<Integer> to = new TestObserver<>();
to.onSubscribe(Disposable.empty());
Runnable r1 = new Runnable() {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupJoin.java
61 issues
Line: 34
public final class FlowableGroupJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends AbstractFlowableWithUpstream<TLeft, R> {
final Publisher<? extends TRight> other;
final Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd;
Reported by PMD.
Line: 36
final Publisher<? extends TRight> other;
final Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super Flowable<TRight>, ? extends R> resultSelector;
Reported by PMD.
Line: 38
final Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super Flowable<TRight>, ? extends R> resultSelector;
public FlowableGroupJoin(
Flowable<TLeft> source,
Reported by PMD.
Line: 40
final Function<? super TRight, ? extends Publisher<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super Flowable<TRight>, ? extends R> resultSelector;
public FlowableGroupJoin(
Flowable<TLeft> source,
Publisher<? extends TRight> other,
Function<? super TLeft, ? extends Publisher<TLeftEnd>> leftEnd,
Reported by PMD.
Line: 64
s.onSubscribe(parent);
LeftRightSubscriber left = new LeftRightSubscriber(parent, true);
parent.disposables.add(left);
LeftRightSubscriber right = new LeftRightSubscriber(parent, false);
parent.disposables.add(right);
source.subscribe(left);
other.subscribe(right);
Reported by PMD.
Line: 66
LeftRightSubscriber left = new LeftRightSubscriber(parent, true);
parent.disposables.add(left);
LeftRightSubscriber right = new LeftRightSubscriber(parent, false);
parent.disposables.add(right);
source.subscribe(left);
other.subscribe(right);
}
Reported by PMD.
Line: 85
void innerCloseError(Throwable ex);
}
static final class GroupJoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Subscription, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 85
void innerCloseError(Throwable ex);
}
static final class GroupJoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Subscription, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 86
}
static final class GroupJoinSubscription<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Subscription, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 90
private static final long serialVersionUID = -6071216598687999801L;
final Subscriber<? super R> downstream;
final AtomicLong requested;
final SpscLinkedArrayQueue<Object> queue;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBufferTimed.java
60 issues
Line: 36
public final class FlowableBufferTimed<T, U extends Collection<? super T>> extends AbstractFlowableWithUpstream<T, U> {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final Supplier<U> bufferSupplier;
final int maxSize;
Reported by PMD.
Line: 37
public final class FlowableBufferTimed<T, U extends Collection<? super T>> extends AbstractFlowableWithUpstream<T, U> {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final Supplier<U> bufferSupplier;
final int maxSize;
final boolean restartTimerOnMaxSize;
Reported by PMD.
Line: 38
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final Supplier<U> bufferSupplier;
final int maxSize;
final boolean restartTimerOnMaxSize;
Reported by PMD.
Line: 39
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final Supplier<U> bufferSupplier;
final int maxSize;
final boolean restartTimerOnMaxSize;
public FlowableBufferTimed(Flowable<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Supplier<U> bufferSupplier, int maxSize,
Reported by PMD.
Line: 40
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final Supplier<U> bufferSupplier;
final int maxSize;
final boolean restartTimerOnMaxSize;
public FlowableBufferTimed(Flowable<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Supplier<U> bufferSupplier, int maxSize,
boolean restartTimerOnMaxSize) {
Reported by PMD.
Line: 41
final TimeUnit unit;
final Scheduler scheduler;
final Supplier<U> bufferSupplier;
final int maxSize;
final boolean restartTimerOnMaxSize;
public FlowableBufferTimed(Flowable<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Supplier<U> bufferSupplier, int maxSize,
boolean restartTimerOnMaxSize) {
super(source);
Reported by PMD.
Line: 42
final Scheduler scheduler;
final Supplier<U> bufferSupplier;
final int maxSize;
final boolean restartTimerOnMaxSize;
public FlowableBufferTimed(Flowable<T> source, long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, Supplier<U> bufferSupplier, int maxSize,
boolean restartTimerOnMaxSize) {
super(source);
this.timespan = timespan;
Reported by PMD.
Line: 83
static final class BufferExactUnboundedSubscriber<T, U extends Collection<? super T>>
extends QueueDrainSubscriber<T, U, U> implements Subscription, Runnable, Disposable {
final Supplier<U> bufferSupplier;
final long timespan;
final TimeUnit unit;
final Scheduler scheduler;
Subscription upstream;
Reported by PMD.
Line: 84
static final class BufferExactUnboundedSubscriber<T, U extends Collection<? super T>>
extends QueueDrainSubscriber<T, U, U> implements Subscription, Runnable, Disposable {
final Supplier<U> bufferSupplier;
final long timespan;
final TimeUnit unit;
final Scheduler scheduler;
Subscription upstream;
Reported by PMD.
Line: 85
extends QueueDrainSubscriber<T, U, U> implements Subscription, Runnable, Disposable {
final Supplier<U> bufferSupplier;
final long timespan;
final TimeUnit unit;
final Scheduler scheduler;
Subscription upstream;
U buffer;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/validators/InternalWrongNaming.java
60 issues
Line: 86
static List<String> readFile(File u) throws Exception {
List<String> lines = new ArrayList<>();
BufferedReader in = new BufferedReader(new FileReader(u));
try {
for (;;) {
String line = in.readLine();
if (line == null) {
break;
Reported by PMD.
Line: 73
}
if (fail.length() != 0) {
System.out.println(fail);
System.out.println();
System.out.println("Total: " + count);
throw new AssertionError(fail.toString());
}
Reported by PMD.
Line: 75
if (fail.length() != 0) {
System.out.println(fail);
System.out.println();
System.out.println("Total: " + count);
throw new AssertionError(fail.toString());
}
}
Reported by PMD.
Line: 76
System.out.println(fail);
System.out.println();
System.out.println("Total: " + count);
throw new AssertionError(fail.toString());
}
}
}
Reported by PMD.
Line: 26
/**
* Adds license header to java files.
*/
public class InternalWrongNaming {
static void checkInternalOperatorNaming(String baseClassName, String consumerClassName, String... ignore) throws Exception {
File f = TestHelper.findSource(baseClassName);
if (f == null) {
return;
Reported by PMD.
Line: 26
/**
* Adds license header to java files.
*/
public class InternalWrongNaming {
static void checkInternalOperatorNaming(String baseClassName, String consumerClassName, String... ignore) throws Exception {
File f = TestHelper.findSource(baseClassName);
if (f == null) {
return;
Reported by PMD.
Line: 26
/**
* Adds license header to java files.
*/
public class InternalWrongNaming {
static void checkInternalOperatorNaming(String baseClassName, String consumerClassName, String... ignore) throws Exception {
File f = TestHelper.findSource(baseClassName);
if (f == null) {
return;
Reported by PMD.
Line: 28
*/
public class InternalWrongNaming {
static void checkInternalOperatorNaming(String baseClassName, String consumerClassName, String... ignore) throws Exception {
File f = TestHelper.findSource(baseClassName);
if (f == null) {
return;
}
Reported by PMD.
Line: 28
*/
public class InternalWrongNaming {
static void checkInternalOperatorNaming(String baseClassName, String consumerClassName, String... ignore) throws Exception {
File f = TestHelper.findSource(baseClassName);
if (f == null) {
return;
}
Reported by PMD.
Line: 28
*/
public class InternalWrongNaming {
static void checkInternalOperatorNaming(String baseClassName, String consumerClassName, String... ignore) throws Exception {
File f = TestHelper.findSource(baseClassName);
if (f == null) {
return;
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDematerializeTest.java
60 issues
Line: 147
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>(subscriber);
dematerialize.subscribe(ts);
System.out.println(ts.errors());
verify(subscriber, never()).onError(any(Throwable.class));
verify(subscriber, times(1)).onComplete();
verify(subscriber, times(0)).onNext(any(Integer.class));
}
Reported by PMD.
Line: 32
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableDematerializeTest extends RxJavaTest {
@Test
public void simpleSelector() {
Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
Flowable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
Reported by PMD.
Line: 36
@Test
public void simpleSelector() {
Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
Flowable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
dematerialize.subscribe(subscriber);
Reported by PMD.
Line: 37
@Test
public void simpleSelector() {
Flowable<Notification<Integer>> notifications = Flowable.just(1, 2).materialize();
Flowable<Integer> dematerialize = notifications.dematerialize(Functions.<Notification<Integer>>identity());
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
dematerialize.subscribe(subscriber);
Reported by PMD.
Line: 41
Subscriber<Integer> subscriber = TestHelper.mockSubscriber();
dematerialize.subscribe(subscriber);
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(2);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
Reported by PMD.
Line: 43
dematerialize.subscribe(subscriber);
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(2);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
Reported by PMD.
Line: 44
dematerialize.subscribe(subscriber);
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(2);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@Test
Reported by PMD.
Line: 45
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(2);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@Test
public void selectorCrash() {
Reported by PMD.
Line: 46
verify(subscriber, times(1)).onNext(1);
verify(subscriber, times(1)).onNext(2);
verify(subscriber, times(1)).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@Test
public void selectorCrash() {
Flowable.just(1, 2)
Reported by PMD.
Line: 50
}
@Test
public void selectorCrash() {
Flowable.just(1, 2)
.materialize()
.dematerialize(new Function<Notification<Integer>, Notification<Object>>() {
@Override
public Notification<Object> apply(Notification<Integer> v) throws Exception {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java
60 issues
Line: 133
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final class MulticastProcessor<@NonNull T> extends FlowableProcessor<T> {
final AtomicInteger wip;
final AtomicReference<Subscription> upstream;
Reported by PMD.
Line: 133
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final class MulticastProcessor<@NonNull T> extends FlowableProcessor<T> {
final AtomicInteger wip;
final AtomicReference<Subscription> upstream;
Reported by PMD.
Line: 133
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final class MulticastProcessor<@NonNull T> extends FlowableProcessor<T> {
final AtomicInteger wip;
final AtomicReference<Subscription> upstream;
Reported by PMD.
Line: 133
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final class MulticastProcessor<@NonNull T> extends FlowableProcessor<T> {
final AtomicInteger wip;
final AtomicReference<Subscription> upstream;
Reported by PMD.
Line: 135
@SchedulerSupport(SchedulerSupport.NONE)
public final class MulticastProcessor<@NonNull T> extends FlowableProcessor<T> {
final AtomicInteger wip;
final AtomicReference<Subscription> upstream;
final AtomicReference<MulticastSubscription<T>[]> subscribers;
Reported by PMD.
Line: 137
final AtomicInteger wip;
final AtomicReference<Subscription> upstream;
final AtomicReference<MulticastSubscription<T>[]> subscribers;
final int bufferSize;
Reported by PMD.
Line: 139
final AtomicReference<Subscription> upstream;
final AtomicReference<MulticastSubscription<T>[]> subscribers;
final int bufferSize;
final int limit;
Reported by PMD.
Line: 141
final AtomicReference<MulticastSubscription<T>[]> subscribers;
final int bufferSize;
final int limit;
final boolean refcount;
Reported by PMD.
Line: 143
final int bufferSize;
final int limit;
final boolean refcount;
volatile SimpleQueue<T> queue;
Reported by PMD.
Line: 145
final int limit;
final boolean refcount;
volatile SimpleQueue<T> queue;
volatile boolean done;
volatile Throwable error;
Reported by PMD.