The following issues were found
src/test/java/io/reactivex/rxjava3/parallel/ParallelSortedJoinTest.java
57 issues
Line: 34
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ParallelSortedJoinTest extends RxJavaTest {
@Test
public void cancel() {
PublishProcessor<Integer> pp = PublishProcessor.create();
Reported by PMD.
Line: 37
public class ParallelSortedJoinTest extends RxJavaTest {
@Test
public void cancel() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = pp
.parallel()
.sorted(Functions.<Integer>naturalComparator())
Reported by PMD.
Line: 40
public void cancel() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = pp
.parallel()
.sorted(Functions.<Integer>naturalComparator())
.test();
assertTrue(pp.hasSubscribers());
Reported by PMD.
Line: 40
public void cancel() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = pp
.parallel()
.sorted(Functions.<Integer>naturalComparator())
.test();
assertTrue(pp.hasSubscribers());
Reported by PMD.
Line: 40
public void cancel() {
PublishProcessor<Integer> pp = PublishProcessor.create();
TestSubscriber<Integer> ts = pp
.parallel()
.sorted(Functions.<Integer>naturalComparator())
.test();
assertTrue(pp.hasSubscribers());
Reported by PMD.
Line: 45
.sorted(Functions.<Integer>naturalComparator())
.test();
assertTrue(pp.hasSubscribers());
ts.cancel();
assertFalse(pp.hasSubscribers());
}
Reported by PMD.
Line: 45
.sorted(Functions.<Integer>naturalComparator())
.test();
assertTrue(pp.hasSubscribers());
ts.cancel();
assertFalse(pp.hasSubscribers());
}
Reported by PMD.
Line: 47
assertTrue(pp.hasSubscribers());
ts.cancel();
assertFalse(pp.hasSubscribers());
}
@Test
Reported by PMD.
Line: 49
ts.cancel();
assertFalse(pp.hasSubscribers());
}
@Test
public void error() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Reported by PMD.
Line: 49
ts.cancel();
assertFalse(pp.hasSubscribers());
}
@Test
public void error() {
List<Throwable> errors = TestHelper.trackPluginErrors();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTest.java
56 issues
Line: 34
import io.reactivex.rxjava3.schedulers.TestScheduler;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableThrottleFirstTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
Reported by PMD.
Line: 36
public class FlowableThrottleFirstTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
@Before
public void before() {
Reported by PMD.
Line: 37
public class FlowableThrottleFirstTest extends RxJavaTest {
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
@Before
public void before() {
scheduler = new TestScheduler();
Reported by PMD.
Line: 38
private TestScheduler scheduler;
private Scheduler.Worker innerScheduler;
private Subscriber<String> subscriber;
@Before
public void before() {
scheduler = new TestScheduler();
innerScheduler = scheduler.createWorker();
Reported by PMD.
Line: 53
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new BooleanSubscription());
publishNext(subscriber, 100, "one"); // publish as it's first
publishNext(subscriber, 300, "two"); // skip as it's last within the first 400
publishNext(subscriber, 900, "three"); // publish
publishNext(subscriber, 905, "four"); // skip
publishCompleted(subscriber, 1000); // Should be published as soon as the timeout expires.
}
Reported by PMD.
Line: 62
});
Flowable<String> sampled = source.throttleFirst(400, TimeUnit.MILLISECONDS, scheduler);
sampled.subscribe(subscriber);
InOrder inOrder = inOrder(subscriber);
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext("one");
Reported by PMD.
Line: 67
InOrder inOrder = inOrder(subscriber);
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext("one");
inOrder.verify(subscriber, times(0)).onNext("two");
inOrder.verify(subscriber, times(1)).onNext("three");
inOrder.verify(subscriber, times(0)).onNext("four");
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
Reported by PMD.
Line: 67
InOrder inOrder = inOrder(subscriber);
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext("one");
inOrder.verify(subscriber, times(0)).onNext("two");
inOrder.verify(subscriber, times(1)).onNext("three");
inOrder.verify(subscriber, times(0)).onNext("four");
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
Reported by PMD.
Line: 68
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext("one");
inOrder.verify(subscriber, times(0)).onNext("two");
inOrder.verify(subscriber, times(1)).onNext("three");
inOrder.verify(subscriber, times(0)).onNext("four");
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
Reported by PMD.
Line: 68
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(subscriber, times(1)).onNext("one");
inOrder.verify(subscriber, times(0)).onNext("two");
inOrder.verify(subscriber, times(1)).onNext("three");
inOrder.verify(subscriber, times(0)).onNext("four");
inOrder.verify(subscriber, times(1)).onComplete();
inOrder.verifyNoMoreInteractions();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFromSourceTest.java
56 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import java.util.List;
import org.junit.*;
import org.reactivestreams.Subscription;
Reported by PMD.
Line: 29
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableFromSourceTest extends RxJavaTest {
PublishAsyncEmitter source;
PublishAsyncEmitterNoCancel sourceNoCancel;
Reported by PMD.
Line: 31
public class FlowableFromSourceTest extends RxJavaTest {
PublishAsyncEmitter source;
PublishAsyncEmitterNoCancel sourceNoCancel;
TestSubscriberEx<Integer> ts;
Reported by PMD.
Line: 33
PublishAsyncEmitter source;
PublishAsyncEmitterNoCancel sourceNoCancel;
TestSubscriberEx<Integer> ts;
@Before
public void before() {
Reported by PMD.
Line: 35
PublishAsyncEmitterNoCancel sourceNoCancel;
TestSubscriberEx<Integer> ts;
@Before
public void before() {
source = new PublishAsyncEmitter();
sourceNoCancel = new PublishAsyncEmitterNoCancel();
Reported by PMD.
Line: 46
@Test
public void normalBuffered() {
Flowable.create(source, BackpressureStrategy.BUFFER).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onComplete();
Reported by PMD.
Line: 67
@Test
public void normalDrop() {
Flowable.create(source, BackpressureStrategy.DROP).subscribe(ts);
source.onNext(1);
ts.request(1);
Reported by PMD.
Line: 85
@Test
public void normalLatest() {
Flowable.create(source, BackpressureStrategy.LATEST).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onComplete();
Reported by PMD.
Line: 103
@Test
public void normalMissing() {
Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
source.onNext(1);
source.onNext(2);
source.onComplete();
Reported by PMD.
Line: 116
@Test
public void normalMissingRequested() {
Flowable.create(source, BackpressureStrategy.MISSING).subscribe(ts);
ts.request(2);
source.onNext(1);
source.onNext(2);
source.onComplete();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableMergeWithMaybe.java
56 issues
Line: 37
*/
public final class FlowableMergeWithMaybe<T> extends AbstractFlowableWithUpstream<T, T> {
final MaybeSource<? extends T> other;
public FlowableMergeWithMaybe(Flowable<T> source, MaybeSource<? extends T> other) {
super(source);
this.other = other;
}
Reported by PMD.
Line: 52
other.subscribe(parent.otherObserver);
}
static final class MergeWithObserver<T> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -4592979584110982903L;
final Subscriber<? super T> downstream;
Reported by PMD.
Line: 52
other.subscribe(parent.otherObserver);
}
static final class MergeWithObserver<T> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -4592979584110982903L;
final Subscriber<? super T> downstream;
Reported by PMD.
Line: 53
}
static final class MergeWithObserver<T> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
private static final long serialVersionUID = -4592979584110982903L;
final Subscriber<? super T> downstream;
Reported by PMD.
Line: 57
private static final long serialVersionUID = -4592979584110982903L;
final Subscriber<? super T> downstream;
final AtomicReference<Subscription> mainSubscription;
final OtherObserver<T> otherObserver;
Reported by PMD.
Line: 59
final Subscriber<? super T> downstream;
final AtomicReference<Subscription> mainSubscription;
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
Reported by PMD.
Line: 61
final AtomicReference<Subscription> mainSubscription;
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
final AtomicLong requested;
Reported by PMD.
Line: 63
final OtherObserver<T> otherObserver;
final AtomicThrowable errors;
final AtomicLong requested;
final int prefetch;
Reported by PMD.
Line: 65
final AtomicThrowable errors;
final AtomicLong requested;
final int prefetch;
final int limit;
Reported by PMD.
Line: 67
final AtomicLong requested;
final int prefetch;
final int limit;
volatile SimplePlainQueue<T> queue;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableTakeLastTest.java
56 issues
Line: 33
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableTakeLastTest extends RxJavaTest {
@Test
public void takeLastEmpty() {
Observable<String> w = Observable.empty();
Observable<String> take = w.takeLast(2);
Reported by PMD.
Line: 38
@Test
public void takeLastEmpty() {
Observable<String> w = Observable.empty();
Observable<String> take = w.takeLast(2);
Observer<String> observer = TestHelper.mockObserver();
take.subscribe(observer);
verify(observer, never()).onNext(any(String.class));
verify(observer, never()).onError(any(Throwable.class));
Reported by PMD.
Line: 41
Observable<String> take = w.takeLast(2);
Observer<String> observer = TestHelper.mockObserver();
take.subscribe(observer);
verify(observer, never()).onNext(any(String.class));
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
Line: 42
Observer<String> observer = TestHelper.mockObserver();
take.subscribe(observer);
verify(observer, never()).onNext(any(String.class));
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
Reported by PMD.
Line: 43
Observer<String> observer = TestHelper.mockObserver();
take.subscribe(observer);
verify(observer, never()).onNext(any(String.class));
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void takeLast1() {
Reported by PMD.
Line: 44
take.subscribe(observer);
verify(observer, never()).onNext(any(String.class));
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
@Test
public void takeLast1() {
Observable<String> w = Observable.just("one", "two", "three");
Reported by PMD.
Line: 49
@Test
public void takeLast1() {
Observable<String> w = Observable.just("one", "two", "three");
Observable<String> take = w.takeLast(2);
Observer<String> observer = TestHelper.mockObserver();
InOrder inOrder = inOrder(observer);
take.subscribe(observer);
Reported by PMD.
Line: 50
@Test
public void takeLast1() {
Observable<String> w = Observable.just("one", "two", "three");
Observable<String> take = w.takeLast(2);
Observer<String> observer = TestHelper.mockObserver();
InOrder inOrder = inOrder(observer);
take.subscribe(observer);
inOrder.verify(observer, times(1)).onNext("two");
Reported by PMD.
Line: 54
Observer<String> observer = TestHelper.mockObserver();
InOrder inOrder = inOrder(observer);
take.subscribe(observer);
inOrder.verify(observer, times(1)).onNext("two");
inOrder.verify(observer, times(1)).onNext("three");
verify(observer, never()).onNext("one");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
Reported by PMD.
Line: 55
Observer<String> observer = TestHelper.mockObserver();
InOrder inOrder = inOrder(observer);
take.subscribe(observer);
inOrder.verify(observer, times(1)).onNext("two");
inOrder.verify(observer, times(1)).onNext("three");
verify(observer, never()).onNext("one");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onComplete();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/validators/NoAnonymousInnerClassesTest.java
55 issues
Line: 78
boolean found = false;
FileInputStream fin = new FileInputStream(f);
try {
byte[] data = new byte[fin.available()];
fin.read(data);
String content = new String(data, "ISO-8859-1");
Reported by PMD.
Line: 31
String fs = f.toString().toLowerCase().replace("\\", "/");
System.out.println("Found " + fs);
// running this particular test from IntelliJ will have the wrong class directory
// gradle will generate test classes into a separate directory too
int idx = fs.indexOf("/test");
if (idx >= 0) {
Reported by PMD.
Line: 22
import org.junit.Test;
public class NoAnonymousInnerClassesTest {
@Test
public void verify() throws Exception {
URL u = NoAnonymousInnerClassesTest.class.getResource("/");
File f = new File(u.toURI());
Reported by PMD.
Line: 22
import org.junit.Test;
public class NoAnonymousInnerClassesTest {
@Test
public void verify() throws Exception {
URL u = NoAnonymousInnerClassesTest.class.getResource("/");
File f = new File(u.toURI());
Reported by PMD.
Line: 25
public class NoAnonymousInnerClassesTest {
@Test
public void verify() throws Exception {
URL u = NoAnonymousInnerClassesTest.class.getResource("/");
File f = new File(u.toURI());
String fs = f.toString().toLowerCase().replace("\\", "/");
Reported by PMD.
Line: 25
public class NoAnonymousInnerClassesTest {
@Test
public void verify() throws Exception {
URL u = NoAnonymousInnerClassesTest.class.getResource("/");
File f = new File(u.toURI());
String fs = f.toString().toLowerCase().replace("\\", "/");
Reported by PMD.
Line: 25
public class NoAnonymousInnerClassesTest {
@Test
public void verify() throws Exception {
URL u = NoAnonymousInnerClassesTest.class.getResource("/");
File f = new File(u.toURI());
String fs = f.toString().toLowerCase().replace("\\", "/");
Reported by PMD.
Line: 25
public class NoAnonymousInnerClassesTest {
@Test
public void verify() throws Exception {
URL u = NoAnonymousInnerClassesTest.class.getResource("/");
File f = new File(u.toURI());
String fs = f.toString().toLowerCase().replace("\\", "/");
Reported by PMD.
Line: 25
public class NoAnonymousInnerClassesTest {
@Test
public void verify() throws Exception {
URL u = NoAnonymousInnerClassesTest.class.getResource("/");
File f = new File(u.toURI());
String fs = f.toString().toLowerCase().replace("\\", "/");
Reported by PMD.
Line: 25
public class NoAnonymousInnerClassesTest {
@Test
public void verify() throws Exception {
URL u = NoAnonymousInnerClassesTest.class.getResource("/");
File f = new File(u.toURI());
String fs = f.toString().toLowerCase().replace("\\", "/");
Reported by PMD.
src/test/java/io/reactivex/rxjava3/validators/NewLinesBeforeAnnotation.java
55 issues
Line: 106
if (fname.endsWith(".java")) {
List<String> lines = new ArrayList<>();
BufferedReader in = new BufferedReader(new FileReader(u));
try {
for (;;) {
String line = in.readLine();
if (line == null) {
break;
Reported by PMD.
Line: 71
static void findPattern(int newLines) throws Exception {
File f = TestHelper.findSource("Flowable");
if (f == null) {
System.out.println("Unable to find sources of RxJava");
return;
}
Queue<File> dirs = new ArrayDeque<>();
Reported by PMD.
Line: 160
fail.append("Found ")
.append(total)
.append(" instances");
System.out.println(fail);
throw new AssertionError(fail.toString());
}
}
}
Reported by PMD.
Line: 41
* @Override
* </code></pre>
*/
public class NewLinesBeforeAnnotation {
@Test
public void missingEmptyNewLine() throws Exception {
findPattern(0);
}
Reported by PMD.
Line: 41
* @Override
* </code></pre>
*/
public class NewLinesBeforeAnnotation {
@Test
public void missingEmptyNewLine() throws Exception {
findPattern(0);
}
Reported by PMD.
Line: 44
public class NewLinesBeforeAnnotation {
@Test
public void missingEmptyNewLine() throws Exception {
findPattern(0);
}
@Test
public void tooManyEmptyNewLines2() throws Exception {
Reported by PMD.
Line: 44
public class NewLinesBeforeAnnotation {
@Test
public void missingEmptyNewLine() throws Exception {
findPattern(0);
}
@Test
public void tooManyEmptyNewLines2() throws Exception {
Reported by PMD.
Line: 49
}
@Test
public void tooManyEmptyNewLines2() throws Exception {
findPattern(2);
}
@Test
public void tooManyEmptyNewLines3() throws Exception {
Reported by PMD.
Line: 49
}
@Test
public void tooManyEmptyNewLines2() throws Exception {
findPattern(2);
}
@Test
public void tooManyEmptyNewLines3() throws Exception {
Reported by PMD.
Line: 54
}
@Test
public void tooManyEmptyNewLines3() throws Exception {
findPattern(3);
}
@Test
public void tooManyEmptyNewLines4() throws Exception {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableMergeWithCompletableTest.java
55 issues
Line: 31
public class ObservableMergeWithCompletableTest extends RxJavaTest {
@Test
public void normal() {
final TestObserver<Integer> to = new TestObserver<>();
Observable.range(1, 5).mergeWith(
Completable.fromAction(new Action() {
@Override
Reported by PMD.
Line: 48
}
@Test
public void take() {
final TestObserver<Integer> to = new TestObserver<>();
Observable.range(1, 5).mergeWith(
Completable.complete()
)
Reported by PMD.
Line: 51
public void take() {
final TestObserver<Integer> to = new TestObserver<>();
Observable.range(1, 5).mergeWith(
Completable.complete()
)
.take(3)
.subscribe(to);
Reported by PMD.
Line: 51
public void take() {
final TestObserver<Integer> to = new TestObserver<>();
Observable.range(1, 5).mergeWith(
Completable.complete()
)
.take(3)
.subscribe(to);
Reported by PMD.
Line: 51
public void take() {
final TestObserver<Integer> to = new TestObserver<>();
Observable.range(1, 5).mergeWith(
Completable.complete()
)
.take(3)
.subscribe(to);
Reported by PMD.
Line: 61
}
@Test
public void cancel() {
final PublishSubject<Integer> ps = PublishSubject.create();
final CompletableSubject cs = CompletableSubject.create();
TestObserver<Integer> to = ps.mergeWith(cs).test();
Reported by PMD.
Line: 65
final PublishSubject<Integer> ps = PublishSubject.create();
final CompletableSubject cs = CompletableSubject.create();
TestObserver<Integer> to = ps.mergeWith(cs).test();
assertTrue(ps.hasObservers());
assertTrue(cs.hasObservers());
to.dispose();
Reported by PMD.
Line: 65
final PublishSubject<Integer> ps = PublishSubject.create();
final CompletableSubject cs = CompletableSubject.create();
TestObserver<Integer> to = ps.mergeWith(cs).test();
assertTrue(ps.hasObservers());
assertTrue(cs.hasObservers());
to.dispose();
Reported by PMD.
Line: 67
TestObserver<Integer> to = ps.mergeWith(cs).test();
assertTrue(ps.hasObservers());
assertTrue(cs.hasObservers());
to.dispose();
assertFalse(ps.hasObservers());
Reported by PMD.
Line: 67
TestObserver<Integer> to = ps.mergeWith(cs).test();
assertTrue(ps.hasObservers());
assertTrue(cs.hasObservers());
to.dispose();
assertFalse(ps.hasObservers());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSubscribeOnTest.java
55 issues
Line: 202
Flowable.range(1, 10000000).subscribeOn(Schedulers.newThread()).take(20).subscribe(ts);
latch.await();
Thread t = ts.lastThread();
System.out.println("First schedule: " + t);
assertTrue(t.getName().startsWith("Rx"));
ts.request(10);
ts.awaitDone(20, TimeUnit.SECONDS);
System.out.println("After reschedule: " + ts.lastThread());
assertEquals(t, ts.lastThread());
Reported by PMD.
Line: 206
assertTrue(t.getName().startsWith("Rx"));
ts.request(10);
ts.awaitDone(20, TimeUnit.SECONDS);
System.out.println("After reschedule: " + ts.lastThread());
assertEquals(t, ts.lastThread());
}
@Test
public void setProducerSynchronousRequest() {
Reported by PMD.
Line: 36
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableSubscribeOnTest extends RxJavaTest {
@Test
public void issue813() throws InterruptedException {
// https://github.com/ReactiveX/RxJava/issues/813
final CountDownLatch scheduled = new CountDownLatch(1);
Reported by PMD.
Line: 39
public class FlowableSubscribeOnTest extends RxJavaTest {
@Test
public void issue813() throws InterruptedException {
// https://github.com/ReactiveX/RxJava/issues/813
final CountDownLatch scheduled = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch doneLatch = new CountDownLatch(1);
Reported by PMD.
Line: 63
}
subscriber.onComplete();
} catch (Throwable e) {
subscriber.onError(e);
} finally {
doneLatch.countDown();
}
}
Reported by PMD.
Line: 82
}
@Test
public void onError() {
TestSubscriberEx<String> ts = new TestSubscriberEx<>();
Flowable.unsafeCreate(new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> s) {
Reported by PMD.
Line: 98
}
public static class SlowScheduler extends Scheduler {
final Scheduler actual;
final long delay;
final TimeUnit unit;
public SlowScheduler() {
this(Schedulers.computation(), 2, TimeUnit.SECONDS);
Reported by PMD.
Line: 99
public static class SlowScheduler extends Scheduler {
final Scheduler actual;
final long delay;
final TimeUnit unit;
public SlowScheduler() {
this(Schedulers.computation(), 2, TimeUnit.SECONDS);
}
Reported by PMD.
Line: 100
public static class SlowScheduler extends Scheduler {
final Scheduler actual;
final long delay;
final TimeUnit unit;
public SlowScheduler() {
this(Schedulers.computation(), 2, TimeUnit.SECONDS);
}
Reported by PMD.
Line: 120
private final class SlowInner extends Worker {
private final Scheduler.Worker actualInner;
private SlowInner(Worker actual) {
this.actualInner = actual;
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableUnsubscribeOnTest.java
55 issues
Line: 184
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException("failed to initialize and get inner thread");
}
}
@NonNull
@Override
Reported by PMD.
Line: 75
assertNotSame(Thread.currentThread(), subscribeThread.get());
// True for Schedulers.newThread()
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
ts.assertValues(1, 2);
ts.assertTerminated();
Reported by PMD.
Line: 76
// True for Schedulers.newThread()
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
ts.assertValues(1, 2);
ts.assertTerminated();
} finally {
Reported by PMD.
Line: 124
assertNotSame(Thread.currentThread(), subscribeThread.get());
// True for Schedulers.newThread()
System.out.println("UI Thread: " + uiEventLoop.getThread());
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
ts.assertValues(1, 2);
Reported by PMD.
Line: 125
// True for Schedulers.newThread()
System.out.println("UI Thread: " + uiEventLoop.getThread());
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
ts.assertValues(1, 2);
ts.assertTerminated();
Reported by PMD.
Line: 126
System.out.println("UI Thread: " + uiEventLoop.getThread());
System.out.println("unsubscribeThread: " + unsubscribeThread);
System.out.println("subscribeThread.get(): " + subscribeThread.get());
assertSame(unsubscribeThread, uiEventLoop.getThread());
ts.assertValues(1, 2);
ts.assertTerminated();
} finally {
Reported by PMD.
Line: 143
@Override
public void cancel() {
System.out.println("unsubscribe invoked: " + Thread.currentThread());
thread = Thread.currentThread();
latch.countDown();
}
public Thread getThread() throws InterruptedException {
Reported by PMD.
Line: 38
public class FlowableUnsubscribeOnTest extends RxJavaTest {
@Test
public void unsubscribeWhenSubscribeOnAndUnsubscribeOnAreOnSameThread() throws InterruptedException {
UIEventLoopScheduler uiEventLoop = new UIEventLoopScheduler();
try {
final ThreadSubscription subscription = new ThreadSubscription();
final AtomicReference<Thread> subscribeThread = new AtomicReference<>();
Flowable<Integer> w = Flowable.unsafeCreate(new Publisher<Integer>() {
Reported by PMD.
Line: 59
});
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
w.subscribeOn(uiEventLoop).observeOn(Schedulers.computation())
.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(ts);
ts.awaitDone(1, TimeUnit.SECONDS);
Reported by PMD.
Line: 59
});
TestSubscriberEx<Integer> ts = new TestSubscriberEx<>();
w.subscribeOn(uiEventLoop).observeOn(Schedulers.computation())
.unsubscribeOn(uiEventLoop)
.take(2)
.subscribe(ts);
ts.awaitDone(1, TimeUnit.SECONDS);
Reported by PMD.