The following issues were found
src/test/java/io/reactivex/rxjava3/maybe/MaybeTest.java
1040 issues
Line: 929
Maybe.unsafeCreate(new MaybeSource<Object>() {
@Override
public void subscribe(MaybeObserver<? super Object> observer) {
throw new NullPointerException("Forced failure");
}
}).test();
fail("Should have thrown!");
} catch (NullPointerException ex) {
Reported by PMD.
Line: 2950
@Test
public void onTerminateDetach() throws Exception {
System.gc();
Thread.sleep(150);
long before = usedMemoryNow();
Reported by PMD.
Line: 2989
source = null;
System.gc();
Thread.sleep(250);
long after = usedMemoryNow();
Reported by PMD.
Line: 3000
middle / 1024.0 / 1024.0,
after / 1024.0 / 1024.0);
System.out.printf(log);
if (before * 1.3 < after) {
fail("There seems to be a memory leak: " + log);
}
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.maybe;
import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.management.*;
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.maybe;
import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.management.*;
Reported by PMD.
Line: 44
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class MaybeTest extends RxJavaTest {
@Test
public void fromFlowableEmpty() {
Flowable.empty()
.singleElement()
Reported by PMD.
Line: 44
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class MaybeTest extends RxJavaTest {
@Test
public void fromFlowableEmpty() {
Flowable.empty()
.singleElement()
Reported by PMD.
Line: 44
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class MaybeTest extends RxJavaTest {
@Test
public void fromFlowableEmpty() {
Flowable.empty()
.singleElement()
Reported by PMD.
Line: 44
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class MaybeTest extends RxJavaTest {
@Test
public void fromFlowableEmpty() {
Flowable.empty()
.singleElement()
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayEagerTruncateTest.java
657 issues
Line: 472
@Override
public void accept(Integer v) {
effectCounter.incrementAndGet();
System.out.println("Sideeffect #" + v);
}
});
Flowable<Integer> result = source.replay(
new Function<Flowable<Integer>, Flowable<Integer>>() {
Reported by PMD.
Line: 486
for (int i = 1; i < 3; i++) {
effectCounter.set(0);
System.out.printf("- %d -%n", i);
result.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
System.out.println(t1);
Reported by PMD.
Line: 491
@Override
public void accept(Integer t1) {
System.out.println(t1);
}
}, new Consumer<Throwable>() {
@Override
Reported by PMD.
Line: 504
new Action() {
@Override
public void run() {
System.out.println("Done");
}
});
assertEquals(2, effectCounter.get());
}
}
Reported by PMD.
Line: 920
@Override
public void run() {
counter.incrementAndGet();
System.out.println("published observable being executed");
subscriber.onNext("one");
subscriber.onComplete();
}
}).start();
}
Reported by PMD.
Line: 937
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
// subscribe again
Reported by PMD.
Line: 948
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
Reported by PMD.
Line: 1182
source.subscribe(ts3);
ts3.assertNoErrors();
System.out.println(ts3.values());
ts3.assertValues(3, 4, 5, 6, 7, 8, 9, 10);
ts3.assertComplete();
}
@Test
Reported by PMD.
Line: 1237
source.subscribe(ts3);
ts3.assertNoErrors();
System.out.println(ts3.values());
ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10);
ts3.assertComplete();
}
@Test
Reported by PMD.
Line: 1950
.takeLast(1)
;
System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);
System.out.println("Bounded Replay Leak check: GC");
System.gc();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOnTest.java
640 issues
Line: 335
// don't let this thing finish yet
try {
if (!nextLatch.await(1000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("it shouldn't have timed out");
}
} catch (InterruptedException e) {
throw new RuntimeException("it shouldn't have failed");
}
}
Reported by PMD.
Line: 338
throw new RuntimeException("it shouldn't have timed out");
}
} catch (InterruptedException e) {
throw new RuntimeException("it shouldn't have failed");
}
}
});
Reported by PMD.
Line: 91
@Test
public void threadName() throws InterruptedException {
System.out.println("Main Thread: " + Thread.currentThread().getName());
Flowable<String> obs = Flowable.just("one", "null", "two", "three", "four");
Subscriber<String> subscriber = TestHelper.mockSubscriber();
final String parentThreadName = Thread.currentThread().getName();
Reported by PMD.
Line: 105
@Override
public void accept(String s) {
String threadName = Thread.currentThread().getName();
System.out.println("Source ThreadName: " + threadName + " Expected => " + parentThreadName);
assertEquals(parentThreadName, threadName);
}
});
Reported by PMD.
Line: 118
public void accept(String t1) {
String threadName = Thread.currentThread().getName();
boolean correctThreadName = threadName.startsWith("RxNewThreadScheduler");
System.out.println("ObserveOn ThreadName: " + threadName + " Correct => " + correctThreadName);
assertTrue(correctThreadName);
}
}).doAfterTerminate(new Action() {
Reported by PMD.
Line: 320
@Override
public void onComplete() {
System.out.println("onComplete");
completeTime.set(System.nanoTime());
completedLatch.countDown();
}
@Override
Reported by PMD.
Line: 345
});
long afterSubscribeTime = System.nanoTime();
System.out.println("After subscribe: " + completedLatch.getCount());
assertEquals(1, completedLatch.getCount());
nextLatch.countDown();
completedLatch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(completeTime.get() > afterSubscribeTime);
System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime));
Reported by PMD.
Line: 350
nextLatch.countDown();
completedLatch.await(1000, TimeUnit.MILLISECONDS);
assertTrue(completeTime.get() > afterSubscribeTime);
System.out.println("onComplete nanos after subscribe: " + (completeTime.get() - afterSubscribeTime));
}
private static int randomIntFrom0to100() {
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
long x = System.nanoTime();
Reported by PMD.
Line: 431
TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>() {
@Override
public void onNext(Integer t) {
System.err.println("c t = " + t + " thread " + Thread.currentThread());
super.onNext(t);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
Reported by PMD.
Line: 445
.take(3)
.subscribe(testSubscriber);
testSubscriber.awaitDone(5, TimeUnit.SECONDS);
System.err.println(testSubscriber.values());
testSubscriber.assertValues(0, 1, 2);
// it should be between the take num and requested batch size across the async boundary
System.out.println("Generated: " + generated.get());
assertTrue(generated.get() >= 3 && generated.get() <= Flowable.bufferSize());
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupByTest.java
633 issues
Line: 1093
return new Function<Integer, T>() {
@Override
public T apply(Integer t1) {
throw new RuntimeException("Forced failure");
}
};
}
<T, R> Function<T, R> fail2(R dummy2) {
Reported by PMD.
Line: 1102
return new Function<T, R>() {
@Override
public R apply(T t1) {
throw new RuntimeException("Forced failure");
}
};
}
Function<Integer, Integer> dbl = new Function<Integer, Integer>() {
Reported by PMD.
Line: 2021
try {
action.accept(notification.getValue());
} catch (Throwable ex) {
throw new RuntimeException(ex);
}
}
}).ticker(testTicker) //
.<Integer, Object>build().asMap();
}
Reported by PMD.
Line: 2152
try {
evictedListener.accept(v);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
return result;
}
Reported by PMD.
Line: 2208
try {
notify.accept(notification.getValue());
} catch (Throwable e) {
throw new RuntimeException(e);
}
}})
.<Integer, Object> build();
cacheOut.set(cache);
return cache.asMap();
Reported by PMD.
Line: 2230
try {
notify.accept(object);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});
}};
return evictingMapFactory;
Reported by PMD.
Line: 2610
try {
itemEvictConsumer.accept(notification.getValue());
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}
})
.<T, Object>build().asMap();
}
Reported by PMD.
Line: 2891
try {
itemEvictConsumer.accept(n.getValue());
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}
}).<T, Object>build().asMap();
}
Reported by PMD.
Line: 156
@Override
public void onNext(String v) {
eventCounter.incrementAndGet();
System.out.println(v);
}
});
assertEquals(3, groupCounter.get());
Reported by PMD.
Line: 211
@Override
public void subscribe(final Subscriber<? super Event> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
System.out.println("*** Subscribing to EventStream ***");
subscribeCounter.incrementAndGet();
new Thread(new Runnable() {
@Override
public void run() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplayTest.java
622 issues
Line: 475
@Override
public void accept(Integer v) {
effectCounter.incrementAndGet();
System.out.println("Sideeffect #" + v);
}
});
Flowable<Integer> result = source.replay(
new Function<Flowable<Integer>, Flowable<Integer>>() {
Reported by PMD.
Line: 489
for (int i = 1; i < 3; i++) {
effectCounter.set(0);
System.out.printf("- %d -%n", i);
result.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
System.out.println(t1);
Reported by PMD.
Line: 494
@Override
public void accept(Integer t1) {
System.out.println(t1);
}
}, new Consumer<Throwable>() {
@Override
Reported by PMD.
Line: 507
new Action() {
@Override
public void run() {
System.out.println("Done");
}
});
assertEquals(2, effectCounter.get());
}
}
Reported by PMD.
Line: 937
@Override
public void run() {
counter.incrementAndGet();
System.out.println("published observable being executed");
subscriber.onNext("one");
subscriber.onComplete();
}
}).start();
}
Reported by PMD.
Line: 954
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
// subscribe again
Reported by PMD.
Line: 965
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
Reported by PMD.
Line: 1247
source.subscribe(ts3);
ts3.assertNoErrors();
System.out.println(ts3.values());
ts3.assertValues(3, 4, 5, 6, 7, 8, 9, 10);
ts3.assertComplete();
}
@Test
Reported by PMD.
Line: 1302
source.subscribe(ts3);
ts3.assertNoErrors();
System.out.println(ts3.values());
ts3.assertValues(2, 3, 4, 5, 6, 7, 8, 9, 10);
ts3.assertComplete();
}
@Test
Reported by PMD.
Line: 2015
.takeLast(1)
;
System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);
System.out.println("Bounded Replay Leak check: GC");
System.gc();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplayEagerTruncateTest.java
601 issues
Line: 472
@Override
public void accept(Integer v) {
effectCounter.incrementAndGet();
System.out.println("Sideeffect #" + v);
}
});
Observable<Integer> result = source.replay(
new Function<Observable<Integer>, Observable<Integer>>() {
Reported by PMD.
Line: 486
for (int i = 1; i < 3; i++) {
effectCounter.set(0);
System.out.printf("- %d -%n", i);
result.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer t1) {
System.out.println(t1);
Reported by PMD.
Line: 491
@Override
public void accept(Integer t1) {
System.out.println(t1);
}
}, new Consumer<Throwable>() {
@Override
Reported by PMD.
Line: 504
new Action() {
@Override
public void run() {
System.out.println("Done");
}
});
assertEquals(2, effectCounter.get());
}
}
Reported by PMD.
Line: 901
@Override
public void run() {
counter.incrementAndGet();
System.out.println("published Observable being executed");
observer.onNext("one");
observer.onComplete();
}
}).start();
}
Reported by PMD.
Line: 918
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
// subscribe again
Reported by PMD.
Line: 929
@Override
public void accept(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
Reported by PMD.
Line: 1664
.takeLast(1)
;
System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);
System.out.println("Bounded Replay Leak check: GC");
System.gc();
Reported by PMD.
Line: 1667
System.out.println("Bounded Replay Leak check: Wait before GC");
Thread.sleep(1000);
System.out.println("Bounded Replay Leak check: GC");
System.gc();
Thread.sleep(500);
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
Reported by PMD.
Line: 1668
Thread.sleep(1000);
System.out.println("Bounded Replay Leak check: GC");
System.gc();
Thread.sleep(500);
final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBufferTest.java
574 issues
Line: 311
.doOnNext(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> pv) {
System.out.println(pv);
}
})
.subscribe(ts);
InOrder inOrder = Mockito.inOrder(subscriber);
Reported by PMD.
Line: 563
.doOnNext(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> pv) {
System.out.println(pv);
}
})
.subscribe(subscriber);
scheduler.advanceTimeBy(5, TimeUnit.SECONDS);
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableBufferTest extends RxJavaTest {
private Subscriber<List<String>> subscriber;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableBufferTest extends RxJavaTest {
private Subscriber<List<String>> subscriber;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableBufferTest extends RxJavaTest {
private Subscriber<List<String>> subscriber;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableBufferTest extends RxJavaTest {
private Subscriber<List<String>> subscriber;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
Reported by PMD.
Line: 44
public class FlowableBufferTest extends RxJavaTest {
private Subscriber<List<String>> subscriber;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
Reported by PMD.
Line: 45
public class FlowableBufferTest extends RxJavaTest {
private Subscriber<List<String>> subscriber;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
subscriber = TestHelper.mockSubscriber();
Reported by PMD.
Line: 46
private Subscriber<List<String>> subscriber;
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
@Before
public void before() {
subscriber = TestHelper.mockSubscriber();
scheduler = new TestScheduler();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/processors/ReplayProcessorTest.java
543 issues
Line: 281
@Override
public void onNext(String v) {
System.out.println("observer1: " + v);
lastValueForSubscriber1.set(v);
}
};
Reported by PMD.
Line: 305
@Override
public void onNext(String v) {
System.out.println("observer2: " + v);
if (v.equals("one")) {
oneReceived.countDown();
} else {
try {
makeSlow.await();
Reported by PMD.
Line: 330
// use subscribeOn to make this async otherwise we deadlock as we are using CountDownLatches
processor.subscribeOn(Schedulers.newThread()).subscribe(subscriber2);
System.out.println("before waiting for one");
// wait until observer2 starts having replay occur
oneReceived.await();
System.out.println("after waiting for one");
Reported by PMD.
Line: 335
// wait until observer2 starts having replay occur
oneReceived.await();
System.out.println("after waiting for one");
processor.onNext("three");
System.out.println("sent three");
Reported by PMD.
Line: 339
processor.onNext("three");
System.out.println("sent three");
// if subscription blocked existing subscribers then 'makeSlow' would cause this to not be there yet
assertEquals("three", lastValueForSubscriber1.get());
System.out.println("about to send onComplete");
Reported by PMD.
Line: 344
// if subscription blocked existing subscribers then 'makeSlow' would cause this to not be there yet
assertEquals("three", lastValueForSubscriber1.get());
System.out.println("about to send onComplete");
processor.onComplete();
System.out.println("completed processor");
Reported by PMD.
Line: 348
processor.onComplete();
System.out.println("completed processor");
// release
makeSlow.countDown();
System.out.println("makeSlow released");
Reported by PMD.
Line: 353
// release
makeSlow.countDown();
System.out.println("makeSlow released");
completed.await();
// all of them should be emitted with the last being "three"
assertEquals("three", lastValueForSubscriber2.get());
Reported by PMD.
Line: 383
InOrder inOrder = inOrder(subscriber);
String v = "" + i;
src.onNext(v);
System.out.printf("Turn: %d%n", i);
src.firstElement().toFlowable()
.flatMap(new Function<String, Flowable<String>>() {
@Override
public Flowable<String> apply(String t1) {
Reported by PMD.
Line: 395
.subscribe(new DefaultSubscriber<String>() {
@Override
public void onNext(String t) {
System.out.println(t);
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/completable/CompletableTest.java
526 issues
Line: 356
public void createOnSubscribeThrowsNPE() {
Completable c = Completable.unsafeCreate(new CompletableSource() {
@Override
public void subscribe(CompletableObserver observer) { throw new NullPointerException(); }
});
c.blockingAwait();
}
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.completable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 47
/**
* Test Completable methods and operators.
*/
public class CompletableTest extends RxJavaTest {
/**
* Iterable that returns an Iterator that throws in its hasNext method.
*/
static final class IterableIteratorNextThrows implements Iterable<Completable> {
@Override
Reported by PMD.
Line: 47
/**
* Test Completable methods and operators.
*/
public class CompletableTest extends RxJavaTest {
/**
* Iterable that returns an Iterator that throws in its hasNext method.
*/
static final class IterableIteratorNextThrows implements Iterable<Completable> {
@Override
Reported by PMD.
Line: 47
/**
* Test Completable methods and operators.
*/
public class CompletableTest extends RxJavaTest {
/**
* Iterable that returns an Iterator that throws in its hasNext method.
*/
static final class IterableIteratorNextThrows implements Iterable<Completable> {
@Override
Reported by PMD.
Line: 47
/**
* Test Completable methods and operators.
*/
public class CompletableTest extends RxJavaTest {
/**
* Iterable that returns an Iterator that throws in its hasNext method.
*/
static final class IterableIteratorNextThrows implements Iterable<Completable> {
@Override
Reported by PMD.
Line: 47
/**
* Test Completable methods and operators.
*/
public class CompletableTest extends RxJavaTest {
/**
* Iterable that returns an Iterator that throws in its hasNext method.
*/
static final class IterableIteratorNextThrows implements Iterable<Completable> {
@Override
Reported by PMD.
Line: 47
/**
* Test Completable methods and operators.
*/
public class CompletableTest extends RxJavaTest {
/**
* Iterable that returns an Iterator that throws in its hasNext method.
*/
static final class IterableIteratorNextThrows implements Iterable<Completable> {
@Override
Reported by PMD.
Line: 103
private static final long serialVersionUID = 7192337844700923752L;
public final Completable completable = Completable.unsafeCreate(new CompletableSource() {
@Override
public void subscribe(CompletableObserver observer) {
getAndIncrement();
EmptyDisposable.complete(observer);
}
Reported by PMD.
Line: 128
private static final long serialVersionUID = 7192337844700923752L;
public final Completable completable = Completable.unsafeCreate(new CompletableSource() {
@Override
public void subscribe(CompletableObserver observer) {
getAndIncrement();
EmptyDisposable.error(new TestException(), observer);
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java
518 issues
Line: 54
Flowable<String> combined = Flowable.combineLatest(w1, w2, new BiFunction<String, String, String>() {
@Override
public String apply(String v1, String v2) {
throw new RuntimeException("I don't work.");
}
});
combined.subscribe(w);
w1.onNext("first value of w1");
Reported by PMD.
Line: 829
@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}
@Override
public void onNext(Integer t) {
latch.countDown();
Reported by PMD.
Line: 843
private static final Function<Object[], Integer> THROW_NON_FATAL = new Function<Object[], Integer>() {
@Override
public Integer apply(Object[] args) {
throw new RuntimeException();
}
};
@Test
Reported by PMD.
Line: 1412
Flowable<Object> errorFlowable = Flowable.timer(100, TimeUnit.MILLISECONDS, testScheduler).map(new Function<Long, Object>() {
@Override
public Object apply(Long aLong) throws Exception {
throw new Exception();
}
});
Flowable.combineLatestDelayError(
Arrays.asList(
Reported by PMD.
Line: 229
private Function3<String, String, String, String> getConcat3StringsCombineLatestFunction() {
Function3<String, String, String, String> combineLatestFunction = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 229
private Function3<String, String, String, String> getConcat3StringsCombineLatestFunction() {
Function3<String, String, String, String> combineLatestFunction = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 229
private Function3<String, String, String, String> getConcat3StringsCombineLatestFunction() {
Function3<String, String, String, String> combineLatestFunction = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 440
}
};
for (int i = 1; i <= n; i++) {
System.out.println("test1ToNSources: " + i + " sources");
List<Flowable<Integer>> sources = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (int j = 0; j < i; j++) {
sources.add(Flowable.just(j));
values.add(j);
Reported by PMD.
Line: 471
}
};
for (int i = 1; i <= n; i++) {
System.out.println("test1ToNSourcesScheduled: " + i + " sources");
List<Flowable<Integer>> sources = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (int j = 0; j < i; j++) {
sources.add(Flowable.just(j).subscribeOn(Schedulers.io()));
values.add(j);
Reported by PMD.
Line: 1422
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
System.out.println("emptyFlowable: " + integerNotification);
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Reported by PMD.