diff --git a/core/src/main/java/org/jsmart/zerocode/core/kafka/receive/message/ConsumerJsonRecord.java b/core/src/main/java/org/jsmart/zerocode/core/kafka/receive/message/ConsumerJsonRecord.java index 99a7fbdb4..9b34b35ee 100644 --- a/core/src/main/java/org/jsmart/zerocode/core/kafka/receive/message/ConsumerJsonRecord.java +++ b/core/src/main/java/org/jsmart/zerocode/core/kafka/receive/message/ConsumerJsonRecord.java @@ -3,6 +3,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.util.Map; @@ -11,16 +12,24 @@ public class ConsumerJsonRecord { private final JsonNode value; private final Map headers; + private static final ObjectMapper objectMapper = new ObjectMapper(); + @JsonCreator public ConsumerJsonRecord( - @JsonProperty("key") JsonNode key, + @JsonProperty("key") Object key, @JsonProperty("value") JsonNode value, @JsonProperty("headers") Map headers) { - this.key = key; + this.key = convertToJsonNode(key); this.value = value; this.headers = headers; } + private static JsonNode convertToJsonNode(Object key) { + if (key instanceof JsonNode) { + return (JsonNode) key; + } + return objectMapper.convertValue(key, JsonNode.class); + } public JsonNode getKey() { return key; } @@ -36,7 +45,7 @@ public Map getHeaders() { @Override public String toString() { return "Record{" + - "key='" + key + '\'' + + "key='" + key+ '\'' + ", value=" + value + '}'; } diff --git a/core/src/test/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelperTest.java b/core/src/test/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelperTest.java index cf243282a..f7ec3fa06 100644 --- a/core/src/test/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelperTest.java +++ b/core/src/test/java/org/jsmart/zerocode/core/kafka/helper/KafkaConsumerHelperTest.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -146,7 +147,7 @@ public void should_read_json_with_headers_in_record() throws IOException { Mockito.when(consumerRecord.key()).thenReturn("\"key\""); Mockito.when(consumerRecord.value()).thenReturn("\"value\""); Mockito.when(consumerRecord.headers()) - .thenReturn(new RecordHeaders().add("headerKey", "headerValue".getBytes())); + .thenReturn(new RecordHeaders().add("headerKey", "headerValue".getBytes())); // when List consumerJsonRecords = new ArrayList<>(); @@ -162,6 +163,30 @@ public void should_read_json_with_headers_in_record() throws IOException { Assert.assertEquals(Collections.singletonMap("headerKey", "headerValue"), consumerJsonRecord.getHeaders()); } + @Test + public void should_read_json_with_headers_in_record_with_key_of_object_type() throws IOException { + // given + Object key = UUID.randomUUID(); + ConsumerRecord consumerRecord = Mockito.mock(ConsumerRecord.class); + Mockito.when(consumerRecord.key()).thenReturn(key); + Mockito.when(consumerRecord.value()).thenReturn("\"value\""); + Mockito.when(consumerRecord.headers()) + .thenReturn(new RecordHeaders().add("headerKey", "headerValue".getBytes())); + + // when + List consumerJsonRecords = new ArrayList<>(); + KafkaConsumerHelper.readJson(consumerJsonRecords, Iterators.forArray(consumerRecord),null); + + // then + Assert.assertEquals(1, consumerJsonRecords.size()); + ConsumerJsonRecord consumerJsonRecord = consumerJsonRecords.get(0); + Assert.assertTrue(consumerJsonRecord.getKey() instanceof JsonNode); + Assert.assertTrue(consumerJsonRecord.getValue() instanceof JsonNode); + Assert.assertEquals("\"" +key+"\"", consumerJsonRecord.getKey().toString()); + Assert.assertEquals("\"value\"", consumerJsonRecord.getValue().toString()); + Assert.assertEquals(Collections.singletonMap("headerKey", "headerValue"), consumerJsonRecord.getHeaders()); + } + @Test public void test_firstPoll_exits_early_on_assignment() { diff --git a/core/src/test/java/org/jsmart/zerocode/core/kafka/receive/message/ConsumerJsonRecordTest.java b/core/src/test/java/org/jsmart/zerocode/core/kafka/receive/message/ConsumerJsonRecordTest.java index aad106ca3..c90d1b33e 100644 --- a/core/src/test/java/org/jsmart/zerocode/core/kafka/receive/message/ConsumerJsonRecordTest.java +++ b/core/src/test/java/org/jsmart/zerocode/core/kafka/receive/message/ConsumerJsonRecordTest.java @@ -11,6 +11,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.UUID; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; @@ -34,15 +35,28 @@ public void testSer() throws IOException { record = new ConsumerJsonRecord(key1, value, null); json = objectMapper.writeValueAsString(record); - System.out.println("1 json >> " + json); + System.out.println("2 json >> " + json); JsonNode key2 = objectMapper.readTree("23.45"); record = new ConsumerJsonRecord(key2, value, null); json = objectMapper.writeValueAsString(record); - System.out.println("2 json >> " + json); - } + System.out.println("3 json >> " + json); + +// UUID as String type + JsonNode key3 = objectMapper.readTree(objectMapper.writeValueAsString(UUID.randomUUID().toString())); + + record = new ConsumerJsonRecord(key3, value, null); + json = objectMapper.writeValueAsString(record); + System.out.println("4 json >> " + json); + +// UUID as Object Type + Object key4 = UUID.randomUUID(); + record = new ConsumerJsonRecord(key4, value, null); + json = objectMapper.writeValueAsString(record); + System.out.println("5 json >> " + json); +} @Test public void should_serialize_a_record_with_headers() throws JsonProcessingException {