The following issues were found
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTake.java
13 issues
Line: 26
public final class FlowableTake<T> extends AbstractFlowableWithUpstream<T, T> {
final long n;
public FlowableTake(Flowable<T> source, long n) {
super(source);
this.n = n;
}
Reported by PMD.
Line: 44
private static final long serialVersionUID = 2288246011222124525L;
final Subscriber<? super T> downstream;
long remaining;
Subscription upstream;
Reported by PMD.
Line: 46
final Subscriber<? super T> downstream;
long remaining;
Subscription upstream;
TakeSubscriber(Subscriber<? super T> actual, long remaining) {
this.downstream = actual;
Reported by PMD.
Line: 48
long remaining;
Subscription upstream;
TakeSubscriber(Subscriber<? super T> actual, long remaining) {
this.downstream = actual;
this.remaining = remaining;
lazySet(remaining);
Reported by PMD.
Line: 59
@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
if (remaining == 0L) {
s.cancel();
EmptySubscription.complete(downstream);
} else {
this.upstream = s;
downstream.onSubscribe(this);
Reported by PMD.
Line: 72
@Override
public void onNext(T t) {
long r = remaining;
if (r > 0L) {
remaining = --r;
downstream.onNext(t);
if (r == 0L) {
upstream.cancel();
downstream.onComplete();
Reported by PMD.
Line: 75
if (r > 0L) {
remaining = --r;
downstream.onNext(t);
if (r == 0L) {
upstream.cancel();
downstream.onComplete();
}
}
}
Reported by PMD.
Line: 84
@Override
public void onError(Throwable t) {
if (remaining > 0L) {
remaining = 0L;
downstream.onError(t);
} else {
RxJavaPlugins.onError(t);
}
Reported by PMD.
Line: 94
@Override
public void onComplete() {
if (remaining > 0L) {
remaining = 0L;
downstream.onComplete();
}
}
Reported by PMD.
Line: 105
if (SubscriptionHelper.validate(n)) {
for (;;) {
long r = get();
if (r == 0L) {
break;
}
long toRequest = Math.min(r, n);
long u = r - toRequest;
if (compareAndSet(r, u)) {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/subscribers/ForEachWhileSubscriber.java
13 issues
Line: 33
private static final long serialVersionUID = -4403180040475402120L;
final Predicate<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
Reported by PMD.
Line: 33
private static final long serialVersionUID = -4403180040475402120L;
final Predicate<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
Reported by PMD.
Line: 35
final Predicate<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
boolean done;
Reported by PMD.
Line: 35
final Predicate<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
boolean done;
Reported by PMD.
Line: 37
final Consumer<? super Throwable> onError;
final Action onComplete;
boolean done;
public ForEachWhileSubscriber(Predicate<? super T> onNext,
Consumer<? super Throwable> onError, Action onComplete) {
Reported by PMD.
Line: 37
final Consumer<? super Throwable> onError;
final Action onComplete;
boolean done;
public ForEachWhileSubscriber(Predicate<? super T> onNext,
Consumer<? super Throwable> onError, Action onComplete) {
Reported by PMD.
Line: 39
final Action onComplete;
boolean done;
public ForEachWhileSubscriber(Predicate<? super T> onNext,
Consumer<? super Throwable> onError, Action onComplete) {
this.onNext = onNext;
this.onError = onError;
Reported by PMD.
Line: 62
boolean b;
try {
b = onNext.test(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
dispose();
onError(ex);
return;
}
Reported by PMD.
Line: 84
done = true;
try {
onError.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(new CompositeException(t, ex));
}
}
Reported by PMD.
Line: 98
done = true;
try {
onComplete.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subjects/SubjectTest.java
13 issues
Line: 34
try {
p.onNext(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
Reported by PMD.
Line: 34
try {
p.onNext(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
Reported by PMD.
Line: 35
p.onNext(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
Reported by PMD.
Line: 38
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
@Test
public void onErrorNull() {
Subject<T> p = create();
Reported by PMD.
Line: 38
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
@Test
public void onErrorNull() {
Subject<T> p = create();
Reported by PMD.
Line: 38
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
@Test
public void onErrorNull() {
Subject<T> p = create();
Reported by PMD.
Line: 48
try {
p.onError(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onError called with a null Throwable."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
Reported by PMD.
Line: 48
try {
p.onError(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onError called with a null Throwable."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
Reported by PMD.
Line: 49
p.onError(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onError called with a null Throwable."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
}
Reported by PMD.
Line: 52
assertEquals(ExceptionHelper.nullWarning("onError called with a null Throwable."), ex.getMessage());
}
p.test().assertEmpty().dispose();
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/processors/FlowableProcessorTest.java
13 issues
Line: 34
try {
p.onNext(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
Reported by PMD.
Line: 34
try {
p.onNext(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
Reported by PMD.
Line: 35
p.onNext(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
Reported by PMD.
Line: 38
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
@Test
public void onErrorNull() {
FlowableProcessor<T> p = create();
Reported by PMD.
Line: 38
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
@Test
public void onErrorNull() {
FlowableProcessor<T> p = create();
Reported by PMD.
Line: 38
assertEquals(ExceptionHelper.nullWarning("onNext called with a null value."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
@Test
public void onErrorNull() {
FlowableProcessor<T> p = create();
Reported by PMD.
Line: 48
try {
p.onError(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onError called with a null Throwable."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
Reported by PMD.
Line: 48
try {
p.onError(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onError called with a null Throwable."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
Reported by PMD.
Line: 49
p.onError(null);
fail("No NullPointerException thrown");
} catch (NullPointerException ex) {
assertEquals(ExceptionHelper.nullWarning("onError called with a null Throwable."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
}
Reported by PMD.
Line: 52
assertEquals(ExceptionHelper.nullWarning("onError called with a null Throwable."), ex.getMessage());
}
p.test().assertEmpty().cancel();
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/subscribers/BlockingSubscriberTest.java
13 issues
Line: 31
public class BlockingSubscriberTest extends RxJavaTest {
@Test
public void doubleOnSubscribe() {
TestHelper.doubleOnSubscribe(new BlockingSubscriber<Integer>(new ArrayDeque<>()));
}
@Test
public void cancel() {
Reported by PMD.
Line: 36
}
@Test
public void cancel() {
BlockingSubscriber<Integer> bq = new BlockingSubscriber<>(new ArrayDeque<>());
assertFalse(bq.isCancelled());
bq.cancel();
Reported by PMD.
Line: 39
public void cancel() {
BlockingSubscriber<Integer> bq = new BlockingSubscriber<>(new ArrayDeque<>());
assertFalse(bq.isCancelled());
bq.cancel();
assertTrue(bq.isCancelled());
Reported by PMD.
Line: 43
bq.cancel();
assertTrue(bq.isCancelled());
bq.cancel();
assertTrue(bq.isCancelled());
}
Reported by PMD.
Line: 47
bq.cancel();
assertTrue(bq.isCancelled());
}
@Test
public void blockingFirstDoubleOnSubscribe() {
TestHelper.doubleOnSubscribe(new BlockingFirstSubscriber<Integer>());
Reported by PMD.
Line: 51
}
@Test
public void blockingFirstDoubleOnSubscribe() {
TestHelper.doubleOnSubscribe(new BlockingFirstSubscriber<Integer>());
}
@Test
public void blockingFirstTimeout() {
Reported by PMD.
Line: 59
public void blockingFirstTimeout() {
BlockingFirstSubscriber<Integer> bf = new BlockingFirstSubscriber<>();
Thread.currentThread().interrupt();
try {
bf.blockingGet();
fail("Should have thrown!");
} catch (RuntimeException ex) {
Reported by PMD.
Line: 64
try {
bf.blockingGet();
fail("Should have thrown!");
} catch (RuntimeException ex) {
assertTrue(ex.toString(), ex.getCause() instanceof InterruptedException);
}
}
@Test
Reported by PMD.
Line: 75
bf.onSubscribe(new BooleanSubscription());
Thread.currentThread().interrupt();
try {
bf.blockingGet();
fail("Should have thrown!");
} catch (RuntimeException ex) {
Reported by PMD.
Line: 80
try {
bf.blockingGet();
fail("Should have thrown!");
} catch (RuntimeException ex) {
assertTrue(ex.toString(), ex.getCause() instanceof InterruptedException);
}
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/maybe/MaybeCreateTest.java
13 issues
Line: 48
}
}).test().assertResult(1);
assertTrue(d.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
Reported by PMD.
Line: 48
}
}).test().assertResult(1);
assertTrue(d.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
}
Reported by PMD.
Line: 56
}
@Test
public void basicWithCancellable() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
final Disposable d1 = Disposable.empty();
final Disposable d2 = Disposable.empty();
Reported by PMD.
Line: 80
}
}).test().assertResult(1);
assertTrue(d1.isDisposed());
assertTrue(d2.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 80
}
}).test().assertResult(1);
assertTrue(d1.isDisposed());
assertTrue(d2.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 81
}).test().assertResult(1);
assertTrue(d1.isDisposed());
assertTrue(d2.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Reported by PMD.
Line: 81
}).test().assertResult(1);
assertTrue(d1.isDisposed());
assertTrue(d2.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Reported by PMD.
Line: 106
}
}).test().assertFailure(TestException.class);
assertTrue(d.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Reported by PMD.
Line: 106
}
}).test().assertFailure(TestException.class);
assertTrue(d.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Reported by PMD.
Line: 131
}
}).test().assertComplete();
assertTrue(d.isDisposed());
TestHelper.assertUndeliverable(errors, 0, TestException.class);
} finally {
RxJavaPlugins.reset();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/validators/MaybeNo2Dot0Since.java
13 issues
Line: 42
boolean classDefPassed = false;
BufferedReader in = new BufferedReader(new FileReader(f));
try {
int ln = 1;
while (true) {
line = in.readLine();
Reported by PMD.
Line: 71
}
if (b.length() != 0) {
System.out.println(b);
fail(b.toString());
}
}
}
Reported by PMD.
Line: 32
public class MaybeNo2Dot0Since {
@Test
public void noSince20InMaybe() throws Exception {
File f = TestHelper.findSource(Maybe.class.getSimpleName());
String line;
Reported by PMD.
Line: 43
boolean classDefPassed = false;
BufferedReader in = new BufferedReader(new FileReader(f));
try {
int ln = 1;
while (true) {
line = in.readLine();
if (line == null) {
Reported by PMD.
Line: 57
}
if (classDefPassed) {
if (line.contains("@since") && line.contains("2.0") && !line.contains("2.0.")) {
b.append("java.lang.RuntimeException: @since 2.0 found").append("\r\n")
.append(" at io.reactivex.Maybe (Maybe.java:").append(ln).append(")\r\n\r\n");
;
}
}
Reported by PMD.
Line: 58
if (classDefPassed) {
if (line.contains("@since") && line.contains("2.0") && !line.contains("2.0.")) {
b.append("java.lang.RuntimeException: @since 2.0 found").append("\r\n")
.append(" at io.reactivex.Maybe (Maybe.java:").append(ln).append(")\r\n\r\n");
;
}
}
Reported by PMD.
Line: 60
if (line.contains("@since") && line.contains("2.0") && !line.contains("2.0.")) {
b.append("java.lang.RuntimeException: @since 2.0 found").append("\r\n")
.append(" at io.reactivex.Maybe (Maybe.java:").append(ln).append(")\r\n\r\n");
;
}
}
ln++;
}
Reported by PMD.
Line: 40
StringBuilder b = new StringBuilder();
boolean classDefPassed = false;
BufferedReader in = new BufferedReader(new FileReader(f));
try {
int ln = 1;
while (true) {
Reported by PMD.
Line: 40
StringBuilder b = new StringBuilder();
boolean classDefPassed = false;
BufferedReader in = new BufferedReader(new FileReader(f));
try {
int ln = 1;
while (true) {
Reported by PMD.
Line: 44
BufferedReader in = new BufferedReader(new FileReader(f));
try {
int ln = 1;
while (true) {
line = in.readLine();
if (line == null) {
break;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/schedulers/IoSchedulerInternalTest.java
12 issues
Line: 29
public class IoSchedulerInternalTest extends RxJavaTest {
@Test
public void expiredQueueEmpty() {
ConcurrentLinkedQueue<ThreadWorker> expire = new ConcurrentLinkedQueue<>();
CompositeDisposable cd = new CompositeDisposable();
CachedWorkerPool.evictExpiredWorkers(expire, cd);
}
Reported by PMD.
Line: 37
}
@Test
public void expiredWorkerRemoved() {
ConcurrentLinkedQueue<ThreadWorker> expire = new ConcurrentLinkedQueue<>();
CompositeDisposable cd = new CompositeDisposable();
ThreadWorker tw = new ThreadWorker(new RxThreadFactory("IoExpiryTest"));
Reported by PMD.
Line: 49
CachedWorkerPool.evictExpiredWorkers(expire, cd);
assertTrue(tw.isDisposed());
assertTrue(expire.isEmpty());
} finally {
tw.dispose();
}
}
Reported by PMD.
Line: 50
CachedWorkerPool.evictExpiredWorkers(expire, cd);
assertTrue(tw.isDisposed());
assertTrue(expire.isEmpty());
} finally {
tw.dispose();
}
}
Reported by PMD.
Line: 57
}
@Test
public void noExpiredWorker() {
ConcurrentLinkedQueue<ThreadWorker> expire = new ConcurrentLinkedQueue<>();
CompositeDisposable cd = new CompositeDisposable();
ThreadWorker tw = new ThreadWorker(new RxThreadFactory("IoExpiryTest"));
tw.setExpirationTime(System.nanoTime() + 10_000_000_000L);
Reported by PMD.
Line: 70
CachedWorkerPool.evictExpiredWorkers(expire, cd);
assertFalse(tw.isDisposed());
assertFalse(expire.isEmpty());
} finally {
tw.dispose();
}
}
Reported by PMD.
Line: 71
CachedWorkerPool.evictExpiredWorkers(expire, cd);
assertFalse(tw.isDisposed());
assertFalse(expire.isEmpty());
} finally {
tw.dispose();
}
}
Reported by PMD.
Line: 78
}
@Test
public void expireReuseRace() {
ConcurrentLinkedQueue<ThreadWorker> expire = new ConcurrentLinkedQueue<>();
CompositeDisposable cd = new CompositeDisposable();
ThreadWorker tw = new ThreadWorker(new RxThreadFactory("IoExpiryTest"));
tw.dispose();
Reported by PMD.
Line: 16
package io.reactivex.rxjava3.internal.schedulers;
import static org.junit.Assert.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.junit.Test;
import io.reactivex.rxjava3.core.RxJavaTest;
Reported by PMD.
Line: 23
import io.reactivex.rxjava3.core.RxJavaTest;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.internal.schedulers.IoScheduler.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class IoSchedulerInternalTest extends RxJavaTest {
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/util/CrashingMappedIterable.java
12 issues
Line: 28
* @param <T> the result type
*/
public final class CrashingMappedIterable<T> implements Iterable<T> {
int crashOnIterator;
final int crashOnHasNext;
final int crashOnNext;
Reported by PMD.
Line: 30
public final class CrashingMappedIterable<T> implements Iterable<T> {
int crashOnIterator;
final int crashOnHasNext;
final int crashOnNext;
final Function<Integer, T> mapper;
Reported by PMD.
Line: 32
final int crashOnHasNext;
final int crashOnNext;
final Function<Integer, T> mapper;
public CrashingMappedIterable(int crashOnIterator, int crashOnHasNext, int crashOnNext, Function<Integer, T> mapper) {
this.crashOnIterator = crashOnIterator;
Reported by PMD.
Line: 34
final int crashOnNext;
final Function<Integer, T> mapper;
public CrashingMappedIterable(int crashOnIterator, int crashOnHasNext, int crashOnNext, Function<Integer, T> mapper) {
this.crashOnIterator = crashOnIterator;
this.crashOnHasNext = crashOnHasNext;
this.crashOnNext = crashOnNext;
Reported by PMD.
Line: 45
@Override
public Iterator<T> iterator() {
if (--crashOnIterator <= 0) {
throw new TestException("iterator()");
}
return new CrashingMapperIterator<>(crashOnHasNext, crashOnNext, mapper);
}
Reported by PMD.
Line: 52
}
static final class CrashingMapperIterator<T> implements Iterator<T> {
int crashOnHasNext;
int crashOnNext;
int count;
Reported by PMD.
Line: 54
static final class CrashingMapperIterator<T> implements Iterator<T> {
int crashOnHasNext;
int crashOnNext;
int count;
final Function<Integer, T> mapper;
Reported by PMD.
Line: 56
int crashOnNext;
int count;
final Function<Integer, T> mapper;
CrashingMapperIterator(int crashOnHasNext, int crashOnNext, Function<Integer, T> mapper) {
this.crashOnHasNext = crashOnHasNext;
Reported by PMD.
Line: 58
int count;
final Function<Integer, T> mapper;
CrashingMapperIterator(int crashOnHasNext, int crashOnNext, Function<Integer, T> mapper) {
this.crashOnHasNext = crashOnHasNext;
this.crashOnNext = crashOnNext;
this.mapper = mapper;
Reported by PMD.
Line: 68
@Override
public boolean hasNext() {
if (--crashOnHasNext <= 0) {
throw new TestException("hasNext()");
}
return true;
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableGroupByTests.java
12 issues
Line: 42
.blockingForEach(new Consumer<GroupedObservable<String, Event>>() {
@Override
public void accept(GroupedObservable<String, Event> v) {
System.out.println(v);
v.take(1).subscribe(); // FIXME groups need consumption to a certain degree to cancel upstream
}
});
System.out.println("**** finished");
Reported by PMD.
Line: 47
}
});
System.out.println("**** finished");
Thread.sleep(200); // make sure the event streams receive their interrupt
}
@Test
Reported by PMD.
Line: 80
.blockingForEach(new Consumer<Object>() {
@Override
public void accept(Object pv) {
System.out.println(pv);
}
});
System.out.println("**** finished");
Reported by PMD.
Line: 84
}
});
System.out.println("**** finished");
Thread.sleep(200); // make sure the event streams receive their interrupt
}
}
Reported by PMD.
Line: 26
public class ObservableGroupByTests extends RxJavaTest {
@Test
public void takeUnsubscribesOnGroupBy() throws Exception {
Observable.merge(
ObservableEventStream.getEventStream("HTTP-ClusterA", 50),
ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
)
// group by type (2 clusters)
Reported by PMD.
Line: 26
public class ObservableGroupByTests extends RxJavaTest {
@Test
public void takeUnsubscribesOnGroupBy() throws Exception {
Observable.merge(
ObservableEventStream.getEventStream("HTTP-ClusterA", 50),
ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
)
// group by type (2 clusters)
Reported by PMD.
Line: 43
@Override
public void accept(GroupedObservable<String, Event> v) {
System.out.println(v);
v.take(1).subscribe(); // FIXME groups need consumption to a certain degree to cancel upstream
}
});
System.out.println("**** finished");
Reported by PMD.
Line: 53
}
@Test
public void takeUnsubscribesOnFlatMapOfGroupBy() throws Exception {
Observable.merge(
ObservableEventStream.getEventStream("HTTP-ClusterA", 50),
ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
)
// group by type (2 clusters)
Reported by PMD.
Line: 53
}
@Test
public void takeUnsubscribesOnFlatMapOfGroupBy() throws Exception {
Observable.merge(
ObservableEventStream.getEventStream("HTTP-ClusterA", 50),
ObservableEventStream.getEventStream("HTTP-ClusterB", 20)
)
// group by type (2 clusters)
Reported by PMD.
Line: 71
return g.map(new Function<Event, Object>() {
@Override
public Object apply(Event event) {
return event.instanceId + " - " + event.values.get("count200");
}
});
}
})
.take(20)
Reported by PMD.