Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34369][connectors/elasticsearch] Elasticsearch connector supports SSL context #91

Merged
merged 6 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ DeliveryGuarantee getDeliveryGuarantee() {
BulkResponseInspectorFactory getBulkResponseInspectorFactory() {
return bulkResponseInspectorFactory;
}

@VisibleForTesting
NetworkClientConfig getNetworkClientConfig() {
return networkClientConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,18 @@
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultFailureHandler;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.function.SerializableSupplier;

import org.apache.http.HttpHost;
import org.apache.http.conn.ssl.TrustAllStrategy;
import org.apache.http.ssl.SSLContexts;

import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;

import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -60,6 +69,8 @@ public abstract class ElasticsearchSinkBuilderBase<
private Integer connectionTimeout;
private Integer connectionRequestTimeout;
private Integer socketTimeout;
private SerializableSupplier<SSLContext> sslContextSupplier;
private SerializableSupplier<HostnameVerifier> hostnameVerifierSupplier;
private FailureHandler failureHandler = new DefaultFailureHandler();
private BulkResponseInspectorFactory bulkResponseInspectorFactory;

Expand Down Expand Up @@ -263,6 +274,51 @@ public B setSocketTimeout(int timeout) {
return self();
}

/**
* Allows to bypass the certificates chain validation and connect to insecure network endpoints
* (for example, servers which use self-signed certificates).
*
* @return this builder
*/
public B allowInsecure() {
this.sslContextSupplier =
() -> {
try {
return SSLContexts.custom()
.loadTrustMaterial(TrustAllStrategy.INSTANCE)
.build();
} catch (final NoSuchAlgorithmException
| KeyStoreException
| KeyManagementException ex) {
throw new IllegalStateException("Unable to create custom SSL context", ex);
}
};
return self();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is overlap between this setting and suppliers, you may just rely on SSL context supplier here:

this.allowInsecure = allowInsecure;
if (this.allowInsecure) {
    return setSslContextSupplier(SSLContexts.custom().loadTrustMaterial(TrustAllStrategy.INSTANCE)::build);
}

}

/**
* Sets the supplier for getting an {@link SSLContext} instance.
*
* @param sslContextSupplier the serializable SSLContext supplier function
* @return this builder
*/
public B setSslContextSupplier(SerializableSupplier<SSLContext> sslContextSupplier) {
this.sslContextSupplier = checkNotNull(sslContextSupplier);
return self();
}

/**
* Sets the supplier for getting an SSL {@link HostnameVerifier} instance.
*
* @param sslHostnameVerifierSupplier the serializable hostname verifier supplier function
* @return this builder
*/
public B setSslHostnameVerifier(
SerializableSupplier<HostnameVerifier> sslHostnameVerifierSupplier) {
this.hostnameVerifierSupplier = sslHostnameVerifierSupplier;
return self();
}

/**
* Overrides the default {@link FailureHandler}. A custom failure handler can handle partial
* failures gracefully. See {@link #bulkResponseInspectorFactory} for more extensive control.
Expand Down Expand Up @@ -329,14 +385,15 @@ public ElasticsearchSink<IN> build() {

private NetworkClientConfig buildNetworkClientConfig() {
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");

return new NetworkClientConfig(
username,
password,
connectionPathPrefix,
connectionRequestTimeout,
connectionTimeout,
socketTimeout);
socketTimeout,
sslContextSupplier,
hostnameVerifierSupplier);
}

private BulkProcessorConfig buildBulkProcessorConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,32 @@ private static RestClientBuilder configureRestClientBuilder(
if (networkClientConfig.getConnectionPathPrefix() != null) {
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
}
if (networkClientConfig.getPassword() != null
&& networkClientConfig.getUsername() != null) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
networkClientConfig.getUsername(), networkClientConfig.getPassword()));

final CredentialsProvider credentialsProvider = getCredentialsProvider(networkClientConfig);
if (credentialsProvider != null
|| networkClientConfig.getSSLContextSupplier() != null
|| networkClientConfig.getSslHostnameVerifier() != null) {
builder.setHttpClientConfigCallback(
httpClientBuilder ->
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
httpClientBuilder -> {
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}

if (networkClientConfig.getSSLContextSupplier() != null) {
// client creates SSL context using the configured supplier
httpClientBuilder.setSSLContext(
networkClientConfig.getSSLContextSupplier().get());
}

if (networkClientConfig.getSslHostnameVerifier() != null) {
httpClientBuilder.setSSLHostnameVerifier(
networkClientConfig.getSslHostnameVerifier().get());
}

return httpClientBuilder;
});
}

if (networkClientConfig.getConnectionRequestTimeout() != null
|| networkClientConfig.getConnectionTimeout() != null
|| networkClientConfig.getSocketTimeout() != null) {
Expand All @@ -195,6 +210,25 @@ private static RestClientBuilder configureRestClientBuilder(
return builder;
}

/**
* Get an http client credentials provider given network client config.
*
* <p>If network client config is not configured with username or password, return null.
*/
private static CredentialsProvider getCredentialsProvider(
NetworkClientConfig networkClientConfig) {
CredentialsProvider credentialsProvider = null;
if (networkClientConfig.getPassword() != null
&& networkClientConfig.getUsername() != null) {
credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
networkClientConfig.getUsername(), networkClientConfig.getPassword()));
}
return credentialsProvider;
}

private BulkProcessor createBulkProcessor(
BulkProcessorBuilderFactory bulkProcessorBuilderFactory,
BulkProcessorConfig bulkProcessorConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.util.function.SerializableSupplier;

import javax.annotation.Nullable;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLContext;

import java.io.Serializable;

Expand All @@ -30,20 +34,26 @@ class NetworkClientConfig implements Serializable {
@Nullable private final Integer connectionRequestTimeout;
@Nullable private final Integer connectionTimeout;
@Nullable private final Integer socketTimeout;
@Nullable private final SerializableSupplier<SSLContext> sslContextSupplier;
@Nullable private final SerializableSupplier<HostnameVerifier> sslHostnameVerifier;

NetworkClientConfig(
@Nullable String username,
@Nullable String password,
@Nullable String connectionPathPrefix,
@Nullable Integer connectionRequestTimeout,
@Nullable Integer connectionTimeout,
@Nullable Integer socketTimeout) {
@Nullable Integer socketTimeout,
@Nullable SerializableSupplier<SSLContext> sslContextSupplier,
@Nullable SerializableSupplier<HostnameVerifier> sslHostnameVerifier) {
this.username = username;
this.password = password;
this.connectionPathPrefix = connectionPathPrefix;
this.connectionRequestTimeout = connectionRequestTimeout;
this.connectionTimeout = connectionTimeout;
this.socketTimeout = socketTimeout;
this.sslContextSupplier = sslContextSupplier;
this.sslHostnameVerifier = sslHostnameVerifier;
}

@Nullable
Expand Down Expand Up @@ -75,4 +85,14 @@ public Integer getSocketTimeout() {
public String getConnectionPathPrefix() {
return connectionPathPrefix;
}

@Nullable
public SerializableSupplier<SSLContext> getSSLContextSupplier() {
return sslContextSupplier;
}

@Nullable
public SerializableSupplier<HostnameVerifier> getSslHostnameVerifier() {
return sslHostnameVerifier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ Stream<DynamicTest> testValidBuilders() {
.setBulkFlushBackoffStrategy(FlushBackoffType.CONSTANT, 1, 1),
createMinimalBuilder()
.setConnectionUsername("username")
.setConnectionPassword("password"));
.setConnectionPassword("password"),
createMinimalBuilder().allowInsecure());

return DynamicTest.stream(
validBuilders,
Expand All @@ -68,6 +69,17 @@ void testDefaultDeliveryGuarantee() {
.isEqualTo(DeliveryGuarantee.AT_LEAST_ONCE);
}

@Test
void testAllowInsecureSetSslContextSupplier() {
assertThat(
createMinimalBuilder()
.allowInsecure()
.build()
.getNetworkClientConfig()
.getSSLContextSupplier())
.isNotNull();
}

@Test
void testThrowIfExactlyOnceConfigured() {
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ private static ElasticsearchWriter<Tuple2<Integer, String>> createWriter(
bulkProcessorConfig,
new TestBulkProcessorBuilderFactory(),
new DefaultBulkResponseInspector(),
new NetworkClientConfig(null, null, null, null, null, null),
new NetworkClientConfig(null, null, null, null, null, null, null, null),
metricGroup,
new TestMailbox());
}
Expand Down