The following issues were found
src/test/java/io/reactivex/rxjava3/plugins/RxJavaPluginsTest.java
430 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.plugins;
import static org.junit.Assert.*;
import java.io.*;
import java.lang.Thread.UncaughtExceptionHandler;
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.plugins;
import static org.junit.Assert.*;
import java.io.*;
import java.lang.Thread.UncaughtExceptionHandler;
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.plugins;
import static org.junit.Assert.*;
import java.io.*;
import java.lang.Thread.UncaughtExceptionHandler;
Reported by PMD.
Line: 51
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class RxJavaPluginsTest extends RxJavaTest {
@Test
public void constructorShouldBePrivate() {
TestHelper.checkUtilityClass(RxJavaPlugins.class);
}
Reported by PMD.
Line: 51
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class RxJavaPluginsTest extends RxJavaTest {
@Test
public void constructorShouldBePrivate() {
TestHelper.checkUtilityClass(RxJavaPlugins.class);
}
Reported by PMD.
Line: 51
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class RxJavaPluginsTest extends RxJavaTest {
@Test
public void constructorShouldBePrivate() {
TestHelper.checkUtilityClass(RxJavaPlugins.class);
}
Reported by PMD.
Line: 51
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class RxJavaPluginsTest extends RxJavaTest {
@Test
public void constructorShouldBePrivate() {
TestHelper.checkUtilityClass(RxJavaPlugins.class);
}
Reported by PMD.
Line: 51
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class RxJavaPluginsTest extends RxJavaTest {
@Test
public void constructorShouldBePrivate() {
TestHelper.checkUtilityClass(RxJavaPlugins.class);
}
Reported by PMD.
Line: 51
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class RxJavaPluginsTest extends RxJavaTest {
@Test
public void constructorShouldBePrivate() {
TestHelper.checkUtilityClass(RxJavaPlugins.class);
}
Reported by PMD.
Line: 54
public class RxJavaPluginsTest extends RxJavaTest {
@Test
public void constructorShouldBePrivate() {
TestHelper.checkUtilityClass(RxJavaPlugins.class);
}
@SuppressWarnings({ "rawtypes" })
@Test
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupByTest.java
416 issues
Line: 1075
return new Function<Integer, T>() {
@Override
public T apply(Integer t1) {
throw new RuntimeException("Forced failure");
}
};
}
<T, R> Function<T, R> fail2(R dummy2) {
Reported by PMD.
Line: 1084
return new Function<T, R>() {
@Override
public R apply(T t1) {
throw new RuntimeException("Forced failure");
}
};
}
Function<Integer, Integer> dbl = new Function<Integer, Integer>() {
Reported by PMD.
Line: 140
@Override
public void onNext(String v) {
eventCounter.incrementAndGet();
System.out.println(v);
}
});
assertEquals(3, groupCounter.get());
Reported by PMD.
Line: 193
@Override
public void subscribe(final Observer<? super Event> observer) {
observer.onSubscribe(Disposable.empty());
System.out.println("*** Subscribing to EventStream ***");
subscribeCounter.incrementAndGet();
new Thread(new Runnable() {
@Override
public void run() {
Reported by PMD.
Line: 223
@Override
public Observable<String> apply(GroupedObservable<Integer, Event> eventGroupedObservable) {
System.out.println("GroupedObservable Key: " + eventGroupedObservable.getKey());
groupCounter.incrementAndGet();
return eventGroupedObservable.map(new Function<Event, String>() {
@Override
Reported by PMD.
Line: 250
@Override
public void onNext(String outputMessage) {
System.out.println(outputMessage);
eventCounter.incrementAndGet();
}
});
latch.await(5000, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 303
@Override
public Observable<String> apply(GroupedObservable<Integer, Event> eventGroupedObservable) {
System.out.println("testUnsubscribe => GroupedObservable Key: " + eventGroupedObservable.getKey());
groupCounter.incrementAndGet();
return eventGroupedObservable
.take(20) // limit to only 20 events on this group
.map(new Function<Event, String>() {
Reported by PMD.
Line: 332
@Override
public void onNext(String outputMessage) {
System.out.println(outputMessage);
eventCounter.incrementAndGet();
}
});
if (!latch.await(2000, TimeUnit.MILLISECONDS)) {
Reported by PMD.
Line: 384
@Override
public void accept(String s) {
eventCounter.incrementAndGet();
System.out.println("=> " + s);
}
});
assertEquals(30, eventCounter.get());
Reported by PMD.
Line: 437
@Override
public void accept(String s) {
eventCounter.incrementAndGet();
System.out.println("=> " + s);
}
});
assertEquals(15, eventCounter.get());
Reported by PMD.
src/test/java/io/reactivex/rxjava3/validators/JavadocWording.java
413 issues
Line: 220
}
if (e.length() != 0) {
System.out.println(e);
fail(e.toString());
}
}
Reported by PMD.
Line: 363
}
if (e.length() != 0) {
System.out.println(e);
fail(e.toString());
}
}
Reported by PMD.
Line: 505
}
if (e.length() != 0) {
System.out.println(e);
fail(e.toString());
}
}
Reported by PMD.
Line: 608
}
if (e.length() != 0) {
System.out.println(e);
fail(e.toString());
}
}
Reported by PMD.
Line: 784
}
if (e.length() != 0) {
System.out.println(e);
fail(e.toString());
}
}
Reported by PMD.
Line: 975
}
if (e.length() != 0) {
System.out.println(e);
fail(e.toString());
}
}
Reported by PMD.
Line: 29
/**
* Check if the method wording is consistent with the target base type.
*/
public class JavadocWording {
public static int lineNumber(CharSequence s, int index) {
int cnt = 1;
for (int i = 0; i < index; i++) {
if (s.charAt(i) == '\n') {
Reported by PMD.
Line: 29
/**
* Check if the method wording is consistent with the target base type.
*/
public class JavadocWording {
public static int lineNumber(CharSequence s, int index) {
int cnt = 1;
for (int i = 0; i < index; i++) {
if (s.charAt(i) == '\n') {
Reported by PMD.
Line: 29
/**
* Check if the method wording is consistent with the target base type.
*/
public class JavadocWording {
public static int lineNumber(CharSequence s, int index) {
int cnt = 1;
for (int i = 0; i < index; i++) {
if (s.charAt(i) == '\n') {
Reported by PMD.
Line: 29
/**
* Check if the method wording is consistent with the target base type.
*/
public class JavadocWording {
public static int lineNumber(CharSequence s, int index) {
int cnt = 1;
for (int i = 0; i < index; i++) {
if (s.charAt(i) == '\n') {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRetryTest.java
410 issues
Line: 55
@Override
public void subscribe(Subscriber<? super String> t1) {
t1.onSubscribe(new BooleanSubscription());
System.out.println(count.get() + " @ " + String.valueOf(last - System.currentTimeMillis()));
last = System.currentTimeMillis();
if (count.getAndDecrement() == 0) {
t1.onNext("hello");
t1.onComplete();
} else {
Reported by PMD.
Line: 86
.flatMap(new Function<Tuple, Flowable<Object>>() {
@Override
public Flowable<Object> apply(Tuple t) {
System.out.println("Retry # " + t.count);
return t.count > 20 ?
Flowable.<Object>error(t.n) :
Flowable.timer(t.count * 1L, TimeUnit.MILLISECONDS)
.cast(Object.class);
}});
Reported by PMD.
Line: 750
try {
for (int r = 0; r < NUM_LOOPS; r++) {
if (r % 10 == 0) {
System.out.println("testRetryWithBackpressureParallelLoop -> " + r);
}
final AtomicInteger timeouts = new AtomicInteger();
final Map<Integer, List<String>> data = new ConcurrentHashMap<>();
Reported by PMD.
Line: 782
}
} catch (Throwable t) {
timeouts.incrementAndGet();
System.out.println(j + " | " + cdl.getCount() + " !!! " + nexts.get());
}
cdl.countDown();
}
});
}
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.flowable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 41
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Subscriber<String> consumer = TestHelper.mockSubscriber();
Reported by PMD.
Line: 41
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Subscriber<String> consumer = TestHelper.mockSubscriber();
Reported by PMD.
Line: 41
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Subscriber<String> consumer = TestHelper.mockSubscriber();
Reported by PMD.
Line: 41
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Subscriber<String> consumer = TestHelper.mockSubscriber();
Reported by PMD.
Line: 41
import io.reactivex.rxjava3.subscribers.*;
import io.reactivex.rxjava3.testsupport.*;
public class FlowableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Subscriber<String> consumer = TestHelper.mockSubscriber();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/testsupport/TestSubscriberExTest.java
402 issues
Line: 607
try {
ts.assertNotTerminated();
throw new RuntimeException("Failed to report there were terminal event(s)!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 621
try {
ts.assertNotTerminated();
throw new RuntimeException("Failed to report there were terminal event(s)!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 636
try {
ts.assertNotTerminated();
throw new RuntimeException("Failed to report there were terminal event(s)!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 652
try {
ts.assertNotTerminated();
throw new RuntimeException("Failed to report there were terminal event(s)!");
} catch (AssertionError ex) {
// expected
Throwable e = ex.getCause();
if (!(e instanceof CompositeException)) {
fail("Multiple Error present but the reported error doesn't have a composite cause!");
Reported by PMD.
Line: 671
try {
ts.assertNoValues();
throw new RuntimeException("Failed to report there were values!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 685
try {
ts.assertValueCount(3);
throw new RuntimeException("Failed to report there were values!");
} catch (AssertionError ex) {
// expected
}
}
Reported by PMD.
Line: 743
try {
ts.assertNotSubscribed();
throw new RuntimeException("Should have thrown");
} catch (AssertionError ex) {
// expected
}
assertTrue(ts.hasSubscription());
Reported by PMD.
Line: 766
try {
ts.assertNoValues();
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
try {
Reported by PMD.
Line: 773
try {
ts.assertValueCount(0);
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
ts.assertValueSequence(Collections.singletonList(1));
Reported by PMD.
Line: 782
try {
ts.assertValueSequence(Collections.singletonList(2));
throw new RuntimeException("Should have thrown");
} catch (AssertionError exc) {
// expected
}
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java
402 issues
Line: 52
Observable<String> combined = Observable.combineLatest(w1, w2, new BiFunction<String, String, String>() {
@Override
public String apply(String v1, String v2) {
throw new RuntimeException("I don't work.");
}
});
combined.subscribe(w);
w1.onNext("first value of w1");
Reported by PMD.
Line: 1096
Observable<Object> errorObservable = Observable.timer(100, TimeUnit.MILLISECONDS, testScheduler).map(new Function<Long, Object>() {
@Override
public Object apply(Long aLong) throws Exception {
throw new Exception();
}
});
Observable.combineLatestDelayError(
Arrays.asList(
Reported by PMD.
Line: 227
private Function3<String, String, String, String> getConcat3StringsCombineLatestFunction() {
Function3<String, String, String, String> combineLatestFunction = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 227
private Function3<String, String, String, String> getConcat3StringsCombineLatestFunction() {
Function3<String, String, String, String> combineLatestFunction = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 227
private Function3<String, String, String, String> getConcat3StringsCombineLatestFunction() {
Function3<String, String, String, String> combineLatestFunction = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 438
}
};
for (int i = 1; i <= n; i++) {
System.out.println("test1ToNSources: " + i + " sources");
List<Observable<Integer>> sources = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (int j = 0; j < i; j++) {
sources.add(Observable.just(j));
values.add(j);
Reported by PMD.
Line: 469
}
};
for (int i = 1; i <= n; i++) {
System.out.println("test1ToNSourcesScheduled: " + i + " sources");
List<Observable<Integer>> sources = new ArrayList<>();
List<Object> values = new ArrayList<>();
for (int j = 0; j < i; j++) {
sources.add(Observable.just(j).subscribeOn(Schedulers.io()));
values.add(j);
Reported by PMD.
Line: 1106
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
System.out.println("emptyObservable: " + integerNotification);
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Reported by PMD.
Line: 1112
.doFinally(new Action() {
@Override
public void run() throws Exception {
System.out.println("emptyObservable: doFinally");
}
}),
errorObservable
.doOnEach(new Consumer<Notification<Object>>() {
@Override
Reported by PMD.
Line: 1119
.doOnEach(new Consumer<Notification<Object>>() {
@Override
public void accept(Notification<Object> integerNotification) throws Exception {
System.out.println("errorObservable: " + integerNotification);
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java
399 issues
Line: 846
latch.await(1000, TimeUnit.MILLISECONDS);
if (!infiniteObservable.await(2000, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("didn't unsubscribe");
}
assertEquals(5, list.size());
assertEquals("1-1", list.get(0));
assertEquals("2-2", list.get(1));
Reported by PMD.
Line: 566
Function3<String, String, String, String> zipr = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 566
Function3<String, String, String, String> zipr = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 566
Function3<String, String, String, String> zipr = new Function3<String, String, String, String>() {
@Override
public String apply(String a1, String a2, String a3) {
if (a1 == null) {
a1 = "";
}
if (a2 == null) {
a2 = "";
Reported by PMD.
Line: 776
@Override
public void accept(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(5, list.size());
Reported by PMD.
Line: 839
@Override
public void onNext(String s) {
System.out.println(s);
list.add(s);
}
});
latch.await(1000, TimeUnit.MILLISECONDS);
Reported by PMD.
Line: 892
@Override
public void accept(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(4, list.size());
Reported by PMD.
Line: 921
@Override
public void accept(String s) {
System.out.println(s);
list.add(s);
}
});
assertEquals(0, list.size());
Reported by PMD.
Line: 990
to.awaitDone(5, TimeUnit.SECONDS);
to.assertNoErrors();
assertEquals(Observable.bufferSize() * 2, to.values().size());
System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get());
assertTrue(generatedA.get() < (Observable.bufferSize() * 3));
assertTrue(generatedB.get() < (Observable.bufferSize() * 3));
}
private Observable<Integer> createInfiniteObservable(final AtomicInteger generated) {
Reported by PMD.
Line: 1054
@Override
public void run() {
System.out.println("-------> subscribe to infinite sequence");
System.out.println("Starting thread: " + Thread.currentThread());
int i = 1;
while (!d.isDisposed()) {
o.onNext(i++);
Thread.yield();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableRetryTest.java
385 issues
Line: 56
@Override
public void subscribe(Observer<? super String> t1) {
t1.onSubscribe(Disposable.empty());
System.out.println(count.get() + " @ " + String.valueOf(last - System.currentTimeMillis()));
last = System.currentTimeMillis();
if (count.getAndDecrement() == 0) {
t1.onNext("hello");
t1.onComplete();
} else {
Reported by PMD.
Line: 87
.flatMap(new Function<Tuple, Observable<Long>>() {
@Override
public Observable<Long> apply(Tuple t) {
System.out.println("Retry # " + t.count);
return t.count > 20 ?
Observable.<Long>error(t.n) :
Observable.timer(t.count * 1L, TimeUnit.MILLISECONDS);
}}).cast(Object.class);
}
Reported by PMD.
Line: 701
try {
for (int r = 0; r < NUM_LOOPS; r++) {
if (r % 10 == 0) {
System.out.println("testRetryWithBackpressureParallelLoop -> " + r);
}
final AtomicInteger timeouts = new AtomicInteger();
final Map<Integer, List<String>> data = new ConcurrentHashMap<>();
Reported by PMD.
Line: 733
}
} catch (Throwable t) {
timeouts.incrementAndGet();
System.out.println(j + " | " + cdl.getCount() + " !!! " + nexts.get());
}
cdl.countDown();
}
});
}
Reported by PMD.
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.observable;
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Observer<String> consumer = TestHelper.mockObserver();
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Observer<String> consumer = TestHelper.mockObserver();
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Observer<String> consumer = TestHelper.mockObserver();
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Observer<String> consumer = TestHelper.mockObserver();
Reported by PMD.
Line: 42
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableRetryTest extends RxJavaTest {
@Test
public void iterativeBackoff() {
Observer<String> consumer = TestHelper.mockObserver();
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableRefCountTest.java
383 issues
Line: 119
d2.dispose(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other
d1.dispose();
System.out.println("onNext: " + nextCount.get());
// should emit once for both subscribers
assertEquals(nextCount.get(), receivedCount.get());
// only 1 subscribe
assertEquals(1, subscribeCount.get());
Reported by PMD.
Line: 166
d2.dispose(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one subscriber getting a value but not the other
d1.dispose();
System.out.println("onNext Count: " + nextCount.get());
// it will emit twice because it is synchronous
assertEquals(nextCount.get(), receivedCount.get() * 2);
// it will subscribe twice because it is synchronous
assertEquals(2, subscribeCount.get());
Reported by PMD.
Line: 181
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer l) {
System.out.println("onNext --------> " + l);
nextCount.incrementAndGet();
}
})
.take(4)
.publish().refCount();
Reported by PMD.
Line: 196
}
});
System.out.println("onNext: " + nextCount.get());
assertEquals(4, receivedCount.get());
assertEquals(4, receivedCount.get());
}
Reported by PMD.
Line: 210
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
System.out.println("******************************* Subscribe received");
// when we are subscribed
subscribeCount.incrementAndGet();
}
})
.doOnCancel(new Action() {
Reported by PMD.
Line: 218
.doOnCancel(new Action() {
@Override
public void run() {
System.out.println("******************************* Unsubscribe received");
// when we are unsubscribed
unsubscribeCount.incrementAndGet();
}
})
.publish().refCount();
Reported by PMD.
Line: 255
.doOnSubscribe(new Consumer<Subscription>() {
@Override
public void accept(Subscription s) {
System.out.println("******************************* Subscribe received");
// when we are subscribed
subscribeLatch.countDown();
}
})
.doOnCancel(new Action() {
Reported by PMD.
Line: 263
.doOnCancel(new Action() {
@Override
public void run() {
System.out.println("******************************* Unsubscribe received");
// when we are unsubscribed
unsubscribeLatch.countDown();
}
});
Reported by PMD.
Line: 271
TestSubscriberEx<Long> s = new TestSubscriberEx<>();
f.publish().refCount().subscribeOn(Schedulers.newThread()).subscribe(s);
System.out.println("send unsubscribe");
// wait until connected
subscribeLatch.await();
// now unsubscribe
s.cancel();
System.out.println("DONE sending unsubscribe ... now waiting");
Reported by PMD.
Line: 276
subscribeLatch.await();
// now unsubscribe
s.cancel();
System.out.println("DONE sending unsubscribe ... now waiting");
if (!unsubscribeLatch.await(3000, TimeUnit.MILLISECONDS)) {
System.out.println("Errors: " + s.errors());
if (s.errors().size() > 0) {
s.errors().get(0).printStackTrace();
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableRefCountTest.java
372 issues
Line: 104
d2.dispose(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one Observer getting a value but not the other
d1.dispose();
System.out.println("onNext: " + nextCount.get());
// should emit once for both subscribers
assertEquals(nextCount.get(), receivedCount.get());
// only 1 subscribe
assertEquals(1, subscribeCount.get());
Reported by PMD.
Line: 151
d2.dispose(); // unsubscribe s2 first as we're counting in 1 and there can be a race between unsubscribe and one Observer getting a value but not the other
d1.dispose();
System.out.println("onNext Count: " + nextCount.get());
// it will emit twice because it is synchronous
assertEquals(nextCount.get(), receivedCount.get() * 2);
// it will subscribe twice because it is synchronous
assertEquals(2, subscribeCount.get());
Reported by PMD.
Line: 166
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer l) {
System.out.println("onNext --------> " + l);
nextCount.incrementAndGet();
}
})
.take(4)
.publish().refCount();
Reported by PMD.
Line: 181
}
});
System.out.println("onNext: " + nextCount.get());
assertEquals(4, receivedCount.get());
assertEquals(4, receivedCount.get());
}
Reported by PMD.
Line: 195
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
System.out.println("******************************* Subscribe received");
// when we are subscribed
subscribeCount.incrementAndGet();
}
})
.doOnDispose(new Action() {
Reported by PMD.
Line: 203
.doOnDispose(new Action() {
@Override
public void run() {
System.out.println("******************************* Unsubscribe received");
// when we are unsubscribed
unsubscribeCount.incrementAndGet();
}
})
.publish().refCount();
Reported by PMD.
Line: 240
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable d) {
System.out.println("******************************* Subscribe received");
// when we are subscribed
subscribeLatch.countDown();
}
})
.doOnDispose(new Action() {
Reported by PMD.
Line: 248
.doOnDispose(new Action() {
@Override
public void run() {
System.out.println("******************************* Unsubscribe received");
// when we are unsubscribed
unsubscribeLatch.countDown();
}
});
Reported by PMD.
Line: 256
TestObserverEx<Long> observer = new TestObserverEx<>();
o.publish().refCount().subscribeOn(Schedulers.newThread()).subscribe(observer);
System.out.println("send unsubscribe");
// wait until connected
subscribeLatch.await();
// now unsubscribe
observer.dispose();
System.out.println("DONE sending unsubscribe ... now waiting");
Reported by PMD.
Line: 261
subscribeLatch.await();
// now unsubscribe
observer.dispose();
System.out.println("DONE sending unsubscribe ... now waiting");
if (!unsubscribeLatch.await(3000, TimeUnit.MILLISECONDS)) {
System.out.println("Errors: " + observer.errors());
if (observer.errors().size() > 0) {
observer.errors().get(0).printStackTrace();
}
Reported by PMD.