Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadbalancer: Add a P2C host selector #2743

Merged
merged 5 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.concurrent.api.Single;

import java.util.List;

import static io.servicetalk.concurrent.api.Single.failed;
import static java.util.Objects.requireNonNull;

abstract class BaseHostSelector<ResolvedAddress, C extends LoadBalancedConnection>
implements HostSelector<ResolvedAddress, C> {

private final String targetResource;
BaseHostSelector(final String targetResource) {
this.targetResource = requireNonNull(targetResource, "targetResource");
}

protected final String getTargetResource() {
return targetResource;
}

protected final Single<C> noActiveHostsException(List<Host<ResolvedAddress, C>> usedHosts) {
return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
getTargetResource() + ". Either all are busy, expired, or unhealthy: " + usedHosts,
this.getClass(), "selectConnection(...)"));
}
}

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private static <ResolvedAddress, C extends LoadBalancedConnection> boolean notAv
final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
boolean available = false;
for (ServiceDiscovererEvent<ResolvedAddress> event : events) {
if (host.address.equals(event.address())) {
if (host.address().equals(event.address())) {
available = true;
break;
}
Expand Down Expand Up @@ -276,7 +276,7 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
}
} else if (UNAVAILABLE.equals(eventStatus)) {
return listWithHostRemoved(oldHostsTyped, host -> {
boolean match = host.address.equals(addr);
boolean match = host.address().equals(addr);
if (match) {
host.markClosed();
}
Expand Down Expand Up @@ -331,7 +331,7 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
private List<Host<ResolvedAddress, C>> markHostAsExpired(
final List<Host<ResolvedAddress, C>> oldHostsTyped, final ResolvedAddress addr) {
for (Host<ResolvedAddress, C> host : oldHostsTyped) {
if (host.address.equals(addr)) {
if (host.address().equals(addr)) {
// Host removal will be handled by the Host's onClose::afterFinally callback
host.markExpired();
break; // because duplicates are not allowed, we can stop iteration
Expand All @@ -342,7 +342,7 @@ private List<Host<ResolvedAddress, C>> markHostAsExpired(

private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
// All hosts will share the healthcheck config of the parent RR loadbalancer.
Host<ResolvedAddress, C> host = new Host<>(NewRoundRobinLoadBalancer.this.toString(), addr,
Host<ResolvedAddress, C> host = new DefaultHost<>(NewRoundRobinLoadBalancer.this.toString(), addr,
connectionFactory, linearSearchSpace, healthCheckConfig);
host.onClose().afterFinally(() ->
usedHostsUpdater.updateAndGet(NewRoundRobinLoadBalancer.this, previousHosts -> {
Expand All @@ -363,7 +363,7 @@ private List<Host<ResolvedAddress, C>> addHostToList(

// duplicates are not allowed
for (Host<ResolvedAddress, C> host : oldHostsTyped) {
if (host.address.equals(addr)) {
if (host.address().equals(addr)) {
if (!host.markActiveIfNotClosed()) {
// If the host is already in CLOSED state, we should create a new entry.
// For duplicate ACTIVE events or for repeated activation due to failed CAS
Expand Down Expand Up @@ -450,7 +450,7 @@ private Single<C> selectConnection0(final Predicate<C> selector, @Nullable final
// This is the case when SD has emitted some items but none of the hosts are available.
failed(Exceptions.StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect for " + targetResource + ".",
NewRoundRobinLoadBalancer.class, "selectConnection0(...)"));
this.getClass(), "selectConnection0(...)"));
}

Single<C> result = hostSelector.selectConnection(currentHosts, selector, context, forceNewConnectionAndReserve);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright © 2023 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.loadbalancer;

import io.servicetalk.client.api.LoadBalancedConnection;
import io.servicetalk.client.api.LoadBalancer;
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.context.api.ContextMap;

import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.succeeded;

/**
* This {@link LoadBalancer} selection algorithm is based on work by Michael David Mitzenmacher in The Power of Two
* Choices in Randomized Load Balancing.
*
* @see <a href="https://www.eecs.harvard.edu/~michaelm/postscripts/tpds2001.pdf">Mitzenmacher (2001) The Power of Two
* Choices in Randomized Load Balancing</a>
*/
final class P2CSelector<ResolvedAddress, C extends LoadBalancedConnection>
extends BaseHostSelector<ResolvedAddress, C> {

@Nullable
private final Random random;
private final int maxEffort;

P2CSelector(final String targetResource, final int maxEffort, @Nullable final Random random) {
super(targetResource);
this.maxEffort = maxEffort;
this.random = random;
}

@Override
public Single<C> selectConnection(
@Nonnull List<Host<ResolvedAddress, C>> hosts,
@Nonnull Predicate<C> selector,
@Nullable ContextMap context,
boolean forceNewConnectionAndReserve) {
final int size = hosts.size();
switch (size) {
case 0:
// We shouldn't get called if the load balancer doesn't have any hosts.
throw new AssertionError("Selector for " + getTargetResource() +
" received an empty host set");
case 1:
Host<ResolvedAddress, C> host = hosts.get(0);
if (!forceNewConnectionAndReserve) {
C connection = host.pickConnection(selector, context);
if (connection != null) {
return succeeded(connection);
}
}
// Either we require a new connection or there wasn't one already established so
// try to make a new one if the host is healthy. If it's not healthy, we fail
// and let the higher level retries decide what to do.
if (!host.isActiveAndHealthy()) {
return noActiveHostsException(hosts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this isn't an exception it's a failed Single. I might expect return noActiveHosts(hosts).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't see this until I had merged. I'll clean it up in a follow up.

}
return host.newConnection(selector, forceNewConnectionAndReserve, context);
default:
return p2c(size, hosts, getRandom(), selector, forceNewConnectionAndReserve, context);
}
}

@Nullable
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method always returns a non-null value, this annotation is not required

private Single<C> p2c(int size, List<Host<ResolvedAddress, C>> hosts, Random random, Predicate<C> selector,
boolean forceNewConnectionAndReserve, @Nullable ContextMap contextMap) {
for (int j = maxEffort; j > 0; j--) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will it make sense to limit efforts based on number of hosts? For example, if there are only 2 hosts with maxEffort > 1, should we try again or fail fast?

// Pick two random indexes that don't collide. Limit the range on the second index to 1 less than
// the max value so that if there is a collision we can safety increment. We also increment if
// i2 > i1 to avoid biased toward lower numbers since we limited the range by 1.
final int i1 = random.nextInt(size);
int i2 = random.nextInt(size - 1);
if (i2 >= i1) {
++i2;
}
Host<ResolvedAddress, C> t1 = hosts.get(i1);
Host<ResolvedAddress, C> t2 = hosts.get(i2);
// Make t1 the preferred host by score to make the logic below a bit cleaner.
if (t1.score() < t2.score()) {
Host<ResolvedAddress, C> tmp = t1;
t1 = t2;
t2 = tmp;
}

if (!forceNewConnectionAndReserve) {
// First we're going to see if we can get an existing connection regardless of health status. Since t1
// is 'better' we'll try it first. If it doesn't have any existing connections we don't fall back to t2
// or else we would cause a bias toward hosts with existing connections which could ultimately drive all
// traffic to the first host to make a connection in the case of a multiplexed session.
C c = t1.pickConnection(selector, contextMap);
if (c != null) {
return succeeded(c);
}
// We now need to consider the health status and make a new connection if either
// host is considered healthy.
}

// We either couldn't find a live connection or are being forced to make a new one. Either way we're
// going to make a new connection which means we need to consider health.
final boolean t1Healthy = t1.isActiveAndHealthy();
final boolean t2Healthy = t2.isActiveAndHealthy();
Comment on lines +120 to +121
Copy link
Contributor Author

@bryce-anderson bryce-anderson Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this follows the pattern set by the RoundRobinSelector which is to consider health only for making new connections. I think that is born of the only current circuit breaker being the L4 consecutive failures and the current hesitancy to mark a connection as inactive.
I think we should change that behavior in that we should consider health for any connections and also allow an unhealthy connection to be selected as a last resort (likely configurable). This allows us to have the same expected behavior for all circuit breakers and we don't need to special case the L4. Additionally, I think the L4 case is even improved because if we can't connect to a host there is a good chance existing connections are either not responsive or draining.
However, that is a reasonably big reinterpretation of what health means. I'd like to change it for RR and P2C at the same time, or maybe switch RR first and then merge this. Feedback appreciated on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on that - RR is a good battle tested source for feedback, but if we are designing building blocks of a LB abstraction we should re-consider some of the design decisions and evaluate tradeoffs.
My mental model about this was

  • List of hosts as seen from SD
  • Subset of hosts as identified by any feedback controller (e.g., L4 / L7)
  • Select among these
  • Fallback strategy (default to what RR is doing)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I think we can potentially go further than RR and offer a panic mode, maybe in two flavors: existing connections only and even for new connections, that at least lets the LB try. At least for the latter it's probably important to add some restrictions at the connection factory level to avoid connection storms so we should proceed with caution there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for improving and interating on this area and shaping definition of "unhealthy" state.

if we can't connect to a host there is a good chance existing connections are either not responsive or draining

In order to keep track of use-cases and make sure we don't miss anything, it's worth mentioning that ServerListenContext API allows users to pause acceptance of new connections without affecting pre-existing connections. We should account for that as well.

if (t1Healthy) {
return t1.newConnection(selector, forceNewConnectionAndReserve, contextMap);
} else if (t2Healthy) {
return t2.newConnection(selector, forceNewConnectionAndReserve, contextMap);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you clarified reasoning to move re-iteration in a separate big redesign, will it make sense for the time being to follow the following logic?

  1. Try t1.pickConnection, if not
  2. Check t1 health, if ok do t1.newConnection if not:
  3. Try t2.pickConnection, if not
  4. Check t2 health, if ok do t1.newConnection if not:
  5. go to the next effort

}
// Neither are healthy and capable of making a connection: fall through, perhaps for another attempt.
}
// Max effort exhausted. We failed to find a healthy and active host.
return noActiveHostsException(hosts);
}

private Random getRandom() {
return random == null ? ThreadLocalRandom.current() : random;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,20 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static java.util.Objects.requireNonNull;

final class RoundRobinSelector<ResolvedAddress, C extends LoadBalancedConnection>
implements HostSelector<ResolvedAddress, C> {
extends BaseHostSelector<ResolvedAddress, C> {

@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<RoundRobinSelector> indexUpdater =
AtomicIntegerFieldUpdater.newUpdater(RoundRobinSelector.class, "index");

private final String targetResource;
@SuppressWarnings("unused")
private volatile int index;

RoundRobinSelector(final String targetResource) {
this.targetResource = requireNonNull(targetResource);
super(targetResource);
}

@Override
Expand Down Expand Up @@ -73,9 +70,7 @@ public Single<C> selectConnection(
}
}
if (pickedHost == null) {
return failed(Exceptions.StacklessNoActiveHostException.newInstance("Failed to pick an active host for " +
targetResource + ". Either all are busy, expired, or unhealthy: " + usedHosts,
this.getClass(), "selectConnection(...)"));
return noActiveHostsException(usedHosts);
}
// We have a host but no connection was selected: create a new one.
return pickedHost.newConnection(selector, forceNewConnectionAndReserve, context);
Expand Down
Loading