Skip to content

Commit

Permalink
provide arbitrary methods for mqtt
Browse files Browse the repository at this point in the history
  • Loading branch information
Tenischev committed Dec 11, 2023
1 parent fbcff6a commit 554dcee
Showing 1 changed file with 111 additions and 76 deletions.
187 changes: 111 additions & 76 deletions template/src/main/java/com/asyncapi/service/MessageHandlerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,27 @@
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
{% if asyncapi | isProtocol('kafka') and hasPublish %}
{%- if asyncapi | isProtocol('kafka') and hasPublish %}
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
{% if asyncapi | isProtocol('amqp') and hasPublish %}
{%- endif %}
{%- if asyncapi | isProtocol('mqtt') and hasPublish %}
import org.springframework.integration.mqtt.support.MqttHeaders;
{%- endif %}
{%- if asyncapi | isProtocol('amqp') and hasPublish %}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
{%- endif %}
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- for message in channel.publish().messages() %}
import {{ params['userJavaPackage'] }}.model.{{message.payload().uid() | camelCase | upperFirst}};
{%- endfor %}
{%- endif %}
{%- endfor %}
{% endif %}
{%- endif %}
{% endfor %}
import javax.annotation.processing.Generated;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -48,64 +44,95 @@
@Service
public class MessageHandlerService {

{%- set anyChannelHasParameter = false %}
private static final Logger LOGGER = LoggerFactory.getLogger(MessageHandlerService.class);
{% if asyncapi | isProtocol('kafka') %}
{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- if channel.publish().hasMultipleMessages() %}
{%- set typeName = "Object" %}
{%- else %}
{%- set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
{%- endif %}
{% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
{%- set route = channelName %}
{%- if channel.hasParameters() %}
{%- set route = route | replaceAll(".", "\\.") %}
{%- for parameterName, parameter in channel.parameters() %}
{%- set route = route | replace("{" + parameterName + "}", ".*") %}
{%- endfor %}
{%- endif %}
@KafkaListener({% if channel.hasParameters() %}topicPattern{% else %}topics{% endif %} = "{{route}}"{% if channel.publish().binding('kafka') %}, groupId = "{{channel.publish().binding('kafka').groupId}}"{% endif %})
public void {{channel.publish().id() | camelCase}}(@Payload {{typeName}} payload,

{% for channelName, channel in asyncapi.channels() %}
{%- if channel.hasPublish() %}
{%- set hasParameters = channel.hasParameters() %}
{%- set anyChannelHasParameter = anyChannelHasParameter or hasParameters %}
{%- set methodName = channel.publish().id() | camelCase%}
{%- if channel.publish().hasMultipleMessages() %}
{%- set typeName = "Object" %}
{%- else %}
{%- set typeName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
{%- endif %}
{% set javaDoc = '' %}
{% if channel.description() or channel.publish().description() %}
{%- set javaDoc = javaDoc + '/**\n' %}
{%- for line in channel.description() | splitByLines %}
{%- set javaDoc = javaDoc + ' * ' + (line | safe) %}
{%- set javaDoc = javaDoc + '\n' %}
{%- endfor %}
{%- for line in channel.publish().description() | splitByLines %}
{%- set javaDoc = javaDoc + ' * ' + (line | safe) %}
{%- set javaDoc = javaDoc + '\n' %}
{%- endfor %}
{%- set javaDoc = javaDoc + ' */' %}
{% endif %}
{%- if asyncapi | isProtocol('kafka') %}
{%- set route = channelName %}
{%- if hasParameters %}
{%- set route = route | replaceAll(".", "\\.") %}
{%- for parameterName, parameter in channel.parameters() %}
{%- set route = route | replace("{" + parameterName + "}", ".*") %}
{%- endfor %}
{%- endif %}
{{javaDoc}}
@KafkaListener({% if hasParameters %}topicPattern{% else %}topics{% endif %} = "{{route}}"{% if channel.publish().binding('kafka') %}, groupId = "{{channel.publish().binding('kafka').groupId}}"{% endif %})
public void {{methodName}}(@Payload {{typeName}} payload,
@Header(KafkaHeaders.{%- if params.springBoot2 %}RECEIVED_MESSAGE_KEY{% else %}RECEIVED_KEY{% endif -%}) Integer key,
@Header(KafkaHeaders.{%- if params.springBoot2 %}RECEIVED_PARTITION_ID{% else %}RECEIVED_PARTITION{% endif -%}) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
LOGGER.info("Key: " + key + ", Payload: " + payload.toString() + ", Timestamp: " + timestamp + ", Partition: " + partition);
}
{%- endif %}
{% endfor %}

{% elif asyncapi | isProtocol('amqp') %}
{%- set anyChannelHasParameter = false %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
{%- set anyChannelHasParameter = anyChannelHasParameter or channel.hasParameters() %}
{%- set schemaName = channel.publish().message().payload().uid() | camelCase | upperFirst %}
{%- set varName = channelName | toAmqpNeutral(channel.hasParameters(), channel.parameters()) %}
{%- if channel.hasParameters() %}
@Value("${amqp.{{- varName -}}.routingKey}")
private String {{varName}}RoutingKey;

{%- endif %}
@RabbitListener(queues = "${amqp.{{- varName -}}.queue}")
public void {{channel.publish().id() | camelCase}}({{schemaName}} payload, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routKey) {
{%- if channel.hasParameters() %}
List<String> parameters = decompileRoutingKey({{varName}}RoutingKey, routKey);
{{channel.publish().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {%- endfor %}payload);
{% elif asyncapi | isProtocol('amqp') %}
{%- set propertyValueName = channelName | toAmqpNeutral(hasParameters, channel.parameters()) %}
{%- if hasParameters %}
@Value("${amqp.{{- propertyValueName -}}.routingKey}")
private String {{propertyValueName}}RoutingKey;
{% endif %}
LOGGER.info("Message received from {{- varName -}} : " + payload);
{{javaDoc}}
@RabbitListener(queues = "${amqp.{{- propertyValueName -}}.queue}")
public void {{methodName}}({{typeName}} payload, @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String routKey) {
{%- if hasParameters %}
List<String> parameters = decompileRoutingKey({{propertyValueName}}RoutingKey, routKey);
{{methodName}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {% endfor %}payload);
{% endif %}
LOGGER.info("Message received from {{- propertyValueName -}} : " + payload);
}
{%- if channel.hasParameters() %}
public void {{channel.publish().id() | camelCase}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {%- endfor %}{{schemaName}} payload) {
{% if hasParameters %}
{{javaDoc}}
public void {{methodName}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{typeName}} payload) {
// parametrized listener
}
{%- endif %}
{%- else %}
{%- if hasParameters %}
@Value("${mqtt.topic.{{-methodName-}}}")
private String {{methodName}}Topic;

{%- endif %}
{{javaDoc}}
public void handle{{methodName | upperFirst}}(Message<?> message) {
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
{%- if hasParameters %}
List<String> parameters = decodeTopic({{methodName}}Topic, topic);
{%- endif %}
{{methodName}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {% endfor %}({{typeName}}) message.getPayload());
}
{{javaDoc}}
public void {{methodName}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{typeName}} payload) {
LOGGER.info("handler {{channelName}}");
LOGGER.info(String.valueOf(payload.toString()));
}
{% endif %}
{% endif %}
{% endfor %}
{%- if anyChannelHasParameter %}
{% endif %}
{% endfor %}

{%- if anyChannelHasParameter %}
{%- if asyncapi | isProtocol('kafka') %}
{%- elif asyncapi | isProtocol('amqp') %}
private List<String> decompileRoutingKey(String pattern, String routKey) {
List<String> parameters = new ArrayList<>();
int routeKeyPossition = 0;
Expand All @@ -126,19 +153,27 @@ private List<String> decompileRoutingKey(String pattern, String routKey) {
}
return parameters;
}
{%- endif %}
{% else %}
{% for channelName, channel in asyncapi.channels() %}
{% if channel.hasPublish() %}
{% if channel.description() or channel.publish().description() %}/**{% for line in channel.description() | splitByLines %}
* {{line | safe}}{% endfor %}{% for line in channel.publish().description() | splitByLines %}
* {{line | safe}}{% endfor %}
*/{% endif %}
public void handle{{channel.publish().id() | camelCase | upperFirst}}(Message<?> message) {
LOGGER.info("handler {{channelName}}");
LOGGER.info(String.valueOf(message.getPayload().toString()));
{%- else %}
private List<String> decodeTopic(String topicPattern, String topic) {
List<String> parameters = new ArrayList<>();
int topicPossition = 0;
int patternPosition = 0;
while (topicPossition < topic.length()) {
while (topicPattern.charAt(patternPosition) == topic.charAt(topicPossition)) {
topicPossition++;
patternPosition++;
}
topicPossition++;
patternPosition += 2; // skip +
StringBuilder parameter = new StringBuilder();
while (topicPattern.charAt(patternPosition) != topic.charAt(topicPossition)) {
parameter.append(topic.charAt(topicPossition));
topicPossition++;
}
parameters.add(parameter.toString());
}
return parameters;
}
{% endif %}
{% endfor %}
{%- endif %}
{% endif %}
}

0 comments on commit 554dcee

Please sign in to comment.