From fe4e462f8392d1a61f7d52b31cdcc9d1ab654a2c Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Wed, 19 Oct 2022 17:12:26 -0700 Subject: [PATCH 1/3] Publisher#takeUntil cancel before terminate Motivation: Publisher#takeUntil will terminate downstream before cancelling upstream. There are some sources (e.g. NettyChannelPublisher) that allow resubscribe in a sequential fashion, but in order for this to work cancellation must be first. Modifications: - Switch ordering in TakeUtil to cancel before terminate - Add TestResubscribePublisher utility that wraps TestPublisher which allows for sequential resbuscribes. --- .../servicetalk/concurrent/api/Publisher.java | 30 ++++- .../concurrent/api/TakeUntilPublisher.java | 27 +++-- .../api/TakeUntilPublisherTest.java | 104 +++++++++++++++--- .../api/TestResubscribePublisher.java | 89 +++++++++++++++ .../tck/PublisherTakeUntilTckTest.java | 2 +- 5 files changed, 227 insertions(+), 25 deletions(-) create mode 100644 servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestResubscribePublisher.java diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 98aefbc22f..89b9c709f3 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -2224,11 +2224,37 @@ public final Publisher takeWhile(Predicate predicate) { * * @param until {@link Completable}, termination of which, terminates the returned {@link Publisher}. * @return A {@link Publisher} that only emits the items till {@code until} {@link Completable} is completed. - * + * @deprecated Use {@link #takeUntil(Supplier)}. * @see ReactiveX takeUntil operator. */ + @Deprecated public final Publisher takeUntil(Completable until) { - return new TakeUntilPublisher<>(this, until); + return takeUntil(() -> until); + } + + /** + * Takes elements until {@link Completable} is terminated successfully or with failure. + *

+ * This method provides a means to take a limited number of results from this {@link Publisher} and in sequential + * programming is similar to: + *

{@code
+     *     List results = ...;
+     *     for (T t : resultOfThisPublisher()) {
+     *         if (isCompleted(until)) {
+     *             break;
+     *         }
+     *         takeResults.add(t);
+     *     }
+     *     return results;
+     * }
+ * + * @param untilSupplier {@link Supplier} that is invoked on each subscribe that provides a {@link Completable}, + * termination of which, terminates the returned {@link Publisher}. + * @return A {@link Publisher} that only emits the items till {@code until} {@link Completable} is completed. + * @see ReactiveX takeUntil operator. + */ + public final Publisher takeUntil(Supplier untilSupplier) { + return new TakeUntilPublisher<>(this, untilSupplier); } /** diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java index b33fdff34f..fc610775b7 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java @@ -21,6 +21,7 @@ import io.servicetalk.concurrent.internal.ConcurrentTerminalSubscriber; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Supplier; import javax.annotation.Nullable; import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL; @@ -28,20 +29,20 @@ import static java.util.Objects.requireNonNull; final class TakeUntilPublisher extends AbstractSynchronousPublisherOperator { + private final Supplier until; - private final Completable until; - - TakeUntilPublisher(Publisher original, Completable until) { + TakeUntilPublisher(Publisher original, Supplier until) { super(original); this.until = requireNonNull(until); } @Override public Subscriber apply(Subscriber subscriber) { - return new TakeUntilSubscriber<>(subscriber, until); + return new TakeUntilSubscriber<>(subscriber, until.get()); } private static final class TakeUntilSubscriber implements Subscriber { + @SuppressWarnings("rawtypes") private static final AtomicReferenceFieldUpdater untilCancellableUpdater = AtomicReferenceFieldUpdater.newUpdater(TakeUntilSubscriber.class, Cancellable.class, "untilCancellable"); @@ -77,15 +78,19 @@ public void onSubscribe(Cancellable cancellable) { @Override public void onComplete() { - if (subscriber.processOnComplete()) { + try { cancelDownstreamSubscription(); + } finally { + subscriber.processOnComplete(); } } @Override public void onError(Throwable t) { - if (subscriber.processOnError(t)) { + try { cancelDownstreamSubscription(); + } finally { + subscriber.processOnError(t); } } @@ -104,15 +109,19 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (subscriber.processOnError(t)) { + try { cancelUntil(); + } finally { + subscriber.processOnError(t); } } @Override public void onComplete() { - if (subscriber.processOnComplete()) { + try { cancelUntil(); + } finally { + subscriber.processOnComplete(); } } @@ -128,7 +137,7 @@ private void cancelUntil() { private static final class TakeUntilSubscription extends ConcurrentSubscription { private final Cancellable cancellable; - protected TakeUntilSubscription(final Subscription subscription, Cancellable cancellable) { + private TakeUntilSubscription(final Subscription subscription, Cancellable cancellable) { super(subscription); this.cancellable = cancellable; } diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java index 678485b9d4..6f0740d85d 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java @@ -15,10 +15,19 @@ */ package io.servicetalk.concurrent.api; +import io.servicetalk.concurrent.CompletableSource; +import io.servicetalk.concurrent.PublisherSource.Subscriber; +import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber; import org.junit.jupiter.api.Test; +import org.mockito.stubbing.Answer; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.function.Supplier; + +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static org.hamcrest.MatcherAssert.assertThat; @@ -26,17 +35,21 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.sameInstance; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; class TakeUntilPublisherTest { - private final TestPublisher publisher = new TestPublisher<>(); private final TestPublisherSubscriber subscriber = new TestPublisherSubscriber<>(); private final TestSubscription subscription = new TestSubscription(); @Test void testUntilComplete() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - Publisher p = publisher.takeUntil(completable); + TestCompletable completable = new TestCompletable(); + Publisher p = publisher.takeUntil(() -> completable); toSource(p).subscribe(subscriber); publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(4); @@ -49,8 +62,8 @@ void testUntilComplete() { @Test void testUntilError() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - Publisher p = publisher.takeUntil(completable); + TestCompletable completable = new TestCompletable(); + Publisher p = publisher.takeUntil(() -> completable); toSource(p).subscribe(subscriber); publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(4); @@ -63,8 +76,8 @@ void testUntilError() { @Test void testEmitsError() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - Publisher p = publisher.takeUntil(completable); + TestCompletable completable = new TestCompletable(); + Publisher p = publisher.takeUntil(() -> completable); toSource(p).subscribe(subscriber); subscriber.awaitSubscription().request(4); publisher.onNext("Hello1"); @@ -75,8 +88,8 @@ void testEmitsError() { @Test void testEmitsComplete() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - Publisher p = publisher.takeUntil(completable); + TestCompletable completable = new TestCompletable(); + Publisher p = publisher.takeUntil(() -> completable); toSource(p).subscribe(subscriber); subscriber.awaitSubscription().request(4); publisher.onNext("Hello1"); @@ -85,9 +98,13 @@ void testEmitsComplete() { } @Test - void testSubCancelled() { - LegacyTestCompletable completable = new LegacyTestCompletable(); - Publisher p = publisher.takeUntil(completable); + void testSubCancelled() throws InterruptedException { + TestCancellable cancellable = new TestCancellable(); + TestCompletable completable = new TestCompletable.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(cancellable); + return subscriber1; + }); + Publisher p = publisher.takeUntil(() -> completable); toSource(p).subscribe(subscriber); publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(3); @@ -95,6 +112,67 @@ void testSubCancelled() { assertThat(subscriber.takeOnNext(2), contains("Hello1", "Hello2")); subscriber.awaitSubscription().cancel(); assertTrue(subscription.isCancelled()); - completable.verifyCancelled(); + cancellable.awaitCancelled(); + } + + @Test + void resubscribe() throws InterruptedException { + BlockingQueue processors = new LinkedTransferQueue<>(); + Supplier completableSupplier = () -> { + CompletableSource.Processor processor = Processors.newCompletableProcessor(); + processors.add(processor); + return fromSource(processor); + }; + TestResubscribePublisher resubscribePublisher = new TestResubscribePublisher<>(); + @SuppressWarnings("unchecked") + Subscriber resubscribeSubscriber = mock(Subscriber.class); + @SuppressWarnings("unchecked") + Subscriber subscriber = mock(Subscriber.class); + doAnswer((Answer) invocation -> { + toSource(resubscribePublisher.takeUntil(completableSupplier)).subscribe(subscriber); + return null; + }).when(resubscribeSubscriber).onComplete(); + doAnswer((Answer) invocation -> { + Subscription s = invocation.getArgument(0); + s.request(3); + return null; + }).when(resubscribeSubscriber).onSubscribe(any()); + doAnswer((Answer) invocation -> { + Subscription s = invocation.getArgument(0); + s.request(3); + return null; + }).when(subscriber).onSubscribe(any()); + + toSource(resubscribePublisher.takeUntil(completableSupplier)).subscribe(resubscribeSubscriber); + + TestPublisher testPublisher1 = resubscribePublisher.publisher(); + TestSubscription testSubscription1 = resubscribePublisher.subscription(); + CompletableSource.Processor completable1 = processors.take(); + testSubscription1.awaitRequestN(2); + testPublisher1.onNext("Hello1", "Hello2"); + + verify(resubscribeSubscriber).onNext("Hello1"); + verify(resubscribeSubscriber).onNext("Hello2"); + + completable1.onComplete(); + testSubscription1.awaitCancelled(); + + verify(resubscribeSubscriber).onComplete(); + verify(resubscribeSubscriber, never()).onError(any()); + + verify(subscriber, never()).onNext(any()); + verify(subscriber, never()).onComplete(); + verify(subscriber, never()).onError(any()); + + TestPublisher testPublisher2 = resubscribePublisher.publisher(); + TestSubscription testSubscription2 = resubscribePublisher.subscription(); + CompletableSource.Processor completable2 = processors.take(); + testSubscription2.awaitRequestN(2); + testPublisher2.onNext("Hello3", "Hello4"); + + completable2.onComplete(); + testSubscription2.awaitCancelled(); + verify(subscriber).onComplete(); + verify(subscriber, never()).onError(any()); } } diff --git a/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestResubscribePublisher.java b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestResubscribePublisher.java new file mode 100644 index 0000000000..ddc250d65f --- /dev/null +++ b/servicetalk-concurrent-api/src/testFixtures/java/io/servicetalk/concurrent/api/TestResubscribePublisher.java @@ -0,0 +1,89 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.concurrent.api; + +import io.servicetalk.concurrent.PublisherSource; +import io.servicetalk.concurrent.internal.DuplicateSubscribeException; + +import java.util.concurrent.atomic.AtomicReference; + +import static io.servicetalk.concurrent.internal.SubscriberUtils.deliverErrorFromSource; +import static java.util.Objects.requireNonNull; + +/** + * Wraps {@link TestPublisher} in a way that allows for sequential resubscribes. + * @param The type of {@link TestPublisher}. + */ +public class TestResubscribePublisher extends Publisher { + private final AtomicReference> state = new AtomicReference<>(); + + @Override + protected void handleSubscribe(final PublisherSource.Subscriber subscriber) { + SubscriberState newState = new SubscriberState<>(state, subscriber); + SubscriberState currState = state.get(); + if (state.compareAndSet(null, newState)) { + newState.publisher.subscribe(subscriber); + } else { + deliverErrorFromSource(subscriber, new DuplicateSubscribeException(currState.subscriber, subscriber)); + } + } + + /** + * Get the current {@link TestPublisher}. + * @return the current {@link TestPublisher}. + */ + public TestPublisher publisher() { + return state.get().publisher; + } + + /** + * Get the current {@link TestSubscription}. + * @return the current {@link TestSubscription}. + */ + public TestSubscription subscription() { + return state.get().subscription; + } + + private static final class SubscriberState { + private final PublisherSource.Subscriber subscriber; + private final TestPublisher publisher; + private final TestSubscription subscription; + + SubscriberState(AtomicReference> state, + PublisherSource.Subscriber subscriber) { + this.subscriber = requireNonNull(subscriber); + this.subscription = new TestSubscription(); + this.publisher = new TestPublisher.Builder().disableAutoOnSubscribe().build(subscriber1 -> { + subscriber1.onSubscribe(new PublisherSource.Subscription() { + @Override + public void request(final long n) { + subscription.request(n); + } + + @Override + public void cancel() { + try { + subscription.cancel(); + } finally { + state.compareAndSet(SubscriberState.this, null); + } + } + }); + return subscriber1; + }); + } + } +} diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherTakeUntilTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherTakeUntilTckTest.java index 90772aef9f..1b281c7eca 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherTakeUntilTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherTakeUntilTckTest.java @@ -24,6 +24,6 @@ public class PublisherTakeUntilTckTest extends AbstractPublisherOperatorTckTest { @Override protected Publisher composePublisher(Publisher publisher, int elements) { - return publisher.takeUntil(Completable.never()); + return publisher.takeUntil(Completable::never); } } From 927b62193bdb326f40757a3f02e395b1dd14b542 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Mon, 7 Nov 2022 10:12:11 -0800 Subject: [PATCH 2/3] checkstyle --- .../src/main/java/io/servicetalk/concurrent/api/Publisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index 89b9c709f3..b96321cfb1 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -2224,8 +2224,8 @@ public final Publisher takeWhile(Predicate predicate) { * * @param until {@link Completable}, termination of which, terminates the returned {@link Publisher}. * @return A {@link Publisher} that only emits the items till {@code until} {@link Completable} is completed. - * @deprecated Use {@link #takeUntil(Supplier)}. * @see ReactiveX takeUntil operator. + * @deprecated Use {@link #takeUntil(Supplier)}. */ @Deprecated public final Publisher takeUntil(Completable until) { From 175ff5bf9473e61acd4b2d24d0bf5644288f51c7 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Tue, 8 Nov 2022 14:15:40 -0800 Subject: [PATCH 3/3] remove supplier --- .../servicetalk/concurrent/api/Publisher.java | 33 +++---------------- .../concurrent/api/TakeUntilPublisher.java | 7 ++-- .../api/TakeUntilPublisherTest.java | 26 +++++++-------- .../tck/PublisherTakeUntilTckTest.java | 2 +- 4 files changed, 21 insertions(+), 47 deletions(-) diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java index b96321cfb1..19e5861a30 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/Publisher.java @@ -2222,39 +2222,14 @@ public final Publisher takeWhile(Predicate predicate) { * return results; * } * - * @param until {@link Completable}, termination of which, terminates the returned {@link Publisher}. + * @param until {@link Completable}, termination of which, terminates the returned {@link Publisher}. If this + * {@link Publisher} maybe resubscribed the {@link Completable} should be also, or use + * {@link Publisher#defer(Supplier)} to create a new {@link Completable} on each subscribe. * @return A {@link Publisher} that only emits the items till {@code until} {@link Completable} is completed. * @see ReactiveX takeUntil operator. - * @deprecated Use {@link #takeUntil(Supplier)}. */ - @Deprecated public final Publisher takeUntil(Completable until) { - return takeUntil(() -> until); - } - - /** - * Takes elements until {@link Completable} is terminated successfully or with failure. - *

- * This method provides a means to take a limited number of results from this {@link Publisher} and in sequential - * programming is similar to: - *

{@code
-     *     List results = ...;
-     *     for (T t : resultOfThisPublisher()) {
-     *         if (isCompleted(until)) {
-     *             break;
-     *         }
-     *         takeResults.add(t);
-     *     }
-     *     return results;
-     * }
- * - * @param untilSupplier {@link Supplier} that is invoked on each subscribe that provides a {@link Completable}, - * termination of which, terminates the returned {@link Publisher}. - * @return A {@link Publisher} that only emits the items till {@code until} {@link Completable} is completed. - * @see ReactiveX takeUntil operator. - */ - public final Publisher takeUntil(Supplier untilSupplier) { - return new TakeUntilPublisher<>(this, untilSupplier); + return new TakeUntilPublisher<>(this, until); } /** diff --git a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java index fc610775b7..977ea86daf 100644 --- a/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java +++ b/servicetalk-concurrent-api/src/main/java/io/servicetalk/concurrent/api/TakeUntilPublisher.java @@ -21,7 +21,6 @@ import io.servicetalk.concurrent.internal.ConcurrentTerminalSubscriber; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.Supplier; import javax.annotation.Nullable; import static io.servicetalk.concurrent.Cancellable.IGNORE_CANCEL; @@ -29,16 +28,16 @@ import static java.util.Objects.requireNonNull; final class TakeUntilPublisher extends AbstractSynchronousPublisherOperator { - private final Supplier until; + private final Completable until; - TakeUntilPublisher(Publisher original, Supplier until) { + TakeUntilPublisher(Publisher original, Completable until) { super(original); this.until = requireNonNull(until); } @Override public Subscriber apply(Subscriber subscriber) { - return new TakeUntilSubscriber<>(subscriber, until.get()); + return new TakeUntilSubscriber<>(subscriber, until); } private static final class TakeUntilSubscriber implements Subscriber { diff --git a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java index 6f0740d85d..c29dc70b48 100644 --- a/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java +++ b/servicetalk-concurrent-api/src/test/java/io/servicetalk/concurrent/api/TakeUntilPublisherTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedTransferQueue; -import java.util.function.Supplier; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; @@ -49,7 +48,7 @@ class TakeUntilPublisherTest { @Test void testUntilComplete() { TestCompletable completable = new TestCompletable(); - Publisher p = publisher.takeUntil(() -> completable); + Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(4); @@ -63,7 +62,7 @@ void testUntilComplete() { @Test void testUntilError() { TestCompletable completable = new TestCompletable(); - Publisher p = publisher.takeUntil(() -> completable); + Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(4); @@ -77,7 +76,7 @@ void testUntilError() { @Test void testEmitsError() { TestCompletable completable = new TestCompletable(); - Publisher p = publisher.takeUntil(() -> completable); + Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); subscriber.awaitSubscription().request(4); publisher.onNext("Hello1"); @@ -89,7 +88,7 @@ void testEmitsError() { @Test void testEmitsComplete() { TestCompletable completable = new TestCompletable(); - Publisher p = publisher.takeUntil(() -> completable); + Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); subscriber.awaitSubscription().request(4); publisher.onNext("Hello1"); @@ -104,7 +103,7 @@ void testSubCancelled() throws InterruptedException { subscriber1.onSubscribe(cancellable); return subscriber1; }); - Publisher p = publisher.takeUntil(() -> completable); + Publisher p = publisher.takeUntil(completable); toSource(p).subscribe(subscriber); publisher.onSubscribe(subscription); subscriber.awaitSubscription().request(3); @@ -117,19 +116,20 @@ void testSubCancelled() throws InterruptedException { @Test void resubscribe() throws InterruptedException { - BlockingQueue processors = new LinkedTransferQueue<>(); - Supplier completableSupplier = () -> { + // Intentionally have publisher outside the defer, we need to extract the TestPublisher from each subscribe. + final TestResubscribePublisher resubscribePublisher = new TestResubscribePublisher<>(); + final BlockingQueue processors = new LinkedTransferQueue<>(); + Publisher publisher = Publisher.defer(() -> { CompletableSource.Processor processor = Processors.newCompletableProcessor(); processors.add(processor); - return fromSource(processor); - }; - TestResubscribePublisher resubscribePublisher = new TestResubscribePublisher<>(); + return resubscribePublisher.takeUntil(fromSource(processor)); + }); @SuppressWarnings("unchecked") Subscriber resubscribeSubscriber = mock(Subscriber.class); @SuppressWarnings("unchecked") Subscriber subscriber = mock(Subscriber.class); doAnswer((Answer) invocation -> { - toSource(resubscribePublisher.takeUntil(completableSupplier)).subscribe(subscriber); + toSource(publisher).subscribe(subscriber); return null; }).when(resubscribeSubscriber).onComplete(); doAnswer((Answer) invocation -> { @@ -143,7 +143,7 @@ void resubscribe() throws InterruptedException { return null; }).when(subscriber).onSubscribe(any()); - toSource(resubscribePublisher.takeUntil(completableSupplier)).subscribe(resubscribeSubscriber); + toSource(publisher).subscribe(resubscribeSubscriber); TestPublisher testPublisher1 = resubscribePublisher.publisher(); TestSubscription testSubscription1 = resubscribePublisher.subscription(); diff --git a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherTakeUntilTckTest.java b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherTakeUntilTckTest.java index 1b281c7eca..90772aef9f 100644 --- a/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherTakeUntilTckTest.java +++ b/servicetalk-concurrent-reactivestreams/src/test/java/io/servicetalk/concurrent/reactivestreams/tck/PublisherTakeUntilTckTest.java @@ -24,6 +24,6 @@ public class PublisherTakeUntilTckTest extends AbstractPublisherOperatorTckTest { @Override protected Publisher composePublisher(Publisher publisher, int elements) { - return publisher.takeUntil(Completable::never); + return publisher.takeUntil(Completable.never()); } }