Skip to content

Commit

Permalink
provide arbitrary methods for kafka in message handler
Browse files Browse the repository at this point in the history
  • Loading branch information
Tenischev committed Dec 12, 2023
1 parent 5c0508b commit 1385c10
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 18 deletions.
7 changes: 6 additions & 1 deletion filters/all.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ function replaceAll(originalStr, replacePattern, replaceString) {
}
filter.replaceAll = replaceAll;

function toTopicString(channelName, hasParameters, parameters, convertDots, replaceParameterValue, replaceDots = "\.") {
function toTopicString(channelName, hasParameters, parameters, convertDots, replaceParameterValue, replaceDots = "\\\\.") {
if (hasParameters) {
let topicName = channelName
if (convertDots) {
Expand All @@ -210,6 +210,11 @@ function toTopicString(channelName, hasParameters, parameters, convertDots, repl
}
}

function toKafkaTopicString(channelName, hasParameters, parameters) {
return toTopicString(channelName, hasParameters, parameters, true, ".*")
}
filter.toKafkaTopicString = toKafkaTopicString

function toMqttTopicString(channelName, hasParameters, parameters) {
return toTopicString(channelName, hasParameters, parameters, false, "+")
}
Expand Down
14 changes: 4 additions & 10 deletions partials/KafkaConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,15 @@ public class Config {
{%- if hasSubscribe %}
{% if hasParameters %}
@Bean
public RoutingKafkaTemplate<Integer, Object> kafkaTemplate() {
public RoutingKafkaTemplate kafkaTemplate() {
ProducerFactory<Object, Object> producerFactory = producerFactory();

Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
{%- for channelName, channel in asyncapi.channels() %}
{%- set route = channelName %}
{%- if channel.hasParameters() %}
{%- set route = route | replaceAll(".", "\\.") %}
{%- for parameterName, parameter in channel.parameters() %}
{%- set route = route | replace("{" + parameterName + "}", ".*") %}
{%- endfor %}
{%- endif %}
map.put(Pattern.compile({{route}}), producerFactory);
{%- set route = channelName | toKafkaTopicString(channel.hasParameters(), channel.parameters()) | safe %}
map.put(Pattern.compile("{{route}}"), producerFactory);
{%- endfor %}
return new RoutingKafkaTemplate<>(map);
return new RoutingKafkaTemplate(map);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
{%- endif %}
{%- if asyncapi | isProtocol('mqtt') and hasPublish %}
import org.springframework.integration.mqtt.support.MqttHeaders;
Expand Down Expand Up @@ -71,21 +73,27 @@ public class MessageHandlerService {
{%- set javaDoc = javaDoc + ' */' %}
{% endif %}
{%- if asyncapi | isProtocol('kafka') %}
{%- set route = channelName %}
{%- if hasParameters %}
{%- set route = route | replaceAll(".", "\\.") %}
{%- for parameterName, parameter in channel.parameters() %}
{%- set route = route | replace("{" + parameterName + "}", ".*") %}
{%- endfor %}
{%- endif %}
{%- set route = channelName | toKafkaTopicString(channel.hasParameters(), channel.parameters()) | safe %}
{{javaDoc}}
@KafkaListener({% if hasParameters %}topicPattern{% else %}topics{% endif %} = "{{route}}"{% if channel.publish().binding('kafka') %}, groupId = "{{channel.publish().binding('kafka').groupId}}"{% endif %})
public void {{methodName}}(@Payload {{typeName}} payload,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.{%- if params.springBoot2 %}RECEIVED_MESSAGE_KEY{% else %}RECEIVED_KEY{% endif -%}) Integer key,
@Header(KafkaHeaders.{%- if params.springBoot2 %}RECEIVED_PARTITION_ID{% else %}RECEIVED_PARTITION{% endif -%}) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
LOGGER.info("Key: " + key + ", Payload: " + payload.toString() + ", Timestamp: " + timestamp + ", Partition: " + partition);
{%- if hasParameters %}
List<String> parameters = decompileTopic("{{route}}", topic);
{{methodName}}({%- for parameterName, parameter in channel.parameters() %}parameters.get({{loop.index0}}), {% endfor %}payload, topic, key, partition, timestamp);
{%- endif %}
}
{%- if hasParameters %}
{{javaDoc}}
public void {{methodName}}({%- for parameterName, parameter in channel.parameters() %}String {{parameterName}}, {% endfor %}{{typeName}} payload,
String topic, Integer key, int partition, long timestamp) {
// parametrized listener
}
{%- endif %}
{% elif asyncapi | isProtocol('amqp') %}
{%- set propertyValueName = channelName | toAmqpNeutral(hasParameters, channel.parameters()) %}
{%- if hasParameters %}
Expand Down Expand Up @@ -132,6 +140,18 @@ public class MessageHandlerService {

{%- if anyChannelHasParameter %}
{%- if asyncapi | isProtocol('kafka') %}
private List<String> decompileTopic(String topicPattern, String topic) {
topicPattern = topicPattern.replaceAll("\\.\\*", "(.*)");
List<String> parameters = new ArrayList<>();
Pattern pattern = Pattern.compile(topicPattern);
Matcher matcher = pattern.matcher(topic);
if (matcher.find()) {
for (int i = 0; i < matcher.groupCount(); i++) {
parameters.add(matcher.group(i + 1));
}
}
return parameters;
}
{%- elif asyncapi | isProtocol('amqp') %}
private List<String> decompileRoutingKey(String pattern, String routKey) {
List<String> parameters = new ArrayList<>();
Expand Down

0 comments on commit 1385c10

Please sign in to comment.