Skip to content

Commit

Permalink
loadbalancer: follow ups to P2C selector logic
Browse files Browse the repository at this point in the history
Motivation:

We should try to get existing connections from the non-preferred
host if we fail to get connections from the preferred.

Modifications:

- adjust the selection logic
- add a test
- don't perform multiple selection attempts if there are only two hosts
- remove unnecessary @nullable annotation since the method never returns null
  • Loading branch information
bryce-anderson committed Nov 13, 2023
1 parent ff33868 commit 4d897d4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,19 @@ public Single<C> selectConnection(
throw new AssertionError("Selector for " + getTargetResource() +
" received an empty host set");
case 1:
Host<ResolvedAddress, C> host = hosts.get(0);
if (!forceNewConnectionAndReserve) {
C connection = host.pickConnection(selector, context);
if (connection != null) {
return succeeded(connection);
}
}
// Either we require a new connection or there wasn't one already established so
// try to make a new one if the host is healthy. If it's not healthy, we fail
// and let the higher level retries decide what to do.
if (!host.isActiveAndHealthy()) {
return noActiveHosts(hosts);
}
return host.newConnection(selector, forceNewConnectionAndReserve, context);
// There is only a single host, so we don't need to do any of the looping or comparison logic.
Single<C> connection = selectFromHost(hosts.get(0), selector, forceNewConnectionAndReserve, context);
return connection == null ? noActiveHosts(hosts) : connection;
default:
return p2c(size, hosts, getRandom(), selector, forceNewConnectionAndReserve, context);
}
}

@Nullable
private Single<C> p2c(int size, List<Host<ResolvedAddress, C>> hosts, Random random, Predicate<C> selector,
boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) {
for (int j = maxEffort; j > 0; j--) {
// If there are only two hosts we only try once since there is no chance we'll select different hosts
// on further iterations.
for (int j = hosts.size() == 2 ? 1 : maxEffort; j > 0; j--) {
// Pick two random indexes that don't collide. Limit the range on the second index to 1 less than
// the max value so that if there is a collision we can safety increment. We also increment if
// i2 > i1 to avoid biased toward lower numbers since we limited the range by 1.
Expand All @@ -102,34 +92,40 @@ private Single<C> p2c(int size, List<Host<ResolvedAddress, C>> hosts, Random ran
t2 = tmp;
}

if (!forceNewConnectionAndReserve) {
// First we're going to see if we can get an existing connection regardless of health status. Since t1
// is 'better' we'll try it first. If it doesn't have any existing connections we don't fall back to t2
// or else we would cause a bias toward hosts with existing connections which could ultimately drive all
// traffic to the first host to make a connection in the case of a multiplexed session.
C c = t1.pickConnection(selector, contextMap);
if (c != null) {
return succeeded(c);
}
// We now need to consider the health status and make a new connection if either
// host is considered healthy.
// Attempt to get a connection from t1 first since it's 'better'. If we can't, then try t2.
Single<C> result = selectFromHost(t1, selector, forceNewConnectionAndReserve, contextMap);
if (result != null) {
return result;
}

// We either couldn't find a live connection or are being forced to make a new one. Either way we're
// going to make a new connection which means we need to consider health.
final boolean t1Healthy = t1.isActiveAndHealthy();
final boolean t2Healthy = t2.isActiveAndHealthy();
if (t1Healthy) {
return t1.newConnection(selector, forceNewConnectionAndReserve, contextMap);
} else if (t2Healthy) {
return t2.newConnection(selector, forceNewConnectionAndReserve, contextMap);
result = selectFromHost(t2, selector, forceNewConnectionAndReserve, contextMap);
if (result != null) {
return result;
}
// Neither are healthy and capable of making a connection: fall through, perhaps for another attempt.
// Neither t1 nor t2 yielded a connection. Fall through, potentially for another attempt.
}
// Max effort exhausted. We failed to find a healthy and active host.
return noActiveHosts(hosts);
}

@Nullable
private Single<C> selectFromHost(Host<ResolvedAddress, C> host, Predicate<C> selector,
boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) {
// First see if we can get an existing connection regardless of health status.
if (!forceNewConnectionAndReserve) {
C c = host.pickConnection(selector, contextMap);
if (c != null) {
return succeeded(c);
}
}
// We need to make a new connection to the host but we'll only do so if it's considered healthy.
if (host.isActiveAndHealthy()) {
return host.newConnection(selector, forceNewConnectionAndReserve, contextMap);
}

// no selectable active connections and the host is unhealthy, so we return `null`.
return null;
}

private Random getRandom() {
return random == null ? ThreadLocalRandom.current() : random;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class P2CSelectorTest {
Expand Down Expand Up @@ -141,7 +143,7 @@ void twoUnHealthyHostsWithoutConnections() {
assertThat(e.getCause(), isA(NoActiveHostException.class));
}

@RepeatedTest(100)
@Test
void doesntBiasTowardHostsWithConnections() throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
// we setup the first host to always be preferred by score, but it also doesn't have any connections.
Expand All @@ -150,30 +152,39 @@ void doesntBiasTowardHostsWithConnections() throws Exception {
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, false).toFuture().get();
assertThat(connection.address(), equalTo("addr-1"));
// verify that we made a new connection to addr-1.
verify(hosts.get(0)).newConnection(any(), anyBoolean(), any());
}

@RepeatedTest(100)
void biasesTowardsActiveAndHealthyHostWhenNoConnections() throws Exception {
@Test
void selectsExistingConnectionsFromNonPreferredHost() throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
// we setup the first host to always be preferred by score, but it also doesn't have any connections
// and is unhealthy.
when(hosts.get(0).pickConnection(any(), any())).thenReturn(null);
when(hosts.get(0).isActiveAndHealthy()).thenReturn(false);
when(hosts.get(0).score()).thenReturn(10);
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, true).toFuture().get();
hosts, PREDICATE, null, false).toFuture().get();
assertThat(connection.address(), equalTo("addr-2"));
// Verify that we selected an existing connection.
verify(hosts.get(1), never()).newConnection(any(), anyBoolean(), any());
}

@RepeatedTest(100)
void biasesTowardTheHighestWeightHostForNewConnections() throws Exception {
biasesTowardTheHighestWeightHost(true);
}

@RepeatedTest(100)
void biasesTowardTheHighestWeightHostForExistingConnections() throws Exception {
biasesTowardTheHighestWeightHost(false);
@Test
void biasesTowardsActiveAndHealthyHostWhenMakingConnections() throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
when(hosts.get(0).isActiveAndHealthy()).thenReturn(false);
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, true).toFuture().get();
assertThat(connection.address(), equalTo("addr-2"));
}

@ParameterizedTest(name = "{displayName} [{index}]: forceNewConnection={0}")
@ValueSource(booleans = {false, true})
void biasesTowardTheHighestWeightHost(boolean forceNewConnection) throws Exception {
List<Host<String, TestLoadBalancedConnection>> hosts = connections("addr-1", "addr-2");
// Host 0 has the highest score so it should always get the new connection.
// Host 0 has the highest score, so it should always get the new connection.
when(hosts.get(0).score()).thenReturn(10);
TestLoadBalancedConnection connection = selector.selectConnection(
hosts, PREDICATE, null, forceNewConnection).toFuture().get();
Expand Down

0 comments on commit 4d897d4

Please sign in to comment.