Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

loadbalancer: rename NewRoundRobinLoadBalancer to DefaultLoadBalancer #2763

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,18 @@
* @param <ResolvedAddress> The resolved address type.
* @param <C> The type of connection.
*/
final class NewRoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedConnection>
final class DefaultLoadBalancer<ResolvedAddress, C extends LoadBalancedConnection>
implements TestableLoadBalancer<ResolvedAddress, C> {

private static final Logger LOGGER = LoggerFactory.getLogger(NewRoundRobinLoadBalancer.class);
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultLoadBalancer.class);

@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<NewRoundRobinLoadBalancer, List> usedHostsUpdater =
AtomicReferenceFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, List.class, "usedHosts");
private static final AtomicReferenceFieldUpdater<DefaultLoadBalancer, List> usedHostsUpdater =
AtomicReferenceFieldUpdater.newUpdater(DefaultLoadBalancer.class, List.class, "usedHosts");

@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<NewRoundRobinLoadBalancer> nextResubscribeTimeUpdater =
AtomicLongFieldUpdater.newUpdater(NewRoundRobinLoadBalancer.class, "nextResubscribeTime");
private static final AtomicLongFieldUpdater<DefaultLoadBalancer> nextResubscribeTimeUpdater =
AtomicLongFieldUpdater.newUpdater(DefaultLoadBalancer.class, "nextResubscribeTime");

private static final long RESUBSCRIBING = -1L;

Expand All @@ -111,7 +111,7 @@ final class NewRoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedCon
/**
* Creates a new instance.
*
* @param id a (unique) ID to identify the created {@link NewRoundRobinLoadBalancer}.
* @param id a (unique) ID to identify the created {@link DefaultLoadBalancer}.
* @param targetResourceName {@link String} representation of the target resource for which this instance
* is performing load balancing.
* @param eventPublisher provides a stream of addresses to connect to.
Expand All @@ -121,16 +121,17 @@ final class NewRoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalancedCon
* continues being eligible for connecting on the request path).
* @see RoundRobinLoadBalancerFactory
*/
NewRoundRobinLoadBalancer(
DefaultLoadBalancer(
final String id,
final String targetResourceName,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final HostSelector<ResolvedAddress, C> hostSelector,
final ConnectionFactory<ResolvedAddress, ? extends C> connectionFactory,
final int linearSearchSpace,
@Nullable final HealthCheckConfig healthCheckConfig) {
this.targetResource = requireNonNull(targetResourceName);
this.lbDescription = makeDescription(id, targetResource);
this.hostSelector = new RoundRobinSelector<>(targetResource);
this.hostSelector = requireNonNull(hostSelector, "hostSelector");
this.eventPublisher = requireNonNull(eventPublisher);
this.eventStream = fromSource(eventStreamProcessor)
.replay(1); // Allow for multiple subscribers and provide new subscribers with last signal.
Expand Down Expand Up @@ -179,13 +180,14 @@ private void subscribeToEvents(boolean resubscribe) {
}

private static <R, C extends LoadBalancedConnection> long nextResubscribeTime(
final HealthCheckConfig config, final NewRoundRobinLoadBalancer<R, C> lb) {
final long lower = config.healthCheckResubscribeLowerBound;
final long upper = config.healthCheckResubscribeUpperBound;
final long currentTime = config.executor.currentTime(NANOSECONDS);
final long result = currentTime + (lower == upper ? lower : ThreadLocalRandom.current().nextLong(lower, upper));
final HealthCheckConfig config, final DefaultLoadBalancer<R, C> lb) {
final long lowerNanos = config.healthCheckResubscribeLowerBound;
final long upperNanos = config.healthCheckResubscribeUpperBound;
final long currentTimeNanos = config.executor.currentTime(NANOSECONDS);
final long result = currentTimeNanos + (lowerNanos == upperNanos ? lowerNanos :
ThreadLocalRandom.current().nextLong(lowerNanos, upperNanos));
LOGGER.debug("{}: current time {}, next resubscribe attempt can be performed at {}.",
lb, currentTime, result);
lb, currentTimeNanos, result);
return result;
}

Expand Down Expand Up @@ -249,7 +251,7 @@ public void onSubscribe(final Subscription s) {
public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) {
if (events == null || events.isEmpty()) {
LOGGER.debug("{}: unexpectedly received null or empty list instead of events.",
NewRoundRobinLoadBalancer.this);
DefaultLoadBalancer.this);
return;
}

Expand All @@ -260,7 +262,7 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
// that never gets used but is orphaned. It's fine so long as there is nothing to close but that
// guarantee may not always hold in the future.
@SuppressWarnings("unchecked")
List<Host<ResolvedAddress, C>> usedHosts = usedHostsUpdater.get(NewRoundRobinLoadBalancer.this);
List<Host<ResolvedAddress, C>> usedHosts = usedHostsUpdater.get(DefaultLoadBalancer.this);
if (isClosedList(usedHosts)) {
// We don't update if the load balancer is closed.
return;
Expand Down Expand Up @@ -309,7 +311,7 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
} else {
LOGGER.warn("{}: Unsupported Status in event:" +
" {} (mapped to {}). Leaving usedHosts unchanged: {}",
NewRoundRobinLoadBalancer.this, event, event.status(), nextHosts);
DefaultLoadBalancer.this, event, event.status(), nextHosts);
nextHosts.add(host);
}
}
Expand All @@ -323,13 +325,13 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R
}
// We've now built the new list so now we need to CAS it before we can move on. This should only be
// racing with closing hosts and closing the whole LB so it shouldn't be common to lose the race.
if (usedHostsUpdater.compareAndSet(NewRoundRobinLoadBalancer.this, usedHosts, nextHosts)) {
if (usedHostsUpdater.compareAndSet(DefaultLoadBalancer.this, usedHosts, nextHosts)) {
break;
}
}

LOGGER.debug("{}: now using addresses (size={}): {}.",
NewRoundRobinLoadBalancer.this, nextHosts.size(), nextHosts);
DefaultLoadBalancer.this, nextHosts.size(), nextHosts);
if (nextHosts.isEmpty()) {
eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT);
} else if (sendReadyEvent) {
Expand Down Expand Up @@ -363,10 +365,10 @@ public void onNext(@Nullable final Collection<? extends ServiceDiscovererEvent<R

private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
// All hosts will share the healthcheck config of the parent RR loadbalancer.
Host<ResolvedAddress, C> host = new DefaultHost<>(NewRoundRobinLoadBalancer.this.toString(), addr,
Host<ResolvedAddress, C> host = new DefaultHost<>(DefaultLoadBalancer.this.toString(), addr,
connectionFactory, linearSearchSpace, healthCheckConfig);
host.onClose().afterFinally(() ->
usedHostsUpdater.updateAndGet(NewRoundRobinLoadBalancer.this, previousHosts -> {
usedHostsUpdater.updateAndGet(DefaultLoadBalancer.this, previousHosts -> {
@SuppressWarnings("unchecked")
List<Host<ResolvedAddress, C>> previousHostsTyped =
(List<Host<ResolvedAddress, C>>) previousHosts;
Expand All @@ -379,7 +381,7 @@ private Host<ResolvedAddress, C> createHost(ResolvedAddress addr) {
private List<Host<ResolvedAddress, C>> listWithHostRemoved(
List<Host<ResolvedAddress, C>> oldHostsTyped, Predicate<Host<ResolvedAddress, C>> hostPredicate) {
if (oldHostsTyped.isEmpty()) {
// this can happen when an expired host is removed during closing of the NewRoundRobinLoadBalancer,
// this can happen when an expired host is removed during closing of the DefaultLoadBalancer,
// but all of its connections have already been closed
return oldHostsTyped;
}
Expand Down Expand Up @@ -409,7 +411,7 @@ public void onError(final Throwable t) {
}
LOGGER.error(
"{}: service discoverer {} emitted an error. Last seen addresses (size={}): {}.",
NewRoundRobinLoadBalancer.this, eventPublisher, hosts.size(), hosts, t);
DefaultLoadBalancer.this, eventPublisher, hosts.size(), hosts, t);
}

@Override
Expand All @@ -420,7 +422,7 @@ public void onComplete() {
eventStreamProcessor.onComplete();
}
LOGGER.error("{}: service discoverer completed. Last seen addresses (size={}): {}.",
NewRoundRobinLoadBalancer.this, hosts.size(), hosts);
DefaultLoadBalancer.this, hosts.size(), hosts);
}
}

Expand Down Expand Up @@ -507,7 +509,7 @@ private static boolean isClosedList(List<?> list) {
}

private String makeDescription(String id, String targetResource) {
return "NewRoundRobinLoadBalancer{" +
return getClass().getSimpleName() + "{" +
"id=" + id + '@' + toHexString(identityHashCode(this)) +
", targetResource=" + targetResource +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,8 @@ public <T extends C> LoadBalancer<T> newLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return useNewRoundRobin ?
new NewRoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig)
: new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
return useNewRoundRobin ? useNewRoundRobinLoadBalancer(targetResource, eventPublisher, connectionFactory)
: new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);
}

Expand All @@ -92,12 +90,19 @@ public LoadBalancer<C> newLoadBalancer(
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, C> connectionFactory,
final String targetResource) {
return useNewRoundRobin ? new NewRoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig)
return useNewRoundRobin ? useNewRoundRobinLoadBalancer(targetResource, eventPublisher, connectionFactory)
: new RoundRobinLoadBalancer<>(id, targetResource, eventPublisher, connectionFactory,
linearSearchSpace, healthCheckConfig);
}

private <T extends C> LoadBalancer<T> useNewRoundRobinLoadBalancer(
final String targetResource,
final Publisher<? extends Collection<? extends ServiceDiscovererEvent<ResolvedAddress>>> eventPublisher,
final ConnectionFactory<ResolvedAddress, T> connectionFactory) {
return new DefaultLoadBalancer<>(id, targetResource, eventPublisher, new RoundRobinSelector<>(targetResource),
connectionFactory, linearSearchSpace, healthCheckConfig);
}

@Override
public ExecutionStrategy requiredOffloads() {
// We do not block
Expand Down