diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java index d8c20ae5..c4f6250b 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/processor/MQTTBrokerProtocolMethodProcessor.java @@ -152,6 +152,7 @@ public void doProcessConnect(MqttAdapterMessage adapterMsg, String userRole, .clientRestrictions(clientRestrictions) .serverRestrictions(serverRestrictions) .authData(authData) + .serverCnx(serverCnx) .channel(channel) .connectMessage(msg) .connectionManager(connectionManager) @@ -295,6 +296,7 @@ public void processDisconnect(MqttAdapterMessage adapterMsg) { if (log.isDebugEnabled()) { log.debug("[Disconnect] [{}] ", clientId); } + qosPublishHandlers.qos0().closeProducer(connection); // If client update session timeout interval property. Optional newSessionExpireInterval; if ((newSessionExpireInterval = MqttPropertyUtils @@ -328,6 +330,7 @@ public void processConnectionLost() { if (log.isDebugEnabled()) { log.debug("[Connection Lost] [{}] ", clientId); } + qosPublishHandlers.qos0().closeProducer(connection); metricsCollector.removeClient(NettyUtils.getAddress(channel)); WillMessage willMessage = connection.getWillMessage(); if (willMessage != null) { diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java index 716c126a..19654745 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/AbstractQosPublishHandler.java @@ -14,6 +14,7 @@ package io.streamnative.pulsar.handlers.mqtt.broker.qos; import static io.streamnative.pulsar.handlers.mqtt.broker.impl.PulsarMessageConverter.toPulsarMsg; +import io.netty.buffer.ByteBuf; import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration; @@ -30,10 +31,12 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerServiceException; +import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.MessageImpl; @@ -42,12 +45,14 @@ /** * Abstract class for publish handler. */ +@Slf4j public abstract class AbstractQosPublishHandler implements QosPublishHandler { protected final PulsarService pulsarService; protected final RetainedMessageHandler retainedMessageHandler; protected final MQTTServerConfiguration configuration; private final ConcurrentHashMap sequenceIdMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap producerMap = new ConcurrentHashMap<>(); protected AbstractQosPublishHandler(MQTTService mqttService) { @@ -109,6 +114,19 @@ protected CompletableFuture writeToPulsarTopic(Connection connection, } return getTopicReference(mqttTopicName).thenCompose(topicOp -> topicOp.map(topic -> { long lastPublishedSequenceId = -1; + Producer producer = producerMap.compute(producerName, (k, v) -> { + if (v == null) { + v = MQTTProducer.create(topic, connection.getServerCnx(), producerName); + final CompletableFuture> producerFuture = + topic.addProducer(v, new CompletableFuture<>()); + producerFuture.whenComplete((r, e) -> { + if (e != null) { + log.error("Failed to add producer", e); + } + }); + } + return v; + }); if (topic instanceof PersistentTopic) { final long lastPublishedId = ((PersistentTopic) topic).getLastPublishedSequenceId(producerName); lastPublishedSequenceId = sequenceIdMap.compute(producerName, (k, v) -> { @@ -121,12 +139,14 @@ protected CompletableFuture writeToPulsarTopic(Connection connection, return id; }); } + final ByteBuf payload = msg.payload(); MessageImpl message = toPulsarMsg(configuration, topic, msg.variableHeader().properties(), - msg.payload().nioBuffer()); + payload.nioBuffer()); CompletableFuture ret = MessagePublishContext.publishMessages(producerName, message, lastPublishedSequenceId, topic); message.recycle(); return ret.thenApply(position -> { + topic.incrementPublishCount(producer, 1, payload.readableBytes()); if (checkSubscription && topic.getSubscriptions().isEmpty()) { throw new MQTTNoMatchingSubscriberException(mqttTopicName); } @@ -135,4 +155,12 @@ protected CompletableFuture writeToPulsarTopic(Connection connection, }).orElseGet(() -> FutureUtil.failedFuture( new BrokerServiceException.TopicNotFoundException(mqttTopicName)))); } + + @Override + public void closeProducer(Connection connection) { + final Producer producer = producerMap.remove(connection.getClientId()); + if (producer != null) { + producer.close(true); + } + } } diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java new file mode 100644 index 00000000..a71fe521 --- /dev/null +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/MQTTProducer.java @@ -0,0 +1,42 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.mqtt.broker.qos; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.TransportCnx; +import org.apache.pulsar.common.api.proto.ProducerAccessMode; +import org.apache.pulsar.common.protocol.schema.SchemaVersion; + +public class MQTTProducer extends Producer { + + public static final AtomicLong PRODUCER_ID = new AtomicLong(); + + public MQTTProducer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId, + boolean isEncrypted, Map metadata, SchemaVersion schemaVersion, long epoch, + boolean userProvidedProducerName, ProducerAccessMode accessMode, Optional topicEpoch, + boolean supportsPartialProducer) { + super(topic, cnx, producerId, producerName, appId, isEncrypted, metadata, schemaVersion, epoch, + userProvidedProducerName, accessMode, topicEpoch, supportsPartialProducer); + } + + public static MQTTProducer create(Topic topic, TransportCnx cnx, String producerName) { + return new MQTTProducer(topic, cnx, PRODUCER_ID.incrementAndGet(), producerName, "", + false, null, SchemaVersion.Latest, 0, true, + ProducerAccessMode.Shared, Optional.empty(), true); + } +} diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandler.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandler.java index 1007269d..d0b41382 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandler.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandler.java @@ -23,4 +23,6 @@ public interface QosPublishHandler { CompletableFuture publish(Connection connection, MqttAdapterMessage msg); + + void closeProducer(Connection connection); } diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlers.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlers.java index 48abab39..ee219794 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlers.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlers.java @@ -13,7 +13,6 @@ */ package io.streamnative.pulsar.handlers.mqtt.broker.qos; - /** * Qos publish handlers for different Qos message publish. */ diff --git a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlersImpl.java b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlersImpl.java index 99c1bde2..111511f4 100644 --- a/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlersImpl.java +++ b/mqtt-broker/src/main/java/io/streamnative/pulsar/handlers/mqtt/broker/qos/QosPublishHandlersImpl.java @@ -44,4 +44,6 @@ public QosPublishHandler qos1() { public QosPublishHandler qos2() { return this.qos2Handler; } + + } diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java index b631ec1a..96cecfed 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/Connection.java @@ -50,6 +50,7 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.service.ServerCnx; /** * Value object to maintain the information of single connection, like ClientID, Channel, and clean @@ -90,6 +91,9 @@ public class Connection { @Getter private AuthenticationDataSource authData; + @Getter + private ServerCnx serverCnx; + @Getter private final boolean fromProxy; private volatile ConnectionState connectionState = DISCONNECTED; @@ -118,6 +122,7 @@ public class Connection { this.processor = builder.processor; this.fromProxy = builder.fromProxy; this.authData = builder.authData; + this.serverCnx = builder.serverCnx; this.channel.attr(AUTH_DATA_ATTRIBUTE_KEY).set(authData); this.addIdleStateHandler(); this.manager.addConnection(this); @@ -309,6 +314,7 @@ public static class ConnectionBuilder { private boolean fromProxy; private AuthenticationDataSource authData; + private ServerCnx serverCnx; public ConnectionBuilder protocolVersion(int protocolVersion) { this.protocolVersion = protocolVersion; @@ -370,6 +376,11 @@ public ConnectionBuilder authData(AuthenticationDataSource authData) { return this; } + public ConnectionBuilder serverCnx(ServerCnx serverCnx) { + this.serverCnx = serverCnx; + return this; + } + public Connection build() { return new Connection(this); } diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java index 54901fb3..fdbe75da 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/channel/AdapterChannel.java @@ -15,6 +15,7 @@ import static com.google.common.base.Preconditions.checkArgument; import io.netty.channel.Channel; +import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.streamnative.pulsar.handlers.mqtt.common.Connection; import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage; import io.streamnative.pulsar.handlers.mqtt.common.utils.FutureUtils; @@ -39,14 +40,18 @@ public AdapterChannel(MQTTProxyAdapter adapter, this.channelFuture = channelFuture; } - public CompletableFuture writeAndFlush(final MqttAdapterMessage adapterMsg) { + public CompletableFuture writeAndFlush(final Connection connection, final MqttAdapterMessage adapterMsg) { checkArgument(StringUtils.isNotBlank(adapterMsg.getClientId()), "clientId is blank"); final String clientId = adapterMsg.getClientId(); adapterMsg.setEncodeType(MqttAdapterMessage.EncodeType.ADAPTER_MESSAGE); CompletableFuture future = channelFuture.thenCompose(channel -> { if (!channel.isActive()) { channelFuture = adapter.getChannel(broker); - return writeAndFlush(adapterMsg); + if (log.isDebugEnabled()) { + log.debug("channel is inactive, re-create channel to broker : {}", broker); + } + return writeConnectMessage(connection) + .thenCompose(__ -> writeAndFlush(connection, adapterMsg)); } return FutureUtils.completableFuture(channel.writeAndFlush(adapterMsg)); }); @@ -58,6 +63,11 @@ public CompletableFuture writeAndFlush(final MqttAdapterMessage adapterMsg return future; } + private CompletableFuture writeConnectMessage(final Connection connection) { + final MqttConnectMessage connectMessage = connection.getConnectMessage(); + return writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(), connectMessage)); + } + /** * When client subscribes, the adapter channel maybe close in exception, so register listener to close the * related client channel and trigger reconnection. diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java index 9d495418..d7f138d3 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/impl/MQTTProxyProtocolMethodProcessor.java @@ -245,7 +245,7 @@ public void processPingReq(final MqttAdapterMessage msg) { topicBrokers.values().forEach(adapterChannel -> { adapterChannel.thenAccept(channel -> { msg.setClientId(clientId); - channel.writeAndFlush(msg); + channel.writeAndFlush(connection, msg); }); }); } @@ -269,7 +269,7 @@ public void processDisconnect(final MqttAdapterMessage msg) { .filter(future -> !future.isCompletedExceptionally()) .map(CompletableFuture::join) .collect(Collectors.toSet()) - .forEach(channel -> channel.writeAndFlush(msg))); + .forEach(channel -> channel.writeAndFlush(connection, msg))); } else { if (log.isDebugEnabled()) { log.debug("Disconnect is already triggered, ignore"); @@ -464,7 +464,7 @@ public void processUnSubscribe(final MqttAdapterMessage adapter) { private CompletableFuture writeToBroker(final String topic, final MqttAdapterMessage msg) { CompletableFuture proxyExchanger = connectToBroker(topic); - return proxyExchanger.thenCompose(exchanger -> exchanger.writeAndFlush(msg)); + return proxyExchanger.thenCompose(exchanger -> exchanger.writeAndFlush(connection, msg)); } private CompletableFuture connectToBroker(final String topic) { @@ -473,8 +473,7 @@ private CompletableFuture connectToBroker(final String topic) { adapterChannels.computeIfAbsent(mqttBroker, key1 -> { AdapterChannel adapterChannel = proxyAdapter.getAdapterChannel(mqttBroker); final MqttConnectMessage connectMessage = connection.getConnectMessage(); - - adapterChannel.writeAndFlush(new MqttAdapterMessage(connection.getClientId(), + adapterChannel.writeAndFlush(connection, new MqttAdapterMessage(connection.getClientId(), connectMessage)); return adapterChannel; }) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/broker/AdapterChannelTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/broker/AdapterChannelTest.java index f8c4ba20..48954a74 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/broker/AdapterChannelTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/broker/AdapterChannelTest.java @@ -21,8 +21,11 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage; import io.netty.handler.codec.mqtt.MqttMessageBuilders; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import io.streamnative.pulsar.handlers.mqtt.common.Connection; import io.streamnative.pulsar.handlers.mqtt.common.MQTTCommonConfiguration; +import io.streamnative.pulsar.handlers.mqtt.common.MQTTConnectionManager; import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterMessage; +import io.streamnative.pulsar.handlers.mqtt.common.mqtt5.restrictions.ClientRestrictions; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; import io.streamnative.pulsar.handlers.mqtt.proxy.channel.AdapterChannel; @@ -38,7 +41,6 @@ import org.testng.annotations.Test; - public class AdapterChannelTest extends MQTTTestBase { @Override @@ -74,7 +76,13 @@ public void testAdapterChannelAutoGetConnection() throws InterruptedException { MqttConnectMessage fakeConnectMessage = MqttMessageBuilders.connect() .clientId(clientId).build(); MqttAdapterMessage mqttAdapterMessage = new MqttAdapterMessage(clientId, fakeConnectMessage); - adapterChannel.writeAndFlush(mqttAdapterMessage).join(); + Connection connection = Connection.builder() + .channel(channel) + .clientRestrictions(ClientRestrictions.builder().keepAliveTime(10).build()) + .connectionManager(Mockito.mock(MQTTConnectionManager.class)) + .clientId(clientId) + .connectMessage(fakeConnectMessage).build(); + adapterChannel.writeAndFlush(connection, mqttAdapterMessage).join(); CompletableFuture channelFutureAfterSend = brokerChannels.get(key); Channel channelAfterSend = channelFutureAfterSend.join(); assertNotEquals(channelAfterSend.id(), previousChannelId); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java index cf99213d..454d64f5 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt3/fusesource/proxy/ProxyTest.java @@ -346,24 +346,27 @@ public void testPubAndSubWithDifferentTopics() { @Test public void testTopicUnload() throws Exception { MQTT mqttConsumer = createMQTTProxyClient(); + mqttConsumer.setClientId("client-consumer"); BlockingConnection consumer = mqttConsumer.blockingConnection(); consumer.connect(); String topicName1 = "topic-unload-1"; Topic[] topic1 = { new Topic(topicName1, QoS.AT_MOST_ONCE)}; consumer.subscribe(topic1); MQTT mqttProducer = createMQTTProxyClient(); + mqttProducer.setClientId("client-producer"); BlockingConnection producer = mqttProducer.blockingConnection(); producer.connect(); String msg1 = "hello topic1"; producer.publish(topicName1, msg1.getBytes(StandardCharsets.UTF_8), QoS.AT_MOST_ONCE, false); - Message receive1 = consumer.receive(); + Message receive1 = consumer.receive(10, TimeUnit.SECONDS); Assert.assertEquals(new String(receive1.getPayload()), msg1); Assert.assertEquals(receive1.getTopic(), topicName1); admin.topics().unload(topicName1); Thread.sleep(5000); + log.info("unloaded topic : {}", topicName1); producer.publish(topicName1, msg1.getBytes(StandardCharsets.UTF_8), QoS.AT_MOST_ONCE, false); - producer.disconnect(); - Message receive2 = consumer.receive(); +// producer.disconnect(); + Message receive2 = consumer.receive(10, TimeUnit.SECONDS); Assert.assertEquals(new String(receive2.getPayload()), msg1); Assert.assertEquals(receive2.getTopic(), topicName1); consumer.disconnect(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java index 36b39472..ab5b0542 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java @@ -85,7 +85,6 @@ public void testBrokerThrowServiceNotReadyException() throws Exception { .collect(Collectors.toList())) .build(); - admin.clusters().createNamespaceIsolationPolicy("test", "policy-1", isolationData); try { final Mqtt5PublishResult r2 = client.publishWith() .topic(topic1) @@ -104,7 +103,6 @@ public void testBrokerThrowServiceNotReadyException() throws Exception { .send(); Assert.assertFalse(r3.getError().isPresent()); client.disconnect(); - admin.clusters().deleteNamespaceIsolationPolicy("test", "policy-1"); } @Test(invocationCount = 2)