From 4350570e95a78fcfd0ddfc18b27505d1af30cd9d Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 10 Sep 2024 08:32:24 -0700 Subject: [PATCH 01/18] test with opensearch server 2.16.0 --- .../pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java index f74771c7f855d..5b8deabb114f6 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java @@ -35,7 +35,7 @@ public class OpenSearchSinkTester extends ElasticSearchSinkTester { public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE")) - .orElse("opensearchproject/opensearch:1.2.4"); + .orElse("opensearchproject/opensearch:2.16.0"); private RestHighLevelClient elasticClient; From 68f7102267bab76d6d4446b865aa97eb351a6f74 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 10 Sep 2024 10:27:40 -0700 Subject: [PATCH 02/18] temporary, disable tests to run opensearch test faster / not skip on other failures --- build/run_integration_group.sh | 4 +- .../integration/io/sinks/PulsarSinksTest.java | 90 +++++++++---------- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index b44f62d12f122..3c62880bbfef2 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -118,8 +118,8 @@ test_group_sql() { } test_group_pulsar_io() { - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink + #mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source + mvn_run_integration_test "$@" -fail-at-end -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink } test_group_pulsar_io_ora() { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java index ceaeda137be92..bc6dd559c085c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java @@ -39,56 +39,56 @@ public Object[][] withSchema() { return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}; } - @Test(groups = "sink") - public void testKafkaSink() throws Exception { - final String kafkaContainerName = "kafka-" + randomName(8); - testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName)); - } - - @Test(enabled = false, groups = "sink") - public void testCassandraSink() throws Exception { - testSink(CassandraSinkTester.createTester(true), true); - } - - @Test(enabled = false, groups = "sink") - public void testCassandraArchiveSink() throws Exception { - testSink(CassandraSinkTester.createTester(false), false); - } - - @Test(enabled = false, groups = "sink") - public void testHdfsSink() throws Exception { - testSink(new HdfsSinkTester(), false); - } - - @Test(groups = "sink", dataProvider = "withSchema") - public void testJdbcSink(boolean kvSchema) throws Exception { - testSink(new JdbcPostgresSinkTester(kvSchema), true); - } - - @Test(groups = "sink", dataProvider = "withSchema") - public void testElasticSearch7Sink(boolean withSchema) throws Exception { - testSink(new ElasticSearch7SinkTester(withSchema), true); - } - - @Test(groups = "sink", dataProvider = "withSchema") - public void testElasticSearch8Sink(boolean withSchema) throws Exception { - testSink(new ElasticSearch8SinkTester(withSchema), true); - } +// @Test(groups = "sink") +// public void testKafkaSink() throws Exception { +// final String kafkaContainerName = "kafka-" + randomName(8); +// testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName)); +// } +// +// @Test(enabled = false, groups = "sink") +// public void testCassandraSink() throws Exception { +// testSink(CassandraSinkTester.createTester(true), true); +// } +// +// @Test(enabled = false, groups = "sink") +// public void testCassandraArchiveSink() throws Exception { +// testSink(CassandraSinkTester.createTester(false), false); +// } +// +// @Test(enabled = false, groups = "sink") +// public void testHdfsSink() throws Exception { +// testSink(new HdfsSinkTester(), false); +// } +// +// @Test(groups = "sink", dataProvider = "withSchema") +// public void testJdbcSink(boolean kvSchema) throws Exception { +// testSink(new JdbcPostgresSinkTester(kvSchema), true); +// } + +// @Test(groups = "sink", dataProvider = "withSchema") +// public void testElasticSearch7Sink(boolean withSchema) throws Exception { +// testSink(new ElasticSearch7SinkTester(withSchema), true); +// } +// +// @Test(groups = "sink", dataProvider = "withSchema") +// public void testElasticSearch8Sink(boolean withSchema) throws Exception { +// testSink(new ElasticSearch8SinkTester(withSchema), true); +// } @Test(groups = "sink", dataProvider = "withSchema") public void testOpenSearchSinkRawData(boolean withSchema) throws Exception { testSink(new OpenSearchSinkTester(withSchema), true); } - @Test(enabled = false, groups = "sink") - public void testRabbitMQSink() throws Exception { - final String containerName = "rabbitmq-" + randomName(8); - testSink(new RabbitMQSinkTester(containerName), true, new RabbitMQSourceTester(containerName)); - } - - @Test(groups = "sink", dataProvider = "withSchema") - public void testKinesis(boolean withSchema) throws Exception { - testSink(new KinesisSinkTester(withSchema), true); - } +// @Test(enabled = false, groups = "sink") +// public void testRabbitMQSink() throws Exception { +// final String containerName = "rabbitmq-" + randomName(8); +// testSink(new RabbitMQSinkTester(containerName), true, new RabbitMQSourceTester(containerName)); +// } +// +// @Test(groups = "sink", dataProvider = "withSchema") +// public void testKinesis(boolean withSchema) throws Exception { +// testSink(new KinesisSinkTester(withSchema), true); +// } } From fc1f1794e681d7c8e15642a500eb2e5b90734cb7 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 10 Sep 2024 12:03:27 -0700 Subject: [PATCH 03/18] checkstyle --- .../pulsar/tests/integration/io/sinks/PulsarSinksTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java index bc6dd559c085c..d8e4512423256 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java @@ -20,9 +20,9 @@ import org.apache.pulsar.tests.integration.io.PulsarIOTestBase; -import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester; -import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester; -import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester; +//import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester; +//import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester; +//import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; From d1c9eaad9b8b86bf74382cc826ce5dfbb36229b0 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Tue, 10 Sep 2024 15:22:49 -0700 Subject: [PATCH 04/18] trying to upgrade the client --- pom.xml | 2 +- .../OpenSearchHighLevelRestClient.java | 18 +++++++++++------- tests/integration/pom.xml | 5 +++++ .../io/sinks/OpenSearchSinkTester.java | 1 + 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index d129fdfc48920..d06b80570f5cd 100644 --- a/pom.xml +++ b/pom.xml @@ -159,7 +159,7 @@ flexible messaging model and an intuitive client API. 2.7.5 3.3.5 2.4.10 - 1.2.4 + 2.16.0 8.5.2 334 2.13 diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java index 3b7059cafd1d5..7b7393db31f46 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java @@ -50,12 +50,12 @@ import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.client.indices.GetIndexRequest; -import org.opensearch.common.Strings; import org.opensearch.common.settings.Settings; -import org.opensearch.common.unit.ByteSizeUnit; -import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.common.Strings; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.builder.SearchSourceBuilder; @@ -229,7 +229,8 @@ public boolean indexDocument(String index, String documentId, String documentSou if (!Strings.isNullOrEmpty(documentId)) { indexRequest.id(documentId); } - indexRequest.type(config.getTypeName()); + // no longer needed? + //indexRequest.type(config.getTypeName()); indexRequest.source(documentSource, XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); @@ -245,7 +246,8 @@ public boolean indexDocument(String index, String documentId, String documentSou public boolean deleteDocument(String index, String documentId) throws IOException { DeleteRequest deleteRequest = Requests.deleteRequest(index); deleteRequest.id(documentId); - deleteRequest.type(config.getTypeName()); + // no longer needed? + //deleteRequest.type(config.getTypeName()); DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT); if (log.isDebugEnabled()) { log.debug("delete result {}", deleteResponse.getResult()); @@ -301,7 +303,8 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO if (!Strings.isNullOrEmpty(request.getDocumentId())) { indexRequest.id(request.getDocumentId()); } - indexRequest.type(config.getTypeName()); + // no longer needed? + //indexRequest.type(config.getTypeName()); indexRequest.source(request.getDocumentSource(), XContentType.JSON); if (log.isDebugEnabled()) { log.debug("append index request id={}, type={}, source={}", request.getDocumentId(), config.getTypeName(), @@ -314,7 +317,8 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request) throws IOException { DeleteRequest deleteRequest = new DeleteRequestWithPulsarRecord(request.getIndex(), request.getRecord()); deleteRequest.id(request.getDocumentId()); - deleteRequest.type(config.getTypeName()); + // no longer needed? + //deleteRequest.type(config.getTypeName()); if (log.isDebugEnabled()) { log.debug("append delete request id={}, type={}", request.getDocumentId(), config.getTypeName()); } diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index 1d77c1bea5ae7..5045d645d0dc9 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -207,6 +207,11 @@ aws-java-sdk-core test + + org.projectlombok + lombok + test + diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java index 5b8deabb114f6..11ab7c53df764 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java @@ -49,6 +49,7 @@ protected ElasticsearchContainer createElasticContainer() { DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH) .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); return new ElasticsearchContainer(dockerImageName) + .withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!") .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m") .withEnv("bootstrap.memory_lock", "true") .withEnv("plugins.security.disabled", "true"); From c4ca6a3cb16eacd80dc9a173ec86b2ab41ca8afe Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Wed, 11 Sep 2024 15:52:58 -0700 Subject: [PATCH 05/18] upgrade OS image in test base --- .../apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java index 506620c2db861..fca3fc5d056dd 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java @@ -46,7 +46,7 @@ public abstract class ElasticSearchTestBase { .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7"); public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE")) - .orElse("opensearchproject/opensearch:1.2.4"); + .orElse("opensearchproject/opensearch:2.16.0"); protected final String elasticImageName; From 8eb2449868df90cbc03183e2ddb194f894e1e1cc Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Wed, 11 Sep 2024 16:32:19 -0700 Subject: [PATCH 06/18] OS requires a password --- .../apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java index fca3fc5d056dd..58ebe3caf833c 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java @@ -59,6 +59,7 @@ protected ElasticsearchContainer createElasticsearchContainer() { if (elasticImageName.equals(OPENSEARCH)) { DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); elasticsearchContainer = new ElasticsearchContainer(dockerImageName) + .withEnv("OPENSEARCH_INITIAL_ADMIN_PASSWORD", "0pEn7earch!") .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m") .withEnv("bootstrap.memory_lock", "true") .withEnv("plugins.security.disabled", "true"); From c161a83a2df4476373a0c93e221aeb0e2a6a692e Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Thu, 12 Sep 2024 16:58:15 -0700 Subject: [PATCH 07/18] correct GC setting for LS 2.10 --- .../latest-version-image/conf/functions_worker.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/docker-images/latest-version-image/conf/functions_worker.conf b/tests/docker-images/latest-version-image/conf/functions_worker.conf index 15f2ed0fb1e3e..da87695d40e03 100644 --- a/tests/docker-images/latest-version-image/conf/functions_worker.conf +++ b/tests/docker-images/latest-version-image/conf/functions_worker.conf @@ -22,6 +22,6 @@ autostart=false redirect_stderr=true stdout_logfile=/var/log/pulsar/functions_worker.log directory=/pulsar -environment=PULSAR_MEM="-Xmx128M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/pulsar/logs/functions",PULSAR_GC="-XX:+UseZGC" +environment=PULSAR_MEM="-Xmx128M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/pulsar/logs/functions",PULSAR_GC="-XX:+UseG1GC" command=/pulsar/bin/pulsar functions-worker user=pulsar From af9c1b3c566a02a8216beba07891936c642a826f Mon Sep 17 00:00:00 2001 From: nikhil-ctds Date: Fri, 13 Sep 2024 12:51:44 +0530 Subject: [PATCH 08/18] updated jackson version to 2.16.0 (cherry picked from commit 1eb6441f5edcc56d59869fde24a80c7a28ad92a1) --- pom.xml | 2 +- .../java/org/apache/pulsar/common/util/FieldParser.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index d06b80570f5cd..70cbe87778f12 100644 --- a/pom.xml +++ b/pom.xml @@ -125,7 +125,7 @@ flexible messaging model and an intuitive client API. 2.18.0 1.69 1.0.2 - 2.14.2 + 2.16.0 0.9.11 1.6.10 8.37 diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java index 40450584764a4..838f944b5d96d 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FieldParser.java @@ -21,8 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.lang.String.format; import static java.util.Objects.requireNonNull; -import com.fasterxml.jackson.databind.AnnotationIntrospector; -import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector; +import com.fasterxml.jackson.databind.DeserializationConfig; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.util.EnumResolver; import java.lang.reflect.Field; import java.lang.reflect.Method; @@ -58,7 +58,7 @@ public final class FieldParser { private static final Map CONVERTERS = new HashMap<>(); private static final Map, Class> WRAPPER_TYPES = new HashMap<>(); - private static final AnnotationIntrospector ANNOTATION_INTROSPECTOR = new JacksonAnnotationIntrospector(); + private static final DeserializationConfig DESERIALIZATION_CONFIG = new ObjectMapper().getDeserializationConfig(); static { // Preload converters and wrapperTypes. @@ -100,7 +100,7 @@ public static T convert(Object from, Class to) { if (to.isEnum()) { // Converting string to enum - EnumResolver r = EnumResolver.constructUsingToString((Class>) to, ANNOTATION_INTROSPECTOR); + EnumResolver r = EnumResolver.constructUsingToString(DESERIALIZATION_CONFIG, to); T value = (T) r.findEnum((String) from); if (value == null) { throw new RuntimeException("Invalid value '" + from + "' for enum " + to); From 8b0e4189dbd708d3357bab99d48f7e5e03f741fc Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 13 Sep 2024 10:47:31 -0700 Subject: [PATCH 09/18] Revert "temporary, disable tests to run opensearch test faster / not skip on other failures" This reverts commit 68f7102267bab76d6d4446b865aa97eb351a6f74. --- build/run_integration_group.sh | 4 +- .../integration/io/sinks/PulsarSinksTest.java | 90 +++++++++---------- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index 3c62880bbfef2..b44f62d12f122 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -118,8 +118,8 @@ test_group_sql() { } test_group_pulsar_io() { - #mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source - mvn_run_integration_test "$@" -fail-at-end -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink } test_group_pulsar_io_ora() { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java index d8e4512423256..4e5b960d477a8 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java @@ -39,56 +39,56 @@ public Object[][] withSchema() { return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}; } -// @Test(groups = "sink") -// public void testKafkaSink() throws Exception { -// final String kafkaContainerName = "kafka-" + randomName(8); -// testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName)); -// } -// -// @Test(enabled = false, groups = "sink") -// public void testCassandraSink() throws Exception { -// testSink(CassandraSinkTester.createTester(true), true); -// } -// -// @Test(enabled = false, groups = "sink") -// public void testCassandraArchiveSink() throws Exception { -// testSink(CassandraSinkTester.createTester(false), false); -// } -// -// @Test(enabled = false, groups = "sink") -// public void testHdfsSink() throws Exception { -// testSink(new HdfsSinkTester(), false); -// } -// -// @Test(groups = "sink", dataProvider = "withSchema") -// public void testJdbcSink(boolean kvSchema) throws Exception { -// testSink(new JdbcPostgresSinkTester(kvSchema), true); -// } - -// @Test(groups = "sink", dataProvider = "withSchema") -// public void testElasticSearch7Sink(boolean withSchema) throws Exception { -// testSink(new ElasticSearch7SinkTester(withSchema), true); -// } -// -// @Test(groups = "sink", dataProvider = "withSchema") -// public void testElasticSearch8Sink(boolean withSchema) throws Exception { -// testSink(new ElasticSearch8SinkTester(withSchema), true); -// } + @Test(groups = "sink") + public void testKafkaSink() throws Exception { + final String kafkaContainerName = "kafka-" + randomName(8); + testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName)); + } + + @Test(enabled = false, groups = "sink") + public void testCassandraSink() throws Exception { + testSink(CassandraSinkTester.createTester(true), true); + } + + @Test(enabled = false, groups = "sink") + public void testCassandraArchiveSink() throws Exception { + testSink(CassandraSinkTester.createTester(false), false); + } + + @Test(enabled = false, groups = "sink") + public void testHdfsSink() throws Exception { + testSink(new HdfsSinkTester(), false); + } + + @Test(groups = "sink", dataProvider = "withSchema") + public void testJdbcSink(boolean kvSchema) throws Exception { + testSink(new JdbcPostgresSinkTester(kvSchema), true); + } + + @Test(groups = "sink", dataProvider = "withSchema") + public void testElasticSearch7Sink(boolean withSchema) throws Exception { + testSink(new ElasticSearch7SinkTester(withSchema), true); + } + + @Test(groups = "sink", dataProvider = "withSchema") + public void testElasticSearch8Sink(boolean withSchema) throws Exception { + testSink(new ElasticSearch8SinkTester(withSchema), true); + } @Test(groups = "sink", dataProvider = "withSchema") public void testOpenSearchSinkRawData(boolean withSchema) throws Exception { testSink(new OpenSearchSinkTester(withSchema), true); } -// @Test(enabled = false, groups = "sink") -// public void testRabbitMQSink() throws Exception { -// final String containerName = "rabbitmq-" + randomName(8); -// testSink(new RabbitMQSinkTester(containerName), true, new RabbitMQSourceTester(containerName)); -// } -// -// @Test(groups = "sink", dataProvider = "withSchema") -// public void testKinesis(boolean withSchema) throws Exception { -// testSink(new KinesisSinkTester(withSchema), true); -// } + @Test(enabled = false, groups = "sink") + public void testRabbitMQSink() throws Exception { + final String containerName = "rabbitmq-" + randomName(8); + testSink(new RabbitMQSinkTester(containerName), true, new RabbitMQSourceTester(containerName)); + } + + @Test(groups = "sink", dataProvider = "withSchema") + public void testKinesis(boolean withSchema) throws Exception { + testSink(new KinesisSinkTester(withSchema), true); + } } From 685709fbe3771e32b94c8ca262cb937c9ebade8c Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 13 Sep 2024 10:47:44 -0700 Subject: [PATCH 10/18] Revert "checkstyle" This reverts commit fc1f1794e681d7c8e15642a500eb2e5b90734cb7. --- .../pulsar/tests/integration/io/sinks/PulsarSinksTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java index 4e5b960d477a8..ceaeda137be92 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java @@ -20,9 +20,9 @@ import org.apache.pulsar.tests.integration.io.PulsarIOTestBase; -//import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester; -//import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester; -//import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester; +import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester; +import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester; +import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; From 8353d2bbbae5fa9df93c459bb6fb4871cb81333b Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 13 Sep 2024 12:24:07 -0700 Subject: [PATCH 11/18] fail tests at the end --- build/run_integration_group.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index b44f62d12f122..f5076eedb4298 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -118,8 +118,8 @@ test_group_sql() { } test_group_pulsar_io() { - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -fae -DintegrationTests -Dgroups=source + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -fae -DintegrationTests -Dgroups=sink } test_group_pulsar_io_ora() { From 6381def34b88d5314dac05af6d01cfbb4660dcff Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 13 Sep 2024 14:23:33 -0700 Subject: [PATCH 12/18] fix test --- .../elasticsearch/opensearch/OpenSearchClientSslTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java index 2fd9cafd16fc6..dc5f78a271349 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java @@ -77,7 +77,7 @@ public void testSslBasic() throws IOException { .setElasticSearchUrl("https://" + container.getHttpHostAddress()) .setIndexName(INDEX) .setUsername("admin") - .setPassword("admin") + .setPassword("0pEn7earch!") .setSsl(new ElasticSearchSslConfig() .setEnabled(true) .setTruststorePath(sslResourceDir + "/truststore.jks") @@ -102,7 +102,7 @@ public void testSslWithHostnameVerification() throws IOException { .setElasticSearchUrl("https://" + container.getHttpHostAddress()) .setIndexName(INDEX) .setUsername("admin") - .setPassword("admin") + .setPassword("0pEn7earch!") .setSsl(new ElasticSearchSslConfig() .setEnabled(true) .setProtocols("TLSv1.2") @@ -128,7 +128,7 @@ public void testSslWithClientAuth() throws IOException { .setElasticSearchUrl("https://" + container.getHttpHostAddress()) .setIndexName(INDEX) .setUsername("admin") - .setPassword("admin") + .setPassword("0pEn7earch!") .setSsl(new ElasticSearchSslConfig() .setEnabled(true) .setHostnameVerification(true) From 1f0e36cda65105b8eae74d53b4cef4f67a0e28a4 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 13 Sep 2024 14:28:49 -0700 Subject: [PATCH 13/18] temoprary disable flakes --- .../integration/io/sinks/PulsarSinksTest.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java index ceaeda137be92..b09fb74189820 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java @@ -22,7 +22,7 @@ import org.apache.pulsar.tests.integration.io.PulsarIOTestBase; import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester; import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester; -import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester; +//import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -39,11 +39,11 @@ public Object[][] withSchema() { return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}; } - @Test(groups = "sink") - public void testKafkaSink() throws Exception { - final String kafkaContainerName = "kafka-" + randomName(8); - testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName)); - } +// @Test(groups = "sink") +// public void testKafkaSink() throws Exception { +// final String kafkaContainerName = "kafka-" + randomName(8); +// testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName)); +// } @Test(enabled = false, groups = "sink") public void testCassandraSink() throws Exception { @@ -86,9 +86,9 @@ public void testRabbitMQSink() throws Exception { testSink(new RabbitMQSinkTester(containerName), true, new RabbitMQSourceTester(containerName)); } - @Test(groups = "sink", dataProvider = "withSchema") - public void testKinesis(boolean withSchema) throws Exception { - testSink(new KinesisSinkTester(withSchema), true); - } +// @Test(groups = "sink", dataProvider = "withSchema") +// public void testKinesis(boolean withSchema) throws Exception { +// testSink(new KinesisSinkTester(withSchema), true); +// } } From d6845e8ce203d3663611d838f14ea2ebb1b44a7f Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 13 Sep 2024 15:31:58 -0700 Subject: [PATCH 14/18] disable more --- build/run_integration_group.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index f5076eedb4298..85f1a99187414 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -118,7 +118,7 @@ test_group_sql() { } test_group_pulsar_io() { - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -fae -DintegrationTests -Dgroups=source + #mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -fae -DintegrationTests -Dgroups=source mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -fae -DintegrationTests -Dgroups=sink } From 64b3e100218ad63de74435a51903c281d77ad363 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 13 Sep 2024 17:07:20 -0700 Subject: [PATCH 15/18] Revert "disable more" This reverts commit d6845e8ce203d3663611d838f14ea2ebb1b44a7f. --- build/run_integration_group.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index 85f1a99187414..f5076eedb4298 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -118,7 +118,7 @@ test_group_sql() { } test_group_pulsar_io() { - #mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -fae -DintegrationTests -Dgroups=source + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -fae -DintegrationTests -Dgroups=source mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -fae -DintegrationTests -Dgroups=sink } From 8d44f1b9c969d276f5e54877e2c698d1758828b7 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 13 Sep 2024 17:07:30 -0700 Subject: [PATCH 16/18] Revert "temoprary disable flakes" This reverts commit 1f0e36cda65105b8eae74d53b4cef4f67a0e28a4. --- .../integration/io/sinks/PulsarSinksTest.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java index b09fb74189820..ceaeda137be92 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java @@ -22,7 +22,7 @@ import org.apache.pulsar.tests.integration.io.PulsarIOTestBase; import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester; import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester; -//import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester; +import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -39,11 +39,11 @@ public Object[][] withSchema() { return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}; } -// @Test(groups = "sink") -// public void testKafkaSink() throws Exception { -// final String kafkaContainerName = "kafka-" + randomName(8); -// testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName)); -// } + @Test(groups = "sink") + public void testKafkaSink() throws Exception { + final String kafkaContainerName = "kafka-" + randomName(8); + testSink(new KafkaSinkTester(kafkaContainerName), true, new KafkaSourceTester(kafkaContainerName)); + } @Test(enabled = false, groups = "sink") public void testCassandraSink() throws Exception { @@ -86,9 +86,9 @@ public void testRabbitMQSink() throws Exception { testSink(new RabbitMQSinkTester(containerName), true, new RabbitMQSourceTester(containerName)); } -// @Test(groups = "sink", dataProvider = "withSchema") -// public void testKinesis(boolean withSchema) throws Exception { -// testSink(new KinesisSinkTester(withSchema), true); -// } + @Test(groups = "sink", dataProvider = "withSchema") + public void testKinesis(boolean withSchema) throws Exception { + testSink(new KinesisSinkTester(withSchema), true); + } } From fc543442277b70ffcd9fb239268c4a9177e6bf76 Mon Sep 17 00:00:00 2001 From: Andrey Yegorov Date: Fri, 13 Sep 2024 17:07:47 -0700 Subject: [PATCH 17/18] Revert "fail tests at the end" This reverts commit 8353d2bbbae5fa9df93c459bb6fb4871cb81333b. --- build/run_integration_group.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build/run_integration_group.sh b/build/run_integration_group.sh index f5076eedb4298..b44f62d12f122 100755 --- a/build/run_integration_group.sh +++ b/build/run_integration_group.sh @@ -118,8 +118,8 @@ test_group_sql() { } test_group_pulsar_io() { - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -fae -DintegrationTests -Dgroups=source - mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -fae -DintegrationTests -Dgroups=sink + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sources.xml -DintegrationTests -Dgroups=source + mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-io-sinks.xml -DintegrationTests -Dgroups=sink } test_group_pulsar_io_ora() { From 678898e39227193b02f74c51f5d13d6182a6b67b Mon Sep 17 00:00:00 2001 From: mukesh-ctds <151806568+mukesh-ctds@users.noreply.github.com> Date: Thu, 19 Sep 2024 10:18:52 +0530 Subject: [PATCH 18/18] Removed comments --- .../client/opensearch/OpenSearchHighLevelRestClient.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java index 7b7393db31f46..81c955c5ae000 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java @@ -229,8 +229,6 @@ public boolean indexDocument(String index, String documentId, String documentSou if (!Strings.isNullOrEmpty(documentId)) { indexRequest.id(documentId); } - // no longer needed? - //indexRequest.type(config.getTypeName()); indexRequest.source(documentSource, XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); @@ -246,8 +244,6 @@ public boolean indexDocument(String index, String documentId, String documentSou public boolean deleteDocument(String index, String documentId) throws IOException { DeleteRequest deleteRequest = Requests.deleteRequest(index); deleteRequest.id(documentId); - // no longer needed? - //deleteRequest.type(config.getTypeName()); DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT); if (log.isDebugEnabled()) { log.debug("delete result {}", deleteResponse.getResult()); @@ -303,8 +299,6 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO if (!Strings.isNullOrEmpty(request.getDocumentId())) { indexRequest.id(request.getDocumentId()); } - // no longer needed? - //indexRequest.type(config.getTypeName()); indexRequest.source(request.getDocumentSource(), XContentType.JSON); if (log.isDebugEnabled()) { log.debug("append index request id={}, type={}, source={}", request.getDocumentId(), config.getTypeName(), @@ -317,8 +311,6 @@ public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) throws IO public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request) throws IOException { DeleteRequest deleteRequest = new DeleteRequestWithPulsarRecord(request.getIndex(), request.getRecord()); deleteRequest.id(request.getDocumentId()); - // no longer needed? - //deleteRequest.type(config.getTypeName()); if (log.isDebugEnabled()) { log.debug("append delete request id={}, type={}", request.getDocumentId(), config.getTypeName()); }