From f455e6f9c5337734a8c17b6fd7e6c18cc80c7229 Mon Sep 17 00:00:00 2001 From: Dariusz Jedrzejczyk Date: Tue, 17 Aug 2021 15:14:09 +0200 Subject: [PATCH] Load balancer health checks problematic hosts (#1709) Motivation: The current implementation of `RoundRobinLoadBalancer` cycles through all addresses that the `ServiceDiscoverer` provides and opens connection regardless of the behavior of the individual hosts behind those addresses. No passive health checking is performed and no feedback from connection establishment is provided to the RRLB to make any smart decisions with regards to the way hosts are chosen for connections or directing requests. The purpose of RRLB is to do exactly one thing - cycle through hosts and direct traffic attempting to distribute the load fairly with regards to this assumption. However, there are occasions when a particular `ServiceDiscoverer` (e.g. DNS-based) doesn't provide up-to-date health information about hosts. Meanwhile, some addresses might be not responding, but are considered active from the perspective of the discovery mechanism. Such addresses lead to unsuccessful connection establishment attempts and introduce unnecessary latency in the request path. In this PR, a mechanism for detecting such failures is introduced. Hosts that the RRLB consecutively fails to establish connections with are taken out of the selection process until a connection is established. A background task tries, at specified intervals, to connect to the given host. Upon success, the connection can be used for routing traffic and the host comes back to the pool and takes part in the selection. The mechanism described here is a specific type of health checking and can possibly be improved in the future to be more tunable. Currently, the user controls the interval at which the health checks are performed, the consecutive failures count for a host to be considered unhealthy, and the background `io.servicetalk.concurrent.api.Executor` for running the checks. Modifications: - Consecutive connection attempts to ACTIVE hosts are counted in the internal RRLB's Host state, - After a threshold is met, a background task is scheduled which will attempt a connection at a specified interval, - Meanwhile, the particular address is not considered for directing traffic and opening connections, - Whenever the background task successfully establishes a connection, that connection is used for directing requests and the host comes back to the list of eligible for selection in the request path, - `RoundRobinConnectionFactory.Builder` was enhanced to incorporate this mechanism. Result: Problematic hosts are not used in the requests path and are actively health checked in the background until they are reachable again. The overall latency should increase for DNS `ServiceDiscoverer` users which stumble upon a situation where some addresses returned from the DNS queries are unreachable. --- .../loadbalancer/RoundRobinLoadBalancer.java | 315 ++++++++++++++---- .../RoundRobinLoadBalancerFactory.java | 115 ++++++- .../EagerRoundRobinLoadBalancerTest.java | 10 +- .../LingeringRoundRobinLoadBalancerTest.java | 10 +- .../RoundRobinLoadBalancerTest.java | 254 +++++++++++++- 5 files changed, 624 insertions(+), 80 deletions(-) diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java index 7adf4fceeb..2481ef0547 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancer.java @@ -28,15 +28,19 @@ import io.servicetalk.concurrent.api.AsyncCloseable; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.CompositeCloseable; +import io.servicetalk.concurrent.api.Executor; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.internal.DelayedCancellable; import io.servicetalk.concurrent.internal.SequentialCancellable; import io.servicetalk.concurrent.internal.ThrowableUtils; +import io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.SharedExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.AbstractMap.SimpleImmutableEntry; import java.util.ArrayList; import java.util.Arrays; @@ -48,6 +52,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Stream; +import javax.annotation.Nullable; import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT; import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_READY_EVENT; @@ -56,11 +61,15 @@ import static io.servicetalk.concurrent.api.Completable.completed; import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; import static io.servicetalk.concurrent.api.Publisher.from; +import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter; import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; +import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_INTERVAL; import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.EAGER_CONNECTION_SHUTDOWN_ENABLED; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -129,7 +138,9 @@ public final class RoundRobinLoadBalancer> eventPublisher, final ConnectionFactory connectionFactory) { - this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED); + this(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, + new HealthCheckConfig(SharedExecutor.getInstance(), + DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD)); } /** @@ -141,10 +152,15 @@ public RoundRobinLoadBalancer(final Publisher> eventPublisher, final ConnectionFactory connectionFactory, - final boolean eagerConnectionShutdown) { + final boolean eagerConnectionShutdown, + @Nullable final HealthCheckConfig healthCheckConfig) { Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); this.eventStream = fromSource(eventStreamProcessor); this.connectionFactory = requireNonNull(connectionFactory); @@ -221,7 +237,7 @@ private List> markHostAsExpired( } private Host createHost(ResolvedAddress addr) { - Host host = new Host<>(addr); + Host host = new Host<>(addr, healthCheckConfig); if (!eagerConnectionShutdown) { host.onClose().afterFinally(() -> usedHostsUpdater.updateAndGet(RoundRobinLoadBalancer.this, previousHosts -> { @@ -244,9 +260,9 @@ private List> addHostToList( // duplicates are not allowed for (Host host : oldHostsTyped) { if (host.address.equals(addr)) { - if (handleExpired && !host.tryToMarkActive()) { - // If the new state is not ACTIVE, the host is already in CLOSED state, we should create - // a new entry. For duplicate ACTIVE events or for repeated activation due to failed CAS + if (handleExpired && !host.markActiveIfNotClosed()) { + // If the host is already in CLOSED state, we should create a new entry. + // For duplicate ACTIVE events or for repeated activation due to failed CAS // of replacing the usedHosts array the marking succeeds so we will not add a new entry. break; } @@ -324,6 +340,10 @@ RoundRobinLoadBalancerFactory newRoundRobinFactory() { return new RoundRobinLoadBalancerFactory<>(); } + private static Single failedLBClosed() { + return failed(new IllegalStateException("LoadBalancer has closed")); + } + @Override public Single selectConnection(Predicate selector) { return defer(() -> selectConnection0(selector).subscribeShareContext()); @@ -367,8 +387,9 @@ private Single selectConnection0(Predicate selector) { } } - // don't open new connections for expired hosts, try a different one - if (host.isActive()) { + // Don't open new connections for expired or unhealthy hosts, try a different one. + // Unhealthy hosts have no open connections – that's why we don't fail earlier, the loop will not progress. + if (host.isActiveAndHealthy()) { pickedHost = host; break; } @@ -380,9 +401,15 @@ private Single selectConnection0(Predicate selector) { } // No connection was selected: create a new one. final Host host = pickedHost; + // This LB implementation does not automatically provide TransportObserver. Therefore, we pass "null" here. // Users can apply a ConnectionFactoryFilter if they need to override this "null" value with TransportObserver. - return connectionFactory.newConnection(host.address, null) + Single establishConnection = connectionFactory.newConnection(host.address, null); + if (host.healthCheckConfig != null) { + // Schedule health check before returning + establishConnection = establishConnection.beforeOnError(t -> host.markUnhealthy(connectionFactory)); + } + return establishConnection .flatMap(newCnx -> { // Invoke the selector before adding the connection to the pool, otherwise, connection can be // used concurrently and hence a new connection can be rejected by the selector. @@ -416,6 +443,11 @@ public Completable closeAsyncGracefully() { return asyncCloseable.closeAsyncGracefully(); } + // Visible for testing + List>> usedAddresses() { + return usedHosts.stream().map(Host::asEntry).collect(toList()); + } + /** * Please use {@link io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory} instead of this factory. * @@ -432,63 +464,73 @@ public static final class RoundRobinLoadBalancerFactory LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { - return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED); + return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, EAGER_CONNECTION_SHUTDOWN_ENABLED, + new HealthCheckConfig(SharedExecutor.getInstance(), + DEFAULT_HEALTH_CHECK_INTERVAL, DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD)); } } - // Visible for testing - List>> usedAddresses() { - return usedHosts.stream().map(Host::asEntry).collect(toList()); - } + static final class HealthCheckConfig { + private final Executor executor; + private final Duration healthCheckInterval; + private final int failedThreshold; - private static final class Host implements ListenableAsyncCloseable { - private static final ConnState EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, State.ACTIVE); - private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); + HealthCheckConfig(final Executor executor, final Duration healthCheckInterval, final int failedThreshold) { + this.executor = executor; + this.healthCheckInterval = healthCheckInterval; + this.failedThreshold = failedThreshold; + } + } - @SuppressWarnings("rawtypes") - private static final AtomicReferenceFieldUpdater connStateUpdater = - AtomicReferenceFieldUpdater.newUpdater(Host.class, ConnState.class, "connState"); + private static final class Host implements ListenableAsyncCloseable { private enum State { - ACTIVE, + // The enum is not exhaustive, as other states have dynamic properties. + // For clarity, the other state classes are listed as comments: + // ACTIVE - see ActiveState + // UNHEALTHY - see HealthCheck EXPIRED, CLOSED } - private static final class ConnState { - final Object[] connections; - final State state; + private static final ActiveState STATE_ACTIVE_NO_FAILURES = new ActiveState(); + private static final ConnState ACTIVE_EMPTY_CONN_STATE = new ConnState(EMPTY_ARRAY, STATE_ACTIVE_NO_FAILURES); + private static final ConnState CLOSED_CONN_STATE = new ConnState(EMPTY_ARRAY, State.CLOSED); - ConnState(final Object[] connections, final State state) { - this.connections = connections; - this.state = state; - } - } + @SuppressWarnings("rawtypes") + private static final AtomicReferenceFieldUpdater connStateUpdater = + AtomicReferenceFieldUpdater.newUpdater(Host.class, ConnState.class, "connState"); final Addr address; + @Nullable + private final HealthCheckConfig healthCheckConfig; private final ListenableAsyncCloseable closeable; + private volatile ConnState connState = ACTIVE_EMPTY_CONN_STATE; - private volatile ConnState connState = EMPTY_CONN_STATE; - - Host(Addr address) { + Host(Addr address, @Nullable HealthCheckConfig healthCheckConfig) { this.address = requireNonNull(address); + this.healthCheckConfig = healthCheckConfig; this.closeable = toAsyncCloseable(graceful -> graceful ? doClose(AsyncCloseable::closeAsyncGracefully) : doClose(AsyncCloseable::closeAsync)); } - boolean tryToMarkActive() { - return connStateUpdater.updateAndGet(this, oldConnState -> { + boolean markActiveIfNotClosed() { + final Object oldState = connStateUpdater.getAndUpdate(this, oldConnState -> { if (oldConnState.state == State.EXPIRED) { - return new ConnState(oldConnState.connections, State.ACTIVE); + return new ConnState(oldConnState.connections, STATE_ACTIVE_NO_FAILURES); } // If oldConnState.state == State.ACTIVE this could mean either a duplicate event, // or a repeated CAS operation. We could issue a warning, but as we don't know, we don't log anything. + // UNHEALTHY state cannot transition to ACTIVE without passing the health check. return oldConnState; - }).state == State.ACTIVE; + }).state; + return oldState != State.CLOSED; } void markClosed() { - final Object[] toRemove = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE).connections; + final ConnState oldState = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE); + final Object[] toRemove = oldState.connections; + cancelIfHealthCheck(oldState.state); LOGGER.debug("Closing {} connection(s) gracefully to closed address: {}", toRemove.length, address); for (Object conn : toRemove) { @SuppressWarnings("unchecked") @@ -498,30 +540,96 @@ void markClosed() { } void markExpired() { - final ConnState newState = connStateUpdater.updateAndGet(this, - oldConnState -> oldConnState.connections.length == 0 ? - CLOSED_CONN_STATE : new ConnState(oldConnState.connections, State.EXPIRED)); - if (newState == CLOSED_CONN_STATE) { - // Trigger the callback to remove the host from usedHosts array. - this.closeAsync().subscribe(); + for (;;) { + ConnState oldState = connStateUpdater.get(this); + if (oldState.state == State.EXPIRED || oldState.state == State.CLOSED) { + break; + } + Object nextState = oldState.connections.length == 0 ? State.CLOSED : State.EXPIRED; + + if (connStateUpdater.compareAndSet(this, oldState, + new ConnState(oldState.connections, nextState))) { + cancelIfHealthCheck(oldState.state); + if (nextState == State.CLOSED) { + // Trigger the callback to remove the host from usedHosts array. + this.closeAsync().subscribe(); + } + break; + } } } - boolean isActive() { - return connState.state == State.ACTIVE; + void markHealthy() { + // Marking healthy is generally called from a successful health check, after a connection was added. + // However, it is possible that in the meantime, the host entered an EXPIRED state, then ACTIVE, then failed + // to open connections and entered the UNHEALTHY state before the original thread continues execution here. + // In such case, the flipped state is not the same as the one that just succeeded to open a connection. + // In an unlikely scenario that the following connection attempts fail indefinitely, a health check task + // would leak and would not be cancelled. Therefore, we cancel it here and allow failures to trigger a new + // health check. + Object oldState = connStateUpdater.getAndUpdate(this, previous -> { + if (HealthCheck.class.equals(previous.state.getClass())) { + return new ConnState(previous.connections, STATE_ACTIVE_NO_FAILURES); + } + return previous; + }).state; + cancelIfHealthCheck(oldState); + } + + void markUnhealthy(ConnectionFactory connectionFactory) { + assert healthCheckConfig != null; + for (;;) { + ConnState previous = connStateUpdater.get(this); + + if (!ActiveState.class.equals(previous.state.getClass()) || previous.connections.length > 0) { + break; + } + + ActiveState previousState = (ActiveState) previous.state; + if (previousState.failedConnections + 1 < this.healthCheckConfig.failedThreshold) { + final ActiveState nextState = previousState.forNextFailedConnection(); + if (connStateUpdater.compareAndSet(this, previous, + new ConnState(previous.connections, nextState))) { + LOGGER.debug("Active host for address {} failed to open {} connections" + + " ({} consecutive failures trigger health check).", + address, nextState.failedConnections, healthCheckConfig.failedThreshold); + break; + } + // another thread won the race, try again + continue; + } + + final HealthCheck healthCheck = new HealthCheck<>(connectionFactory, this); + final ConnState nextState = new ConnState(previous.connections, healthCheck); + if (connStateUpdater.compareAndSet(this, previous, nextState)) { + LOGGER.debug("Triggering health check for address {} after {} failed attempts" + + " to open a new connection", address, previousState.failedConnections); + healthCheck.schedule(); + break; + } + } + } + + boolean isActiveAndHealthy() { + return ActiveState.class.equals(connState.state.getClass()); } boolean addConnection(C connection) { for (;;) { - final ConnState currentConnState = this.connState; - if (currentConnState == CLOSED_CONN_STATE) { + final ConnState previous = connStateUpdater.get(this); + if (previous == CLOSED_CONN_STATE) { return false; } - final Object[] existing = currentConnState.connections; + + final Object[] existing = previous.connections; Object[] newList = Arrays.copyOf(existing, existing.length + 1); newList[existing.length] = connection; + + Object newState = ActiveState.class.equals(previous.state.getClass()) ? + STATE_ACTIVE_NO_FAILURES : previous.state; + if (connStateUpdater.compareAndSet(this, - currentConnState, new ConnState(newList, currentConnState.state))) { + previous, new ConnState(newList, newState))) { break; } } @@ -543,8 +651,9 @@ currentConnState, new ConnState(newList, currentConnState.state))) { if (i == connections.length) { break; } else if (connections.length == 1) { - if (currentConnState.state == State.ACTIVE) { - if (connStateUpdater.compareAndSet(this, currentConnState, EMPTY_CONN_STATE)) { + if (ActiveState.class.equals(currentConnState.state.getClass())) { + if (connStateUpdater.compareAndSet(this, currentConnState, + new ConnState(EMPTY_ARRAY, currentConnState.state))) { break; } } else if (currentConnState.state == State.EXPIRED @@ -572,8 +681,8 @@ currentConnState, new ConnState(newList, currentConnState.state))) { }).subscribe(); return true; } - // Used for testing only + // Used for testing only @SuppressWarnings("unchecked") Entry> asEntry() { return new SimpleImmutableEntry<>(address, @@ -598,12 +707,23 @@ public Completable onClose() { @SuppressWarnings("unchecked") private Completable doClose(final Function closeFunction) { return Completable.defer(() -> { - final Object[] connections = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE).connections; + final ConnState oldState = connStateUpdater.getAndSet(this, CLOSED_CONN_STATE); + cancelIfHealthCheck(oldState.state); + final Object[] connections = oldState.connections; return connections.length == 0 ? completed() : from(connections).flatMapCompletableDelayError(conn -> closeFunction.apply((C) conn)); }); } + private void cancelIfHealthCheck(Object o) { + if (HealthCheck.class.equals(o.getClass())) { + @SuppressWarnings("unchecked") + HealthCheck healthCheck = (HealthCheck) o; + LOGGER.debug("Health check for address {} cancelled.", healthCheck.host.address); + healthCheck.cancel(); + } + } + @Override public String toString() { final ConnState connState = this.connState; @@ -613,6 +733,91 @@ public String toString() { ", #connections=" + connState.connections.length + '}'; } + + private static final class ActiveState { + private final int failedConnections; + + ActiveState() { + this(0); + } + + private ActiveState(int failedConnections) { + this.failedConnections = failedConnections; + } + + ActiveState forNextFailedConnection() { + return new ActiveState(addWithOverflowProtection(this.failedConnections, 1)); + } + + @Override + public String toString() { + return "ACTIVE(failedConnections=" + failedConnections + ')'; + } + } + + private static final class HealthCheck + extends DelayedCancellable { + private static final Exception RESCHEDULE_SIGNAL = ThrowableUtils.unknownStackTrace( + new ConnectionRejectedException("Connection rejected during health check."), + HealthCheck.class, "run()"); + private final ConnectionFactory connectionFactory; + private final Host host; + + private HealthCheck(final ConnectionFactory connectionFactory, + final Host host) { + this.connectionFactory = connectionFactory; + this.host = host; + } + + public void schedule() { + assert host.healthCheckConfig != null; + delayedCancellable( + retryWithConstantBackoffFullJitter(cause -> true, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.executor) + .apply(0, RESCHEDULE_SIGNAL) + .concat(reconnect() + .retryWhen(retryWithConstantBackoffFullJitter( + cause -> cause == RESCHEDULE_SIGNAL, + host.healthCheckConfig.healthCheckInterval, + host.healthCheckConfig.executor))) + .subscribe()); + } + + public Completable reconnect() { + return connectionFactory.newConnection(host.address, null) + .onErrorMap(cause -> { + LOGGER.debug("Health check failed for address {}.", host.address, cause); + return RESCHEDULE_SIGNAL; + }) + .flatMapCompletable(newCnx -> { + if (host.addConnection(newCnx)) { + LOGGER.debug("Health check passed for address {}.", host.address); + host.markHealthy(); + } else { + LOGGER.debug("Health check finished for address {}. Host rejected connection.", + host.address); + return newCnx.closeAsync(); + } + return completed(); + }); + } + + @Override + public String toString() { + return "UNHEALTHY"; + } + } + + private static final class ConnState { + final Object[] connections; + final Object state; + + ConnState(final Object[] connections, final Object state) { + this.connections = connections; + this.state = state; + } + } } private static final class StacklessNoAvailableHostException extends NoAvailableHostException { @@ -631,8 +836,4 @@ public static StacklessNoAvailableHostException newInstance(String message, Clas return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method); } } - - private static Single failedLBClosed() { - return failed(new IllegalStateException("LoadBalancer has closed")); - } } diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java index 797507eef4..5c2513b66f 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerFactory.java @@ -20,9 +20,17 @@ import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.LoadBalancerFactory; import io.servicetalk.client.api.ServiceDiscovererEvent; +import io.servicetalk.concurrent.api.DefaultThreadFactory; +import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.Executors; import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.loadbalancer.RoundRobinLoadBalancer.HealthCheckConfig; +import java.time.Duration; import java.util.function.Predicate; +import javax.annotation.Nullable; + +import static java.util.Objects.requireNonNull; /** * {@link LoadBalancerFactory} that creates {@link LoadBalancer} instances which use a round robin strategy @@ -40,6 +48,12 @@ * to addresses marked as {@link ServiceDiscovererEvent#isAvailable() unavailable} are used for requests, * but no new connections are created for them. In case the address' connections are busy, another host is tried. * If all hosts are busy, selection fails with a {@link io.servicetalk.client.api.ConnectionRejectedException}. + *
  • For hosts to which consecutive connection attempts fail, a background health checking task is created and + * the host is not considered for opening new connections until the background check succeeds to create a connection. + * Upon such event, the connection can immediately be reused and future attempts will again consider this host. + * This behaviour can be disabled using a negative argument for + * {@link Builder#healthCheckFailedConnectionsThreshold(int)} and the failing host will take part in the regular + * round robin cycle for trying to establish a connection on the request path.
  • * * * @param The resolved address type. @@ -49,17 +63,26 @@ public final class RoundRobinLoadBalancerFactory { static final boolean EAGER_CONNECTION_SHUTDOWN_ENABLED = true; + static final Duration DEFAULT_HEALTH_CHECK_INTERVAL = Duration.ofSeconds(1); + static final int DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD = 5; // higher than default for AutoRetryStrategy + private final boolean eagerConnectionShutdown; - private RoundRobinLoadBalancerFactory(boolean eagerConnectionShutdown) { + @Nullable + private final HealthCheckConfig healthCheckConfig; + + private RoundRobinLoadBalancerFactory(boolean eagerConnectionShutdown, + @Nullable HealthCheckConfig healthCheckConfig) { this.eagerConnectionShutdown = eagerConnectionShutdown; + this.healthCheckConfig = healthCheckConfig; } @Override public LoadBalancer newLoadBalancer( final Publisher> eventPublisher, final ConnectionFactory connectionFactory) { - return new RoundRobinLoadBalancer<>(eventPublisher, connectionFactory, eagerConnectionShutdown); + return new RoundRobinLoadBalancer<>( + eventPublisher, connectionFactory, eagerConnectionShutdown, healthCheckConfig); } /** @@ -70,6 +93,10 @@ public LoadBalancer newLoadBalancer( */ public static final class Builder { private boolean eagerConnectionShutdown = EAGER_CONNECTION_SHUTDOWN_ENABLED; + @Nullable + private Executor backgroundExecutor; + private Duration healthCheckInterval = DEFAULT_HEALTH_CHECK_INTERVAL; + private int healthCheckFailedConnectionsThreshold = DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; /** * Creates a new instance with default settings. @@ -95,13 +122,95 @@ public RoundRobinLoadBalancerFactory.Builder eagerConnection return this; } + /** + * This {@link LoadBalancer} may monitor hosts to which connection establishment has failed + * using health checks that run in the background. The health check tries to establish a new connection + * and if it succeeds, the host is returned to the load balancing pool. As long as the connection + * establishment fails, the host is not considered for opening new connections for processed requests. + * If an {@link Executor} is not provided using this method, a default shared instance is used + * for all {@link LoadBalancer LoadBalancers} created by this factory. + *

    + * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable this mechanism and always + * consider all hosts for establishing new connections. + * + * @param backgroundExecutor {@link Executor} on which to schedule health checking. + * @return {@code this}. + * @see #healthCheckFailedConnectionsThreshold(int) + */ + public RoundRobinLoadBalancerFactory.Builder backgroundExecutor( + Executor backgroundExecutor) { + this.backgroundExecutor = requireNonNull(backgroundExecutor); + return this; + } + + /** + * Configure an interval for health checking a host that failed to open connections. If no interval is provided + * using this method, a default value will be used. + *

    + * {@link #healthCheckFailedConnectionsThreshold(int)} can be used to disable the health checking mechanism + * and always consider all hosts for establishing new connections. + * @param interval interval at which a background health check will be scheduled. + * @return {@code this}. + * @see #healthCheckFailedConnectionsThreshold(int) + */ + public RoundRobinLoadBalancerFactory.Builder healthCheckInterval(Duration interval) { + if (interval.isNegative() || interval.isZero()) { + throw new IllegalArgumentException("Health check interval should be greater than 0"); + } + this.healthCheckInterval = interval; + return this; + } + + /** + * Configure a threshold for consecutive connection failures to a host. When the {@link LoadBalancer} + * consecutively fails to open connections in the amount greater or equal to the specified value, + * the host will be marked as unhealthy and connection establishment will take place in the background + * repeatedly until a connection is established. During that time, the host will not take part in + * load balancing selection. + *

    + * Use a negative value of the argument to disable health checking. + * @param threshold number of consecutive connection failures to consider a host unhealthy and eligible for + * background health checking. Use negative value to disable the health checking mechanism. + * @return {@code this}. + * @see #backgroundExecutor(Executor) + * @see #healthCheckInterval(Duration) + */ + public RoundRobinLoadBalancerFactory.Builder healthCheckFailedConnectionsThreshold( + int threshold) { + if (threshold == 0) { + throw new IllegalArgumentException("Health check failed connections threshold should not be 0"); + } + this.healthCheckFailedConnectionsThreshold = threshold; + return this; + } + /** * Builds the {@link RoundRobinLoadBalancerFactory} configured by this builder. * * @return a new instance of {@link RoundRobinLoadBalancerFactory} with settings from this builder. */ public RoundRobinLoadBalancerFactory build() { - return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown); + if (this.healthCheckFailedConnectionsThreshold < 0) { + return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, null); + } + + HealthCheckConfig healthCheckConfig = new HealthCheckConfig( + this.backgroundExecutor == null ? SharedExecutor.getInstance() : this.backgroundExecutor, + healthCheckInterval, healthCheckFailedConnectionsThreshold); + + return new RoundRobinLoadBalancerFactory<>(eagerConnectionShutdown, healthCheckConfig); + } + } + + static final class SharedExecutor { + private static final Executor INSTANCE = Executors.newFixedSizeExecutor(1, + new DefaultThreadFactory("round-robin-load-balancer-executor")); + + private SharedExecutor() { + } + + static Executor getInstance() { + return INSTANCE; } } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java index c21e3dc629..ee51ebd05c 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/EagerRoundRobinLoadBalancerTest.java @@ -106,13 +106,7 @@ private void validateConnectionClosedGracefully(final TestLoadBalancedConnection } @Override - protected RoundRobinLoadBalancer defaultLb() { - return newTestLoadBalancer(true); - } - - @Override - protected RoundRobinLoadBalancer defaultLb( - RoundRobinLoadBalancerTest.DelegatingConnectionFactory connectionFactory) { - return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory, true); + protected boolean eagerConnectionShutdown() { + return true; } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java index 7770370e23..5c79fb7b02 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/LingeringRoundRobinLoadBalancerTest.java @@ -309,13 +309,7 @@ public void handleDiscoveryEventsForNotConnectedHosts() { } @Override - protected RoundRobinLoadBalancer defaultLb() { - return newTestLoadBalancer(false); - } - - @Override - protected RoundRobinLoadBalancer defaultLb( - RoundRobinLoadBalancerTest.DelegatingConnectionFactory connectionFactory) { - return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory, false); + protected boolean eagerConnectionShutdown() { + return false; } } diff --git a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java index 07fbca0d08..eda50e7991 100644 --- a/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java +++ b/servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/RoundRobinLoadBalancerTest.java @@ -25,10 +25,13 @@ import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.Completable; +import io.servicetalk.concurrent.api.Executor; +import io.servicetalk.concurrent.api.ExecutorRule; import io.servicetalk.concurrent.api.LegacyTestSingle; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.api.TestExecutor; import io.servicetalk.concurrent.api.TestPublisher; import io.servicetalk.concurrent.api.TestSubscription; import io.servicetalk.concurrent.internal.DeliberateException; @@ -44,6 +47,7 @@ import org.junit.rules.ExpectedException; import org.junit.rules.Timeout; +import java.time.Duration; import java.util.AbstractMap; import java.util.Arrays; import java.util.List; @@ -65,14 +69,18 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; import static io.servicetalk.concurrent.api.AsyncCloseables.emptyAsyncCloseable; import static io.servicetalk.concurrent.api.BlockingTestUtils.awaitIndefinitely; +import static io.servicetalk.concurrent.api.RetryStrategies.retryWithConstantBackoffFullJitter; +import static io.servicetalk.concurrent.api.Single.defer; import static io.servicetalk.concurrent.api.Single.failed; import static io.servicetalk.concurrent.api.Single.succeeded; import static io.servicetalk.concurrent.api.SourceAdapters.toSource; import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; import static io.servicetalk.concurrent.internal.TestTimeoutConstants.DEFAULT_TIMEOUT_SECONDS; +import static io.servicetalk.loadbalancer.RoundRobinLoadBalancerFactory.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toSet; @@ -90,6 +98,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -105,6 +114,9 @@ abstract class RoundRobinLoadBalancerTest { @Rule public final ExpectedException thrown = ExpectedException.none(); + @Rule + public final ExecutorRule executor = ExecutorRule.withTestExecutor(); + protected final TestSingleSubscriber selectConnectionListener = new TestSingleSubscriber<>(); protected final List connectionsCreated = new CopyOnWriteArrayList<>(); @@ -116,6 +128,8 @@ abstract class RoundRobinLoadBalancerTest { protected RoundRobinLoadBalancer lb; + protected TestExecutor testExecutor; + protected static Predicate any() { return __ -> true; } @@ -124,13 +138,20 @@ protected Predicate alwaysNewConnectionFilter() { return cnx -> lb.usedAddresses().stream().noneMatch(addr -> addr.getValue().stream().anyMatch(cnx::equals)); } - protected abstract RoundRobinLoadBalancer defaultLb(); + protected RoundRobinLoadBalancer defaultLb() { + return newTestLoadBalancer(eagerConnectionShutdown()); + } + + protected RoundRobinLoadBalancer defaultLb( + DelegatingConnectionFactory connectionFactory) { + return newTestLoadBalancer(serviceDiscoveryPublisher, connectionFactory, eagerConnectionShutdown()); + } - protected abstract RoundRobinLoadBalancer defaultLb( - DelegatingConnectionFactory connectionFactory); + protected abstract boolean eagerConnectionShutdown(); @Before public void initialize() { + testExecutor = executor.executor(); lb = defaultLb(); connectionsCreated.clear(); connectionRealizers.clear(); @@ -381,6 +402,182 @@ public void newConnectionIsClosedWhenSelectorRejects() throws Exception { awaitIndefinitely(connection.onClose()); } + @Test + public void unhealthyHostTakenOutOfPoolForSelection() throws Exception { + serviceDiscoveryPublisher.onComplete(); + + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); + + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + lb = defaultLb(connectionFactory); + + sendServiceDiscoveryEvents(upEvent("address-1")); + sendServiceDiscoveryEvents(upEvent("address-2")); + + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD * 2; ++i) { + try { + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } catch (Exception e) { + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); + } + } + + for (int i = 0; i < 10; ++i) { + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + + assertThat(testExecutor.scheduledTasksPending(), equalTo(1)); + + for (int i = 0; i < timeAdvancementsTillHealthy; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + } + + assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); + + for (int i = 0; i < 10; ++i) { + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + } + + @Test + public void disabledHealthCheckDoesntRun() throws Exception { + serviceDiscoveryPublisher.onComplete(); + + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + + lb = (RoundRobinLoadBalancer) + new RoundRobinLoadBalancerFactory.Builder() + .healthCheckFailedConnectionsThreshold(-1) + .build() + .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory); + + sendServiceDiscoveryEvents(upEvent("address-1")); + + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); + } + + assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); + + for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); + } + + unhealthyHostConnectionFactory.advanceTime(testExecutor); + assertThat(testExecutor.scheduledTasksPending(), equalTo(0)); + + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + + @Test + public void hostUnhealthyIsHealthChecked() throws Exception { + serviceDiscoveryPublisher.onComplete(); + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + + lb = defaultLb(connectionFactory); + + sendServiceDiscoveryEvents(upEvent("address-1")); + + for (int i = 0; i < DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD; ++i) { + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), is(DELIBERATE_EXCEPTION)); + } + + for (int i = 0; i < timeAdvancementsTillHealthy; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + } + + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + + // 5 failed attempts trigger health check, 2 health check attempts fail, 3rd health check attempt + // uses the proper connection, final selection reuses that connection. 8 total creation attempts. + int expectedConnectionAttempts = 8; + assertThat(unhealthyHostConnectionFactory.requests.get(), equalTo(expectedConnectionAttempts)); + } + + // Concurrency test, run multiple times (at least 1000). + @Test + public void hostUnhealthyDoesntRaceToRunHealthCheck() throws Exception { + serviceDiscoveryPublisher.onComplete(); + + final Single properConnection = newRealizedConnectionSingle("address-1"); + final int timeAdvancementsTillHealthy = 3; + final UnhealthyHostConnectionFactory unhealthyHostConnectionFactory = new UnhealthyHostConnectionFactory( + "address-1", timeAdvancementsTillHealthy, properConnection); + final DelegatingConnectionFactory connectionFactory = unhealthyHostConnectionFactory.createFactory(); + + lb = defaultLb(connectionFactory); + sendServiceDiscoveryEvents(upEvent("address-1")); + + // Imitate concurrency by running multiple threads attempting to establish connections. + ExecutorService executor = Executors.newFixedThreadPool(3); + try { + final Runnable runnable = () -> + assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + + for (int i = 0; i < 1000; i++) { + executor.submit(runnable); + } + + // From test main thread, wait until the host becomes UNHEALTHY, which is apparent from + // NoHostAvailableException being thrown from selection AFTER a health check was scheduled by any thread. + final Executor executorForRetries = io.servicetalk.concurrent.api.Executors.newFixedSizeExecutor(1); + try { + awaitIndefinitely(lb.selectConnection(any()).retryWhen(retryWithConstantBackoffFullJitter((t) -> + // DeliberateException comes from connection opening, check for that first + // Next, NoAvailableHostException is thrown when the host is unhealthy, + // but we still wait until the health check is scheduled and only then stop retrying. + t instanceof DeliberateException || testExecutor.scheduledTasksPending() == 0, + // try to prevent stack overflow + Duration.ofMillis(30), executorForRetries))); + } catch (Exception e) { + assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + } finally { + executorForRetries.closeAsync().toFuture().get(); + } + + // At this point, either the above selection caused the host to be marked as UNHEALTHY, + // or any background thread. We also know that a health check is pending to be executed. + // Now we can validate if there is just one health check happening and confirm that by asserting the host + // is not selected. If our assumption doesn't hold, it means more than one health check was scheduled. + for (int i = 0; i < timeAdvancementsTillHealthy - 1; ++i) { + unhealthyHostConnectionFactory.advanceTime(testExecutor); + + // Assert still unhealthy + Exception e = assertThrows(ExecutionException.class, () -> lb.selectConnection(any()).toFuture().get()); + assertThat(e.getCause(), instanceOf(NoAvailableHostException.class)); + } + } finally { + // Shutdown the concurrent validation of unhealthiness. + executor.shutdownNow(); + executor.awaitTermination(10, SECONDS); + } + + unhealthyHostConnectionFactory.advanceTime(testExecutor); + + final TestLoadBalancedConnection selectedConnection = lb.selectConnection(any()).toFuture().get(); + assertThat(selectedConnection, equalTo(properConnection.toFuture().get())); + } + @SuppressWarnings("unchecked") protected void sendServiceDiscoveryEvents(final ServiceDiscovererEvent... events) { sendServiceDiscoveryEvents(serviceDiscoveryPublisher, events); @@ -408,7 +605,12 @@ protected RoundRobinLoadBalancer newTestLoad protected RoundRobinLoadBalancer newTestLoadBalancer( final TestPublisher> serviceDiscoveryPublisher, final DelegatingConnectionFactory connectionFactory, final boolean eagerConnectionShutdown) { - return new RoundRobinLoadBalancer<>(serviceDiscoveryPublisher, connectionFactory, eagerConnectionShutdown); + return (RoundRobinLoadBalancer) + new RoundRobinLoadBalancerFactory.Builder() + .eagerConnectionShutdown(eagerConnectionShutdown) + .backgroundExecutor(testExecutor) + .build() + .newLoadBalancer(serviceDiscoveryPublisher, connectionFactory); } @SafeVarargs @@ -510,4 +712,48 @@ boolean isClosed() { return closed.get(); } } + + protected static class UnhealthyHostConnectionFactory { + private final String failingHost; + private final AtomicInteger momentInTime = new AtomicInteger(); + final AtomicInteger requests = new AtomicInteger(); + final Single properConnection; + final List> connections; + + Function> factory = + new Function>() { + + @Override + public Single apply(final String s) { + return defer(() -> { + if (s.equals(failingHost)) { + requests.incrementAndGet(); + if (momentInTime.get() >= connections.size()) { + return properConnection; + } + return connections.get(momentInTime.get()); + } + return properConnection; + }); + } + }; + + UnhealthyHostConnectionFactory(final String failingHost, int timeAdvancementsTillHealthy, + Single properConnection) { + this.failingHost = failingHost; + this.connections = IntStream.range(0, timeAdvancementsTillHealthy) + .>mapToObj(__ -> failed(DELIBERATE_EXCEPTION)) + .collect(Collectors.toList()); + this.properConnection = properConnection; + } + + DelegatingConnectionFactory createFactory() { + return new DelegatingConnectionFactory(this.factory); + } + + void advanceTime(TestExecutor executor) { + momentInTime.incrementAndGet(); + executor.advanceTimeBy(1, SECONDS); + } + } }