The following issues were found
src/test/java/io/reactivex/rxjava3/internal/subscribers/BoundedSubscriberTest.java
106 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.subscribers;
import static org.junit.Assert.*;
import java.util.*;
Reported by PMD.
Line: 32
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.testsupport.*;
public class BoundedSubscriberTest extends RxJavaTest {
@Test
public void onSubscribeThrows() {
final List<Object> received = new ArrayList<>();
Reported by PMD.
Line: 35
public class BoundedSubscriberTest extends RxJavaTest {
@Test
public void onSubscribeThrows() {
final List<Object> received = new ArrayList<>();
BoundedSubscriber<Object> subscriber = new BoundedSubscriber<>(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Reported by PMD.
Line: 60
}
}, 128);
assertFalse(subscriber.isDisposed());
Flowable.just(1).subscribe(subscriber);
assertTrue(received.toString(), received.get(0) instanceof TestException);
assertEquals(received.toString(), 1, received.size());
Reported by PMD.
Line: 62
assertFalse(subscriber.isDisposed());
Flowable.just(1).subscribe(subscriber);
assertTrue(received.toString(), received.get(0) instanceof TestException);
assertEquals(received.toString(), 1, received.size());
assertTrue(subscriber.isDisposed());
Reported by PMD.
Line: 65
Flowable.just(1).subscribe(subscriber);
assertTrue(received.toString(), received.get(0) instanceof TestException);
assertEquals(received.toString(), 1, received.size());
assertTrue(subscriber.isDisposed());
}
@Test
Reported by PMD.
Line: 67
assertTrue(received.toString(), received.get(0) instanceof TestException);
assertEquals(received.toString(), 1, received.size());
assertTrue(subscriber.isDisposed());
}
@Test
public void onNextThrows() {
final List<Object> received = new ArrayList<>();
Reported by PMD.
Line: 71
}
@Test
public void onNextThrows() {
final List<Object> received = new ArrayList<>();
BoundedSubscriber<Object> subscriber = new BoundedSubscriber<>(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Reported by PMD.
Line: 96
}
}, 128);
assertFalse(subscriber.isDisposed());
Flowable.just(1).subscribe(subscriber);
assertTrue(received.toString(), received.get(0) instanceof TestException);
assertEquals(received.toString(), 1, received.size());
Reported by PMD.
Line: 98
assertFalse(subscriber.isDisposed());
Flowable.just(1).subscribe(subscriber);
assertTrue(received.toString(), received.get(0) instanceof TestException);
assertEquals(received.toString(), 1, received.size());
assertTrue(subscriber.isDisposed());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/schedulers/ExecutorSchedulerFairTest.java
106 issues
Line: 36
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ExecutorSchedulerFairTest extends AbstractSchedulerConcurrencyTests {
static final Executor executor = Executors.newFixedThreadPool(2, new RxThreadFactory("TestCustomPool"));
@Override
protected Scheduler getScheduler() {
Reported by PMD.
Line: 46
}
@Test
public final void handledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
SchedulerTestHelper.handledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
@Test
public void cancelledTaskRetention() throws InterruptedException {
Reported by PMD.
Line: 51
}
@Test
public void cancelledTaskRetention() throws InterruptedException {
ExecutorService exec = Executors.newSingleThreadExecutor();
Scheduler s = Schedulers.from(exec, false, true);
try {
Scheduler.Worker w = s.createWorker();
try {
Reported by PMD.
Line: 59
try {
ExecutorSchedulerTest.cancelledRetention(w, false);
} finally {
w.dispose();
}
w = s.createWorker();
try {
ExecutorSchedulerTest.cancelledRetention(w, true);
Reported by PMD.
Line: 66
try {
ExecutorSchedulerTest.cancelledRetention(w, true);
} finally {
w.dispose();
}
} finally {
exec.shutdownNow();
}
}
Reported by PMD.
Line: 75
/** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */
static final class TestExecutor implements Executor {
final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
@Override
public void execute(Runnable command) {
queue.offer(command);
}
public void executeOne() {
Reported by PMD.
Line: 88
}
public void executeAll() {
Runnable r;
while ((r = queue.poll()) != null) {
r.run();
}
}
}
Reported by PMD.
Line: 105
};
TestExecutor exec = new TestExecutor();
Scheduler custom = Schedulers.from(exec, false, true);
Worker w = custom.createWorker();
try {
Disposable d1 = w.schedule(task);
Disposable d2 = w.schedule(task);
Disposable d3 = w.schedule(task);
Reported by PMD.
Line: 111
Disposable d2 = w.schedule(task);
Disposable d3 = w.schedule(task);
d1.dispose();
d2.dispose();
d3.dispose();
exec.executeAll();
Reported by PMD.
Line: 112
Disposable d3 = w.schedule(task);
d1.dispose();
d2.dispose();
d3.dispose();
exec.executeAll();
assertEquals(0, calls.get());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/subjects/ReplaySubjectConcurrencyTest.java
105 issues
Line: 224
if (success) {
System.out.println("Success! " + sums.size() + " each had the same sum of " + expected);
} else {
throw new RuntimeException("Concurrency Bug");
}
}
/**
Reported by PMD.
Line: 279
t4.join();
t5.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals("value", value1.get());
assertEquals("value", t2.value.get());
assertEquals("value", t3.value.get());
Reported by PMD.
Line: 47
@Override
public void subscribe(Observer<? super Long> o) {
o.onSubscribe(Disposable.empty());
System.out.println("********* Start Source Data ***********");
for (long l = 1; l <= 10000; l++) {
o.onNext(l);
}
System.out.println("********* Finished Source Data ***********");
o.onComplete();
Reported by PMD.
Line: 51
for (long l = 1; l <= 10000; l++) {
o.onNext(l);
}
System.out.println("********* Finished Source Data ***********");
o.onComplete();
}
}).subscribe(replay);
}
});
Reported by PMD.
Line: 72
@Override
public void onComplete() {
System.out.println("*** Slow Observer completed");
slowLatch.countDown();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 83
@Override
public void onNext(Long args) {
if (args == 1) {
System.out.println("*** Slow Observer STARTED");
}
try {
if (args % 10 == 0) {
Thread.sleep(1);
}
Reported by PMD.
Line: 113
@Override
public void onComplete() {
System.out.println("*** Fast Observer completed");
fastLatch.countDown();
}
@Override
public void onError(Throwable e) {
Reported by PMD.
Line: 124
@Override
public void onNext(Long args) {
if (args == 1) {
System.out.println("*** Fast Observer STARTED");
}
}
};
replay.subscribe(fast);
try {
Reported by PMD.
Line: 157
@Override
public void subscribe(Observer<? super Long> o) {
o.onSubscribe(Disposable.empty());
System.out.println("********* Start Source Data ***********");
for (long l = 1; l <= 10000; l++) {
o.onNext(l);
}
System.out.println("********* Finished Source Data ***********");
o.onComplete();
Reported by PMD.
Line: 161
for (long l = 1; l <= 10000; l++) {
o.onNext(l);
}
System.out.println("********* Finished Source Data ***********");
o.onComplete();
}
}).subscribe(replay);
}
});
Reported by PMD.
src/main/java/io/reactivex/rxjava3/core/Flowable.java
104 issues
Line: 15928
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
/**
* Operator implementations (both source and intermediate) should implement this method that
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.core;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;
Reported by PMD.
Line: 156
* @see ParallelFlowable
* @see io.reactivex.rxjava3.subscribers.DisposableSubscriber
*/
public abstract class Flowable<@NonNull T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
}
Reported by PMD.
Line: 156
* @see ParallelFlowable
* @see io.reactivex.rxjava3.subscribers.DisposableSubscriber
*/
public abstract class Flowable<@NonNull T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
}
Reported by PMD.
Line: 156
* @see ParallelFlowable
* @see io.reactivex.rxjava3.subscribers.DisposableSubscriber
*/
public abstract class Flowable<@NonNull T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
}
Reported by PMD.
Line: 156
* @see ParallelFlowable
* @see io.reactivex.rxjava3.subscribers.DisposableSubscriber
*/
public abstract class Flowable<@NonNull T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
}
Reported by PMD.
Line: 156
* @see ParallelFlowable
* @see io.reactivex.rxjava3.subscribers.DisposableSubscriber
*/
public abstract class Flowable<@NonNull T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
}
Reported by PMD.
Line: 156
* @see ParallelFlowable
* @see io.reactivex.rxjava3.subscribers.DisposableSubscriber
*/
public abstract class Flowable<@NonNull T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
}
Reported by PMD.
Line: 197
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public static <@NonNull T> Flowable<T> amb(@NonNull Iterable<@NonNull ? extends Publisher<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableAmb<>(null, sources));
}
/**
* Mirrors the one {@link Publisher} in an array of several {@code Publisher}s that first either emits an item or sends
Reported by PMD.
Line: 241
if (len == 0) {
return empty();
} else
if (len == 1) {
return fromPublisher(sources[0]);
}
return RxJavaPlugins.onAssembly(new FlowableAmb<>(sources, null));
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/flowable/FlowableConcatTests.java
104 issues
Line: 29
public class FlowableConcatTests extends RxJavaTest {
@Test
public void concatSimple() {
Flowable<String> f1 = Flowable.just("one", "two");
Flowable<String> f2 = Flowable.just("three", "four");
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
Reported by PMD.
Line: 30
@Test
public void concatSimple() {
Flowable<String> f1 = Flowable.just("one", "two");
Flowable<String> f2 = Flowable.just("three", "four");
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
assertEquals("one", values.get(0));
Reported by PMD.
Line: 30
@Test
public void concatSimple() {
Flowable<String> f1 = Flowable.just("one", "two");
Flowable<String> f2 = Flowable.just("three", "four");
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
assertEquals("one", values.get(0));
Reported by PMD.
Line: 31
@Test
public void concatSimple() {
Flowable<String> f1 = Flowable.just("one", "two");
Flowable<String> f2 = Flowable.just("three", "four");
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
Reported by PMD.
Line: 31
@Test
public void concatSimple() {
Flowable<String> f1 = Flowable.just("one", "two");
Flowable<String> f2 = Flowable.just("three", "four");
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
Reported by PMD.
Line: 33
Flowable<String> f1 = Flowable.just("one", "two");
Flowable<String> f2 = Flowable.just("three", "four");
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
Reported by PMD.
Line: 33
Flowable<String> f1 = Flowable.just("one", "two");
Flowable<String> f2 = Flowable.just("three", "four");
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
Reported by PMD.
Line: 35
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
}
Reported by PMD.
Line: 35
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
}
Reported by PMD.
Line: 36
List<String> values = Flowable.concat(f1, f2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
}
@Test
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowTimed.java
104 issues
Line: 487
}
}
UnicastProcessor<T> createNewWindow(UnicastProcessor<T> window) {
if (window != null) {
window.onComplete();
window = null;
}
Reported by PMD.
Line: 487
}
}
UnicastProcessor<T> createNewWindow(UnicastProcessor<T> window) {
if (window != null) {
window.onComplete();
window = null;
}
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.processors.UnicastProcessor;
public final class FlowableWindowTimed<T> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
Reported by PMD.
Line: 34
public final class FlowableWindowTimed<T> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
Reported by PMD.
Line: 35
public final class FlowableWindowTimed<T> extends AbstractFlowableWithUpstream<T, Flowable<T>> {
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
Reported by PMD.
Line: 36
final long timespan;
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
public FlowableWindowTimed(Flowable<T> source,
Reported by PMD.
Line: 37
final long timeskip;
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
public FlowableWindowTimed(Flowable<T> source,
long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,
Reported by PMD.
Line: 38
final TimeUnit unit;
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
public FlowableWindowTimed(Flowable<T> source,
long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,
int bufferSize, boolean restartTimerOnMaxSize) {
Reported by PMD.
Line: 39
final Scheduler scheduler;
final long maxSize;
final int bufferSize;
final boolean restartTimerOnMaxSize;
public FlowableWindowTimed(Flowable<T> source,
long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, long maxSize,
int bufferSize, boolean restartTimerOnMaxSize) {
super(source);
Reported by PMD.
Line: 78
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = 5724293814035355511L;
final Subscriber<? super Flowable<T>> downstream;
final SimplePlainQueue<Object> queue;
final long timespan;
final TimeUnit unit;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/observable/ObservableConcatTests.java
103 issues
Line: 30
public class ObservableConcatTests extends RxJavaTest {
@Test
public void concatSimple() {
Observable<String> o1 = Observable.just("one", "two");
Observable<String> o2 = Observable.just("three", "four");
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
Reported by PMD.
Line: 31
@Test
public void concatSimple() {
Observable<String> o1 = Observable.just("one", "two");
Observable<String> o2 = Observable.just("three", "four");
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
Reported by PMD.
Line: 31
@Test
public void concatSimple() {
Observable<String> o1 = Observable.just("one", "two");
Observable<String> o2 = Observable.just("three", "four");
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
Reported by PMD.
Line: 32
@Test
public void concatSimple() {
Observable<String> o1 = Observable.just("one", "two");
Observable<String> o2 = Observable.just("three", "four");
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
Reported by PMD.
Line: 32
@Test
public void concatSimple() {
Observable<String> o1 = Observable.just("one", "two");
Observable<String> o2 = Observable.just("three", "four");
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
Reported by PMD.
Line: 34
Observable<String> o1 = Observable.just("one", "two");
Observable<String> o2 = Observable.just("three", "four");
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
Reported by PMD.
Line: 34
Observable<String> o1 = Observable.just("one", "two");
Observable<String> o2 = Observable.just("three", "four");
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
Reported by PMD.
Line: 36
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
}
Reported by PMD.
Line: 36
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
}
Reported by PMD.
Line: 37
List<String> values = Observable.concat(o1, o2).toList().blockingGet();
assertEquals("one", values.get(0));
assertEquals("two", values.get(1));
assertEquals("three", values.get(2));
assertEquals("four", values.get(3));
}
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReduceTest.java
103 issues
Line: 419
@Override
public void accept(String s) throws Exception {
count.incrementAndGet();
System.out.println("Completed with " + s);
}
})
.toFlowable();
}
}
Reported by PMD.
Line: 459
@Override
public void accept(String s) throws Exception {
count.incrementAndGet();
System.out.println("Completed with " + s);
}
})
;
}
}
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 37
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableReduceTest extends RxJavaTest {
Subscriber<Object> subscriber;
SingleObserver<Object> singleObserver;
@Before
Reported by PMD.
Line: 38
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableReduceTest extends RxJavaTest {
Subscriber<Object> subscriber;
SingleObserver<Object> singleObserver;
@Before
public void before() {
Reported by PMD.
Line: 40
public class FlowableReduceTest extends RxJavaTest {
Subscriber<Object> subscriber;
SingleObserver<Object> singleObserver;
@Before
public void before() {
subscriber = TestHelper.mockSubscriber();
singleObserver = TestHelper.mockSingleObserver();
Reported by PMD.
Line: 48
singleObserver = TestHelper.mockSingleObserver();
}
BiFunction<Integer, Integer, Integer> sum = new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer t1, Integer t2) {
return t1 + t2;
}
};
Reported by PMD.
Line: 68
result.subscribe(subscriber);
verify(subscriber).onNext(1 + 2 + 3 + 4 + 5);
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@Test
Reported by PMD.
Line: 69
result.subscribe(subscriber);
verify(subscriber).onNext(1 + 2 + 3 + 4 + 5);
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@Test
public void aggregateAsIntSumSourceThrowsFlowable() {
Reported by PMD.
Line: 70
verify(subscriber).onNext(1 + 2 + 3 + 4 + 5);
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any(Throwable.class));
}
@Test
public void aggregateAsIntSumSourceThrowsFlowable() {
Flowable<Integer> result = Flowable.concat(Flowable.just(1, 2, 3, 4, 5),
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMapTest.java
101 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 39
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableMapTest extends RxJavaTest {
Subscriber<String> stringSubscriber;
Subscriber<String> stringSubscriber2;
static final BiFunction<String, Integer, String> APPEND_INDEX = new BiFunction<String, Integer, String>() {
Reported by PMD.
Line: 41
public class FlowableMapTest extends RxJavaTest {
Subscriber<String> stringSubscriber;
Subscriber<String> stringSubscriber2;
static final BiFunction<String, Integer, String> APPEND_INDEX = new BiFunction<String, Integer, String>() {
@Override
public String apply(String value, Integer index) {
Reported by PMD.
Line: 42
public class FlowableMapTest extends RxJavaTest {
Subscriber<String> stringSubscriber;
Subscriber<String> stringSubscriber2;
static final BiFunction<String, Integer, String> APPEND_INDEX = new BiFunction<String, Integer, String>() {
@Override
public String apply(String value, Integer index) {
return value + index;
Reported by PMD.
Line: 66
Flowable<String> m = flowable.map(new Function<Map<String, String>, String>() {
@Override
public String apply(Map<String, String> map) {
return map.get("firstName");
}
});
m.subscribe(stringSubscriber);
Reported by PMD.
Line: 72
m.subscribe(stringSubscriber);
verify(stringSubscriber, never()).onError(any(Throwable.class));
verify(stringSubscriber, times(1)).onNext("OneFirst");
verify(stringSubscriber, times(1)).onNext("TwoFirst");
verify(stringSubscriber, times(1)).onComplete();
}
Reported by PMD.
Line: 73
m.subscribe(stringSubscriber);
verify(stringSubscriber, never()).onError(any(Throwable.class));
verify(stringSubscriber, times(1)).onNext("OneFirst");
verify(stringSubscriber, times(1)).onNext("TwoFirst");
verify(stringSubscriber, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 74
verify(stringSubscriber, never()).onError(any(Throwable.class));
verify(stringSubscriber, times(1)).onNext("OneFirst");
verify(stringSubscriber, times(1)).onNext("TwoFirst");
verify(stringSubscriber, times(1)).onComplete();
}
@Test
public void mapMany() {
Reported by PMD.
Line: 75
verify(stringSubscriber, never()).onError(any(Throwable.class));
verify(stringSubscriber, times(1)).onNext("OneFirst");
verify(stringSubscriber, times(1)).onNext("TwoFirst");
verify(stringSubscriber, times(1)).onComplete();
}
@Test
public void mapMany() {
/* simulate a top-level async call which returns IDs */
Reported by PMD.
Line: 89
@Override
public Flowable<String> apply(Integer id) {
/* simulate making a nested async call which creates another Flowable */
Flowable<Map<String, String>> subFlowable = null;
if (id == 1) {
Map<String, String> m1 = getMap("One");
Map<String, String> m2 = getMap("Two");
subFlowable = Flowable.just(m1, m2);
} else {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleUsingTest.java
101 issues
Line: 32
import io.reactivex.rxjava3.processors.PublishProcessor;
import io.reactivex.rxjava3.testsupport.*;
public class SingleUsingTest extends RxJavaTest {
Function<Disposable, Single<Integer>> mapper = new Function<Disposable, Single<Integer>>() {
@Override
public Single<Integer> apply(Disposable d) throws Exception {
return Single.just(1);
Reported by PMD.
Line: 34
public class SingleUsingTest extends RxJavaTest {
Function<Disposable, Single<Integer>> mapper = new Function<Disposable, Single<Integer>>() {
@Override
public Single<Integer> apply(Disposable d) throws Exception {
return Single.just(1);
}
};
Reported by PMD.
Line: 41
}
};
Function<Disposable, Single<Integer>> mapperThrows = new Function<Disposable, Single<Integer>>() {
@Override
public Single<Integer> apply(Disposable d) throws Exception {
throw new TestException("Mapper");
}
};
Reported by PMD.
Line: 48
}
};
Consumer<Disposable> disposer = new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
d.dispose();
}
};
Reported by PMD.
Line: 55
}
};
Consumer<Disposable> disposerThrows = new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
throw new TestException("Disposer");
}
};
Reported by PMD.
Line: 58
Consumer<Disposable> disposerThrows = new Consumer<Disposable>() {
@Override
public void accept(Disposable d) throws Exception {
throw new TestException("Disposer");
}
};
@Test
public void resourceSupplierThrows() {
Reported by PMD.
Line: 63
};
@Test
public void resourceSupplierThrows() {
Single.using(new Supplier<Integer>() {
@Override
public Integer get() throws Exception {
throw new TestException();
}
Reported by PMD.
Line: 75
}
@Test
public void normalEager() {
Single.using(Functions.justSupplier(1), Functions.justFunction(Single.just(1)), Functions.emptyConsumer())
.test()
.assertResult(1);
}
Reported by PMD.
Line: 76
@Test
public void normalEager() {
Single.using(Functions.justSupplier(1), Functions.justFunction(Single.just(1)), Functions.emptyConsumer())
.test()
.assertResult(1);
}
@Test
Reported by PMD.
Line: 76
@Test
public void normalEager() {
Single.using(Functions.justSupplier(1), Functions.justFunction(Single.just(1)), Functions.emptyConsumer())
.test()
.assertResult(1);
}
@Test
Reported by PMD.