diff --git a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java index 6cff989315..7a3ac07822 100644 --- a/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java +++ b/servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java @@ -19,14 +19,17 @@ import io.servicetalk.client.api.LoadBalancedConnection; import io.servicetalk.client.api.LoadBalancer; import io.servicetalk.client.api.ServiceDiscovererEvent; +import io.servicetalk.concurrent.CompletableSource; import io.servicetalk.concurrent.PublisherSource.Processor; import io.servicetalk.concurrent.PublisherSource.Subscriber; import io.servicetalk.concurrent.PublisherSource.Subscription; import io.servicetalk.concurrent.api.Completable; import io.servicetalk.concurrent.api.CompositeCloseable; import io.servicetalk.concurrent.api.ListenableAsyncCloseable; +import io.servicetalk.concurrent.api.Processors; import io.servicetalk.concurrent.api.Publisher; import io.servicetalk.concurrent.api.Single; +import io.servicetalk.concurrent.api.SourceAdapters; import io.servicetalk.concurrent.internal.SequentialCancellable; import io.servicetalk.context.api.ContextMap; @@ -35,21 +38,13 @@ import java.util.ArrayList; import java.util.Collection; -import java.util.Comparator; import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; -import java.util.Spliterator; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.function.Consumer; import java.util.function.Predicate; -import java.util.function.UnaryOperator; -import java.util.stream.Stream; import javax.annotation.Nullable; import static io.servicetalk.client.api.LoadBalancerReadyEvent.LOAD_BALANCER_NOT_READY_EVENT; @@ -82,9 +77,6 @@ final class DefaultLoadBalancer usedHostsUpdater = - AtomicReferenceFieldUpdater.newUpdater(DefaultLoadBalancer.class, List.class, "usedHosts"); @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater nextResubscribeTimeUpdater = @@ -93,13 +85,18 @@ final class DefaultLoadBalancer> usedHosts = emptyList(); + private volatile boolean isClosed; private final String targetResource; private final String lbDescription; private final HostSelector hostSelector; private final Publisher>> eventPublisher; private final Processor eventStreamProcessor = newPublisherProcessorDropHeadOnOverflow(32); + private final Processor sequentialExecutionQueue = + newPublisherProcessorDropHeadOnOverflow(Integer.MAX_VALUE); private final Publisher eventStream; private final SequentialCancellable discoveryCancellable = new SequentialCancellable(); private final ConnectionFactory connectionFactory; @@ -138,30 +135,13 @@ final class DefaultLoadBalancer { - discoveryCancellable.cancel(); - eventStreamProcessor.onComplete(); - final CompositeCloseable compositeCloseable; - for (;;) { - List> currentList = usedHosts; - if (isClosedList(currentList) || - usedHostsUpdater.compareAndSet(this, currentList, new ClosedList<>(currentList))) { - compositeCloseable = newCompositeCloseable().appendAll(currentList).appendAll(connectionFactory); - LOGGER.debug("{} is closing {}gracefully. Last seen addresses (size={}): {}.", - this, graceful ? "" : "non", currentList.size(), currentList); - break; - } - } - return (graceful ? compositeCloseable.closeAsyncGracefully() : compositeCloseable.closeAsync()) - .beforeOnError(t -> { - if (!graceful) { - usedHosts = new ClosedList<>(emptyList()); - } - }) - .beforeOnComplete(() -> usedHosts = new ClosedList<>(emptyList())); - }); + this.asyncCloseable = toAsyncCloseable(this::doClose); // Maintain a Subscriber so signals are always delivered to replay and new Subscribers get the latest signal. eventStream.ignoreElements().subscribe(); + + // Start running the events forever. + SourceAdapters.fromSource(sequentialExecutionQueue).forEach(DefaultLoadBalancer::safeRun); + subscribeToEvents(false); } @@ -179,6 +159,34 @@ private void subscribeToEvents(boolean resubscribe) { } } + // This method is called eagerly, meaning the completable will be immediately subscribed to, + // so we don't need to do any Completable.defer business. + private Completable doClose(final boolean graceful) { + CompletableSource.Processor processor = Processors.newCompletableProcessor(); + executeSequentially(() -> { + if (!isClosed) { + discoveryCancellable.cancel(); + eventStreamProcessor.onComplete(); + } + isClosed = true; + List> currentList = usedHosts; + final CompositeCloseable compositeCloseable = newCompositeCloseable() + .appendAll(currentList) + .appendAll(connectionFactory); + LOGGER.debug("{} is closing {}gracefully. Last seen addresses (size={}): {}.", + this, graceful ? "" : "non", currentList.size(), currentList); + SourceAdapters.toSource((graceful ? compositeCloseable.closeAsyncGracefully() : + // We only want to empty the host list on error if we're closing non-gracefully. + compositeCloseable.closeAsync().beforeOnError(t -> + executeSequentially(() -> usedHosts = emptyList())) + ) + // we want to always empty out the host list if we complete successfully + .beforeOnComplete(() -> executeSequentially(() -> usedHosts = emptyList()))) + .subscribe(processor); + }); + return SourceAdapters.fromSource(processor); + } + private static long nextResubscribeTime( final HealthCheckConfig config, final DefaultLoadBalancer lb) { final long lowerNanos = config.healthCheckResubscribeLowerBound; @@ -254,81 +262,71 @@ public void onNext(@Nullable final Collection sequentialOnNext(events)); + } - boolean sendReadyEvent; - List> nextHosts; - for (;;) { - // TODO: we have some weirdness in the event that we fail the CAS namely that we can create a host - // that never gets used but is orphaned. It's fine so long as there is nothing to close but that - // guarantee may not always hold in the future. - @SuppressWarnings("unchecked") - List> usedHosts = usedHostsUpdater.get(DefaultLoadBalancer.this); - if (isClosedList(usedHosts)) { - // We don't update if the load balancer is closed. - return; - } - nextHosts = new ArrayList<>(usedHosts.size() + events.size()); - sendReadyEvent = false; - - // First we make a map of addresses to events so that we don't get quadratic behavior for diffing. - // Unfortunately we need to make this every iteration of the CAS loop since we remove entries - // for hosts that already exist. If this results in to many collisions and map rebuilds we should - // re-assess how we manage concurrency for list mutations. - final Map> eventMap = new HashMap<>(); - for (ServiceDiscovererEvent event : events) { - ServiceDiscovererEvent old = eventMap.put(event.address(), event); - if (old != null) { - LOGGER.debug("Multiple ServiceDiscoveryEvent's detected for address {}. Event: {}.", - event.address(), event); - } + private void sequentialOnNext(Collection> events) { + assert events != null && !events.isEmpty(); + + if (isClosed) { + // nothing to do if the load balancer is closed. + return; + } + + boolean sendReadyEvent = false; + final List> nextHosts = new ArrayList<>(usedHosts.size() + events.size()); + final List> oldUsedHosts = usedHosts; + // First we make a map of addresses to events so that we don't get quadratic behavior for diffing. + final Map> eventMap = new HashMap<>(); + for (ServiceDiscovererEvent event : events) { + ServiceDiscovererEvent old = eventMap.put(event.address(), event); + if (old != null) { + LOGGER.debug("Multiple ServiceDiscoveryEvent's detected for address {}. Event: {}.", + event.address(), event); } + } - // First thing we do is go through the existing hosts and see if we need to transfer them. These - // will be all existing hosts that either don't have a matching discovery event or are not marked - // as unavailable. If they are marked unavailable, we need to close them (which is idempotent). - for (Host host : usedHosts) { - ServiceDiscovererEvent event = eventMap.remove(host.address()); - if (event == null) { - // Host doesn't have a SD update so just copy it over. + // First thing we do is go through the existing hosts and see if we need to transfer them. These + // will be all existing hosts that either don't have a matching discovery event or are not marked + // as unavailable. If they are marked unavailable, we need to close them. + for (Host host : oldUsedHosts) { + ServiceDiscovererEvent event = eventMap.remove(host.address()); + if (event == null) { + // Host doesn't have a SD update so just copy it over. + nextHosts.add(host); + } else if (AVAILABLE.equals(event.status())) { + // We only send the ready event if the previous host list was empty. + sendReadyEvent = oldUsedHosts.isEmpty(); + // If the host is already in CLOSED state, we should discard it and create a new entry. + // For duplicate ACTIVE events the marking succeeds, so we will not add a new entry. + if (host.markActiveIfNotClosed()) { nextHosts.add(host); - } else if (AVAILABLE.equals(event.status())) { - // We only send the ready event if the previous host list was empty. - sendReadyEvent = usedHosts.isEmpty(); - // If the host is already in CLOSED state, we should discard it and create a new entry. - // For duplicate ACTIVE events or for repeated activation due to failed CAS - // of replacing the usedHosts array the marking succeeds so we will not add a new entry. - if (host.markActiveIfNotClosed()) { - nextHosts.add(host); - } else { - nextHosts.add(createHost(event.address())); - } - } else if (EXPIRED.equals(event.status())) { - if (!host.markExpired()) { - nextHosts.add(host); - } - } else if (UNAVAILABLE.equals(event.status())) { - host.markClosed(); } else { - LOGGER.warn("{}: Unsupported Status in event:" + - " {} (mapped to {}). Leaving usedHosts unchanged: {}", - DefaultLoadBalancer.this, event, event.status(), nextHosts); - nextHosts.add(host); - } - } - // Now process events that didn't have an existing host. The only ones that we actually care - // about are the AVAILABLE events which result in a new host. - for (ServiceDiscovererEvent event : eventMap.values()) { - if (AVAILABLE.equals(event.status())) { - sendReadyEvent = true; nextHosts.add(createHost(event.address())); } + } else if (EXPIRED.equals(event.status())) { + if (!host.markExpired()) { + nextHosts.add(host); + } + } else if (UNAVAILABLE.equals(event.status())) { + host.markClosed(); + } else { + LOGGER.warn("{}: Unsupported Status in event:" + + " {} (mapped to {}). Leaving usedHosts unchanged: {}", + DefaultLoadBalancer.this, event, event.status(), nextHosts); + nextHosts.add(host); } - // We've now built the new list so now we need to CAS it before we can move on. This should only be - // racing with closing hosts and closing the whole LB so it shouldn't be common to lose the race. - if (usedHostsUpdater.compareAndSet(DefaultLoadBalancer.this, usedHosts, nextHosts)) { - break; + } + // Now process events that didn't have an existing host. The only ones that we actually care + // about are the AVAILABLE events which result in a new host. + for (ServiceDiscovererEvent event : eventMap.values()) { + if (AVAILABLE.equals(event.status())) { + sendReadyEvent = true; + nextHosts.add(createHost(event.address())); } } + // We've built the new list so now set it for consumption and then send our events. + usedHosts = nextHosts; LOGGER.debug("{}: now using addresses (size={}): {}.", DefaultLoadBalancer.this, nextHosts.size(), nextHosts); @@ -368,13 +366,20 @@ private Host createHost(ResolvedAddress addr) { Host host = new DefaultHost<>(DefaultLoadBalancer.this.toString(), addr, connectionFactory, linearSearchSpace, healthCheckConfig); host.onClose().afterFinally(() -> - usedHostsUpdater.updateAndGet(DefaultLoadBalancer.this, previousHosts -> { - @SuppressWarnings("unchecked") - List> previousHostsTyped = - (List>) previousHosts; - return listWithHostRemoved(previousHostsTyped, current -> current == host); - } - )).subscribe(); + executeSequentially(() -> { + final List> currentHosts = usedHosts; + if (currentHosts.isEmpty()) { + // Can't remove an entry from an empty list. + return; + } + final List> nextHosts = listWithHostRemoved( + currentHosts, current -> current == host); + usedHosts = nextHosts; + if (nextHosts.isEmpty()) { + // We transitioned from non-empty to empty. That means we're not ready. + eventStreamProcessor.onNext(LOAD_BALANCER_NOT_READY_EVENT); + } + })).subscribe(); return host; } @@ -446,7 +451,7 @@ private Single selectConnection0(final Predicate selector, @Nullable final // It's possible that we're racing with updates from the `onNext` method but since it's intrinsically // racy it's fine to do these 'are there any hosts at all' checks here using the total host set. if (currentHosts.isEmpty()) { - return isClosedList(currentHosts) ? failedLBClosed(targetResource) : + return isClosed ? failedLBClosed(targetResource) : // This is the case when SD has emitted some items but none of the hosts are available. failed(Exceptions.StacklessNoAvailableHostException.newInstance( "No hosts are available to connect for " + targetResource + ".", @@ -504,8 +509,8 @@ public List>> usedAddresses() { return usedHosts.stream().map(host -> ((DefaultHost) host).asEntry()).collect(toList()); } - private static boolean isClosedList(List list) { - return list.getClass().equals(ClosedList.class); + private void executeSequentially(Runnable runnable) { + sequentialExecutionQueue.onNext(runnable); } private String makeDescription(String id, String targetResource) { @@ -515,161 +520,11 @@ private String makeDescription(String id, String targetResource) { '}'; } - private static final class ClosedList implements List { - private final List delegate; - - private ClosedList(final List delegate) { - this.delegate = requireNonNull(delegate); - } - - @Override - public int size() { - return delegate.size(); - } - - @Override - public boolean isEmpty() { - return delegate.isEmpty(); - } - - @Override - public boolean contains(final Object o) { - return delegate.contains(o); - } - - @Override - public Iterator iterator() { - return delegate.iterator(); - } - - @Override - public void forEach(final Consumer action) { - delegate.forEach(action); - } - - @Override - public Object[] toArray() { - return delegate.toArray(); - } - - @Override - public T1[] toArray(final T1[] a) { - return delegate.toArray(a); - } - - @Override - public boolean add(final T t) { - return delegate.add(t); - } - - @Override - public boolean remove(final Object o) { - return delegate.remove(o); - } - - @Override - public boolean containsAll(final Collection c) { - return delegate.containsAll(c); - } - - @Override - public boolean addAll(final Collection c) { - return delegate.addAll(c); - } - - @Override - public boolean addAll(final int index, final Collection c) { - return delegate.addAll(c); - } - - @Override - public boolean removeAll(final Collection c) { - return delegate.removeAll(c); - } - - @Override - public boolean removeIf(final Predicate filter) { - return delegate.removeIf(filter); - } - - @Override - public boolean retainAll(final Collection c) { - return delegate.retainAll(c); - } - - @Override - public void replaceAll(final UnaryOperator operator) { - delegate.replaceAll(operator); - } - - @Override - public void sort(final Comparator c) { - delegate.sort(c); - } - - @Override - public void clear() { - delegate.clear(); - } - - @Override - public T get(final int index) { - return delegate.get(index); - } - - @Override - public T set(final int index, final T element) { - return delegate.set(index, element); - } - - @Override - public void add(final int index, final T element) { - delegate.add(index, element); - } - - @Override - public T remove(final int index) { - return delegate.remove(index); - } - - @Override - public int indexOf(final Object o) { - return delegate.indexOf(o); - } - - @Override - public int lastIndexOf(final Object o) { - return delegate.lastIndexOf(o); - } - - @Override - public ListIterator listIterator() { - return delegate.listIterator(); - } - - @Override - public ListIterator listIterator(final int index) { - return delegate.listIterator(index); - } - - @Override - public List subList(final int fromIndex, final int toIndex) { - return new ClosedList<>(delegate.subList(fromIndex, toIndex)); - } - - @Override - public Spliterator spliterator() { - return delegate.spliterator(); - } - - @Override - public Stream stream() { - return delegate.stream(); - } - - @Override - public Stream parallelStream() { - return delegate.parallelStream(); + private static void safeRun(Runnable runnable) { + try { + runnable.run(); + } catch (Exception ex) { + LOGGER.error("Exception caught in sequential execution", ex); } } }