Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom note locator #234

Merged
merged 4 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions folsom/src/main/java/com/spotify/folsom/MemcacheClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import com.spotify.folsom.client.tls.SSLEngineFactory;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.AddressAndClient;
import com.spotify.folsom.ketama.Continuum;
import com.spotify.folsom.ketama.KetamaMemcacheClient;
import com.spotify.folsom.ketama.NodeLocator;
import com.spotify.folsom.ketama.ResolvingKetamaClient;
import com.spotify.folsom.reconnect.CatchingReconnectionListener;
import com.spotify.folsom.reconnect.ReconnectingClient;
Expand All @@ -56,12 +58,14 @@
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -126,6 +130,7 @@ public class MemcacheClientBuilder<V> {
private final List<UsernamePasswordPair> passwords = new ArrayList<>();
private boolean skipAuth = false;

private Function<Collection<AddressAndClient>, NodeLocator> nodeLocator = Continuum::new;
private SSLEngineFactory sslEngineFactory = null;

/**
Expand Down Expand Up @@ -571,6 +576,22 @@ public MemcacheClientBuilder<V> withUsernamePassword(
return this;
}

/**
* When using ketama client use provided NodeLocator to find client for given key. NodeLocator
* will be recreated when clients appear and disappear when using dynamic resolver.
*
* <p>This option can be used to change default hashing algorithm or vnode_ratio in consistent
* hashing algorithm.
*
* @param nodeLocator mapper from client collection to NodeLocator
* @return itself
*/
public MemcacheClientBuilder<V> withNodeLocator(
Function<Collection<AddressAndClient>, NodeLocator> nodeLocator) {
this.nodeLocator = nodeLocator;
return this;
}

/**
* Disable authentication validation - only useful for tests against jmemcached which does not
* support binary NOOP
Expand Down Expand Up @@ -673,7 +694,7 @@ protected RawMemcacheClient connectRaw(boolean binary, Authenticator authenticat
aac.add(new AddressAndClient(address, clients.get(i)));
}

client = new KetamaMemcacheClient(aac);
client = new KetamaMemcacheClient(aac, nodeLocator.apply(aac));
} else {
client = clients.get(0);
}
Expand Down Expand Up @@ -705,7 +726,8 @@ private RawMemcacheClient createResolvingClient(
TimeUnit.MILLISECONDS,
input -> createClient(input, binary, authenticator),
shutdownDelay,
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS,
nodeLocator);

client.start();
return client;
Expand Down
13 changes: 9 additions & 4 deletions folsom/src/main/java/com/spotify/folsom/ketama/Continuum.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,29 @@
import java.util.Map.Entry;
import java.util.TreeMap;

public class Continuum {
public class Continuum implements NodeLocator {

private static final int VNODE_RATIO = 100;

private final TreeMap<Integer, RawMemcacheClient> ringOfFire;

public Continuum(final Collection<AddressAndClient> clients) {
this.ringOfFire = buildRing(clients);
this(clients, VNODE_RATIO);
}

public Continuum(final Collection<AddressAndClient> clients, int vnodeRatio) {
this.ringOfFire = buildRing(clients, vnodeRatio);
}

private TreeMap<Integer, RawMemcacheClient> buildRing(
final Collection<AddressAndClient> clients) {
final Collection<AddressAndClient> clients, int vnodeRatio) {

final TreeMap<Integer, RawMemcacheClient> r = new TreeMap<>();
for (final AddressAndClient client : clients) {
final String address = client.getAddress().toString();

byte[] hash = addressToBytes(address);
for (int i = 0; i < VNODE_RATIO; i++) {
for (int i = 0; i < vnodeRatio; i++) {
final HashCode hashCode = Hasher.hash(hash);
hash = hashCode.asBytes();
r.put(hashCode.asInt(), client.getClient());
Expand All @@ -51,6 +55,7 @@ private TreeMap<Integer, RawMemcacheClient> buildRing(
return r;
}

@Override
public RawMemcacheClient findClient(final byte[] key) {
final int keyHash = Hasher.hash(key).asInt();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,19 @@ private static Collection<RawMemcacheClient> clientsOnly(
return clients;
}

private final Continuum continuum;
private final NodeLocator nodeLocator;

public KetamaMemcacheClient(final Collection<AddressAndClient> clients) {
public KetamaMemcacheClient(final Collection<AddressAndClient> clients, NodeLocator nodeLocator) {
super(clientsOnly(clients));
if (clients.isEmpty()) {
throw new IllegalArgumentException("Can not create ketama client from empty list");
}

this.continuum = new Continuum(clients);
this.nodeLocator = nodeLocator;
}

private RawMemcacheClient getClient(final byte[] key) {
return continuum.findClient(key);
return nodeLocator.findClient(key);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.spotify.folsom.ketama;

import com.spotify.folsom.RawMemcacheClient;

public interface NodeLocator {

RawMemcacheClient findClient(final byte[] key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand Down Expand Up @@ -64,6 +65,7 @@ public class ResolvingKetamaClient extends AbstractRawMemcacheClient {
private volatile RawMemcacheClient currentClient;
private volatile RawMemcacheClient pendingClient = null;
private boolean shutdown = false;
private final Function<Collection<AddressAndClient>, NodeLocator> nodeLocator;

public ResolvingKetamaClient(
Resolver resolver,
Expand All @@ -72,14 +74,16 @@ public ResolvingKetamaClient(
TimeUnit periodUnit,
final Connector connector,
long shutdownDelay,
TimeUnit shutdownUnit) {
TimeUnit shutdownUnit,
Function<Collection<AddressAndClient>, NodeLocator> nodeLocator) {
this.resolver = resolver;
this.connector = connector;
this.shutdownDelay = shutdownDelay;
this.shutdownUnit = shutdownUnit;
this.executor = executor;
this.currentClient = NotConnectedClient.INSTANCE;
this.ttl = TimeUnit.SECONDS.convert(period, periodUnit);
this.nodeLocator = nodeLocator;
}

public void start() {
Expand Down Expand Up @@ -216,7 +220,8 @@ private void setPendingClient(final ImmutableList.Builder<RawMemcacheClient> rem

// This may invalidate an existing pendingClient but should be fine since it doesn't have any
// important state of its own.
final KetamaMemcacheClient newClient = new KetamaMemcacheClient(addressAndClients);
final KetamaMemcacheClient newClient =
new KetamaMemcacheClient(addressAndClients, nodeLocator.apply(addressAndClients));
this.pendingClient = newClient;

newClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.spotify.dns.LookupResult;
import com.spotify.folsom.client.test.FakeRawMemcacheClient;
import com.spotify.folsom.guava.HostAndPort;
import com.spotify.folsom.ketama.Continuum;
import com.spotify.folsom.ketama.ResolvingKetamaClient;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -65,7 +66,8 @@ public void testSimple() throws Exception {
TimeUnit.MILLISECONDS,
connector,
1000,
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS,
Continuum::new);
executor.tick(1000, TimeUnit.SECONDS);

assertFalse(ketamaClient.isConnected());
Expand Down Expand Up @@ -141,7 +143,8 @@ public void testReusingConnections() {
TimeUnit.MILLISECONDS,
connector,
1000,
TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS,
Continuum::new);
executor.tick(1000, TimeUnit.SECONDS);

assertFalse(ketamaClient.isConnected());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private void test(
}
}

final KetamaMemcacheClient ketamaMemcacheClient = new KetamaMemcacheClient(clients);
final KetamaMemcacheClient ketamaMemcacheClient =
new KetamaMemcacheClient(clients, new Continuum(clients));
final MemcacheClient<String> memcacheClient = buildClient(ketamaMemcacheClient, binary);

final List<String> requestedKeys = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockitoAnnotations;
import org.mockito.junit.MockitoJUnitRunner;

public class ContinuumTest {
@RunWith(MockitoJUnitRunner.class)
public class NodeLocatorTest {

private static final HostAndPort ADDRESS1 = HostAndPort.fromParts("127.0.0.1", 11211);
private static final HostAndPort ADDRESS2 = HostAndPort.fromParts("127.0.0.1", 11212);
Expand Down Expand Up @@ -183,6 +186,39 @@ public void testWrapDisconnected() {
assertSame(CLIENT2, c.findClient(bytes("key1561")));
}

@Test
public void testPrefixNodeLocator() {
final List<AddressAndClient> clients = ImmutableList.of(AAC1, AAC2, AAC3);
final Continuum c = new Continuum(clients);
final NodeLocator nodeLocator =
key -> {
String[] keyParts = new String(key, StandardCharsets.US_ASCII).split("-");
if (keyParts.length > 1) {
return clients.get(Integer.parseInt(keyParts[0]) - 1).getClient();
} else {
return c.findClient(key);
}
};

List<RawMemcacheClient> actual =
Arrays.asList(
nodeLocator.findClient(bytes("1-key1")),
nodeLocator.findClient(bytes("1-key2")),
nodeLocator.findClient(bytes("1-key3")),
nodeLocator.findClient(bytes("2-key1")),
nodeLocator.findClient(bytes("3-key1")),
nodeLocator.findClient(bytes(KEY1)),
nodeLocator.findClient(bytes(KEY3)));

List<RawMemcacheClient> expected =
Arrays.asList(
CLIENT1, CLIENT1, CLIENT1, // keys prefixed with 1
CLIENT2, // keys prefixed with 2
CLIENT3, // keys prefixed with 3
CLIENT1, CLIENT2); // fallback to default NodeLocator
assertEquals(expected, actual);
}

private static byte[] bytes(String key) {
return key.getBytes(StandardCharsets.US_ASCII);
}
Expand Down
Loading