Skip to content

Commit

Permalink
[FLINK-35287] Builder builds NetworkConfig for Elasticsearch connector 8
Browse files Browse the repository at this point in the history
  • Loading branch information
liuml07 committed May 3, 2024
1 parent 5d1f8d0 commit 76e4210
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@

package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Elasticsearch8AsyncSink Apache Flink's Async Sink that submits Operations into an Elasticsearch
Expand All @@ -47,15 +43,8 @@
public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, Operation> {
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncSink.class);

private final String username;

private final String password;

private final String certificateFingerprint;

private final List<HttpHost> httpHosts;

private final List<Header> headers;
@VisibleForTesting
protected final NetworkConfig networkConfig;

protected Elasticsearch8AsyncSink(
ElementConverter<InputT, Operation> converter,
Expand All @@ -65,11 +54,7 @@ protected Elasticsearch8AsyncSink(
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInByte,
String username,
String password,
String certificateFingerprint,
List<HttpHost> httpHosts,
List<Header> headers) {
NetworkConfig networkConfig) {
super(
converter,
maxBatchSize,
Expand All @@ -79,11 +64,7 @@ protected Elasticsearch8AsyncSink(
maxTimeInBufferMS,
maxRecordSizeInByte);

this.username = username;
this.password = password;
this.certificateFingerprint = certificateFingerprint;
this.httpHosts = checkNotNull(httpHosts, "Hosts must not be null");
this.headers = headers;
this.networkConfig = networkConfig;
}

@Override
Expand All @@ -98,11 +79,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter(
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
username,
password,
certificateFingerprint,
httpHosts,
headers,
networkConfig,
Collections.emptyList());
}

Expand All @@ -118,11 +95,7 @@ public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> restoreWriter
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
username,
password,
certificateFingerprint,
httpHosts,
headers,
networkConfig,
recoveredState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public Elasticsearch8AsyncSinkBuilder<InputT> setHosts(HttpHost... hosts) {
*/
public Elasticsearch8AsyncSinkBuilder<InputT> setHeaders(Header... headers) {
checkNotNull(hosts);
checkArgument(headers.length > 0, "Hosts cannot be empty");
checkArgument(headers.length > 0, "Headers cannot be empty");
this.headers = Arrays.asList(headers);
return this;
}
Expand All @@ -108,7 +108,7 @@ public Elasticsearch8AsyncSinkBuilder<InputT> setHeaders(Header... headers) {
*/
public Elasticsearch8AsyncSinkBuilder<InputT> setCertificateFingerprint(
String certificateFingerprint) {
checkNotNull(username, "certificateFingerprint must not be null");
checkNotNull(certificateFingerprint, "certificateFingerprint must not be null");
this.certificateFingerprint = certificateFingerprint;
return this;
}
Expand Down Expand Up @@ -171,18 +171,24 @@ public Elasticsearch8AsyncSink<InputT> build() {
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
username,
password,
certificateFingerprint,
hosts,
headers);
buildNetworkConfig());
}

private OperationConverter<InputT> buildOperationConverter(
ElementConverter<InputT, BulkOperationVariant> converter) {
return converter != null ? new OperationConverter<>(converter) : null;
}

private NetworkConfig buildNetworkConfig() {
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
return new NetworkConfig(
hosts,
username,
password,
headers,
certificateFingerprint);
}

/** A wrapper that evolves the Operation, since a BulkOperationVariant is not Serializable. */
public static class OperationConverter<T> implements ElementConverter<T, Operation> {
private final ElementConverter<T, BulkOperationVariant> converter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,11 +83,7 @@ public Elasticsearch8AsyncWriter(
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
String username,
String password,
String certificateFingerprint,
List<HttpHost> httpHosts,
List<Header> headers,
NetworkConfig networkConfig,
Collection<BufferedRequestState<Operation>> state) {
super(
elementConverter,
Expand All @@ -104,9 +98,7 @@ public Elasticsearch8AsyncWriter(
.build(),
state);

this.esClient =
new NetworkConfig(httpHosts, username, password, headers, certificateFingerprint)
.create();
this.esClient = networkConfig.create();
final SinkWriterMetricGroup metricGroup = context.metricGroup();
checkNotNull(metricGroup);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,13 @@ private Elasticsearch8AsyncWriter<DummyData> createWriter(
5 * 1024 * 1024,
5000,
1024 * 1024,
null,
null,
null,
esHost,
null) {
new NetworkConfig(
esHost,
null,
null,
null,
null
)) {
@Override
public StatefulSinkWriter createWriter(InitContext context) {
return new Elasticsearch8AsyncWriter<DummyData>(
Expand All @@ -261,11 +263,7 @@ public StatefulSinkWriter createWriter(InitContext context) {
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
null,
null,
null,
esHost,
null,
networkConfig,
Collections.emptyList()) {
@Override
protected void submitRequestEntries(
Expand Down

0 comments on commit 76e4210

Please sign in to comment.