Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publisher#takeUntil cancel before terminate #2413

Merged
merged 3 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2222,9 +2222,10 @@ public final Publisher<T> takeWhile(Predicate<? super T> predicate) {
* return results;
* }</pre>
*
* @param until {@link Completable}, termination of which, terminates the returned {@link Publisher}.
* @param until {@link Completable}, termination of which, terminates the returned {@link Publisher}. If this
* {@link Publisher} maybe resubscribed the {@link Completable} should be also, or use
* {@link Publisher#defer(Supplier)} to create a new {@link Completable} on each subscribe.
* @return A {@link Publisher} that only emits the items till {@code until} {@link Completable} is completed.
*
* @see <a href="https://reactivex.io/documentation/operators/takeuntil.html">ReactiveX takeUntil operator.</a>
*/
public final Publisher<T> takeUntil(Completable until) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import static java.util.Objects.requireNonNull;

final class TakeUntilPublisher<T> extends AbstractSynchronousPublisherOperator<T, T> {

private final Completable until;

TakeUntilPublisher(Publisher<T> original, Completable until) {
Expand All @@ -42,6 +41,7 @@ public Subscriber<? super T> apply(Subscriber<? super T> subscriber) {
}

private static final class TakeUntilSubscriber<T> implements Subscriber<T> {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<TakeUntilSubscriber, Cancellable> untilCancellableUpdater =
AtomicReferenceFieldUpdater.newUpdater(TakeUntilSubscriber.class, Cancellable.class,
"untilCancellable");
Expand Down Expand Up @@ -77,15 +77,19 @@ public void onSubscribe(Cancellable cancellable) {

@Override
public void onComplete() {
if (subscriber.processOnComplete()) {
try {
cancelDownstreamSubscription();
} finally {
subscriber.processOnComplete();
}
}

@Override
public void onError(Throwable t) {
if (subscriber.processOnError(t)) {
try {
cancelDownstreamSubscription();
} finally {
subscriber.processOnError(t);
}
}

Expand All @@ -104,15 +108,19 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (subscriber.processOnError(t)) {
try {
cancelUntil();
} finally {
subscriber.processOnError(t);
}
}

@Override
public void onComplete() {
if (subscriber.processOnComplete()) {
try {
cancelUntil();
} finally {
subscriber.processOnComplete();
}
}

Expand All @@ -128,7 +136,7 @@ private void cancelUntil() {
private static final class TakeUntilSubscription extends ConcurrentSubscription {
private final Cancellable cancellable;

protected TakeUntilSubscription(final Subscription subscription, Cancellable cancellable) {
private TakeUntilSubscription(final Subscription subscription, Cancellable cancellable) {
super(subscription);
this.cancellable = cancellable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,39 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;

import org.junit.jupiter.api.Test;
import org.mockito.stubbing.Answer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedTransferQueue;

import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

class TakeUntilPublisherTest {

private final TestPublisher<String> publisher = new TestPublisher<>();
private final TestPublisherSubscriber<String> subscriber = new TestPublisherSubscriber<>();
private final TestSubscription subscription = new TestSubscription();

@Test
void testUntilComplete() {
LegacyTestCompletable completable = new LegacyTestCompletable();
TestCompletable completable = new TestCompletable();
Publisher<String> p = publisher.takeUntil(completable);
toSource(p).subscribe(subscriber);
publisher.onSubscribe(subscription);
Expand All @@ -49,7 +61,7 @@ void testUntilComplete() {

@Test
void testUntilError() {
LegacyTestCompletable completable = new LegacyTestCompletable();
TestCompletable completable = new TestCompletable();
Publisher<String> p = publisher.takeUntil(completable);
toSource(p).subscribe(subscriber);
publisher.onSubscribe(subscription);
Expand All @@ -63,7 +75,7 @@ void testUntilError() {

@Test
void testEmitsError() {
LegacyTestCompletable completable = new LegacyTestCompletable();
TestCompletable completable = new TestCompletable();
Publisher<String> p = publisher.takeUntil(completable);
toSource(p).subscribe(subscriber);
subscriber.awaitSubscription().request(4);
Expand All @@ -75,7 +87,7 @@ void testEmitsError() {

@Test
void testEmitsComplete() {
LegacyTestCompletable completable = new LegacyTestCompletable();
TestCompletable completable = new TestCompletable();
Publisher<String> p = publisher.takeUntil(completable);
toSource(p).subscribe(subscriber);
subscriber.awaitSubscription().request(4);
Expand All @@ -85,8 +97,12 @@ void testEmitsComplete() {
}

@Test
void testSubCancelled() {
LegacyTestCompletable completable = new LegacyTestCompletable();
void testSubCancelled() throws InterruptedException {
TestCancellable cancellable = new TestCancellable();
TestCompletable completable = new TestCompletable.Builder().disableAutoOnSubscribe().build(subscriber1 -> {
subscriber1.onSubscribe(cancellable);
return subscriber1;
});
Publisher<String> p = publisher.takeUntil(completable);
toSource(p).subscribe(subscriber);
publisher.onSubscribe(subscription);
Expand All @@ -95,6 +111,68 @@ void testSubCancelled() {
assertThat(subscriber.takeOnNext(2), contains("Hello1", "Hello2"));
subscriber.awaitSubscription().cancel();
assertTrue(subscription.isCancelled());
completable.verifyCancelled();
cancellable.awaitCancelled();
}

@Test
void resubscribe() throws InterruptedException {
// Intentionally have publisher outside the defer, we need to extract the TestPublisher from each subscribe.
final TestResubscribePublisher<String> resubscribePublisher = new TestResubscribePublisher<>();
final BlockingQueue<CompletableSource.Processor> processors = new LinkedTransferQueue<>();
Publisher<String> publisher = Publisher.defer(() -> {
CompletableSource.Processor processor = Processors.newCompletableProcessor();
processors.add(processor);
return resubscribePublisher.takeUntil(fromSource(processor));
});
@SuppressWarnings("unchecked")
Subscriber<String> resubscribeSubscriber = mock(Subscriber.class);
@SuppressWarnings("unchecked")
Subscriber<String> subscriber = mock(Subscriber.class);
doAnswer((Answer<Void>) invocation -> {
toSource(publisher).subscribe(subscriber);
return null;
}).when(resubscribeSubscriber).onComplete();
doAnswer((Answer<Void>) invocation -> {
Subscription s = invocation.getArgument(0);
s.request(3);
return null;
}).when(resubscribeSubscriber).onSubscribe(any());
doAnswer((Answer<Void>) invocation -> {
Subscription s = invocation.getArgument(0);
s.request(3);
return null;
}).when(subscriber).onSubscribe(any());

toSource(publisher).subscribe(resubscribeSubscriber);

TestPublisher<String> testPublisher1 = resubscribePublisher.publisher();
TestSubscription testSubscription1 = resubscribePublisher.subscription();
CompletableSource.Processor completable1 = processors.take();
testSubscription1.awaitRequestN(2);
testPublisher1.onNext("Hello1", "Hello2");

verify(resubscribeSubscriber).onNext("Hello1");
verify(resubscribeSubscriber).onNext("Hello2");

completable1.onComplete();
testSubscription1.awaitCancelled();

verify(resubscribeSubscriber).onComplete();
verify(resubscribeSubscriber, never()).onError(any());

verify(subscriber, never()).onNext(any());
verify(subscriber, never()).onComplete();
verify(subscriber, never()).onError(any());

TestPublisher<String> testPublisher2 = resubscribePublisher.publisher();
TestSubscription testSubscription2 = resubscribePublisher.subscription();
CompletableSource.Processor completable2 = processors.take();
testSubscription2.awaitRequestN(2);
testPublisher2.onNext("Hello3", "Hello4");

completable2.onComplete();
testSubscription2.awaitCancelled();
verify(subscriber).onComplete();
verify(subscriber, never()).onError(any());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright © 2022 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.PublisherSource;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;

import java.util.concurrent.atomic.AtomicReference;

import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource;
import static java.util.Objects.requireNonNull;

/**
* Wraps {@link TestPublisher} in a way that allows for sequential resubscribes.
* @param <T> The type of {@link TestPublisher}.
*/
public class TestResubscribePublisher<T> extends Publisher<T> {
private final AtomicReference<SubscriberState<T>> state = new AtomicReference<>();

@Override
protected void handleSubscribe(final PublisherSource.Subscriber<? super T> subscriber) {
SubscriberState<T> newState = new SubscriberState<>(state, subscriber);
SubscriberState<T> currState = state.get();
if (state.compareAndSet(null, newState)) {
newState.publisher.subscribe(subscriber);
} else {
deliverErrorFromSource(subscriber, new DuplicateSubscribeException(currState.subscriber, subscriber));
}
}

/**
* Get the current {@link TestPublisher}.
* @return the current {@link TestPublisher}.
*/
public TestPublisher<T> publisher() {
return state.get().publisher;
}

/**
* Get the current {@link TestSubscription}.
* @return the current {@link TestSubscription}.
*/
public TestSubscription subscription() {
return state.get().subscription;
}

private static final class SubscriberState<T> {
private final PublisherSource.Subscriber<? super T> subscriber;
private final TestPublisher<T> publisher;
private final TestSubscription subscription;

SubscriberState(AtomicReference<SubscriberState<T>> state,
PublisherSource.Subscriber<? super T> subscriber) {
this.subscriber = requireNonNull(subscriber);
this.subscription = new TestSubscription();
this.publisher = new TestPublisher.Builder<T>().disableAutoOnSubscribe().build(subscriber1 -> {
subscriber1.onSubscribe(new PublisherSource.Subscription() {
@Override
public void request(final long n) {
subscription.request(n);
}

@Override
public void cancel() {
try {
subscription.cancel();
} finally {
state.compareAndSet(SubscriberState.this, null);
}
}
});
return subscriber1;
});
}
}
}