diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSink.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSink.java index 3d984599..4d0032ab 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSink.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSink.java @@ -21,21 +21,17 @@ package org.apache.flink.connector.elasticsearch.sink; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.connector.base.sink.AsyncSinkBase; import org.apache.flink.connector.base.sink.writer.BufferedRequestState; import org.apache.flink.connector.base.sink.writer.ElementConverter; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.http.Header; -import org.apache.http.HttpHost; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; -import java.util.List; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * Elasticsearch8AsyncSink Apache Flink's Async Sink that submits Operations into an Elasticsearch @@ -47,15 +43,8 @@ public class Elasticsearch8AsyncSink extends AsyncSinkBase { private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncSink.class); - private final String username; - - private final String password; - - private final String certificateFingerprint; - - private final List httpHosts; - - private final List
headers; + @VisibleForTesting + protected final NetworkConfig networkConfig; protected Elasticsearch8AsyncSink( ElementConverter converter, @@ -65,11 +54,7 @@ protected Elasticsearch8AsyncSink( long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInByte, - String username, - String password, - String certificateFingerprint, - List httpHosts, - List
headers) { + NetworkConfig networkConfig) { super( converter, maxBatchSize, @@ -79,11 +64,7 @@ protected Elasticsearch8AsyncSink( maxTimeInBufferMS, maxRecordSizeInByte); - this.username = username; - this.password = password; - this.certificateFingerprint = certificateFingerprint; - this.httpHosts = checkNotNull(httpHosts, "Hosts must not be null"); - this.headers = headers; + this.networkConfig = networkConfig; } @Override @@ -98,11 +79,7 @@ public StatefulSinkWriter> createWriter( getMaxBatchSizeInBytes(), getMaxTimeInBufferMS(), getMaxRecordSizeInBytes(), - username, - password, - certificateFingerprint, - httpHosts, - headers, + networkConfig, Collections.emptyList()); } @@ -118,11 +95,7 @@ public StatefulSinkWriter> restoreWriter getMaxBatchSizeInBytes(), getMaxTimeInBufferMS(), getMaxRecordSizeInBytes(), - username, - password, - certificateFingerprint, - httpHosts, - headers, + networkConfig, recoveredState); } diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java index 391b5fec..bcf4a4a5 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java @@ -94,7 +94,7 @@ public Elasticsearch8AsyncSinkBuilder setHosts(HttpHost... hosts) { */ public Elasticsearch8AsyncSinkBuilder setHeaders(Header... headers) { checkNotNull(hosts); - checkArgument(headers.length > 0, "Hosts cannot be empty"); + checkArgument(headers.length > 0, "Headers cannot be empty"); this.headers = Arrays.asList(headers); return this; } @@ -108,7 +108,7 @@ public Elasticsearch8AsyncSinkBuilder setHeaders(Header... headers) { */ public Elasticsearch8AsyncSinkBuilder setCertificateFingerprint( String certificateFingerprint) { - checkNotNull(username, "certificateFingerprint must not be null"); + checkNotNull(certificateFingerprint, "certificateFingerprint must not be null"); this.certificateFingerprint = certificateFingerprint; return this; } @@ -171,11 +171,7 @@ public Elasticsearch8AsyncSink build() { Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B), Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS), Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B), - username, - password, - certificateFingerprint, - hosts, - headers); + buildNetworkConfig()); } private OperationConverter buildOperationConverter( @@ -183,6 +179,16 @@ private OperationConverter buildOperationConverter( return converter != null ? new OperationConverter<>(converter) : null; } + private NetworkConfig buildNetworkConfig() { + checkArgument(!hosts.isEmpty(), "Hosts cannot be empty."); + return new NetworkConfig( + hosts, + username, + password, + headers, + certificateFingerprint); + } + /** A wrapper that evolves the Operation, since a BulkOperationVariant is not Serializable. */ public static class OperationConverter implements ElementConverter { private final ElementConverter converter; diff --git a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java index e0f5b42f..ab68f696 100644 --- a/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java +++ b/flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriter.java @@ -35,8 +35,6 @@ import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; -import org.apache.http.Header; -import org.apache.http.HttpHost; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,11 +83,7 @@ public Elasticsearch8AsyncWriter( long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, - String username, - String password, - String certificateFingerprint, - List httpHosts, - List
headers, + NetworkConfig networkConfig, Collection> state) { super( elementConverter, @@ -104,9 +98,7 @@ public Elasticsearch8AsyncWriter( .build(), state); - this.esClient = - new NetworkConfig(httpHosts, username, password, headers, certificateFingerprint) - .create(); + this.esClient = networkConfig.create(); final SinkWriterMetricGroup metricGroup = context.metricGroup(); checkNotNull(metricGroup); diff --git a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java index 891f30b7..3bf2c5ef 100644 --- a/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java +++ b/flink-connector-elasticsearch8/src/test/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncWriterITCase.java @@ -245,11 +245,13 @@ private Elasticsearch8AsyncWriter createWriter( 5 * 1024 * 1024, 5000, 1024 * 1024, - null, - null, - null, - esHost, - null) { + new NetworkConfig( + esHost, + null, + null, + null, + null + )) { @Override public StatefulSinkWriter createWriter(InitContext context) { return new Elasticsearch8AsyncWriter( @@ -261,11 +263,7 @@ public StatefulSinkWriter createWriter(InitContext context) { getMaxBatchSizeInBytes(), getMaxTimeInBufferMS(), getMaxRecordSizeInBytes(), - null, - null, - null, - esHost, - null, + networkConfig, Collections.emptyList()) { @Override protected void submitRequestEntries(