The following issues were found
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleDoOnTest.java
64 issues
Line: 31
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class SingleDoOnTest extends RxJavaTest {
@Test
public void doOnDispose() {
final int[] count = { 0 };
Reported by PMD.
Line: 44
}
}).test(true);
assertEquals(1, count[0]);
}
@Test
public void doOnError() {
final Object[] event = { null };
Reported by PMD.
Line: 73
}
}).test();
assertEquals(1, count[0]);
}
@Test
public void doOnSuccess() {
final Object[] event = { null };
Reported by PMD.
Line: 88
})
.test();
assertEquals(1, event[0]);
}
@Test
public void doOnSubscribeNormal() {
final int[] count = { 0 };
Reported by PMD.
Line: 104
.test()
.assertResult(1);
assertEquals(1, count[0]);
}
@Test
public void doOnSubscribeError() {
final int[] count = { 0 };
Reported by PMD.
Line: 120
.test()
.assertFailure(TestException.class);
assertEquals(1, count[0]);
}
@Test
public void doOnSubscribeJustCrash() {
Reported by PMD.
Line: 124
}
@Test
public void doOnSubscribeJustCrash() {
Single.just(1).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
throw new TestException();
Reported by PMD.
Line: 137
}
@Test
public void doOnSubscribeErrorCrash() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Single.error(new TestException("Outer")).doOnSubscribe(new Consumer<Disposable>() {
@Override
Reported by PMD.
Line: 141
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
Single.error(new TestException("Outer")).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
throw new TestException("Inner");
}
})
Reported by PMD.
Line: 144
Single.error(new TestException("Outer")).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
throw new TestException("Inner");
}
})
.to(TestHelper.testConsumer())
.assertFailureAndMessage(TestException.class, "Inner");
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableMapTest.java
63 issues
Line: 159
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
return s;
}
}).doOnError(new Consumer<Throwable>() {
Reported by PMD.
Line: 34
import io.reactivex.rxjava3.subjects.UnicastSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableMapTest extends RxJavaTest {
Observer<String> stringObserver;
Observer<String> stringObserver2;
static final BiFunction<String, Integer, String> APPEND_INDEX = new BiFunction<String, Integer, String>() {
Reported by PMD.
Line: 36
public class ObservableMapTest extends RxJavaTest {
Observer<String> stringObserver;
Observer<String> stringObserver2;
static final BiFunction<String, Integer, String> APPEND_INDEX = new BiFunction<String, Integer, String>() {
@Override
public String apply(String value, Integer index) {
Reported by PMD.
Line: 37
public class ObservableMapTest extends RxJavaTest {
Observer<String> stringObserver;
Observer<String> stringObserver2;
static final BiFunction<String, Integer, String> APPEND_INDEX = new BiFunction<String, Integer, String>() {
@Override
public String apply(String value, Integer index) {
return value + index;
Reported by PMD.
Line: 61
Observable<String> m = o.map(new Function<Map<String, String>, String>() {
@Override
public String apply(Map<String, String> map) {
return map.get("firstName");
}
});
m.subscribe(stringObserver);
Reported by PMD.
Line: 67
m.subscribe(stringObserver);
verify(stringObserver, never()).onError(any(Throwable.class));
verify(stringObserver, times(1)).onNext("OneFirst");
verify(stringObserver, times(1)).onNext("TwoFirst");
verify(stringObserver, times(1)).onComplete();
}
Reported by PMD.
Line: 68
m.subscribe(stringObserver);
verify(stringObserver, never()).onError(any(Throwable.class));
verify(stringObserver, times(1)).onNext("OneFirst");
verify(stringObserver, times(1)).onNext("TwoFirst");
verify(stringObserver, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 69
verify(stringObserver, never()).onError(any(Throwable.class));
verify(stringObserver, times(1)).onNext("OneFirst");
verify(stringObserver, times(1)).onNext("TwoFirst");
verify(stringObserver, times(1)).onComplete();
}
@Test
public void mapMany() {
Reported by PMD.
Line: 70
verify(stringObserver, never()).onError(any(Throwable.class));
verify(stringObserver, times(1)).onNext("OneFirst");
verify(stringObserver, times(1)).onNext("TwoFirst");
verify(stringObserver, times(1)).onComplete();
}
@Test
public void mapMany() {
/* simulate a top-level async call which returns IDs */
Reported by PMD.
Line: 84
@Override
public Observable<String> apply(Integer id) {
/* simulate making a nested async call which creates another Observable */
Observable<Map<String, String>> subObservable = null;
if (id == 1) {
Map<String, String> m1 = getMap("One");
Map<String, String> m2 = getMap("Two");
subObservable = Observable.just(m1, m2);
} else {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observers/ResourceCompletableObserverTest.java
63 issues
Line: 31
public class ResourceCompletableObserverTest extends RxJavaTest {
static final class TestResourceCompletableObserver extends ResourceCompletableObserver {
final List<Throwable> errors = new ArrayList<>();
int complete;
int start;
Reported by PMD.
Line: 33
static final class TestResourceCompletableObserver extends ResourceCompletableObserver {
final List<Throwable> errors = new ArrayList<>();
int complete;
int start;
@Override
protected void onStart() {
Reported by PMD.
Line: 35
int complete;
int start;
@Override
protected void onStart() {
super.onStart();
Reported by PMD.
Line: 66
}
@Test
public void addResources() {
TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
assertFalse(rco.isDisposed());
Disposable d = Disposable.empty();
Reported by PMD.
Line: 69
public void addResources() {
TestResourceCompletableObserver rco = new TestResourceCompletableObserver();
assertFalse(rco.isDisposed());
Disposable d = Disposable.empty();
rco.add(d);
Reported by PMD.
Line: 75
rco.add(d);
assertFalse(d.isDisposed());
rco.dispose();
assertTrue(rco.isDisposed());
Reported by PMD.
Line: 75
rco.add(d);
assertFalse(d.isDisposed());
rco.dispose();
assertTrue(rco.isDisposed());
Reported by PMD.
Line: 79
rco.dispose();
assertTrue(rco.isDisposed());
assertTrue(d.isDisposed());
rco.dispose();
Reported by PMD.
Line: 81
assertTrue(rco.isDisposed());
assertTrue(d.isDisposed());
rco.dispose();
assertTrue(rco.isDisposed());
Reported by PMD.
Line: 81
assertTrue(rco.isDisposed());
assertTrue(d.isDisposed());
rco.dispose();
assertTrue(rco.isDisposed());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/mixed/CompletableAndThenObservableTest.java
63 issues
Line: 29
public class CompletableAndThenObservableTest extends RxJavaTest {
@Test
public void cancelMain() {
CompletableSubject cs = CompletableSubject.create();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = cs.andThen(ps)
.test();
Reported by PMD.
Line: 33
CompletableSubject cs = CompletableSubject.create();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = cs.andThen(ps)
.test();
assertTrue(cs.hasObservers());
assertFalse(ps.hasObservers());
Reported by PMD.
Line: 33
CompletableSubject cs = CompletableSubject.create();
PublishSubject<Integer> ps = PublishSubject.create();
TestObserver<Integer> to = cs.andThen(ps)
.test();
assertTrue(cs.hasObservers());
assertFalse(ps.hasObservers());
Reported by PMD.
Line: 36
TestObserver<Integer> to = cs.andThen(ps)
.test();
assertTrue(cs.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(cs.hasObservers());
Reported by PMD.
Line: 36
TestObserver<Integer> to = cs.andThen(ps)
.test();
assertTrue(cs.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(cs.hasObservers());
Reported by PMD.
Line: 37
.test();
assertTrue(cs.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(cs.hasObservers());
assertFalse(ps.hasObservers());
Reported by PMD.
Line: 37
.test();
assertTrue(cs.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(cs.hasObservers());
assertFalse(ps.hasObservers());
Reported by PMD.
Line: 39
assertTrue(cs.hasObservers());
assertFalse(ps.hasObservers());
to.dispose();
assertFalse(cs.hasObservers());
assertFalse(ps.hasObservers());
}
Reported by PMD.
Line: 41
to.dispose();
assertFalse(cs.hasObservers());
assertFalse(ps.hasObservers());
}
@Test
public void cancelOther() {
Reported by PMD.
Line: 41
to.dispose();
assertFalse(cs.hasObservers());
assertFalse(ps.hasObservers());
}
@Test
public void cancelOther() {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java
63 issues
Line: 41
*/
public final class FlowableFlatMapStream<T, R> extends Flowable<R> {
final Flowable<T> source;
final Function<? super T, ? extends Stream<? extends R>> mapper;
final int prefetch;
Reported by PMD.
Line: 43
final Flowable<T> source;
final Function<? super T, ? extends Stream<? extends R>> mapper;
final int prefetch;
public FlowableFlatMapStream(Flowable<T> source, Function<? super T, ? extends Stream<? extends R>> mapper, int prefetch) {
this.source = source;
Reported by PMD.
Line: 45
final Function<? super T, ? extends Stream<? extends R>> mapper;
final int prefetch;
public FlowableFlatMapStream(Flowable<T> source, Function<? super T, ? extends Stream<? extends R>> mapper, int prefetch) {
this.source = source;
this.mapper = mapper;
this.prefetch = prefetch;
Reported by PMD.
Line: 63
if (t != null) {
stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptySubscription.error(ex, s);
return;
}
Reported by PMD.
Line: 92
return new FlatMapStreamSubscriber<>(downstream, mapper, prefetch);
}
static final class FlatMapStreamSubscriber<T, R> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -5127032662980523968L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 92
return new FlatMapStreamSubscriber<>(downstream, mapper, prefetch);
}
static final class FlatMapStreamSubscriber<T, R> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -5127032662980523968L;
final Subscriber<? super R> downstream;
Reported by PMD.
Line: 97
private static final long serialVersionUID = -5127032662980523968L;
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Stream<? extends R>> mapper;
final int prefetch;
Reported by PMD.
Line: 99
final Subscriber<? super R> downstream;
final Function<? super T, ? extends Stream<? extends R>> mapper;
final int prefetch;
final AtomicLong requested;
Reported by PMD.
Line: 101
final Function<? super T, ? extends Stream<? extends R>> mapper;
final int prefetch;
final AtomicLong requested;
SimpleQueue<T> queue;
Reported by PMD.
Line: 103
final int prefetch;
final AtomicLong requested;
SimpleQueue<T> queue;
Subscription upstream;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableToListTest.java
63 issues
Line: 33
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableToListTest extends RxJavaTest {
@Test
public void listObservable() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Observable<List<String>> observable = w.toList().toObservable();
Reported by PMD.
Line: 37
@Test
public void listObservable() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Observable<List<String>> observable = w.toList().toObservable();
Observer<List<String>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", "two", "three"));
Reported by PMD.
Line: 37
@Test
public void listObservable() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Observable<List<String>> observable = w.toList().toObservable();
Observer<List<String>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", "two", "three"));
Reported by PMD.
Line: 37
@Test
public void listObservable() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Observable<List<String>> observable = w.toList().toObservable();
Observer<List<String>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", "two", "three"));
Reported by PMD.
Line: 38
@Test
public void listObservable() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Observable<List<String>> observable = w.toList().toObservable();
Observer<List<String>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", "two", "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
Reported by PMD.
Line: 38
@Test
public void listObservable() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Observable<List<String>> observable = w.toList().toObservable();
Observer<List<String>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", "two", "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
Reported by PMD.
Line: 41
Observable<List<String>> observable = w.toList().toObservable();
Observer<List<String>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", "two", "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 42
Observer<List<String>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", "two", "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 43
Observer<List<String>> observer = TestHelper.mockObserver();
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", "two", "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void listViaObservableObservable() {
Reported by PMD.
Line: 44
observable.subscribe(observer);
verify(observer, times(1)).onNext(Arrays.asList("one", "two", "three"));
verify(observer, Mockito.never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void listViaObservableObservable() {
Observable<String> w = Observable.fromIterable(Arrays.asList("one", "two", "three"));
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMapScheduler.java
63 issues
Line: 32
public final class ObservableConcatMapScheduler<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final int bufferSize;
final ErrorMode delayErrors;
Reported by PMD.
Line: 34
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final int bufferSize;
final ErrorMode delayErrors;
final Scheduler scheduler;
Reported by PMD.
Line: 36
final int bufferSize;
final ErrorMode delayErrors;
final Scheduler scheduler;
public ObservableConcatMapScheduler(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
int bufferSize, ErrorMode delayErrors, Scheduler scheduler) {
Reported by PMD.
Line: 38
final ErrorMode delayErrors;
final Scheduler scheduler;
public ObservableConcatMapScheduler(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
int bufferSize, ErrorMode delayErrors, Scheduler scheduler) {
super(source);
this.mapper = mapper;
Reported by PMD.
Line: 62
static final class ConcatMapObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = 8828587559905699186L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
final int bufferSize;
final Scheduler.Worker worker;
Reported by PMD.
Line: 63
private static final long serialVersionUID = 8828587559905699186L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
final int bufferSize;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
Reported by PMD.
Line: 64
private static final long serialVersionUID = 8828587559905699186L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
final int bufferSize;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
Reported by PMD.
Line: 65
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
final int bufferSize;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
Disposable upstream;
Reported by PMD.
Line: 66
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
final int bufferSize;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
Disposable upstream;
Reported by PMD.
Line: 68
final int bufferSize;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
Disposable upstream;
volatile boolean active;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableConcatMap.java
63 issues
Line: 31
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final int bufferSize;
final ErrorMode delayErrors;
public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
Reported by PMD.
Line: 32
public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final int bufferSize;
final ErrorMode delayErrors;
public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
int bufferSize, ErrorMode delayErrors) {
Reported by PMD.
Line: 34
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final int bufferSize;
final ErrorMode delayErrors;
public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
int bufferSize, ErrorMode delayErrors) {
super(source);
this.mapper = mapper;
Reported by PMD.
Line: 59
}
}
static final class SourceObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable {
private static final long serialVersionUID = 8828587559905699186L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
Reported by PMD.
Line: 59
}
}
static final class SourceObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable {
private static final long serialVersionUID = 8828587559905699186L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
Reported by PMD.
Line: 62
static final class SourceObserver<T, U> extends AtomicInteger implements Observer<T>, Disposable {
private static final long serialVersionUID = 8828587559905699186L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
final int bufferSize;
SimpleQueue<T> queue;
Reported by PMD.
Line: 63
private static final long serialVersionUID = 8828587559905699186L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
final int bufferSize;
SimpleQueue<T> queue;
Reported by PMD.
Line: 64
private static final long serialVersionUID = 8828587559905699186L;
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Reported by PMD.
Line: 65
final Observer<? super U> downstream;
final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
final InnerObserver<U> inner;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
Reported by PMD.
Line: 67
final InnerObserver<U> inner;
final int bufferSize;
SimpleQueue<T> queue;
Disposable upstream;
volatile boolean active;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/parallel/ParallelJoin.java
63 issues
Line: 37
*/
public final class ParallelJoin<T> extends Flowable<T> {
final ParallelFlowable<? extends T> source;
final int prefetch;
final boolean delayErrors;
Reported by PMD.
Line: 39
final ParallelFlowable<? extends T> source;
final int prefetch;
final boolean delayErrors;
public ParallelJoin(ParallelFlowable<? extends T> source, int prefetch, boolean delayErrors) {
this.source = source;
Reported by PMD.
Line: 41
final int prefetch;
final boolean delayErrors;
public ParallelJoin(ParallelFlowable<? extends T> source, int prefetch, boolean delayErrors) {
this.source = source;
this.prefetch = prefetch;
this.delayErrors = delayErrors;
Reported by PMD.
Line: 66
private static final long serialVersionUID = 3100232009247827843L;
final Subscriber<? super T> downstream;
final JoinInnerSubscriber<T>[] subscribers;
final AtomicThrowable errors = new AtomicThrowable();
Reported by PMD.
Line: 68
final Subscriber<? super T> downstream;
final JoinInnerSubscriber<T>[] subscribers;
final AtomicThrowable errors = new AtomicThrowable();
final AtomicLong requested = new AtomicLong();
Reported by PMD.
Line: 70
final JoinInnerSubscriber<T>[] subscribers;
final AtomicThrowable errors = new AtomicThrowable();
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled;
Reported by PMD.
Line: 72
final AtomicThrowable errors = new AtomicThrowable();
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled;
final AtomicInteger done = new AtomicInteger();
Reported by PMD.
Line: 74
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled;
final AtomicInteger done = new AtomicInteger();
JoinSubscriptionBase(Subscriber<? super T> actual, int n, int prefetch) {
this.downstream = actual;
Reported by PMD.
Line: 76
volatile boolean cancelled;
final AtomicInteger done = new AtomicInteger();
JoinSubscriptionBase(Subscriber<? super T> actual, int n, int prefetch) {
this.downstream = actual;
@SuppressWarnings("unchecked")
JoinInnerSubscriber<T>[] a = new JoinInnerSubscriber[n];
Reported by PMD.
Line: 120
void cleanup() {
for (JoinInnerSubscriber<T> s : subscribers) {
s.queue = null;
}
}
abstract void onNext(JoinInnerSubscriber<T> inner, T value);
Reported by PMD.
src/jmh/java/io/reactivex/rxjava3/core/MemoryPerf.java
62 issues
Line: 95
checkMemory(item, name, typeLib, 1000000);
}
static <U> void checkMemory(Callable<U> item, String name, String typeLib, int n) throws Exception {
// make sure classes are initialized
item.call();
Object[] array = new Object[n];
Reported by PMD.
Line: 102
Object[] array = new Object[n];
Thread.sleep(100);
System.gc();
Thread.sleep(100);
long before = memoryUse();
for (int i = 0; i < n; i++) {
Reported by PMD.
Line: 112
}
Thread.sleep(100);
System.gc();
Thread.sleep(100);
long after = memoryUse();
double use = Math.max(0.0, (after - before) / 1024.0 / 1024.0);
Reported by PMD.
Line: 119
double use = Math.max(0.0, (after - before) / 1024.0 / 1024.0);
System.out.print(name);
System.out.print(" ");
System.out.print(typeLib);
System.out.print(" thrpt ");
System.out.print(n);
System.out.printf(" %.3f 0.000 MB%n", use);
Reported by PMD.
Line: 120
double use = Math.max(0.0, (after - before) / 1024.0 / 1024.0);
System.out.print(name);
System.out.print(" ");
System.out.print(typeLib);
System.out.print(" thrpt ");
System.out.print(n);
System.out.printf(" %.3f 0.000 MB%n", use);
Reported by PMD.
Line: 121
System.out.print(name);
System.out.print(" ");
System.out.print(typeLib);
System.out.print(" thrpt ");
System.out.print(n);
System.out.printf(" %.3f 0.000 MB%n", use);
if (array.hashCode() == 1) {
Reported by PMD.
Line: 122
System.out.print(name);
System.out.print(" ");
System.out.print(typeLib);
System.out.print(" thrpt ");
System.out.print(n);
System.out.printf(" %.3f 0.000 MB%n", use);
if (array.hashCode() == 1) {
System.out.print("");
Reported by PMD.
Line: 123
System.out.print(" ");
System.out.print(typeLib);
System.out.print(" thrpt ");
System.out.print(n);
System.out.printf(" %.3f 0.000 MB%n", use);
if (array.hashCode() == 1) {
System.out.print("");
}
Reported by PMD.
Line: 124
System.out.print(typeLib);
System.out.print(" thrpt ");
System.out.print(n);
System.out.printf(" %.3f 0.000 MB%n", use);
if (array.hashCode() == 1) {
System.out.print("");
}
Reported by PMD.
Line: 127
System.out.printf(" %.3f 0.000 MB%n", use);
if (array.hashCode() == 1) {
System.out.print("");
}
array = null;
item = null;
Reported by PMD.