The following issues were found
src/test/java/io/reactivex/rxjava3/internal/observers/FutureSingleObserverTest.java
50 issues
Line: 34
public class FutureSingleObserverTest extends RxJavaTest {
@Test
public void cancel() {
final Future<?> f = Single.never().toFuture();
assertFalse(f.isCancelled());
assertFalse(f.isDone());
Reported by PMD.
Line: 34
public class FutureSingleObserverTest extends RxJavaTest {
@Test
public void cancel() {
final Future<?> f = Single.never().toFuture();
assertFalse(f.isCancelled());
assertFalse(f.isDone());
Reported by PMD.
Line: 35
@Test
public void cancel() {
final Future<?> f = Single.never().toFuture();
assertFalse(f.isCancelled());
assertFalse(f.isDone());
f.cancel(true);
Reported by PMD.
Line: 37
public void cancel() {
final Future<?> f = Single.never().toFuture();
assertFalse(f.isCancelled());
assertFalse(f.isDone());
f.cancel(true);
assertTrue(f.isCancelled());
Reported by PMD.
Line: 37
public void cancel() {
final Future<?> f = Single.never().toFuture();
assertFalse(f.isCancelled());
assertFalse(f.isDone());
f.cancel(true);
assertTrue(f.isCancelled());
Reported by PMD.
Line: 38
final Future<?> f = Single.never().toFuture();
assertFalse(f.isCancelled());
assertFalse(f.isDone());
f.cancel(true);
assertTrue(f.isCancelled());
assertTrue(f.isDone());
Reported by PMD.
Line: 38
final Future<?> f = Single.never().toFuture();
assertFalse(f.isCancelled());
assertFalse(f.isDone());
f.cancel(true);
assertTrue(f.isCancelled());
assertTrue(f.isDone());
Reported by PMD.
Line: 40
assertFalse(f.isCancelled());
assertFalse(f.isDone());
f.cancel(true);
assertTrue(f.isCancelled());
assertTrue(f.isDone());
try {
Reported by PMD.
Line: 42
f.cancel(true);
assertTrue(f.isCancelled());
assertTrue(f.isDone());
try {
f.get();
fail("Should have thrown!");
Reported by PMD.
Line: 42
f.cancel(true);
assertTrue(f.isCancelled());
assertTrue(f.isDone());
try {
f.get();
fail("Should have thrown!");
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableToFutureTest.java
50 issues
Line: 32
public class ObservableToFutureTest extends RxJavaTest {
@Test
public void success() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Reported by PMD.
Line: 33
@Test
public void success() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Observer<Object> o = TestHelper.mockObserver();
Reported by PMD.
Line: 36
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Observer<Object> o = TestHelper.mockObserver();
TestObserver<Object> to = new TestObserver<>(o);
Reported by PMD.
Line: 36
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Observer<Object> o = TestHelper.mockObserver();
TestObserver<Object> to = new TestObserver<>(o);
Reported by PMD.
Line: 42
TestObserver<Object> to = new TestObserver<>(o);
Observable.fromFuture(future).subscribe(to);
to.dispose();
verify(o, times(1)).onNext(value);
verify(o, times(1)).onComplete();
Reported by PMD.
Line: 46
to.dispose();
verify(o, times(1)).onNext(value);
verify(o, times(1)).onComplete();
verify(o, never()).onError(any(Throwable.class));
verify(future, never()).cancel(true);
}
Reported by PMD.
Line: 47
to.dispose();
verify(o, times(1)).onNext(value);
verify(o, times(1)).onComplete();
verify(o, never()).onError(any(Throwable.class));
verify(future, never()).cancel(true);
}
@Test
Reported by PMD.
Line: 48
verify(o, times(1)).onNext(value);
verify(o, times(1)).onComplete();
verify(o, never()).onError(any(Throwable.class));
verify(future, never()).cancel(true);
}
@Test
public void successOperatesOnSuppliedScheduler() throws Exception {
Reported by PMD.
Line: 49
verify(o, times(1)).onNext(value);
verify(o, times(1)).onComplete();
verify(o, never()).onError(any(Throwable.class));
verify(future, never()).cancel(true);
}
@Test
public void successOperatesOnSuppliedScheduler() throws Exception {
@SuppressWarnings("unchecked")
Reported by PMD.
Line: 53
}
@Test
public void successOperatesOnSuppliedScheduler() throws Exception {
@SuppressWarnings("unchecked")
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
Reported by PMD.
src/test/java/io/reactivex/rxjava3/validators/TooManyEmptyNewLines.java
50 issues
Line: 86
if (fname.endsWith(".java")) {
List<String> lines = new ArrayList<>();
BufferedReader in = new BufferedReader(new FileReader(u));
try {
for (;;) {
String line = in.readLine();
if (line == null) {
break;
Reported by PMD.
Line: 51
static void findPattern(int newLines) throws Exception {
File f = TestHelper.findSource("Flowable");
if (f == null) {
System.out.println("Unable to find sources of TestHelper.findSourceDir()");
return;
}
Queue<File> dirs = new ArrayDeque<>();
Reported by PMD.
Line: 135
}
if (total != 0) {
fail.insert(0, "Found " + total + " instances\n");
System.out.println(fail);
throw new AssertionError(fail.toString());
}
}
}
Reported by PMD.
Line: 26
/**
* Test verifying there are no 2..5 empty newlines in the code.
*/
public class TooManyEmptyNewLines {
@Test
public void tooManyEmptyNewLines2() throws Exception {
findPattern(2);
}
Reported by PMD.
Line: 26
/**
* Test verifying there are no 2..5 empty newlines in the code.
*/
public class TooManyEmptyNewLines {
@Test
public void tooManyEmptyNewLines2() throws Exception {
findPattern(2);
}
Reported by PMD.
Line: 29
public class TooManyEmptyNewLines {
@Test
public void tooManyEmptyNewLines2() throws Exception {
findPattern(2);
}
@Test
public void tooManyEmptyNewLines3() throws Exception {
Reported by PMD.
Line: 29
public class TooManyEmptyNewLines {
@Test
public void tooManyEmptyNewLines2() throws Exception {
findPattern(2);
}
@Test
public void tooManyEmptyNewLines3() throws Exception {
Reported by PMD.
Line: 34
}
@Test
public void tooManyEmptyNewLines3() throws Exception {
findPattern(3);
}
@Test
public void tooManyEmptyNewLines4() throws Exception {
Reported by PMD.
Line: 34
}
@Test
public void tooManyEmptyNewLines3() throws Exception {
findPattern(3);
}
@Test
public void tooManyEmptyNewLines4() throws Exception {
Reported by PMD.
Line: 39
}
@Test
public void tooManyEmptyNewLines4() throws Exception {
findPattern(4);
}
@Test
public void tooManyEmptyNewLines5() throws Exception {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorResumeWithTest.java
50 issues
Line: 76
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Reported by PMD.
Line: 127
System.out.println("running TestObservable thread");
for (String s : values) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
System.out.println("TestObservable onComplete");
Reported by PMD.
Line: 78
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Observable<String> observable = w.onErrorResumeWith(resume);
Reported by PMD.
Line: 117
@Override
public void subscribe(final Observer<? super String> observer) {
System.out.println("TestObservable subscribed to ...");
observer.onSubscribe(upstream);
t = new Thread(new Runnable() {
@Override
public void run() {
Reported by PMD.
Line: 124
@Override
public void run() {
try {
System.out.println("running TestObservable thread");
for (String s : values) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("TestObservable onNext: " + s);
Reported by PMD.
Line: 129
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
System.out.println("TestObservable onComplete");
observer.onComplete();
} catch (Throwable e) {
Reported by PMD.
Line: 132
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
System.out.println("TestObservable onComplete");
observer.onComplete();
} catch (Throwable e) {
System.out.println("TestObservable onError: " + e);
observer.onError(e);
}
Reported by PMD.
Line: 135
System.out.println("TestObservable onComplete");
observer.onComplete();
} catch (Throwable e) {
System.out.println("TestObservable onError: " + e);
observer.onError(e);
}
}
});
Reported by PMD.
Line: 141
}
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
Reported by PMD.
Line: 143
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
@Test
public void backpressure() {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelRunOnTest.java
50 issues
Line: 36
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ParallelRunOnTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.runOn(Schedulers.computation()));
Reported by PMD.
Line: 39
public class ParallelRunOnTest extends RxJavaTest {
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.runOn(Schedulers.computation()));
}
@Test
Reported by PMD.
Line: 40
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.runOn(Schedulers.computation()));
}
@Test
public void doubleError() {
Reported by PMD.
Line: 40
@Test
public void subscriberCount() {
ParallelFlowableTest.checkSubscriberCount(Flowable.range(1, 5).parallel()
.runOn(Schedulers.computation()));
}
@Test
public void doubleError() {
Reported by PMD.
Line: 45
}
@Test
public void doubleError() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {
new ParallelInvalid()
.runOn(ImmediateThinScheduler.INSTANCE)
.sequential()
Reported by PMD.
Line: 54
.test()
.assertFailure(TestException.class);
assertFalse(errors.isEmpty());
for (Throwable ex : errors) {
assertTrue(ex.toString(), ex.getCause() instanceof TestException);
}
} finally {
RxJavaPlugins.reset();
Reported by PMD.
Line: 64
}
@Test
public void conditionalPath() {
Flowable.range(1, 1000)
.parallel(2)
.runOn(Schedulers.computation())
.filter(new Predicate<Integer>() {
@Override
Reported by PMD.
Line: 83
}
@Test
public void missingBackpressure() {
new ParallelFlowable<Integer>() {
@Override
public int parallelism() {
return 1;
}
Reported by PMD.
Line: 105
}
@Test
public void error() {
Flowable.error(new TestException())
.parallel(1)
.runOn(ImmediateThinScheduler.INSTANCE)
.sequential()
.test()
Reported by PMD.
Line: 115
}
@Test
public void errorBackpressured() {
Flowable.error(new TestException())
.parallel(1)
.runOn(ImmediateThinScheduler.INSTANCE)
.sequential(1)
.test(0)
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleFlatMapTest.java
49 issues
Line: 260
public void flatMapValueErrorThrown() {
Single.just(1).flatMap(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
throw new RuntimeException("something went terribly wrong!");
}
})
.to(TestHelper.<Integer>testConsumer())
.assertNoValues()
.assertError(RuntimeException.class)
Reported by PMD.
Line: 28
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.testsupport.*;
public class SingleFlatMapTest extends RxJavaTest {
@Test
public void normal() {
final boolean[] b = { false };
Reported by PMD.
Line: 49
.test()
.assertResult();
assertTrue(b[0]);
}
@Test
public void error() {
final boolean[] b = { false };
Reported by PMD.
Line: 71
.test()
.assertFailure(TestException.class);
assertFalse(b[0]);
}
@Test
public void mapperThrows() {
final boolean[] b = { false };
Reported by PMD.
Line: 88
.test()
.assertFailure(TestException.class);
assertFalse(b[0]);
}
@Test
public void mapperReturnsNull() {
final boolean[] b = { false };
Reported by PMD.
Line: 105
.test()
.assertFailure(NullPointerException.class);
assertFalse(b[0]);
}
@Test
public void flatMapObservable() {
Single.just(1).flatMapObservable(new Function<Integer, Observable<Integer>>() {
Reported by PMD.
Line: 109
}
@Test
public void flatMapObservable() {
Single.just(1).flatMapObservable(new Function<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> apply(Integer v) throws Exception {
return Observable.range(v, 5);
}
Reported by PMD.
Line: 121
}
@Test
public void flatMapPublisher() {
Single.just(1).flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v) throws Exception {
return Flowable.range(v, 5);
}
Reported by PMD.
Line: 133
}
@Test
public void flatMapPublisherMapperThrows() {
final TestException ex = new TestException();
Single.just(1)
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v) throws Exception {
Reported by PMD.
Line: 148
}
@Test
public void flatMapPublisherSingleError() {
final TestException ex = new TestException();
Single.<Integer>error(ex)
.flatMapPublisher(new Function<Integer, Publisher<Integer>>() {
@Override
public Publisher<Integer> apply(Integer v) throws Exception {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/core/Completable.java
49 issues
Line: 2851
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull CompletableObserver observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.core;
import java.util.*;
import java.util.concurrent.*;
import org.reactivestreams.*;
Reported by PMD.
Line: 107
*
* @see io.reactivex.rxjava3.observers.DisposableCompletableObserver
*/
public abstract class Completable implements CompletableSource {
/**
* Returns a {@code Completable} which terminates as soon as one of the source {@code Completable}s
* terminates (normally or with an error) and disposes all other {@code Completable}s.
* <p>
* <img width="640" height="518" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.ambArray.png" alt="">
Reported by PMD.
Line: 107
*
* @see io.reactivex.rxjava3.observers.DisposableCompletableObserver
*/
public abstract class Completable implements CompletableSource {
/**
* Returns a {@code Completable} which terminates as soon as one of the source {@code Completable}s
* terminates (normally or with an error) and disposes all other {@code Completable}s.
* <p>
* <img width="640" height="518" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.ambArray.png" alt="">
Reported by PMD.
Line: 107
*
* @see io.reactivex.rxjava3.observers.DisposableCompletableObserver
*/
public abstract class Completable implements CompletableSource {
/**
* Returns a {@code Completable} which terminates as soon as one of the source {@code Completable}s
* terminates (normally or with an error) and disposes all other {@code Completable}s.
* <p>
* <img width="640" height="518" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.ambArray.png" alt="">
Reported by PMD.
Line: 107
*
* @see io.reactivex.rxjava3.observers.DisposableCompletableObserver
*/
public abstract class Completable implements CompletableSource {
/**
* Returns a {@code Completable} which terminates as soon as one of the source {@code Completable}s
* terminates (normally or with an error) and disposes all other {@code Completable}s.
* <p>
* <img width="640" height="518" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.ambArray.png" alt="">
Reported by PMD.
Line: 127
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static Completable ambArray(@NonNull CompletableSource... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return complete();
}
if (sources.length == 1) {
return wrap(sources[0]);
Reported by PMD.
Line: 131
if (sources.length == 0) {
return complete();
}
if (sources.length == 1) {
return wrap(sources[0]);
}
return RxJavaPlugins.onAssembly(new CompletableAmb(sources, null));
}
Reported by PMD.
Line: 199
if (sources.length == 0) {
return complete();
} else
if (sources.length == 1) {
return wrap(sources[0]);
}
return RxJavaPlugins.onAssembly(new CompletableConcatArray(sources));
}
Reported by PMD.
Line: 223
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static Completable concatArrayDelayError(@NonNull CompletableSource... sources) {
return Flowable.fromArray(sources).concatMapCompletableDelayError(Functions.identity(), true, 2);
}
/**
* Returns a {@code Completable} which completes only when all sources complete, one after another.
* <p>
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromSupplierTest.java
49 issues
Line: 52
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
}
@Test
public void fromSupplierTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 56
}
@Test
public void fromSupplierTwice() {
final AtomicInteger atomicInteger = new AtomicInteger();
Supplier<Object> supplier = new Supplier<Object>() {
@Override
public Object get() throws Exception {
Reported by PMD.
Line: 67
}
};
Maybe.fromSupplier(supplier)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 67
}
};
Maybe.fromSupplier(supplier)
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Reported by PMD.
Line: 71
.test()
.assertResult();
assertEquals(1, atomicInteger.get());
Maybe.fromSupplier(supplier)
.test()
.assertResult();
Reported by PMD.
Line: 73
assertEquals(1, atomicInteger.get());
Maybe.fromSupplier(supplier)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 73
assertEquals(1, atomicInteger.get());
Maybe.fromSupplier(supplier)
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
Reported by PMD.
Line: 77
.test()
.assertResult();
assertEquals(2, atomicInteger.get());
}
@Test
public void fromSupplierInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Reported by PMD.
Line: 81
}
@Test
public void fromSupplierInvokesLazy() {
final AtomicInteger atomicInteger = new AtomicInteger();
Maybe<Object> completable = Maybe.fromSupplier(new Supplier<Object>() {
@Override
public Object get() throws Exception {
Reported by PMD.
Line: 92
}
});
assertEquals(0, atomicInteger.get());
completable
.test()
.assertResult();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeMergeArrayTest.java
49 issues
Line: 33
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.*;
public class MaybeMergeArrayTest extends RxJavaTest {
@Test
public void normal() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>().setInitialFusionMode(QueueFuseable.SYNC);
Reported by PMD.
Line: 36
public class MaybeMergeArrayTest extends RxJavaTest {
@Test
public void normal() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>().setInitialFusionMode(QueueFuseable.SYNC);
Maybe.mergeArray(Maybe.just(1), Maybe.just(2))
.subscribe(ts);
ts
Reported by PMD.
Line: 39
public void normal() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>().setInitialFusionMode(QueueFuseable.SYNC);
Maybe.mergeArray(Maybe.just(1), Maybe.just(2))
.subscribe(ts);
ts
.assertFuseable()
.assertFusionMode(QueueFuseable.NONE)
.assertResult(1, 2);
Reported by PMD.
Line: 41
Maybe.mergeArray(Maybe.just(1), Maybe.just(2))
.subscribe(ts);
ts
.assertFuseable()
.assertFusionMode(QueueFuseable.NONE)
.assertResult(1, 2);
}
Reported by PMD.
Line: 41
Maybe.mergeArray(Maybe.just(1), Maybe.just(2))
.subscribe(ts);
ts
.assertFuseable()
.assertFusionMode(QueueFuseable.NONE)
.assertResult(1, 2);
}
Reported by PMD.
Line: 48
}
@Test
public void fusedPollMixed() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>().setInitialFusionMode(QueueFuseable.ANY);
Maybe.mergeArray(Maybe.just(1), Maybe.<Integer>empty(), Maybe.just(2))
.subscribe(ts);
ts
Reported by PMD.
Line: 51
public void fusedPollMixed() {
TestSubscriberEx<Integer> ts = new TestSubscriberEx<Integer>().setInitialFusionMode(QueueFuseable.ANY);
Maybe.mergeArray(Maybe.just(1), Maybe.<Integer>empty(), Maybe.just(2))
.subscribe(ts);
ts
.assertFuseable()
.assertFusionMode(QueueFuseable.ASYNC)
.assertResult(1, 2);
Reported by PMD.
Line: 53
Maybe.mergeArray(Maybe.just(1), Maybe.<Integer>empty(), Maybe.just(2))
.subscribe(ts);
ts
.assertFuseable()
.assertFusionMode(QueueFuseable.ASYNC)
.assertResult(1, 2);
}
Reported by PMD.
Line: 53
Maybe.mergeArray(Maybe.just(1), Maybe.<Integer>empty(), Maybe.just(2))
.subscribe(ts);
ts
.assertFuseable()
.assertFusionMode(QueueFuseable.ASYNC)
.assertResult(1, 2);
}
Reported by PMD.
Line: 61
@SuppressWarnings("unchecked")
@Test
public void fusedEmptyCheck() {
Maybe.mergeArray(Maybe.just(1), Maybe.<Integer>empty(), Maybe.just(2))
.subscribe(new FlowableSubscriber<Integer>() {
QueueSubscription<Integer> qs;
@Override
public void onSubscribe(Subscription s) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableOnErrorReturnTest.java
49 issues
Line: 82
@Override
public String apply(Throwable e) {
capturedException.set(e);
throw new RuntimeException("exception from function");
}
});
Observer<String> observer = TestHelper.mockObserver();
Reported by PMD.
Line: 116
@Override
public String apply(String s) {
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Reported by PMD.
Line: 203
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
throw new RuntimeException("Forced Failure");
} catch (Throwable e) {
observer.onError(e);
}
}
Reported by PMD.
Line: 118
if ("fail".equals(s)) {
throw new RuntimeException("Forced Failure");
}
System.out.println("BadMapper:" + s);
return s;
}
});
Observable<String> observable = w.onErrorReturn(new Function<Throwable, String>() {
Reported by PMD.
Line: 192
@Override
public void subscribe(final Observer<? super String> observer) {
observer.onSubscribe(Disposable.empty());
System.out.println("TestObservable subscribed to ...");
t = new Thread(new Runnable() {
@Override
public void run() {
try {
Reported by PMD.
Line: 198
@Override
public void run() {
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
throw new RuntimeException("Forced Failure");
Reported by PMD.
Line: 200
try {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
}
throw new RuntimeException("Forced Failure");
} catch (Throwable e) {
observer.onError(e);
Reported by PMD.
Line: 210
}
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
Reported by PMD.
Line: 212
});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
}
}
@Test
public void returnItem() {
Reported by PMD.
Line: 38
@Test
public void resumeNext() {
TestObservable f = new TestObservable("one");
Observable<String> w = Observable.unsafeCreate(f);
final AtomicReference<Throwable> capturedException = new AtomicReference<>();
Observable<String> observable = w.onErrorReturn(new Function<Throwable, String>() {
Reported by PMD.