Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Client interceptors close method not called after producer/consumer/reader close #23830

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.client.api;

import com.google.common.collect.Sets;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1088,6 +1090,135 @@ public void onAckTimeoutSend(Consumer<String> consumer, Set<MessageId> messageId
consumer.close();
}

@Test(dataProvider = "topicPartition")
public void testProducerInterceptorClose(int partitions) throws PulsarClientException, PulsarAdminException {
final String topic = "persistent://my-property/my-ns/my-topic";
if (partitions > 0) {
admin.topics().createPartitionedTopic(topic, partitions);
} else {
admin.topics().createNonPartitionedTopic(topic);
}

AtomicInteger closeCount = new AtomicInteger(0);
org.apache.pulsar.client.api.interceptor.ProducerInterceptor interceptor = new ProducerInterceptorAdaptor() {
@Override
public void close() {
closeCount.incrementAndGet();
}
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.intercept(interceptor)
.create();
for (int i = 0; i < 10; i++) {
producer.newMessage().value("Hello Pulsar!").send();
}
producer.close();
Assert.assertEquals(closeCount.get(), 1);
}

@Test(dataProvider = "topicPartition")
public void testConsumerInterceptorClose(int partitions) throws PulsarClientException, PulsarAdminException {
final String topic = "persistent://my-property/my-ns/my-topic";
if (partitions > 0) {
admin.topics().createPartitionedTopic(topic, partitions);
} else {
admin.topics().createNonPartitionedTopic(topic);
}

AtomicInteger closeCount = new AtomicInteger(0);
ConsumerInterceptor<String> interceptor = new ConsumerInterceptorAdaptor<String>() {
@Override
public void close() {
closeCount.incrementAndGet();
}
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("my-subscription")
.intercept(interceptor)
.subscribe();
for (int i = 0; i < 10; i++) {
producer.newMessage().value("Hello Pulsar!").send();
}
for (int i = 0; i < 10; i++) {
consumer.acknowledge(consumer.receive());
}
producer.close();
consumer.close();
Assert.assertEquals(closeCount.get(), 1);
}

@Test
public void testZeroQueueConsumerInterceptorClose() throws PulsarClientException, PulsarAdminException {
final String topic = "persistent://my-property/my-ns/my-topic";
admin.topics().createNonPartitionedTopic(topic);

AtomicInteger closeCount = new AtomicInteger(0);
ConsumerInterceptor<String> interceptor = new ConsumerInterceptorAdaptor<String>() {
@Override
public void close() {
closeCount.incrementAndGet();
}
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("my-subscription")
.receiverQueueSize(0)
.intercept(interceptor)
.subscribe();
for (int i = 0; i < 10; i++) {
producer.newMessage().value("Hello Pulsar!").send();
}
for (int i = 0; i < 10; i++) {
consumer.acknowledge(consumer.receive());
}
producer.close();
consumer.close();
Assert.assertEquals(closeCount.get(), 1);
}

@Test(dataProvider = "topicPartition")
public void testReaderInterceptorClose(int partitions) throws IOException, PulsarAdminException {
final String topic = "persistent://my-property/my-ns/my-topic";
if (partitions > 0) {
admin.topics().createPartitionedTopic(topic, partitions);
} else {
admin.topics().createNonPartitionedTopic(topic);
}

AtomicInteger closeCount = new AtomicInteger(0);
ReaderInterceptor<String> interceptor = new ReaderInterceptorAdaptor<String>() {
@Override
public void close() {
closeCount.incrementAndGet();
}
};
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.intercept(interceptor)
.create();
for (int i = 0; i < 10; i++) {
producer.newMessage().value("Hello Pulsar!").send();
}
for (int i = 0; i < 10; i++) {
Message<String> discard = reader.readNext();
}
producer.close();
reader.close();
Assert.assertEquals(closeCount.get(), 1);
}

private void produceAndConsume(int msgCount, Producer<byte[]> producer, Reader<byte[]> reader)
throws PulsarClientException {
for (int i = 0; i < msgCount; i++) {
Expand All @@ -1100,4 +1231,64 @@ private void produceAndConsume(int msgCount, Producer<byte[]> producer, Reader<b
}
}

static class ProducerInterceptorAdaptor implements org.apache.pulsar.client.api.interceptor.ProducerInterceptor {

@Override
public void close() {
}

@Override
public boolean eligible(Message message) {
return true;
}

@Override
public Message beforeSend(Producer producer, Message message) {
return message;
}

@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
}
}

static class ConsumerInterceptorAdaptor<T> implements ConsumerInterceptor<T> {

@Override
public void close() {
}

@Override
public Message<T> beforeConsume(Consumer<T> consumer, Message<T> message) {
return message;
}

@Override
public void onAcknowledge(Consumer<T> consumer, MessageId messageId, Throwable exception) {
}

@Override
public void onAcknowledgeCumulative(Consumer<T> consumer, MessageId messageId, Throwable exception) {
}

@Override
public void onNegativeAcksSend(Consumer<T> consumer, Set<MessageId> messageIds) {
}

@Override
public void onAckTimeoutSend(Consumer<T> consumer, Set<MessageId> messageIds) {
}
}

static class ReaderInterceptorAdaptor<T> implements ReaderInterceptor<T> {

@Override
public void close() {
}

@Override
public Message<T> beforeRead(Reader<T> reader, Message<T> message) {
return message;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import lombok.Getter;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
Expand Down Expand Up @@ -1142,7 +1143,14 @@ public synchronized CompletableFuture<Void> closeAsync() {
}

ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
closeFutures.add(closeFuture);
closeFutures.add(closeFuture.whenCompleteAsync((nil, ex) -> {
if (interceptors != null) {
interceptors.close();
}
if (ex != null) {
ExceptionUtils.rethrow(ex);
}
}, internalPinnedExecutor));
if (retryLetterProducer != null) {
closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> {
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ public void onPartitionsChange(String topicName, int partitions) {
}

@Override
public void close() throws IOException {
for (int i = 0, interceptorsSize = interceptors.size(); i < interceptorsSize; i++) {
public void close() {
for (ConsumerInterceptor<T> interceptor : interceptors) {
try {
interceptors.get(i).close();
interceptor.close();
} catch (Throwable e) {
log.error("Fail to close consumer interceptor ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -659,7 +660,14 @@ public CompletableFuture<Void> closeAsync() {
}
});

return closeFuture;
return closeFuture.whenCompleteAsync((nil, ex) -> {
if (interceptors != null) {
interceptors.close();
}
if (ex != null) {
ExceptionUtils.rethrow(ex);
}
}, client.getInternalExecutorService());
}

private void cleanupMultiConsumer() {
Expand Down Expand Up @@ -1650,8 +1658,7 @@ public void onPartitionsChange(String topicName, int partitions) {
}

@Override
public void close() throws IOException {
multiTopicInterceptors.close();
public void close() {
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
Expand All @@ -66,6 +67,7 @@ public class PartitionedProducerImpl<T> extends ProducerBase<T> {
private final Map<Integer, ProducerImpl<T>> producers = new ConcurrentHashMap<>();
private final MessageRouter routerPolicy;
private final PartitionedTopicProducerStatsRecorderImpl stats;
private final ProducerInterceptors internalProducerInterceptors;
private TopicMetadata topicMetadata;
private final int firstPartitionIndex;
private String overrideProducerName;
Expand All @@ -81,6 +83,9 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo
super(client, topic, conf, producerCreatedFuture, schema, interceptors);
this.topicMetadata = new TopicMetadataImpl(numPartitions);
this.routerPolicy = getMessageRouter();
this.internalProducerInterceptors = interceptors != null
? createInternalProducerInterceptors(interceptors)
: null;
stats = client.getConfiguration().getStatsIntervalSeconds() > 0
? new PartitionedTopicProducerStatsRecorderImpl()
: null;
Expand Down Expand Up @@ -221,7 +226,7 @@ private ProducerImpl<T> createProducer(final int partitionIndex, final Optional<
return producers.computeIfAbsent(partitionIndex, (idx) -> {
String partitionName = TopicName.get(topic).getPartition(idx).toString();
return client.newProducerImpl(partitionName, idx,
conf, schema, interceptors, new CompletableFuture<>(), overrideProducerName);
conf, schema, internalProducerInterceptors, new CompletableFuture<>(), overrideProducerName);
});
}

Expand Down Expand Up @@ -347,14 +352,19 @@ public CompletableFuture<Void> closeAsync() {
log.error("[{}] Could not close Partitioned Producer", topic, closeFail.get().getCause());
}
}

return null;
});
}

}

return closeFuture;
return closeFuture.whenCompleteAsync((nil, ex) -> {
if (interceptors != null) {
interceptors.close();
}
if (ex != null) {
ExceptionUtils.rethrow(ex);
}
}, client.getInternalExecutorService());
}

@Override
Expand Down Expand Up @@ -462,6 +472,30 @@ public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtende
}
}

private ProducerInterceptors createInternalProducerInterceptors(ProducerInterceptors interceptors) {
return new ProducerInterceptors(Collections.emptyList()) {
@Override
public Message beforeSend(Producer producer, Message message) {
return interceptors.beforeSend(producer, message);
}

@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) {
interceptors.onSendAcknowledgement(producer, message, msgId, exception);
}

@Override
public void onPartitionsChange(String topicName, int partitions) {
// do nothing
}

@Override
public void close() {
// do nothing
}
};
}

private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -1192,7 +1193,14 @@ public CompletableFuture<Void> closeAsync() {
return null;
});

return closeFuture;
return closeFuture.whenCompleteAsync((nil, ex) -> {
if (interceptors != null) {
interceptors.close();
}
if (ex != null) {
ExceptionUtils.rethrow(ex);
}
}, client.getInternalExecutorService());
}

@VisibleForTesting
Expand Down
Loading
Loading