Skip to content

Commit

Permalink
loadbalancer: HostSelector can be rebuilt
Browse files Browse the repository at this point in the history
Motivation:

For algorithms like weighted round robin and weighted
random we need to build some data structures for each
unique host set.

Modifications:

Add a `.rebuild(List<Host>)` method to the `HostSelector`
interface. This lets a host potentially rebuild data
structures it needs to properly balance but only when
there is a host set update.
  • Loading branch information
bryce-anderson committed Dec 7, 2023
1 parent d288479 commit ebb41e4
Show file tree
Hide file tree
Showing 14 changed files with 446 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;

import java.util.List;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.failed;
import static java.util.Objects.requireNonNull;
Expand All @@ -27,17 +31,34 @@ abstract class BaseHostSelector<ResolvedAddress, C extends LoadBalancedConnectio
implements HostSelector<ResolvedAddress, C> {

private final String targetResource;
BaseHostSelector(final String targetResource) {
private final boolean isEmpty;
BaseHostSelector(final boolean isEmpty, final String targetResource) {
this.isEmpty = isEmpty;
this.targetResource = requireNonNull(targetResource, "targetResource");
}

protected abstract Single<C> selectConnection0(@Nonnull Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve);

@Override
public final Single<C> selectConnection(@Nonnull Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
return isEmpty ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve);
}

protected final String getTargetResource() {
return targetResource;
}

protected final Single<C> noActiveHosts(List<Host<ResolvedAddress, C>> usedHosts) {
protected final Single<C> noActiveHostsFailure(List<Host<ResolvedAddress, C>> usedHosts) {
return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
getTargetResource() + ". Either all are busy, expired, or unhealthy: " + usedHosts,
this.getClass(), "selectConnection(...)"));
}

private Single<C> noHostsFailure() {
return failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
this.getClass(), "selectConnection0(...)"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio

// writes to these fields protected by `executeSequentially` but they can be read from any thread.
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList();
private volatile HostSelector<ResolvedAddress, C> hostSelector;
private volatile boolean isClosed;

private final String targetResource;
private final SequentialExecutor sequentialExecutor;
private final String lbDescription;
private final HostSelector<ResolvedAddress, C> hostSelector;
private final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher;
private final Processor<Object, Object> eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32);
private final Publisher<Object> eventStream;
Expand Down Expand Up @@ -177,10 +177,10 @@ private Completable doClose(final boolean graceful) {
SourceAdapters.toSource((graceful ? compositeCloseable.closeAsyncGracefully() :
// We only want to empty the host list on error if we're closing non-gracefully.
compositeCloseable.closeAsync().beforeOnError(t ->
sequentialExecutor.execute(() -> usedHosts = emptyList()))
sequentialExecutor.execute(() -> updateUsedHosts(emptyList())))
)
// we want to always empty out the host list if we complete successfully
.beforeOnComplete(() -> sequentialExecutor.execute(() -> usedHosts = emptyList())))
.beforeOnComplete(() -> sequentialExecutor.execute(() -> updateUsedHosts(emptyList()))))
.subscribe(processor);
} catch (Throwable ex) {
processor.onError(ex);
Expand Down Expand Up @@ -328,7 +328,7 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
}
}
// We've built the new list so now set it for consumption and then send our events.
usedHosts = nextHosts;
updateUsedHosts(nextHosts);

LOGGER.debug("{}: now using addresses (size={}): {}.",
DefaultLoadBalancer.this, nextHosts.size(), nextHosts);
Expand Down Expand Up @@ -376,7 +376,7 @@ private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
}
final List<Host<ResolvedAddress, C>> nextHosts = listWithHostRemoved(
currentHosts, current -> current == host);
usedHosts = nextHosts;
updateUsedHosts(nextHosts);
if (nextHosts.isEmpty()) {
// We transitioned from non-empty to empty. That means we're not ready.
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
Expand Down Expand Up @@ -433,6 +433,11 @@ public void onComplete() {
}
}

private void updateUsedHosts(List<Host<ResolvedAddress, C>> nextHosts) {
this.usedHosts = nextHosts;
this.hostSelector = hostSelector.rebuildWithHosts(usedHosts);
}

private static <T> Single<T> failedLBClosed(String targetResource) {
return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed"));
}
Expand Down Expand Up @@ -460,7 +465,7 @@ private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final
this.getClass(), "selectConnection0(...)"));
}

Single<C> result = hostSelector.selectConnection(currentHosts, selector, context, forceNewConnectionAndReserve);
Single<C> result = hostSelector.selectConnection(selector, context, forceNewConnectionAndReserve);
if (healthCheckConfig != null) {
result = result.beforeOnError(exn -> {
if (exn instanceof Exceptions.StacklessNoActiveHostException && allUnhealthy(currentHosts)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nullable;

import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
Expand Down Expand Up @@ -141,8 +142,8 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(String targetResource,
Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
loadBalancingPolicy.buildSelector(targetResource), connectionFactory, linearSearchSpace,
healthCheckConfig);
loadBalancingPolicy.buildSelector(Collections.emptyList(), targetResource), connectionFactory,
linearSearchSpace, healthCheckConfig);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@

/**
* Interface abstracting away the method of host selection.
* <p>
* Thread Safety
* Because the HostSelector is used on the hot path some care must be paid to thread safety. The easiest
* pattern to use is to make the internal state effectively immutable and rebuilds (see below) generate new
* immutable instances as necessary. The interface is expected to adhere to the following rules:
*
* <li>The {@link HostSelector#selectConnection(Predicate, ContextMap, boolean)} method will be used
* concurrently with calls to itself as well as calls to {@link HostSelector#rebuildWithHosts(List)}.</li>
* <li>The {@link HostSelector#rebuildWithHosts(List)} will only be called sequentially with respect to itself.</li>
* <p>
* Note that the HostSelector does not own the provided {@link Host}s and therefore should not
* attempt to manage their lifecycle.
*/
interface HostSelector<ResolvedAddress, C extends LoadBalancedConnection> {

Expand All @@ -35,6 +47,17 @@ interface HostSelector<ResolvedAddress, C extends LoadBalancedConnection> {
* This method will be called concurrently with other selectConnection calls and
* hostSetChanged calls and must be thread safe under those conditions.
*/
Single<C> selectConnection(@Nonnull List<Host<ResolvedAddress, C>> hosts, @Nonnull Predicate<C> selector,
@Nullable ContextMap context, boolean forceNewConnectionAndReserve);
Single<C> selectConnection(@Nonnull Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve);

/**
* Generate another HostSelector using the provided host list.
* <p>
* This method is will be called when the host set is updated and provides a way for the
* HostSelector to rebuild any data structures necessary. Note that the method can return
* {@code this} or a new selector depending on the convenience of implementation.
* @param hosts the new list of {@link Host}s the returned selector should choose from.
* @return the next selector that should be used for host selection.
*/
HostSelector<ResolvedAddress, C> rebuildWithHosts(@Nonnull List<Host<ResolvedAddress, C>> hosts);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,26 @@

import io.servicetalk.client.api.LoadBalancedConnection;

import java.util.List;
import javax.annotation.Nonnull;

/**
* Definition of the selector mechanism used for load balancing.
*/
interface LoadBalancingPolicy<ResolvedAddress, C extends LoadBalancedConnection> {
/**
* The name of the load balancing policy
*
* @return the name of the load balancing policy
*/
String name();

/**
* Construct a {@link HostSelector}.
* @param hosts the set of {@link Host}s to select from.
* @param targetResource the name of the target resource, useful for debugging purposes.
* @return a {@link HostSelector}
*/
HostSelector<ResolvedAddress, C> buildSelector(String targetResource);
HostSelector<ResolvedAddress, C> buildSelector(@Nonnull List<Host<ResolvedAddress, C>> hosts,
String targetResource);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;

import java.util.List;
import java.util.Random;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -47,8 +49,9 @@ private P2CLoadBalancingPolicy(final int maxEffort, @Nullable final Random rando
}

@Override
public HostSelector<ResolvedAddress, C> buildSelector(String targetResource) {
return new P2CSelector<>(targetResource, maxEffort, random);
public HostSelector<ResolvedAddress, C> buildSelector(
@Nonnull List<Host<ResolvedAddress, C>> hosts, String targetResource) {
return new P2CSelector<>(hosts, targetResource, maxEffort, random);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,23 @@ final class P2CSelector<ResolvedAddress, C extends LoadBalancedConnection>
@Nullable
private final Random random;
private final int maxEffort;
private final List<Host<ResolvedAddress, C>> hosts;

P2CSelector(final String targetResource, final int maxEffort, @Nullable final Random random) {
super(targetResource);
P2CSelector(@Nonnull List<Host<ResolvedAddress, C>> hosts,
final String targetResource, final int maxEffort, @Nullable final Random random) {
super(hosts.isEmpty(), targetResource);
this.hosts = hosts;
this.maxEffort = maxEffort;
this.random = random;
}

@Override
public Single<C> selectConnection(
@Nonnull List<Host<ResolvedAddress, C>> hosts,
public HostSelector<ResolvedAddress, C> rebuildWithHosts(@Nonnull List<Host<ResolvedAddress, C>> hosts) {
return new P2CSelector<>(hosts, getTargetResource(), maxEffort, random);
}

@Override
protected Single<C> selectConnection0(
@Nonnull Predicate<C> selector,
@Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
Expand All @@ -64,7 +71,7 @@ public Single<C> selectConnection(
case 1:
// There is only a single host, so we don't need to do any of the looping or comparison logic.
Single<C> connection = selectFromHost(hosts.get(0), selector, forceNewConnectionAndReserve, context);
return connection == null ? noActiveHosts(hosts) : connection;
return connection == null ? noActiveHostsFailure(hosts) : connection;
default:
return p2c(size, hosts, getRandom(), selector, forceNewConnectionAndReserve, context);
}
Expand Down Expand Up @@ -104,7 +111,7 @@ private Single<C> p2c(int size, List<Host<ResolvedAddress, C>> hosts, Random ran
// Neither t1 nor t2 yielded a connection. Fall through, potentially for another attempt.
}
// Max effort exhausted. We failed to find a healthy and active host.
return noActiveHosts(hosts);
return noActiveHostsFailure(hosts);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -95,7 +96,8 @@ private <T extends C> LoadBalancer<T> useNewRoundRobinLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher, new RoundRobinSelector<>(targetResource),
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
new RoundRobinSelector<>(Collections.emptyList(), targetResource),
connectionFactory, linearSearchSpace, healthCheckConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import io.servicetalk.client.api.LoadBalancedConnection;

import java.util.List;
import javax.annotation.Nonnull;

/**
* A round-robin load balancing policy.
*
Expand All @@ -33,8 +36,9 @@ private RoundRobinLoadBalancingPolicy() {
}

@Override
public HostSelector<ResolvedAddress, C> buildSelector(final String targetResource) {
return new RoundRobinSelector<>(targetResource);
public HostSelector<ResolvedAddress, C>
buildSelector(@Nonnull final List<Host<ResolvedAddress, C>> hosts, final String targetResource) {
return new RoundRobinSelector<>(hosts, targetResource);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,33 +20,36 @@
import io.servicetalk.context.api.ContextMap;

import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.succeeded;

final class RoundRobinSelector<ResolvedAddress, C extends LoadBalancedConnection>
extends BaseHostSelector<ResolvedAddress, C> {

@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<RoundRobinSelector> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(RoundRobinSelector.class, "index");
private final AtomicInteger index;
private final List<Host<ResolvedAddress, C>> usedHosts;

@SuppressWarnings("unused")
private volatile int index;
RoundRobinSelector(final List<Host<ResolvedAddress, C>> usedHosts, final String targetResource) {
this(new AtomicInteger(), usedHosts, targetResource);
}

RoundRobinSelector(final String targetResource) {
super(targetResource);
private RoundRobinSelector(final AtomicInteger index, final List<Host<ResolvedAddress, C>> usedHosts,
final String targetResource) {
super(usedHosts.isEmpty(), targetResource);
this.index = index;
this.usedHosts = usedHosts;
}

@Override
public Single<C> selectConnection(
final List<Host<ResolvedAddress, C>> usedHosts,
protected Single<C> selectConnection0(
final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
// try one loop over hosts and if all are expired, give up
final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % usedHosts.size();
final int cursor = (index.getAndIncrement() & Integer.MAX_VALUE) % usedHosts.size();
Host<ResolvedAddress, C> pickedHost = null;
for (int i = 0; i < usedHosts.size(); ++i) {
// for a particular iteration we maintain a local cursor without contention with other requests
Expand All @@ -70,9 +73,14 @@ public Single<C> selectConnection(
}
}
if (pickedHost == null) {
return noActiveHosts(usedHosts);
return noActiveHostsFailure(usedHosts);
}
// We have a host but no connection was selected: create a new one.
return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context);
}

@Override
public HostSelector<ResolvedAddress, C> rebuildWithHosts(@Nonnull List<Host<ResolvedAddress, C>> hosts) {
return new RoundRobinSelector<>(index, hosts, getTargetResource());
}
}
Loading

0 comments on commit ebb41e4

Please sign in to comment.