diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 98aefbc22f..19e5861a30 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -2222,9 +2222,10 @@ public final Publisher takeWhile(Predicate predicate) { * return results; * } * - * @param until {@link Completable}, termination of which, terminates the returned {@link Publisher}. + * @param until {@link Completable}, termination of which, terminates the returned {@link Publisher}. If this + * {@link Publisher} maybe resubscribed the {@link Completable} should be also, or use + * {@link Publisher#defer(Supplier)} to create a new {@link Completable} on each subscribe. * @return A {@link Publisher} that only emits the items till {@code until} {@link Completable} is completed. - * * @see ReactiveX takeUntil operator. */ public final Publisher takeUntil(Completable until) { diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java index b33fdff34f..977ea86daf 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java @@ -28,7 +28,6 @@ import static java.util.Objects.requireNonNull; final class TakeUntilPublisher extends AbstractSynchronousPublisherOperator { - private final Completable until; TakeUntilPublisher(Publisher original, Completable until) { @@ -42,6 +41,7 @@ public Subscriber apply(Subscriber subscriber) { } private static final class TakeUntilSubscriber implements Subscriber { + @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater untilCancellableUpdater = AtomicReferenceFieldUpdater.newUpdater(TakeUntilSubscriber.class, Cancellable.class, "untilCancellable"); @@ -77,15 +77,19 @@ public void onSubscribe(Cancellable cancellable) { @Override public void onComplete() { - if (subscriber.processOnComplete()) { + try { cancelDownstreamSubscription(); + } finally { + subscriber.processOnComplete(); } } @Override public void onError(Throwable t) { - if (subscriber.processOnError(t)) { + try { cancelDownstreamSubscription(); + } finally { + subscriber.processOnError(t); } } @@ -104,15 +108,19 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (subscriber.processOnError(t)) { + try { cancelUntil(); + } finally { + subscriber.processOnError(t); } } @Override public void onComplete() { - if (subscriber.processOnComplete()) { + try { cancelUntil(); + } finally { + subscriber.processOnComplete(); } } @@ -128,7 +136,7 @@ private void cancelUntil() { private static final class TakeUntilSubscription extends ConcurrentSubscription { private final Cancellable cancellable; - protected TakeUntilSubscription(final Subscription subscription, Cancellable cancellable) { + private TakeUntilSubscription(final Subscription subscription, Cancellable cancellable) { super(subscription); this.cancellable = cancellable; } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java index 678485b9d4..c29dc70b48 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java @@ -15,10 +15,18 @@ */ package io.servicetalk.concurrent.api; +import io.servicetalk.concurrent.CompletableSource; +import io.servicetalk.concurrent.PublisherSource.Subscriber; +import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; import org.junit.jupiter.api.Test; +import org.mockito.stubbing.Answer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; + +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static org.hamcrest.MatcherAssert.assertThat; @@ -26,16 +34,20 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; class TakeUntilPublisherTest { - private final TestPublisher publisher = new TestPublisher<>(); private final TestPublisherSubscriber subscriber = new TestPublisherSubscriber<>(); private final TestSubscription subscription = new TestSubscription(); @Test void testUntilComplete() { - LegacyTestCompletable completable = new LegacyTestCompletable(); + TestCompletable completable = new TestCompletable(); Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); publisher.onSubscribe(subscription); @@ -49,7 +61,7 @@ void testUntilComplete() { @Test void testUntilError() { - LegacyTestCompletable completable = new LegacyTestCompletable(); + TestCompletable completable = new TestCompletable(); Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); publisher.onSubscribe(subscription); @@ -63,7 +75,7 @@ void testUntilError() { @Test void testEmitsError() { - LegacyTestCompletable completable = new LegacyTestCompletable(); + TestCompletable completable = new TestCompletable(); Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); subscriber.awaitSubscription().request(4); @@ -75,7 +87,7 @@ void testEmitsError() { @Test void testEmitsComplete() { - LegacyTestCompletable completable = new LegacyTestCompletable(); + TestCompletable completable = new TestCompletable(); Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); subscriber.awaitSubscription().request(4); @@ -85,8 +97,12 @@ void testEmitsComplete() { } @Test - void testSubCancelled() { - LegacyTestCompletable completable = new LegacyTestCompletable(); + void testSubCancelled() throws InterruptedException { + TestCancellable cancellable = new TestCancellable(); + TestCompletable completable = new TestCompletable.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(cancellable); + return subscriber1; + }); Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); publisher.onSubscribe(subscription); @@ -95,6 +111,68 @@ void testSubCancelled() { assertThat(subscriber.takeOnNext(2), contains("Hello1", "Hello2")); subscriber.awaitSubscription().cancel(); assertTrue(subscription.isCancelled()); - completable.verifyCancelled(); + cancellable.awaitCancelled(); + } + + @Test + void resubscribe() throws InterruptedException { + // Intentionally have publisher outside the defer, we need to extract the TestPublisher from each subscribe. + final TestResubscribePublisher resubscribePublisher = new TestResubscribePublisher<>(); + final BlockingQueue processors = new LinkedTransferQueue<>(); + Publisher publisher = Publisher.defer(() -> { + CompletableSource.Processor processor = Processors.newCompletableProcessor(); + processors.add(processor); + return resubscribePublisher.takeUntil(fromSource(processor)); + }); + @SuppressWarnings("unchecked") + Subscriber resubscribeSubscriber = mock(Subscriber.class); + @SuppressWarnings("unchecked") + Subscriber subscriber = mock(Subscriber.class); + doAnswer((Answer) invocation -> { + toSource(publisher).subscribe(subscriber); + return null; + }).when(resubscribeSubscriber).onComplete(); + doAnswer((Answer) invocation -> { + Subscription s = invocation.getArgument(0); + s.request(3); + return null; + }).when(resubscribeSubscriber).onSubscribe(any()); + doAnswer((Answer) invocation -> { + Subscription s = invocation.getArgument(0); + s.request(3); + return null; + }).when(subscriber).onSubscribe(any()); + + toSource(publisher).subscribe(resubscribeSubscriber); + + TestPublisher testPublisher1 = resubscribePublisher.publisher(); + TestSubscription testSubscription1 = resubscribePublisher.subscription(); + CompletableSource.Processor completable1 = processors.take(); + testSubscription1.awaitRequestN(2); + testPublisher1.onNext("Hello1", "Hello2"); + + verify(resubscribeSubscriber).onNext("Hello1"); + verify(resubscribeSubscriber).onNext("Hello2"); + + completable1.onComplete(); + testSubscription1.awaitCancelled(); + + verify(resubscribeSubscriber).onComplete(); + verify(resubscribeSubscriber, never()).onError(any()); + + verify(subscriber, never()).onNext(any()); + verify(subscriber, never()).onComplete(); + verify(subscriber, never()).onError(any()); + + TestPublisher testPublisher2 = resubscribePublisher.publisher(); + TestSubscription testSubscription2 = resubscribePublisher.subscription(); + CompletableSource.Processor completable2 = processors.take(); + testSubscription2.awaitRequestN(2); + testPublisher2.onNext("Hello3", "Hello4"); + + completable2.onComplete(); + testSubscription2.awaitCancelled(); + verify(subscriber).onComplete(); + verify(subscriber, never()).onError(any()); } } diff --git a/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestResubscribePublisher.java b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestResubscribePublisher.java new file mode 100644 index 0000000000..ddc250d65f --- /dev/null +++ b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestResubscribePublisher.java @@ -0,0 +1,89 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.internal.DuplicateSubscribeException; + +import java.util.concurrent.atomic.AtomicReference; + +import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource; +import static java.util.Objects.requireNonNull; + +/** + * Wraps {@link TestPublisher} in a way that allows for sequential resubscribes. + * @param The type of {@link TestPublisher}. + */ +public class TestResubscribePublisher extends Publisher { + private final AtomicReference> state = new AtomicReference<>(); + + @Override + protected void handleSubscribe(final PublisherSource.Subscriber subscriber) { + SubscriberState newState = new SubscriberState<>(state, subscriber); + SubscriberState currState = state.get(); + if (state.compareAndSet(null, newState)) { + newState.publisher.subscribe(subscriber); + } else { + deliverErrorFromSource(subscriber, new DuplicateSubscribeException(currState.subscriber, subscriber)); + } + } + + /** + * Get the current {@link TestPublisher}. + * @return the current {@link TestPublisher}. + */ + public TestPublisher publisher() { + return state.get().publisher; + } + + /** + * Get the current {@link TestSubscription}. + * @return the current {@link TestSubscription}. + */ + public TestSubscription subscription() { + return state.get().subscription; + } + + private static final class SubscriberState { + private final PublisherSource.Subscriber subscriber; + private final TestPublisher publisher; + private final TestSubscription subscription; + + SubscriberState(AtomicReference> state, + PublisherSource.Subscriber subscriber) { + this.subscriber = requireNonNull(subscriber); + this.subscription = new TestSubscription(); + this.publisher = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(new PublisherSource.Subscription() { + @Override + public void request(final long n) { + subscription.request(n); + } + + @Override + public void cancel() { + try { + subscription.cancel(); + } finally { + state.compareAndSet(SubscriberState.this, null); + } + } + }); + return subscriber1; + }); + } + } +}