Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadbalancer: HostSelector can be rebuilt each time the DefaultLoadBalancer gets a host set update #2774

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;

import java.util.List;
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.failed;
import static java.util.Objects.requireNonNull;
Expand All @@ -27,17 +30,53 @@ abstract class BaseHostSelector<ResolvedAddress, C extends LoadBalancedConnectio
implements HostSelector<ResolvedAddress, C> {

private final String targetResource;
BaseHostSelector(final String targetResource) {
private final List<Host<ResolvedAddress, C>> hosts;
BaseHostSelector(final List<Host<ResolvedAddress, C>> hosts, final String targetResource) {
this.hosts = hosts;
this.targetResource = requireNonNull(targetResource, "targetResource");
}

protected abstract Single<C> selectConnection0(Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve);

@Override
public final Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
return hosts.isEmpty() ? noHostsFailure() : selectConnection0(selector, context, forceNewConnectionAndReserve);
}

@Override
public final boolean isUnHealthy() {
// TODO: in the future we may want to make this more of a "are at least X hosts available" question
// so that we can compose a group of selectors into a priority set.
return allUnhealthy(hosts);
}

protected final String getTargetResource() {
return targetResource;
}

protected final Single<C> noActiveHosts(List<Host<ResolvedAddress, C>> usedHosts) {
protected final Single<C> noActiveHostsFailure(List<Host<ResolvedAddress, C>> usedHosts) {
return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
getTargetResource() + ". Either all are busy, expired, or unhealthy: " + usedHosts,
this.getClass(), "selectConnection(...)"));
}

private Single<C> noHostsFailure() {
return failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
this.getClass(), "selectConnection(...)"));
}

private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(
bryce-anderson marked this conversation as resolved.
Show resolved Hide resolved
final List<Host<ResolvedAddress, C>> usedHosts) {
boolean allUnhealthy = !usedHosts.isEmpty();
for (Host<ResolvedAddress, C> host : usedHosts) {
if (!host.isUnhealthy()) {
allUnhealthy = false;
break;
}
}
return allUnhealthy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

import io.servicetalk.client.api.ConnectionFactory;
import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.NoActiveHostException;
import io.servicetalk.client.api.ServiceDiscovererEvent;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.PublisherSource.Processor;
import io.servicetalk.concurrent.PublisherSource.Subscriber;
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.CompositeCloseable;
import io.servicetalk.concurrent.api.ListenableAsyncCloseable;
Expand Down Expand Up @@ -84,14 +86,16 @@ final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnectio

private volatile long nextResubscribeTime = RESUBSCRIBING;

// writes to these fields protected by `sequentialExecutor` but they can be read from any thread.
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList();
private volatile boolean isClosed;
// writes are protected by `sequentialExecutor` but the field can be read by any thread.
private volatile HostSelector<ResolvedAddress, C> hostSelector;
// reads and writes are protected by `sequentialExecutor`.
private List<Host<ResolvedAddress, C>> usedHosts = emptyList();
// reads and writes are protected by `sequentialExecutor`.
private boolean isClosed;

private final String targetResource;
private final SequentialExecutor sequentialExecutor;
private final String lbDescription;
private final HostSelector<ResolvedAddress, C> hostSelector;
private final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher;
private final Processor<Object, Object> eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32);
private final Publisher<Object> eventStream;
Expand Down Expand Up @@ -174,9 +178,9 @@ private Completable doClose(final boolean graceful) {
SourceAdapters.toSource((graceful ? compositeCloseable.closeAsyncGracefully() :
// We only want to empty the host list on error if we're closing non-gracefully.
compositeCloseable.closeAsync().beforeOnError(t ->
sequentialExecutor.execute(() -> usedHosts = emptyList())))
sequentialExecutor.execute(this::sequentialCompleteClosed))
// we want to always empty out the host list if we complete successfully
.beforeOnComplete(() -> sequentialExecutor.execute(() -> usedHosts = emptyList())))
.beforeOnComplete(() -> sequentialExecutor.execute(this::sequentialCompleteClosed))))
.subscribe(processor);
} catch (Throwable ex) {
processor.onError(ex);
Expand All @@ -185,6 +189,12 @@ private Completable doClose(final boolean graceful) {
return SourceAdapters.fromSource(processor);
}

// must be called from within the sequential executor.
private void sequentialCompleteClosed() {
usedHosts = emptyList();
hostSelector = new ClosedHostSelector();
}

private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(
final HealthCheckConfig config, final DefaultLoadBalancer<R, C> lb) {
final long lowerNanos = config.healthCheckResubscribeLowerBound;
Expand All @@ -197,18 +207,6 @@ private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(
return result;
}

private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(
final List<Host<ResolvedAddress, C>> usedHosts) {
boolean allUnhealthy = !usedHosts.isEmpty();
for (Host<ResolvedAddress, C> host : usedHosts) {
if (!host.isUnhealthy()) {
allUnhealthy = false;
break;
}
}
return allUnhealthy;
}

private static <ResolvedAddress> boolean onlyAvailable(
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean onlyAvailable = !events.isEmpty();
Expand Down Expand Up @@ -285,6 +283,7 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
// First thing we do is go through the existing hosts and see if we need to transfer them. These
// will be all existing hosts that either don't have a matching discovery event or are not marked
// as unavailable. If they are marked unavailable, we need to close them.
boolean hostSetChanged = false;
for (Host<ResolvedAddress, C> host : oldUsedHosts) {
ServiceDiscovererEvent<ResolvedAddress> event = eventMap.remove(host.address());
if (event == null) {
Expand All @@ -298,14 +297,20 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
if (host.markActiveIfNotClosed()) {
nextHosts.add(host);
} else {
// It's a new host, so the set changed.
hostSetChanged = true;
nextHosts.add(createHost(event.address()));
}
} else if (EXPIRED.equals(event.status())) {
if (!host.markExpired()) {
nextHosts.add(host);
} else {
// Marking it expired also resulted in removing it from the set.
hostSetChanged = true;
}
} else if (UNAVAILABLE.equals(event.status())) {
host.markClosed();
hostSetChanged = true;
} else {
LOGGER.warn("{}: Unsupported Status in event:" +
" {} (mapped to {}). Leaving usedHosts unchanged: {}",
Expand All @@ -318,11 +323,14 @@ private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<Resolv
for (ServiceDiscovererEvent<ResolvedAddress> event : eventMap.values()) {
if (AVAILABLE.equals(event.status())) {
sendReadyEvent = true;
hostSetChanged = true;
nextHosts.add(createHost(event.address()));
}
}
// We've built the new list so now set it for consumption and then send our events.
usedHosts = nextHosts;
// We've built a materially different host set so now set it for consumption and send our events.
if (hostSetChanged) {
sequentialUpdateUsedHosts(nextHosts);
}

LOGGER.debug("{}: now using addresses (size={}): {}.",
DefaultLoadBalancer.this, nextHosts.size(), nextHosts);
Expand Down Expand Up @@ -370,10 +378,13 @@ private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
}
final List<Host<ResolvedAddress, C>> nextHosts = listWithHostRemoved(
currentHosts, current -> current == host);
usedHosts = nextHosts;
if (nextHosts.isEmpty()) {
// We transitioned from non-empty to empty. That means we're not ready.
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
// we only need to do anything else if we actually removed the host
if (nextHosts.size() != currentHosts.size()) {
sequentialUpdateUsedHosts(nextHosts);
if (nextHosts.isEmpty()) {
// We transitioned from non-empty to empty. That means we're not ready.
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
}
}
})).subscribe();
return host;
Expand Down Expand Up @@ -405,30 +416,36 @@ private List<Host<ResolvedAddress, C>> listWithHostRemoved(

@Override
public void onError(final Throwable t) {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onError(t);
}
LOGGER.error(
"{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.",
DefaultLoadBalancer.this, eventPublisher, hosts.size(), hosts, t);
sequentialExecutor.execute(() -> {
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onError(t);
}
List<Host<ResolvedAddress, C>> hosts = usedHosts;
LOGGER.error(
"{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.",
DefaultLoadBalancer.this, eventPublisher, hosts.size(), hosts, t);
});
}

@Override
public void onComplete() {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onComplete();
}
LOGGER.error("{}: service discoverer completed. Last seen addresses (size={}): {}.",
DefaultLoadBalancer.this, hosts.size(), hosts);
sequentialExecutor.execute(() -> {
List<Host<ResolvedAddress, C>> hosts = usedHosts;
if (healthCheckConfig == null) {
// Terminate processor only if we will never re-subscribe
eventStreamProcessor.onComplete();
}
LOGGER.error("{}: service discoverer completed. Last seen addresses (size={}): {}.",
DefaultLoadBalancer.this, hosts.size(), hosts);
});
}
}

private static <T> Single<T> failedLBClosed(String targetResource) {
return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed"));
// must be called from within the SequentialExecutor
private void sequentialUpdateUsedHosts(List<Host<ResolvedAddress, C>> nextHosts) {
this.usedHosts = nextHosts;
this.hostSelector = hostSelector.rebuildWithHosts(usedHosts);
}

@Override
Expand All @@ -443,21 +460,11 @@ public Single<C> newConnection(@Nullable final ContextMap context) {

private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final ContextMap context,
final boolean forceNewConnectionAndReserve) {
final List<Host<ResolvedAddress, C>> currentHosts = this.usedHosts;
// It's possible that we're racing with updates from the `onNext` method but since it's intrinsically
// racy it's fine to do these 'are there any hosts at all' checks here using the total host set.
if (currentHosts.isEmpty()) {
return isClosed ? failedLBClosed(targetResource) :
// This is the case when SD has emitted some items but none of the hosts are available.
failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
this.getClass(), "selectConnection0(...)"));
}

Single<C> result = hostSelector.selectConnection(currentHosts, selector, context, forceNewConnectionAndReserve);
final HostSelector<ResolvedAddress, C> currentHostSelector = hostSelector;
Single<C> result = currentHostSelector.selectConnection(selector, context, forceNewConnectionAndReserve);
if (healthCheckConfig != null) {
result = result.beforeOnError(exn -> {
if (exn instanceof Exceptions.StacklessNoActiveHostException && allUnhealthy(currentHosts)) {
if (exn instanceof NoActiveHostException && currentHostSelector.isUnHealthy()) {
final long currNextResubscribeTime = nextResubscribeTime;
if (currNextResubscribeTime >= 0 &&
healthCheckConfig.executor.currentTime(NANOSECONDS) >= currNextResubscribeTime &&
Expand Down Expand Up @@ -502,6 +509,24 @@ public Completable closeAsyncGracefully() {

@Override
public List<Entry<ResolvedAddress, List<C>>> usedAddresses() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something here, but isn't that a check-then-act issue? What guarantees that the condition still holds after the IF.

Copy link
Contributor Author

@bryce-anderson bryce-anderson Dec 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify what your concern is? If it's whether the result from usedAddresses() is still valid, then yes: it's always going to be racy. Since it's only used for tests to inspect the internal state the usage is rather constrained it should be no worse than it always was. If you means the if block on line 514 then we know what the current sequence of calls are going to be for this thread and reason about it for the lifetime of this method call at least.

// If we're already in the executor we can't submit a task and wait for it without deadlock but
// the access is thread safe anyway so just go for it.
if (sequentialExecutor.isCurrentThreadDraining()) {
return sequentialUsedAddresses();
}
SingleSource.Processor<List<Entry<ResolvedAddress, List<C>>>, List<Entry<ResolvedAddress, List<C>>>> processor =
Processors.newSingleProcessor();
sequentialExecutor.execute(() -> processor.onSuccess(sequentialUsedAddresses()));
try {
// This method is just for testing and our tests have timeouts it's fine to do some awaiting.
return SourceAdapters.fromSource(processor).toFuture().get();
} catch (Exception ex) {
throw new AssertionError("Failed to get results", ex);
}
}

// must be called from within the sequential executor.
private List<Entry<ResolvedAddress, List<C>>> sequentialUsedAddresses() {
return usedHosts.stream().map(host -> ((DefaultHost<ResolvedAddress, C>) host).asEntry()).collect(toList());
}

Expand All @@ -511,4 +536,22 @@ private String makeDescription(String id, String targetResource) {
", targetResource=" + targetResource +
'}';
}

private final class ClosedHostSelector implements HostSelector<ResolvedAddress, C> {
@Override
public Single<C> selectConnection(Predicate<C> selector, @Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
return failed(new IllegalStateException("LoadBalancer for " + targetResource + " has closed"));
}

@Override
public HostSelector<ResolvedAddress, C> rebuildWithHosts(List<Host<ResolvedAddress, C>> hosts) {
return this;
}

@Override
public boolean isUnHealthy() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import javax.annotation.Nullable;

import static io.servicetalk.loadbalancer.HealthCheckConfig.DEFAULT_HEALTH_CHECK_FAILED_CONNECTIONS_THRESHOLD;
Expand Down Expand Up @@ -141,8 +142,8 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(String targetResource,
Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher,
loadBalancingPolicy.buildSelector(targetResource), connectionFactory, linearSearchSpace,
healthCheckConfig);
loadBalancingPolicy.buildSelector(Collections.emptyList(), targetResource), connectionFactory,
linearSearchSpace, healthCheckConfig);
}
}

Expand Down
Loading
Loading