diff --git a/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java b/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java index 7d0f705c..f4617d9d 100644 --- a/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java +++ b/folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java @@ -40,7 +40,9 @@ import com.spotify.folsom.client.tls.SSLEngineFactory; import com.spotify.folsom.guava.HostAndPort; import com.spotify.folsom.ketama.AddressAndClient; +import com.spotify.folsom.ketama.Continuum; import com.spotify.folsom.ketama.KetamaMemcacheClient; +import com.spotify.folsom.ketama.NodeLocator; import com.spotify.folsom.ketama.ResolvingKetamaClient; import com.spotify.folsom.reconnect.CatchingReconnectionListener; import com.spotify.folsom.reconnect.ReconnectingClient; @@ -56,12 +58,14 @@ import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -126,6 +130,7 @@ public class MemcacheClientBuilder { private final List passwords = new ArrayList<>(); private boolean skipAuth = false; + private Function, NodeLocator> nodeLocator = Continuum::new; private SSLEngineFactory sslEngineFactory = null; /** @@ -571,6 +576,22 @@ public MemcacheClientBuilder withUsernamePassword( return this; } + /** + * When using ketama client use provided NodeLocator to find client for given key. NodeLocator + * will be recreated when clients appear and disappear when using dynamic resolver. + * + *

This option can be used to change default hashing algorithm or vnode_ratio in consistent + * hashing algorithm. + * + * @param nodeLocator mapper from client collection to NodeLocator + * @return itself + */ + public MemcacheClientBuilder withNodeLocator( + Function, NodeLocator> nodeLocator) { + this.nodeLocator = nodeLocator; + return this; + } + /** * Disable authentication validation - only useful for tests against jmemcached which does not * support binary NOOP @@ -673,7 +694,7 @@ protected RawMemcacheClient connectRaw(boolean binary, Authenticator authenticat aac.add(new AddressAndClient(address, clients.get(i))); } - client = new KetamaMemcacheClient(aac); + client = new KetamaMemcacheClient(aac, nodeLocator.apply(aac)); } else { client = clients.get(0); } @@ -705,7 +726,8 @@ private RawMemcacheClient createResolvingClient( TimeUnit.MILLISECONDS, input -> createClient(input, binary, authenticator), shutdownDelay, - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS, + nodeLocator); client.start(); return client; diff --git a/folsom/src/main/java/com/spotify/folsom/ketama/Continuum.java b/folsom/src/main/java/com/spotify/folsom/ketama/Continuum.java index d31af819..f352a134 100644 --- a/folsom/src/main/java/com/spotify/folsom/ketama/Continuum.java +++ b/folsom/src/main/java/com/spotify/folsom/ketama/Continuum.java @@ -24,25 +24,29 @@ import java.util.Map.Entry; import java.util.TreeMap; -public class Continuum { +public class Continuum implements NodeLocator { private static final int VNODE_RATIO = 100; private final TreeMap ringOfFire; public Continuum(final Collection clients) { - this.ringOfFire = buildRing(clients); + this(clients, VNODE_RATIO); + } + + public Continuum(final Collection clients, int vnodeRatio) { + this.ringOfFire = buildRing(clients, vnodeRatio); } private TreeMap buildRing( - final Collection clients) { + final Collection clients, int vnodeRatio) { final TreeMap r = new TreeMap<>(); for (final AddressAndClient client : clients) { final String address = client.getAddress().toString(); byte[] hash = addressToBytes(address); - for (int i = 0; i < VNODE_RATIO; i++) { + for (int i = 0; i < vnodeRatio; i++) { final HashCode hashCode = Hasher.hash(hash); hash = hashCode.asBytes(); r.put(hashCode.asInt(), client.getClient()); @@ -51,6 +55,7 @@ private TreeMap buildRing( return r; } + @Override public RawMemcacheClient findClient(final byte[] key) { final int keyHash = Hasher.hash(key).asInt(); diff --git a/folsom/src/main/java/com/spotify/folsom/ketama/KetamaMemcacheClient.java b/folsom/src/main/java/com/spotify/folsom/ketama/KetamaMemcacheClient.java index ebf88c5d..33979ae0 100644 --- a/folsom/src/main/java/com/spotify/folsom/ketama/KetamaMemcacheClient.java +++ b/folsom/src/main/java/com/spotify/folsom/ketama/KetamaMemcacheClient.java @@ -47,19 +47,19 @@ private static Collection clientsOnly( return clients; } - private final Continuum continuum; + private final NodeLocator nodeLocator; - public KetamaMemcacheClient(final Collection clients) { + public KetamaMemcacheClient(final Collection clients, NodeLocator nodeLocator) { super(clientsOnly(clients)); if (clients.isEmpty()) { throw new IllegalArgumentException("Can not create ketama client from empty list"); } - this.continuum = new Continuum(clients); + this.nodeLocator = nodeLocator; } private RawMemcacheClient getClient(final byte[] key) { - return continuum.findClient(key); + return nodeLocator.findClient(key); } @Override diff --git a/folsom/src/main/java/com/spotify/folsom/ketama/NodeLocator.java b/folsom/src/main/java/com/spotify/folsom/ketama/NodeLocator.java new file mode 100644 index 00000000..c6a228cd --- /dev/null +++ b/folsom/src/main/java/com/spotify/folsom/ketama/NodeLocator.java @@ -0,0 +1,8 @@ +package com.spotify.folsom.ketama; + +import com.spotify.folsom.RawMemcacheClient; + +public interface NodeLocator { + + RawMemcacheClient findClient(final byte[] key); +} diff --git a/folsom/src/main/java/com/spotify/folsom/ketama/ResolvingKetamaClient.java b/folsom/src/main/java/com/spotify/folsom/ketama/ResolvingKetamaClient.java index 46dfb0c7..4f3e10d8 100644 --- a/folsom/src/main/java/com/spotify/folsom/ketama/ResolvingKetamaClient.java +++ b/folsom/src/main/java/com/spotify/folsom/ketama/ResolvingKetamaClient.java @@ -36,6 +36,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -64,6 +65,7 @@ public class ResolvingKetamaClient extends AbstractRawMemcacheClient { private volatile RawMemcacheClient currentClient; private volatile RawMemcacheClient pendingClient = null; private boolean shutdown = false; + private final Function, NodeLocator> nodeLocator; public ResolvingKetamaClient( Resolver resolver, @@ -72,7 +74,8 @@ public ResolvingKetamaClient( TimeUnit periodUnit, final Connector connector, long shutdownDelay, - TimeUnit shutdownUnit) { + TimeUnit shutdownUnit, + Function, NodeLocator> nodeLocator) { this.resolver = resolver; this.connector = connector; this.shutdownDelay = shutdownDelay; @@ -80,6 +83,7 @@ public ResolvingKetamaClient( this.executor = executor; this.currentClient = NotConnectedClient.INSTANCE; this.ttl = TimeUnit.SECONDS.convert(period, periodUnit); + this.nodeLocator = nodeLocator; } public void start() { @@ -216,7 +220,8 @@ private void setPendingClient(final ImmutableList.Builder rem // This may invalidate an existing pendingClient but should be fine since it doesn't have any // important state of its own. - final KetamaMemcacheClient newClient = new KetamaMemcacheClient(addressAndClients); + final KetamaMemcacheClient newClient = + new KetamaMemcacheClient(addressAndClients, nodeLocator.apply(addressAndClients)); this.pendingClient = newClient; newClient diff --git a/folsom/src/test/java/com/spotify/folsom/ResolvingKetamaClientTest.java b/folsom/src/test/java/com/spotify/folsom/ResolvingKetamaClientTest.java index 78e23cdc..8b33e1f1 100644 --- a/folsom/src/test/java/com/spotify/folsom/ResolvingKetamaClientTest.java +++ b/folsom/src/test/java/com/spotify/folsom/ResolvingKetamaClientTest.java @@ -24,6 +24,7 @@ import com.spotify.dns.LookupResult; import com.spotify.folsom.client.test.FakeRawMemcacheClient; import com.spotify.folsom.guava.HostAndPort; +import com.spotify.folsom.ketama.Continuum; import com.spotify.folsom.ketama.ResolvingKetamaClient; import java.util.Collections; import java.util.HashMap; @@ -65,7 +66,8 @@ public void testSimple() throws Exception { TimeUnit.MILLISECONDS, connector, 1000, - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS, + Continuum::new); executor.tick(1000, TimeUnit.SECONDS); assertFalse(ketamaClient.isConnected()); @@ -141,7 +143,8 @@ public void testReusingConnections() { TimeUnit.MILLISECONDS, connector, 1000, - TimeUnit.MILLISECONDS); + TimeUnit.MILLISECONDS, + Continuum::new); executor.tick(1000, TimeUnit.SECONDS); assertFalse(ketamaClient.isConnected()); diff --git a/folsom/src/test/java/com/spotify/folsom/ketama/KetamaMemcacheClientTest.java b/folsom/src/test/java/com/spotify/folsom/ketama/KetamaMemcacheClientTest.java index 1fbab292..0da80f73 100644 --- a/folsom/src/test/java/com/spotify/folsom/ketama/KetamaMemcacheClientTest.java +++ b/folsom/src/test/java/com/spotify/folsom/ketama/KetamaMemcacheClientTest.java @@ -73,7 +73,8 @@ private void test( } } - final KetamaMemcacheClient ketamaMemcacheClient = new KetamaMemcacheClient(clients); + final KetamaMemcacheClient ketamaMemcacheClient = + new KetamaMemcacheClient(clients, new Continuum(clients)); final MemcacheClient memcacheClient = buildClient(ketamaMemcacheClient, binary); final List requestedKeys = new ArrayList<>(); diff --git a/folsom/src/test/java/com/spotify/folsom/ketama/ContinuumTest.java b/folsom/src/test/java/com/spotify/folsom/ketama/NodeLocatorTest.java similarity index 83% rename from folsom/src/test/java/com/spotify/folsom/ketama/ContinuumTest.java rename to folsom/src/test/java/com/spotify/folsom/ketama/NodeLocatorTest.java index d836c472..7227d3bf 100644 --- a/folsom/src/test/java/com/spotify/folsom/ketama/ContinuumTest.java +++ b/folsom/src/test/java/com/spotify/folsom/ketama/NodeLocatorTest.java @@ -29,9 +29,12 @@ import java.util.List; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.MockitoAnnotations; +import org.mockito.junit.MockitoJUnitRunner; -public class ContinuumTest { +@RunWith(MockitoJUnitRunner.class) +public class NodeLocatorTest { private static final HostAndPort ADDRESS1 = HostAndPort.fromParts("127.0.0.1", 11211); private static final HostAndPort ADDRESS2 = HostAndPort.fromParts("127.0.0.1", 11212); @@ -183,6 +186,39 @@ public void testWrapDisconnected() { assertSame(CLIENT2, c.findClient(bytes("key1561"))); } + @Test + public void testPrefixNodeLocator() { + final List clients = ImmutableList.of(AAC1, AAC2, AAC3); + final Continuum c = new Continuum(clients); + final NodeLocator nodeLocator = + key -> { + String[] keyParts = new String(key, StandardCharsets.US_ASCII).split("-"); + if (keyParts.length > 1) { + return clients.get(Integer.parseInt(keyParts[0]) - 1).getClient(); + } else { + return c.findClient(key); + } + }; + + List actual = + Arrays.asList( + nodeLocator.findClient(bytes("1-key1")), + nodeLocator.findClient(bytes("1-key2")), + nodeLocator.findClient(bytes("1-key3")), + nodeLocator.findClient(bytes("2-key1")), + nodeLocator.findClient(bytes("3-key1")), + nodeLocator.findClient(bytes(KEY1)), + nodeLocator.findClient(bytes(KEY3))); + + List expected = + Arrays.asList( + CLIENT1, CLIENT1, CLIENT1, // keys prefixed with 1 + CLIENT2, // keys prefixed with 2 + CLIENT3, // keys prefixed with 3 + CLIENT1, CLIENT2); // fallback to default NodeLocator + assertEquals(expected, actual); + } + private static byte[] bytes(String key) { return key.getBytes(StandardCharsets.US_ASCII); }