The following issues were found
src/jmh/java/io/reactivex/rxjava3/core/JustAsyncPerf.java
53 issues
Line: 30
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@State(Scope.Thread)
public class JustAsyncPerf {
Flowable<Integer> subscribeOnFlowable;
Flowable<Integer> observeOnFlowable;
Reported by PMD.
Line: 30
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@State(Scope.Thread)
public class JustAsyncPerf {
Flowable<Integer> subscribeOnFlowable;
Flowable<Integer> observeOnFlowable;
Reported by PMD.
Line: 32
@State(Scope.Thread)
public class JustAsyncPerf {
Flowable<Integer> subscribeOnFlowable;
Flowable<Integer> observeOnFlowable;
Flowable<Integer> pipelineFlowable;
Reported by PMD.
Line: 32
@State(Scope.Thread)
public class JustAsyncPerf {
Flowable<Integer> subscribeOnFlowable;
Flowable<Integer> observeOnFlowable;
Flowable<Integer> pipelineFlowable;
Reported by PMD.
Line: 34
Flowable<Integer> subscribeOnFlowable;
Flowable<Integer> observeOnFlowable;
Flowable<Integer> pipelineFlowable;
Observable<Integer> subscribeOnObservable;
Reported by PMD.
Line: 34
Flowable<Integer> subscribeOnFlowable;
Flowable<Integer> observeOnFlowable;
Flowable<Integer> pipelineFlowable;
Observable<Integer> subscribeOnObservable;
Reported by PMD.
Line: 36
Flowable<Integer> observeOnFlowable;
Flowable<Integer> pipelineFlowable;
Observable<Integer> subscribeOnObservable;
Observable<Integer> observeOnObservable;
Reported by PMD.
Line: 36
Flowable<Integer> observeOnFlowable;
Flowable<Integer> pipelineFlowable;
Observable<Integer> subscribeOnObservable;
Observable<Integer> observeOnObservable;
Reported by PMD.
Line: 38
Flowable<Integer> pipelineFlowable;
Observable<Integer> subscribeOnObservable;
Observable<Integer> observeOnObservable;
Observable<Integer> pipelineObservable;
Reported by PMD.
Line: 38
Flowable<Integer> pipelineFlowable;
Observable<Integer> subscribeOnObservable;
Observable<Integer> observeOnObservable;
Observable<Integer> pipelineObservable;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableBuffer.java
53 issues
Line: 29
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
public final class FlowableBuffer<T, C extends Collection<? super T>> extends AbstractFlowableWithUpstream<T, C> {
final int size;
final int skip;
final Supplier<C> bufferSupplier;
Reported by PMD.
Line: 31
public final class FlowableBuffer<T, C extends Collection<? super T>> extends AbstractFlowableWithUpstream<T, C> {
final int size;
final int skip;
final Supplier<C> bufferSupplier;
public FlowableBuffer(Flowable<T> source, int size, int skip, Supplier<C> bufferSupplier) {
super(source);
Reported by PMD.
Line: 33
final int skip;
final Supplier<C> bufferSupplier;
public FlowableBuffer(Flowable<T> source, int size, int skip, Supplier<C> bufferSupplier) {
super(source);
this.size = size;
this.skip = skip;
Reported by PMD.
Line: 56
static final class PublisherBufferExactSubscriber<T, C extends Collection<? super T>>
implements FlowableSubscriber<T>, Subscription {
final Subscriber<? super C> downstream;
final Supplier<C> bufferSupplier;
final int size;
Reported by PMD.
Line: 58
final Subscriber<? super C> downstream;
final Supplier<C> bufferSupplier;
final int size;
C buffer;
Reported by PMD.
Line: 60
final Supplier<C> bufferSupplier;
final int size;
C buffer;
Subscription upstream;
Reported by PMD.
Line: 62
final int size;
C buffer;
Subscription upstream;
boolean done;
Reported by PMD.
Line: 64
C buffer;
Subscription upstream;
boolean done;
int index;
Reported by PMD.
Line: 66
Subscription upstream;
boolean done;
int index;
PublisherBufferExactSubscriber(Subscriber<? super C> actual, int size, Supplier<C> bufferSupplier) {
this.downstream = actual;
Reported by PMD.
Line: 68
boolean done;
int index;
PublisherBufferExactSubscriber(Subscriber<? super C> actual, int size, Supplier<C> bufferSupplier) {
this.downstream = actual;
this.size = size;
this.bufferSupplier = bufferSupplier;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapTest.java
53 issues
Line: 232
return Flowable.fromCallable(new Callable<Integer>() {
@Override public Integer call() throws Exception {
if (integer >= 100) {
throw new NullPointerException("test null exp");
}
return integer;
}
});
}
Reported by PMD.
Line: 251
return Flowable.fromCallable(new Callable<Integer>() {
@Override public Integer call() throws Exception {
if (integer >= 100) {
throw new NullPointerException("test null exp");
}
return integer;
}
});
}
Reported by PMD.
Line: 33
import io.reactivex.rxjava3.subscribers.TestSubscriber;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class FlowableConcatMapTest extends RxJavaTest {
@Test
public void simpleSubscriptionRequest() {
TestSubscriber<Integer> ts = new TestSubscriber<>(0);
SimpleScalarSubscription<Integer> ws = new SimpleScalarSubscription<>(1, ts);
Reported by PMD.
Line: 36
public class FlowableConcatMapTest extends RxJavaTest {
@Test
public void simpleSubscriptionRequest() {
TestSubscriber<Integer> ts = new TestSubscriber<>(0);
SimpleScalarSubscription<Integer> ws = new SimpleScalarSubscription<>(1, ts);
ts.onSubscribe(ws);
ws.request(0);
Reported by PMD.
Line: 55
}
@Test
public void boundaryFusion() {
Flowable.range(1, 10000)
.observeOn(Schedulers.single())
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
Reported by PMD.
Line: 61
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
String name = Thread.currentThread().getName();
if (name.contains("RxSingleScheduler")) {
return "RxSingleScheduler";
}
return name;
}
Reported by PMD.
Line: 62
@Override
public String apply(Integer t) throws Exception {
String name = Thread.currentThread().getName();
if (name.contains("RxSingleScheduler")) {
return "RxSingleScheduler";
}
return name;
}
})
Reported by PMD.
Line: 62
@Override
public String apply(Integer t) throws Exception {
String name = Thread.currentThread().getName();
if (name.contains("RxSingleScheduler")) {
return "RxSingleScheduler";
}
return name;
}
})
Reported by PMD.
Line: 83
}
@Test
public void innerScalarRequestRace() {
Flowable<Integer> just = Flowable.just(1);
int n = 1000;
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
PublishProcessor<Flowable<Integer>> source = PublishProcessor.create();
Reported by PMD.
Line: 89
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
PublishProcessor<Flowable<Integer>> source = PublishProcessor.create();
TestSubscriber<Integer> ts = source
.concatMap(v -> v, n + 1)
.test(1L);
TestHelper.race(() -> {
for (int j = 0; j < n; j++) {
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFromCallableTest.java
53 issues
Line: 36
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableFromCallableTest extends RxJavaTest {
@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);
Reported by PMD.
Line: 38
public class ObservableFromCallableTest extends RxJavaTest {
@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);
when(func.call()).thenReturn(new Object());
Reported by PMD.
Line: 40
@SuppressWarnings("unchecked")
@Test
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);
when(func.call()).thenReturn(new Object());
Observable<Object> fromCallableObservable = Observable.fromCallable(func);
Reported by PMD.
Line: 43
public void shouldNotInvokeFuncUntilSubscription() throws Exception {
Callable<Object> func = mock(Callable.class);
when(func.call()).thenReturn(new Object());
Observable<Object> fromCallableObservable = Observable.fromCallable(func);
verifyNoInteractions(func);
Reported by PMD.
Line: 49
verifyNoInteractions(func);
fromCallableObservable.subscribe();
verify(func).call();
}
@SuppressWarnings("unchecked")
Reported by PMD.
Line: 51
fromCallableObservable.subscribe();
verify(func).call();
}
@SuppressWarnings("unchecked")
@Test
public void shouldCallOnNextAndOnCompleted() throws Exception {
Reported by PMD.
Line: 56
@SuppressWarnings("unchecked")
@Test
public void shouldCallOnNextAndOnCompleted() throws Exception {
Callable<String> func = mock(Callable.class);
when(func.call()).thenReturn("test_value");
Observable<String> fromCallableObservable = Observable.fromCallable(func);
Reported by PMD.
Line: 59
public void shouldCallOnNextAndOnCompleted() throws Exception {
Callable<String> func = mock(Callable.class);
when(func.call()).thenReturn("test_value");
Observable<String> fromCallableObservable = Observable.fromCallable(func);
Observer<Object> observer = TestHelper.mockObserver();
Reported by PMD.
Line: 59
public void shouldCallOnNextAndOnCompleted() throws Exception {
Callable<String> func = mock(Callable.class);
when(func.call()).thenReturn("test_value");
Observable<String> fromCallableObservable = Observable.fromCallable(func);
Observer<Object> observer = TestHelper.mockObserver();
Reported by PMD.
Line: 65
Observer<Object> observer = TestHelper.mockObserver();
fromCallableObservable.subscribe(observer);
verify(observer).onNext("test_value");
verify(observer).onComplete();
verify(observer, never()).onError(any(Throwable.class));
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAllTest.java
53 issues
Line: 33
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.reactivex.rxjava3.testsupport.*;
public class ObservableAllTest extends RxJavaTest {
@Test
public void allObservable() {
Observable<String> obs = Observable.just("one", "two", "six");
Reported by PMD.
Line: 37
@Test
public void allObservable() {
Observable<String> obs = Observable.just("one", "two", "six");
Observer <Boolean> observer = TestHelper.mockObserver();
obs.all(new Predicate<String>() {
@Override
Reported by PMD.
Line: 37
@Test
public void allObservable() {
Observable<String> obs = Observable.just("one", "two", "six");
Observer <Boolean> observer = TestHelper.mockObserver();
obs.all(new Predicate<String>() {
@Override
Reported by PMD.
Line: 37
@Test
public void allObservable() {
Observable<String> obs = Observable.just("one", "two", "six");
Observer <Boolean> observer = TestHelper.mockObserver();
obs.all(new Predicate<String>() {
@Override
Reported by PMD.
Line: 50
.subscribe(observer);
verify(observer).onSubscribe((Disposable)any());
verify(observer).onNext(true);
verify(observer).onComplete();
verifyNoMoreInteractions(observer);
}
@Test
Reported by PMD.
Line: 51
verify(observer).onSubscribe((Disposable)any());
verify(observer).onNext(true);
verify(observer).onComplete();
verifyNoMoreInteractions(observer);
}
@Test
public void notAllObservable() {
Reported by PMD.
Line: 70
.subscribe(observer);
verify(observer).onSubscribe((Disposable)any());
verify(observer).onNext(false);
verify(observer).onComplete();
verifyNoMoreInteractions(observer);
}
@Test
Reported by PMD.
Line: 71
verify(observer).onSubscribe((Disposable)any());
verify(observer).onNext(false);
verify(observer).onComplete();
verifyNoMoreInteractions(observer);
}
@Test
public void emptyObservable() {
Reported by PMD.
Line: 90
.subscribe(observer);
verify(observer).onSubscribe((Disposable)any());
verify(observer).onNext(true);
verify(observer).onComplete();
verifyNoMoreInteractions(observer);
}
@Test
Reported by PMD.
Line: 91
verify(observer).onSubscribe((Disposable)any());
verify(observer).onNext(true);
verify(observer).onComplete();
verifyNoMoreInteractions(observer);
}
@Test
public void errorObservable() {
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatest.java
52 issues
Line: 28
import io.reactivex.rxjava3.internal.util.AtomicThrowable;
public final class ObservableCombineLatest<T, R> extends Observable<R> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> combiner;
final int bufferSize;
final boolean delayError;
Reported by PMD.
Line: 29
public final class ObservableCombineLatest<T, R> extends Observable<R> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> combiner;
final int bufferSize;
final boolean delayError;
public ObservableCombineLatest(ObservableSource<? extends T>[] sources,
Reported by PMD.
Line: 30
public final class ObservableCombineLatest<T, R> extends Observable<R> {
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> combiner;
final int bufferSize;
final boolean delayError;
public ObservableCombineLatest(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Reported by PMD.
Line: 31
final ObservableSource<? extends T>[] sources;
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> combiner;
final int bufferSize;
final boolean delayError;
public ObservableCombineLatest(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> combiner, int bufferSize,
Reported by PMD.
Line: 32
final Iterable<? extends ObservableSource<? extends T>> sourcesIterable;
final Function<? super Object[], ? extends R> combiner;
final int bufferSize;
final boolean delayError;
public ObservableCombineLatest(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> combiner, int bufferSize,
boolean delayError) {
Reported by PMD.
Line: 34
final int bufferSize;
final boolean delayError;
public ObservableCombineLatest(ObservableSource<? extends T>[] sources,
Iterable<? extends ObservableSource<? extends T>> sourcesIterable,
Function<? super Object[], ? extends R> combiner, int bufferSize,
boolean delayError) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
Reported by PMD.
Line: 55
try {
for (ObservableSource<? extends T> p : sourcesIterable) {
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = Objects.requireNonNull(p, "The Iterator returned a null ObservableSource");
}
Reported by PMD.
Line: 61
}
sources[count++] = Objects.requireNonNull(p, "The Iterator returned a null ObservableSource");
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
EmptyDisposable.error(ex, observer);
return;
}
} else {
Reported by PMD.
Line: 79
lc.subscribe(sources);
}
static final class LatestCoordinator<T, R> extends AtomicInteger implements Disposable {
private static final long serialVersionUID = 8567835998786448817L;
final Observer<? super R> downstream;
final Function<? super Object[], ? extends R> combiner;
final CombinerObserver<T, R>[] observers;
Reported by PMD.
Line: 79
lc.subscribe(sources);
}
static final class LatestCoordinator<T, R> extends AtomicInteger implements Disposable {
private static final long serialVersionUID = 8567835998786448817L;
final Observer<? super R> downstream;
final Function<? super Object[], ? extends R> combiner;
final CombinerObserver<T, R>[] observers;
Reported by PMD.
src/test/java/io/reactivex/rxjava3/internal/jdk8/ObservableCollectWithCollectorTest.java
52 issues
Line: 14
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.jdk8;
import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.util.*;
Reported by PMD.
Line: 35
import io.reactivex.rxjava3.subjects.PublishSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;
public class ObservableCollectWithCollectorTest extends RxJavaTest {
@Test
public void basic() {
Observable.range(1, 5)
.collect(Collectors.toList())
Reported by PMD.
Line: 38
public class ObservableCollectWithCollectorTest extends RxJavaTest {
@Test
public void basic() {
Observable.range(1, 5)
.collect(Collectors.toList())
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5));
}
Reported by PMD.
Line: 39
@Test
public void basic() {
Observable.range(1, 5)
.collect(Collectors.toList())
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5));
}
Reported by PMD.
Line: 39
@Test
public void basic() {
Observable.range(1, 5)
.collect(Collectors.toList())
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5));
}
Reported by PMD.
Line: 39
@Test
public void basic() {
Observable.range(1, 5)
.collect(Collectors.toList())
.test()
.assertResult(Arrays.asList(1, 2, 3, 4, 5));
}
Reported by PMD.
Line: 46
}
@Test
public void empty() {
Observable.empty()
.collect(Collectors.toList())
.test()
.assertResult(Collections.emptyList());
}
Reported by PMD.
Line: 47
@Test
public void empty() {
Observable.empty()
.collect(Collectors.toList())
.test()
.assertResult(Collections.emptyList());
}
Reported by PMD.
Line: 47
@Test
public void empty() {
Observable.empty()
.collect(Collectors.toList())
.test()
.assertResult(Collections.emptyList());
}
Reported by PMD.
Line: 47
@Test
public void empty() {
Observable.empty()
.collect(Collectors.toList())
.test()
.assertResult(Collections.emptyList());
}
Reported by PMD.
src/test/java/io/reactivex/rxjava3/parallel/ParallelFromPublisherTest.java
52 issues
Line: 36
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.testsupport.*;
public class ParallelFromPublisherTest extends RxJavaTest {
@Test
public void sourceOverflow() {
new Flowable<Integer>() {
@Override
Reported by PMD.
Line: 39
public class ParallelFromPublisherTest extends RxJavaTest {
@Test
public void sourceOverflow() {
new Flowable<Integer>() {
@Override
protected void subscribeActual(Subscriber<? super Integer> s) {
s.onSubscribe(new BooleanSubscription());
for (int i = 0; i < 10; i++) {
Reported by PMD.
Line: 56
}
@Test
public void fusedFilterBecomesEmpty() {
Flowable.just(1)
.filter(Functions.alwaysFalse())
.parallel()
.sequential()
.test()
Reported by PMD.
Line: 57
@Test
public void fusedFilterBecomesEmpty() {
Flowable.just(1)
.filter(Functions.alwaysFalse())
.parallel()
.sequential()
.test()
.assertResult();
Reported by PMD.
Line: 57
@Test
public void fusedFilterBecomesEmpty() {
Flowable.just(1)
.filter(Functions.alwaysFalse())
.parallel()
.sequential()
.test()
.assertResult();
Reported by PMD.
Line: 57
@Test
public void fusedFilterBecomesEmpty() {
Flowable.just(1)
.filter(Functions.alwaysFalse())
.parallel()
.sequential()
.test()
.assertResult();
Reported by PMD.
Line: 57
@Test
public void fusedFilterBecomesEmpty() {
Flowable.just(1)
.filter(Functions.alwaysFalse())
.parallel()
.sequential()
.test()
.assertResult();
Reported by PMD.
Line: 57
@Test
public void fusedFilterBecomesEmpty() {
Flowable.just(1)
.filter(Functions.alwaysFalse())
.parallel()
.sequential()
.test()
.assertResult();
Reported by PMD.
Line: 67
static final class StripBoundary<T> extends Flowable<T> implements FlowableTransformer<T, T> {
final Flowable<T> source;
StripBoundary(Flowable<T> source) {
this.source = source;
}
Reported by PMD.
Line: 113
}
@Test
public void syncFusedMapCrash() {
Flowable.just(1)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
throw new TestException();
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupJoin.java
52 issues
Line: 33
public final class ObservableGroupJoin<TLeft, TRight, TLeftEnd, TRightEnd, R> extends AbstractObservableWithUpstream<TLeft, R> {
final ObservableSource<? extends TRight> other;
final Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd;
Reported by PMD.
Line: 35
final ObservableSource<? extends TRight> other;
final Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super Observable<TRight>, ? extends R> resultSelector;
Reported by PMD.
Line: 37
final Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd;
final Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super Observable<TRight>, ? extends R> resultSelector;
public ObservableGroupJoin(
ObservableSource<TLeft> source,
Reported by PMD.
Line: 39
final Function<? super TRight, ? extends ObservableSource<TRightEnd>> rightEnd;
final BiFunction<? super TLeft, ? super Observable<TRight>, ? extends R> resultSelector;
public ObservableGroupJoin(
ObservableSource<TLeft> source,
ObservableSource<? extends TRight> other,
Function<? super TLeft, ? extends ObservableSource<TLeftEnd>> leftEnd,
Reported by PMD.
Line: 63
observer.onSubscribe(parent);
LeftRightObserver left = new LeftRightObserver(parent, true);
parent.disposables.add(left);
LeftRightObserver right = new LeftRightObserver(parent, false);
parent.disposables.add(right);
source.subscribe(left);
other.subscribe(right);
Reported by PMD.
Line: 65
LeftRightObserver left = new LeftRightObserver(parent, true);
parent.disposables.add(left);
LeftRightObserver right = new LeftRightObserver(parent, false);
parent.disposables.add(right);
source.subscribe(left);
other.subscribe(right);
}
Reported by PMD.
Line: 84
void innerCloseError(Throwable ex);
}
static final class GroupJoinDisposable<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Disposable, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Observer<? super R> downstream;
Reported by PMD.
Line: 84
void innerCloseError(Throwable ex);
}
static final class GroupJoinDisposable<TLeft, TRight, TLeftEnd, TRightEnd, R>
extends AtomicInteger implements Disposable, JoinSupport {
private static final long serialVersionUID = -6071216598687999801L;
final Observer<? super R> downstream;
Reported by PMD.
Line: 89
private static final long serialVersionUID = -6071216598687999801L;
final Observer<? super R> downstream;
final SpscLinkedArrayQueue<Object> queue;
final CompositeDisposable disposables;
Reported by PMD.
Line: 91
final Observer<? super R> downstream;
final SpscLinkedArrayQueue<Object> queue;
final CompositeDisposable disposables;
final Map<Integer, UnicastSubject<TRight>> lefts;
Reported by PMD.
src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCreate.java
51 issues
Line: 137
}
@Override
public boolean tryOnError(Throwable t) {
if (emitter.isCancelled() || done) {
return false;
}
if (t == null) {
t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
Reported by PMD.
Line: 273
}
@Override
public final void onError(Throwable e) {
if (e == null) {
e = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
}
if (!signalError(e)) {
RxJavaPlugins.onError(e);
Reported by PMD.
Line: 283
}
@Override
public final boolean tryOnError(Throwable e) {
if (e == null) {
e = ExceptionHelper.createNullPointerException("tryOnError called with a null Throwable.");
}
return signalError(e);
}
Reported by PMD.
Line: 33
public final class FlowableCreate<T> extends Flowable<T> {
final FlowableOnSubscribe<T> source;
final BackpressureStrategy backpressure;
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
Reported by PMD.
Line: 35
final FlowableOnSubscribe<T> source;
final BackpressureStrategy backpressure;
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
Reported by PMD.
Line: 72
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
Reported by PMD.
Line: 89
private static final long serialVersionUID = 4883307006032401862L;
final BaseEmitter<T> emitter;
final AtomicThrowable errors;
final SimplePlainQueue<T> queue;
Reported by PMD.
Line: 91
final BaseEmitter<T> emitter;
final AtomicThrowable errors;
final SimplePlainQueue<T> queue;
volatile boolean done;
Reported by PMD.
Line: 93
final AtomicThrowable errors;
final SimplePlainQueue<T> queue;
volatile boolean done;
SerializedEmitter(BaseEmitter<T> emitter) {
this.emitter = emitter;
Reported by PMD.
Line: 95
final SimplePlainQueue<T> queue;
volatile boolean done;
SerializedEmitter(BaseEmitter<T> emitter) {
this.emitter = emitter;
this.errors = new AtomicThrowable();
this.queue = new SpscLinkedArrayQueue<>(16);
Reported by PMD.