Skip to content

Commit

Permalink
Fix mop producer publish metric (#1553)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- authored Dec 11, 2024
1 parent 4764775 commit 6407272
Show file tree
Hide file tree
Showing 12 changed files with 121 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public void doProcessConnect(MqttAdapterMessage adapterMsg, String userRole,
.clientRestrictions(clientRestrictions)
.serverRestrictions(serverRestrictions)
.authData(authData)
.serverCnx(serverCnx)
.channel(channel)
.connectMessage(msg)
.connectionManager(connectionManager)
Expand Down Expand Up @@ -295,6 +296,7 @@ public void processDisconnect(MqttAdapterMessage adapterMsg) {
if (log.isDebugEnabled()) {
log.debug("[Disconnect] [{}] ", clientId);
}
qosPublishHandlers.qos0().closeProducer(connection);
// If client update session timeout interval property.
Optional<Integer> newSessionExpireInterval;
if ((newSessionExpireInterval = MqttPropertyUtils
Expand Down Expand Up @@ -328,6 +330,7 @@ public void processConnectionLost() {
if (log.isDebugEnabled()) {
log.debug("[Connection Lost] [{}] ", clientId);
}
qosPublishHandlers.qos0().closeProducer(connection);
metricsCollector.removeClient(NettyUtils.getAddress(channel));
WillMessage willMessage = connection.getWillMessage();
if (willMessage != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.streamnative.pulsar.handlers.mqtt.broker.qos;

import static io.streamnative.pulsar.handlers.mqtt.broker.impl.PulsarMessageConverter.toPulsarMsg;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration;
Expand All @@ -30,10 +31,12 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.MessageImpl;
Expand All @@ -42,12 +45,14 @@
/**
* Abstract class for publish handler.
*/
@Slf4j
public abstract class AbstractQosPublishHandler implements QosPublishHandler {

protected final PulsarService pulsarService;
protected final RetainedMessageHandler retainedMessageHandler;
protected final MQTTServerConfiguration configuration;
private final ConcurrentHashMap<String, Long> sequenceIdMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Producer> producerMap = new ConcurrentHashMap<>();


protected AbstractQosPublishHandler(MQTTService mqttService) {
Expand Down Expand Up @@ -109,6 +114,19 @@ protected CompletableFuture<Position> writeToPulsarTopic(Connection connection,
}
return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> {
long lastPublishedSequenceId = -1;
Producer producer = producerMap.compute(producerName, (k, v) -> {
if (v == null) {
v = MQTTProducer.create(topic, connection.getServerCnx(), producerName);
final CompletableFuture<Optional<Long>> producerFuture =
topic.addProducer(v, new CompletableFuture<>());
producerFuture.whenComplete((r, e) -> {
if (e != null) {
log.error("Failed to add producer", e);
}
});
}
return v;
});
if (topic instanceof PersistentTopic) {
final long lastPublishedId = ((PersistentTopic) topic).getLastPublishedSequenceId(producerName);
lastPublishedSequenceId = sequenceIdMap.compute(producerName, (k, v) -> {
Expand All @@ -121,12 +139,14 @@ protected CompletableFuture<Position> writeToPulsarTopic(Connection connection,
return id;
});
}
final ByteBuf payload = msg.payload();
MessageImpl<byte[]> message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(),
msg.payload().nioBuffer());
payload.nioBuffer());
CompletableFuture<Position> ret = MessagePublishContext.publishMessages(producerName, message,
lastPublishedSequenceId, topic);
message.recycle();
return ret.thenApply(position -> {
topic.incrementPublishCount(producer, 1, payload.readableBytes());
if (checkSubscription && topic.getSubscriptions().isEmpty()) {
throw new MQTTNoMatchingSubscriberException(mqttTopicName);
}
Expand All @@ -135,4 +155,12 @@ protected CompletableFuture<Position> writeToPulsarTopic(Connection connection,
}).orElseGet(() -> FutureUtil.failedFuture(
new BrokerServiceException.TopicNotFoundException(mqttTopicName))));
}

@Override
public void closeProducer(Connection connection) {
final Producer producer = producerMap.remove(connection.getClientId());
if (producer != null) {
producer.close(true);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.mqtt.broker.qos;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;

public class MQTTProducer extends Producer {

public static final AtomicLong PRODUCER_ID = new AtomicLong();

public MQTTProducer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName, ProducerAccessMode accessMode, Optional<Long> topicEpoch,
boolean supportsPartialProducer) {
super(topic, cnx, producerId, producerName, appId, isEncrypted, metadata, schemaVersion, epoch,
userProvidedProducerName, accessMode, topicEpoch, supportsPartialProducer);
}

public static MQTTProducer create(Topic topic, TransportCnx cnx, String producerName) {
return new MQTTProducer(topic, cnx, PRODUCER_ID.incrementAndGet(), producerName, "",
false, null, SchemaVersion.Latest, 0, true,
ProducerAccessMode.Shared, Optional.empty(), true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@
public interface QosPublishHandler {

CompletableFuture<Void> publish(Connection connection, MqttAdapterMessage msg);

void closeProducer(Connection connection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.streamnative.pulsar.handlers.mqtt.broker.qos;


/**
* Qos publish handlers for different Qos message publish.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ public QosPublishHandler qos1() {
public QosPublishHandler qos2() {
return this.qos2Handler;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.ServerCnx;

/**
* Value object to maintain the information of single connection, like ClientID, Channel, and clean
Expand Down Expand Up @@ -90,6 +91,9 @@ public class Connection {
@Getter
private AuthenticationDataSource authData;

@Getter
private ServerCnx serverCnx;

@Getter
private final boolean fromProxy;
private volatile ConnectionState connectionState = DISCONNECTED;
Expand Down Expand Up @@ -118,6 +122,7 @@ public class Connection {
this.processor = builder.processor;
this.fromProxy = builder.fromProxy;
this.authData = builder.authData;
this.serverCnx = builder.serverCnx;
this.channel.attr(AUTH_DATA_ATTRIBUTE_KEY).set(authData);
this.addIdleStateHandler();
this.manager.addConnection(this);
Expand Down Expand Up @@ -309,6 +314,7 @@ public static class ConnectionBuilder {
private boolean fromProxy;

private AuthenticationDataSource authData;
private ServerCnx serverCnx;

public ConnectionBuilder protocolVersion(int protocolVersion) {
this.protocolVersion = protocolVersion;
Expand Down Expand Up @@ -370,6 +376,11 @@ public ConnectionBuilder authData(AuthenticationDataSource authData) {
return this;
}

public ConnectionBuilder serverCnx(ServerCnx serverCnx) {
this.serverCnx = serverCnx;
return this;
}

public Connection build() {
return new Connection(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.streamnative.pulsar.handlers.mqtt.common.Connection;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
import io.streamnative.pulsar.handlers.mqtt.common.utils.FutureUtils;
Expand All @@ -39,14 +40,18 @@ public AdapterChannel(MQTTProxyAdapter adapter,
this.channelFuture = channelFuture;
}

public CompletableFuture<Void> writeAndFlush(final MqttAdapterMessage adapterMsg) {
public CompletableFuture<Void> writeAndFlush(final Connection connection, final MqttAdapterMessage adapterMsg) {
checkArgument(StringUtils.isNotBlank(adapterMsg.getClientId()), "clientId is blank");
final String clientId = adapterMsg.getClientId();
adapterMsg.setEncodeType(MqttAdapterMessage.EncodeType.ADAPTER_MESSAGE);
CompletableFuture<Void> future = channelFuture.thenCompose(channel -> {
if (!channel.isActive()) {
channelFuture = adapter.getChannel(broker);
return writeAndFlush(adapterMsg);
if (log.isDebugEnabled()) {
log.debug("channel is inactive, re-create channel to broker : {}", broker);
}
return writeConnectMessage(connection)
.thenCompose(__ -> writeAndFlush(connection, adapterMsg));
}
return FutureUtils.completableFuture(channel.writeAndFlush(adapterMsg));
});
Expand All @@ -58,6 +63,11 @@ public CompletableFuture<Void> writeAndFlush(final MqttAdapterMessage adapterMsg
return future;
}

private CompletableFuture<Void> writeConnectMessage(final Connection connection) {
final MqttConnectMessage connectMessage = connection.getConnectMessage();
return writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(), connectMessage));
}

/**
* When client subscribes, the adapter channel maybe close in exception, so register listener to close the
* related client channel and trigger reconnection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void processPingReq(final MqttAdapterMessage msg) {
topicBrokers.values().forEach(adapterChannel -> {
adapterChannel.thenAccept(channel -> {
msg.setClientId(clientId);
channel.writeAndFlush(msg);
channel.writeAndFlush(connection, msg);
});
});
}
Expand All @@ -269,7 +269,7 @@ public void processDisconnect(final MqttAdapterMessage msg) {
.filter(future -> !future.isCompletedExceptionally())
.map(CompletableFuture::join)
.collect(Collectors.toSet())
.forEach(channel -> channel.writeAndFlush(msg)));
.forEach(channel -> channel.writeAndFlush(connection, msg)));
} else {
if (log.isDebugEnabled()) {
log.debug("Disconnect is already triggered, ignore");
Expand Down Expand Up @@ -464,7 +464,7 @@ public void processUnSubscribe(final MqttAdapterMessage adapter) {

private CompletableFuture<Void> writeToBroker(final String topic, final MqttAdapterMessage msg) {
CompletableFuture<AdapterChannel> proxyExchanger = connectToBroker(topic);
return proxyExchanger.thenCompose(exchanger -> exchanger.writeAndFlush(msg));
return proxyExchanger.thenCompose(exchanger -> exchanger.writeAndFlush(connection, msg));
}

private CompletableFuture<AdapterChannel> connectToBroker(final String topic) {
Expand All @@ -473,8 +473,7 @@ private CompletableFuture<AdapterChannel> connectToBroker(final String topic) {
adapterChannels.computeIfAbsent(mqttBroker, key1 -> {
AdapterChannel adapterChannel = proxyAdapter.getAdapterChannel(mqttBroker);
final MqttConnectMessage connectMessage = connection.getConnectMessage();

adapterChannel.writeAndFlush(new MqttAdapterMessage(connection.getClientId(),
adapterChannel.writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(),
connectMessage));
return adapterChannel;
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
import io.streamnative.pulsar.handlers.mqtt.common.Connection;
import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration;
import io.streamnative.pulsar.handlers.mqtt.common.MQTTConnectionManager;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage;
import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ClientRestrictions;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService;
import io.streamnative.pulsar.handlers.mqtt.proxy.channel.AdapterChannel;
Expand All @@ -38,7 +41,6 @@
import org.testng.annotations.Test;



public class AdapterChannelTest extends MQTTTestBase {

@Override
Expand Down Expand Up @@ -74,7 +76,13 @@ public void testAdapterChannelAutoGetConnection() throws InterruptedException {
MqttConnectMessage fakeConnectMessage = MqttMessageBuilders.connect()
.clientId(clientId).build();
MqttAdapterMessage mqttAdapterMessage = new MqttAdapterMessage(clientId, fakeConnectMessage);
adapterChannel.writeAndFlush(mqttAdapterMessage).join();
Connection connection = Connection.builder()
.channel(channel)
.clientRestrictions(ClientRestrictions.builder().keepAliveTime(10).build())
.connectionManager(Mockito.mock(MQTTConnectionManager.class))
.clientId(clientId)
.connectMessage(fakeConnectMessage).build();
adapterChannel.writeAndFlush(connection, mqttAdapterMessage).join();
CompletableFuture<Channel> channelFutureAfterSend = brokerChannels.get(key);
Channel channelAfterSend = channelFutureAfterSend.join();
assertNotEquals(channelAfterSend.id(), previousChannelId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,24 +346,27 @@ public void testPubAndSubWithDifferentTopics() {
@Test
public void testTopicUnload() throws Exception {
MQTT mqttConsumer = createMQTTProxyClient();
mqttConsumer.setClientId("client-consumer");
BlockingConnection consumer = mqttConsumer.blockingConnection();
consumer.connect();
String topicName1 = "topic-unload-1";
Topic[] topic1 = { new Topic(topicName1, QoS.AT_MOST_ONCE)};
consumer.subscribe(topic1);
MQTT mqttProducer = createMQTTProxyClient();
mqttProducer.setClientId("client-producer");
BlockingConnection producer = mqttProducer.blockingConnection();
producer.connect();
String msg1 = "hello topic1";
producer.publish(topicName1, msg1.getBytes(StandardCharsets.UTF_8), QoS.AT_MOST_ONCE, false);
Message receive1 = consumer.receive();
Message receive1 = consumer.receive(10, TimeUnit.SECONDS);
Assert.assertEquals(new String(receive1.getPayload()), msg1);
Assert.assertEquals(receive1.getTopic(), topicName1);
admin.topics().unload(topicName1);
Thread.sleep(5000);
log.info("unloaded topic : {}", topicName1);
producer.publish(topicName1, msg1.getBytes(StandardCharsets.UTF_8), QoS.AT_MOST_ONCE, false);
producer.disconnect();
Message receive2 = consumer.receive();
// producer.disconnect();
Message receive2 = consumer.receive(10, TimeUnit.SECONDS);
Assert.assertEquals(new String(receive2.getPayload()), msg1);
Assert.assertEquals(receive2.getTopic(), topicName1);
consumer.disconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ public void testBrokerThrowServiceNotReadyException() throws Exception {
.collect(Collectors.toList()))
.build();

admin.clusters().createNamespaceIsolationPolicy("test", "policy-1", isolationData);
try {
final Mqtt5PublishResult r2 = client.publishWith()
.topic(topic1)
Expand All @@ -104,7 +103,6 @@ public void testBrokerThrowServiceNotReadyException() throws Exception {
.send();
Assert.assertFalse(r3.getError().isPresent());
client.disconnect();
admin.clusters().deleteNamespaceIsolationPolicy("test", "policy-1");
}

@Test(invocationCount = 2)
Expand Down

0 comments on commit 6407272

Please sign in to comment.