diff --git a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIElasticsearchSink.java b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIElasticsearchSink.java index 297684c46..74a9da847 100644 --- a/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIElasticsearchSink.java +++ b/cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSIElasticsearchSink.java @@ -760,7 +760,7 @@ private TimeRelatedValues getTimeRelatedValues(long notifiedRecvTimeTs, String a } // if else v.recvTimeDt = new Date(v.recvTimeTs); - v.idx = this.index + "-" + this.indexDateFormatter.format(v.recvTimeDt); + v.idx = this.index; return v; } // getTimeRelatedValues diff --git a/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIElasticsearchSinkTest.java b/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIElasticsearchSinkTest.java index 5fb067aeb..71e3f9ee3 100644 --- a/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIElasticsearchSinkTest.java +++ b/cygnus-ngsi/src/test/java/com/telefonica/iot/cygnus/sinks/NGSIElasticsearchSinkTest.java @@ -781,7 +781,7 @@ public void testPersistBatchWithOneAttributeRow() throws Exception { sink.persistBatch(batch); verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture()); - assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -837,7 +837,7 @@ public void testPersistBatchWithOneAttributeColumn() throws Exception { byte[] bytes = MessageDigest.getInstance("md5").digest("temperature:".getBytes(StandardCharsets.UTF_8)); String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase(); - assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -890,7 +890,7 @@ public void testPersistBatchWithTwoAttributesIgnoreEmptyRow() throws Exception { sink.persistBatch(batch); verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture()); - assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -944,7 +944,7 @@ public void testPersistBatchWithTwoAttributesWithEmptyRow() throws Exception { sink.persistBatch(batch); verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture()); - assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1019,7 +1019,7 @@ public void testPersistBatchWithTwoAttributesIgnoreEmptyColumn() throws Exceptio byte[] bytes = MessageDigest.getInstance("md5").digest("temperature:".getBytes(StandardCharsets.UTF_8)); String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase(); - assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1075,7 +1075,7 @@ public void testPersistBatchWithTwoAttributesWithEmptyColumn() throws Exception byte[] bytes = MessageDigest.getInstance("md5").digest("roomtype:temperature:".getBytes(StandardCharsets.UTF_8)); String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase(); - assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1130,9 +1130,9 @@ public void testPersistBatchWithThreeAttributesWithMetadataRow() throws Exceptio verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture()); if (timezone == "Asia/Tokyo") { - assertEquals("cygnus-room_service-room_service_path-room1-room-2018.01.02", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue()); } else { - assertEquals("cygnus-room_service-room_service_path-room1-room-2018.01.01", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue()); } assertEquals("cygnus_type", mappingTypeCaptor.getValue()); @@ -1243,9 +1243,9 @@ public void testPersistBatchWithThreeAttributesWithMetadataColumn() throws Excep String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase(); if (timezone == "Asia/Tokyo") { - assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-2018.01.02", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue()); } else { - assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-2018.01.01", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue()); } assertEquals("cygnus_type", mappingTypeCaptor.getValue()); @@ -1308,7 +1308,7 @@ public void testPersistBatchWithOneNullAttributeRow() throws Exception { sink.persistBatch(batch); verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture()); - assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1356,7 +1356,7 @@ public void testPersistBatchWithOneNullAttributeIgnoreWhiteSpacesRow() throws Ex sink.persistBatch(batch); verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture()); - assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1407,7 +1407,7 @@ public void testPersistBatchWithOneNullAttributeColumn() throws Exception { byte[] bytes = MessageDigest.getInstance("md5").digest("temperature:".getBytes(StandardCharsets.UTF_8)); String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase(); - assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1457,7 +1457,7 @@ public void testPersistBatchWithOneNullAttributeIgnoreWhiteSpacesColumn() throws byte[] bytes = MessageDigest.getInstance("md5").digest("temperature:".getBytes(StandardCharsets.UTF_8)); String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase(); - assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1504,7 +1504,7 @@ public void testPersistBatchWithTwoNullAttributesRow() throws Exception { sink.persistBatch(batch); verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture()); - assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1569,7 +1569,7 @@ public void testPersistBatchWithTwoNullAttributesIgnoreWhiteSpacesRow() throws E sink.persistBatch(batch); verify(mockBackend, times(1)).bulkInsert(idxCaptor.capture(), mappingTypeCaptor.capture(), dataCaptor.capture()); - assertEquals("cygnus-room_service-room_service_path-room1-room-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room", idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1637,7 +1637,7 @@ public void testPersistBatchWithTwoNullAttributesColumn() throws Exception { byte[] bytes = MessageDigest.getInstance("md5").digest("roomtype:temperature:".getBytes(StandardCharsets.UTF_8)); String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase(); - assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); @@ -1686,7 +1686,7 @@ public void testPersistBatchWithTwoNullAttributesIgnoreWhiteSpacesColumn() throw byte[] bytes = MessageDigest.getInstance("md5").digest("roomtype:temperature:".getBytes(StandardCharsets.UTF_8)); String hash = DatatypeConverter.printHexBinary(bytes).toLowerCase(); - assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash + "-1970.01.15", idxCaptor.getValue()); + assertEquals("cygnus-room_service-room_service_path-room1-room-" + hash, idxCaptor.getValue()); assertEquals("cygnus_type", mappingTypeCaptor.getValue()); List> requestedData = dataCaptor.getValue(); diff --git a/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_elasticsearch_sink.md b/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_elasticsearch_sink.md index 6e490e9b8..33581221b 100644 --- a/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_elasticsearch_sink.md +++ b/doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_elasticsearch_sink.md @@ -61,7 +61,6 @@ So `NGSIElasticsearchSink` constructs the index name according to the following 4. when you use `Column-like storing`, append a hash string calculated by the attribute names to be stored * MD5 hash is calculated by concatinated attribute names such as `attrName1:attrName2:...`. 5. when it starts with `-, _, +`, append 'idx' at the beggning of the base string. -6. append the created <date> such as `yyyy.mm.dd`. According to the above rules, `NGSIElasticsearchSink` can handle the multiple subscriptions with different attributes of the same entity.