Skip to content

Commit

Permalink
TLS Support
Browse files Browse the repository at this point in the history
  • Loading branch information
paralainer committed Nov 14, 2023
1 parent c18f9f5 commit 59eac98
Show file tree
Hide file tree
Showing 14 changed files with 315 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.net.Socket;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLSocketFactory;

/**
* Implement support for AWS ElastiCache node auto-discovery <a
Expand All @@ -52,6 +53,7 @@ public static class Builder {
private int configPort = 11211;
private long ttl = MINUTES.toMillis(1);
private int timeout = 5000; // ms
private boolean useTls = false;

private Builder(final String configHost) {
this.configHost = configHost;
Expand Down Expand Up @@ -95,13 +97,24 @@ public Builder withResolveTimeoutMillis(final int timeout) {
return this;
}

/**
* Configure TLS usage for connection. Default: false
*
* @param useTls whether resolver should use TLS connection
* @return The builder
*/
public Builder withTls(final boolean useTls) {
this.useTls = useTls;
return this;
}

/**
* Build the resolver
*
* @return The resolver
*/
public ElastiCacheResolver build() {
final Resolver resolver = new SocketResolver(configHost, configPort, timeout);
final Resolver resolver = new SocketResolver(configHost, configPort, timeout, useTls);

return new ElastiCacheResolver(resolver, ttl);
}
Expand Down Expand Up @@ -157,20 +170,22 @@ public interface Resolver {
}

public static class SocketResolver implements Resolver {

private final String configHost;
private final int configPort;
private final int timeout;
private final boolean useTls;
private final ResponseParser parser = new ResponseParser();

public SocketResolver(final String configHost, final int configPort, final int timeout) {
public SocketResolver(
final String configHost, final int configPort, final int timeout, final boolean useTls) {
this.configHost = configHost;
this.configPort = configPort;
this.timeout = timeout;
this.useTls = useTls;
}

public Response resolve() {
try (final Socket socket = new Socket()) {
try (final Socket socket = createSocket()) {
socket.setSoTimeout(timeout);
socket.connect(new InetSocketAddress(configHost, configPort), timeout);

Expand All @@ -181,5 +196,13 @@ public Response resolve() {
throw new RuntimeException("ElastiCache auto-discovery failed", e);
}
}

private Socket createSocket() throws IOException {
if (useTls) {
return SSLSocketFactory.getDefault().createSocket();
} else {
return new Socket();
}
}
}
}
5 changes: 5 additions & 0 deletions folsom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.spotify.folsom.client.NoopTracer;
import com.spotify.folsom.client.ascii.DefaultAsciiMemcacheClient;
import com.spotify.folsom.client.binary.DefaultBinaryMemcacheClient;
import com.spotify.folsom.client.tls.SSLEngineFactory;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.AddressAndClient;
import com.spotify.folsom.ketama.KetamaMemcacheClient;
Expand Down Expand Up @@ -125,6 +126,8 @@ public class MemcacheClientBuilder<V> {
private final List<UsernamePasswordPair> passwords = new ArrayList<>();
private boolean skipAuth = false;

private SSLEngineFactory sslEngineFactory = null;

/**
* Create a client builder for byte array values.
*
Expand Down Expand Up @@ -579,6 +582,11 @@ MemcacheClientBuilder<V> withoutAuthenticationValidation() {
return this;
}

public MemcacheClientBuilder<V> withSSLEngineFactory(final SSLEngineFactory sslEngineFactory) {
this.sslEngineFactory = sslEngineFactory;
return this;
}

/**
* Create a client that uses the binary memcache protocol.
*
Expand Down Expand Up @@ -732,6 +740,7 @@ private RawMemcacheClient createReconnectingClient(
metrics,
maxSetLength,
eventLoopGroup,
channelClass);
channelClass,
sslEngineFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.spotify.folsom.RawMemcacheClient;
import com.spotify.folsom.client.ascii.AsciiMemcacheDecoder;
import com.spotify.folsom.client.binary.BinaryMemcacheDecoder;
import com.spotify.folsom.client.tls.SSLEngineFactory;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.AddressAndClient;
import com.spotify.futures.CompletableFutures;
Expand All @@ -43,6 +44,7 @@
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
import io.netty.channel.EventLoopGroup;
Expand All @@ -52,6 +54,7 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.net.ConnectException;
Expand All @@ -66,6 +69,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -112,7 +116,8 @@ public static CompletionStage<RawMemcacheClient> connect(
final Metrics metrics,
final int maxSetLength,
final EventLoopGroup eventLoopGroup,
final Class<? extends Channel> channelClass) {
final Class<? extends Channel> channelClass,
final SSLEngineFactory sslEngineFactory) {

final ChannelInboundHandler decoder;
if (binary) {
Expand All @@ -124,14 +129,21 @@ public static CompletionStage<RawMemcacheClient> connect(
final ChannelHandler initializer =
new ChannelInitializer<Channel>() {
@Override
protected void initChannel(final Channel ch) throws Exception {
ch.pipeline()
.addLast(
new TcpTuningHandler(),
decoder,

// Downstream
new MemcacheEncoder());
protected void initChannel(final Channel ch) {
final ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast(new TcpTuningHandler());

if (sslEngineFactory != null) {
final SSLEngine sslEngine =
sslEngineFactory.createSSLEngine(address.getHostText(), address.getPort());
SslHandler sslHandler = new SslHandler(sslEngine);
// Disable SSL data aggregation
// it doesn't play well with memcached protocol and causes connection hangs
sslHandler.setWrapDataSize(0);
channelPipeline.addLast(sslHandler);
}

channelPipeline.addLast(decoder, new MemcacheEncoder());
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.spotify.folsom.client.tls;

import java.security.NoSuchAlgorithmException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;

public class DefaultSSLEngineFactory implements SSLEngineFactory {
private final SSLContext sslContext;
private final boolean reuseSession;

public DefaultSSLEngineFactory(final boolean reuseSession) throws NoSuchAlgorithmException {
this(SSLContext.getDefault(), reuseSession);
}

public DefaultSSLEngineFactory(final SSLContext sslContext, final boolean reuseSession) {
this.sslContext = sslContext;
this.reuseSession = reuseSession;
}

@Override
public SSLEngine createSSLEngine(final String hostname, final int port) {
final SSLEngine sslEngine;

if (reuseSession) {
sslEngine = sslContext.createSSLEngine(hostname, port);
} else {
sslEngine = sslContext.createSSLEngine();
}

sslEngine.setUseClientMode(true);
return sslEngine;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.spotify.folsom.client.tls;

import javax.net.ssl.SSLEngine;

public interface SSLEngineFactory {
SSLEngine createSSLEngine(String hostname, int port);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.spotify.folsom.client.DefaultRawMemcacheClient;
import com.spotify.folsom.client.NotConnectedClient;
import com.spotify.folsom.client.Request;
import com.spotify.folsom.client.tls.SSLEngineFactory;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.AddressAndClient;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -77,7 +78,8 @@ public ReconnectingClient(
final Metrics metrics,
final int maxSetLength,
final EventLoopGroup eventLoopGroup,
final Class<? extends Channel> channelClass) {
final Class<? extends Channel> channelClass,
final SSLEngineFactory sslEngineFactory) {
this(
backoffFunction,
scheduledExecutorService,
Expand All @@ -93,7 +95,8 @@ public ReconnectingClient(
metrics,
maxSetLength,
eventLoopGroup,
channelClass),
channelClass,
sslEngineFactory),
authenticator,
address,
reconnectionListener);
Expand All @@ -113,7 +116,8 @@ public ReconnectingClient(
final Metrics metrics,
final int maxSetLength,
final EventLoopGroup eventLoopGroup,
final Class<? extends Channel> channelClass) {
final Class<? extends Channel> channelClass,
final SSLEngineFactory sslEngineFactory) {
this(
backoffFunction,
scheduledExecutorService,
Expand All @@ -129,7 +133,8 @@ public ReconnectingClient(
metrics,
maxSetLength,
eventLoopGroup,
channelClass),
channelClass,
sslEngineFactory),
authenticator,
address,
new StandardReconnectionListener());
Expand Down
Loading

0 comments on commit 59eac98

Please sign in to comment.