Skip to content

Commit

Permalink
update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Tenischev committed Dec 12, 2023
1 parent 5f751b6 commit ed3121a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 5 deletions.
52 changes: 47 additions & 5 deletions tests/__snapshots__/kafka.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.asyncapi.model.LightMeasuredPayload;
Expand All @@ -145,6 +147,7 @@ public class MessageHandlerService {
@KafkaListener(topics = "event.lighting.measured", groupId = "my-group")
public void readLightMeasurement(@Payload LightMeasuredPayload payload,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
Expand Down Expand Up @@ -519,6 +522,8 @@ import org.springframework.stereotype.Service;
import com.asyncapi.model.LightMeasuredPayload;
import javax.annotation.processing.Generated;
import java.util.HashMap;
import java.util.Map;
@Generated(value="com.asyncapi.generator.template.spring", date="AnyDate")
@Service
Expand All @@ -530,11 +535,26 @@ public class PublisherServiceImpl implements PublisherService {
public void updateLightMeasurement(Integer key, LightMeasuredPayload lightMeasuredPayload) {
Message<LightMeasuredPayload> message = MessageBuilder.withPayload(lightMeasuredPayload)
.setHeader(KafkaHeaders.TOPIC, "event.lighting.measured")
.setHeader(KafkaHeaders.TOPIC, getUpdateLightMeasurementTopic())
.setHeader(KafkaHeaders.KEY, key)
.build();
kafkaTemplate.send(message);
}
private String getUpdateLightMeasurementTopic() {
Map<String, String> parameters = null;
return replaceParameters("event.lighting.measured", parameters);
}
private String replaceParameters(String topic, Map<String, String> parameters) {
if (parameters != null) {
String compiledTopic = topic;
for (String key : parameters.keySet()) {
compiledTopic = compiledTopic.replace("{" + key + "}", parameters.get(key));
}
return compiledTopic;
}
return topic;
}
}
"
`;
Expand Down Expand Up @@ -580,12 +600,12 @@ public class Config {
private int concurrency;
@Bean
public RoutingKafkaTemplate<Integer, Object> kafkaTemplate() {
public RoutingKafkaTemplate kafkaTemplate() {
ProducerFactory<Object, Object> producerFactory = producerFactory();
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile(event&#92;.lighting&#92;..*&#92;.measured&#92;..*), producerFactory);
return new RoutingKafkaTemplate<>(map);
map.put(Pattern.compile("event\\\\.lighting\\\\..*\\\\.measured\\\\..*"), producerFactory);
return new RoutingKafkaTemplate(map);
}
@Bean
Expand Down Expand Up @@ -653,6 +673,8 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.asyncapi.model.LightMeasuredPayload;
Expand All @@ -669,16 +691,36 @@ public class MessageHandlerService {
@KafkaListener(topicPattern = "event&#92;.lighting&#92;..*&#92;.measured&#92;..*", groupId = "my-group")
@KafkaListener(topicPattern = "event\\\\.lighting\\\\..*\\\\.measured\\\\..*", groupId = "my-group")
public void readLightMeasurement(@Payload LightMeasuredPayload payload,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
LOGGER.info("Key: " + key + ", Payload: " + payload.toString() + ", Timestamp: " + timestamp + ", Partition: " + partition);
List<String> parameters = decompileTopic("event\\\\.lighting\\\\..*\\\\.measured\\\\..*", topic);
readLightMeasurement(parameters.get(0), parameters.get(1), payload, topic, key, partition, timestamp);
}
public void readLightMeasurement(String streetlightId, String zoneId, LightMeasuredPayload payload,
String topic, Integer key, int partition, long timestamp) {
// parametrized listener
}
private List<String> decompileTopic(String topicPattern, String topic) {
topicPattern = topicPattern.replaceAll("\\\\.\\\\*", "(.*)");
List<String> parameters = new ArrayList<>();
Pattern pattern = Pattern.compile(topicPattern);
Matcher matcher = pattern.matcher(topic);
if (matcher.find()) {
for (int i = 0; i < matcher.groupCount(); i++) {
parameters.add(matcher.group(i + 1));
}
}
return parameters;
}
}
"
Expand Down
3 changes: 3 additions & 0 deletions tests/__snapshots__/oneOf.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.asyncapi.model.AnonymousSchema1;
import com.asyncapi.model.AnonymousSchema7;
Expand All @@ -31,6 +33,7 @@ public class MessageHandlerService {
@KafkaListener(topics = "song.released")
public void release(@Payload Object payload,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
Expand Down
3 changes: 3 additions & 0 deletions tests/__snapshots__/parameters.test.js.snap
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.asyncapi.model.LightMeasuredPayload;
Expand All @@ -164,6 +166,7 @@ public class MessageHandlerService {
@KafkaListener(topics = "event.lighting.measured", groupId = "my-group")
public void readLightMeasurement(@Payload LightMeasuredPayload payload,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long timestamp) {
Expand Down

0 comments on commit ed3121a

Please sign in to comment.