-
Notifications
You must be signed in to change notification settings - Fork 184
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
loadbalancer: Use a sequential execution concurrency model in DefaultLoadBalancer #2768
loadbalancer: Use a sequential execution concurrency model in DefaultLoadBalancer #2768
Conversation
…LoadBalancer Motivation: We use an atomic field and CAS operations to manage concurrency in the DefaultLoadBalancer. Because the CAS operation has to succeed before changes have taken affect, it's not easy to make events coordinate with that model. A few examples: - it will be awkward to add an event observer to the load balancer since we may end up re-doing server-set updates if we fail a CAS. - there is also a question of ordering of observer events since the observer interactions may get out of order wrt the state mutations that induced them. - we can't currently send an LOAD_BALANCER_NOT_READY_EVENT when the last host expires because we would risk the event racing with SD updates that triggered related events. CAS failures may also be relatively common due to re-entrance: SD update events can cause hosts to be closed that that may synchronously cause the hosts `.afterFinally` listeners to remove themselves from the host list, resulting in a followup CAS failure. Modifications: - use an unbounded processor as a queue for Runnables that are executed sequentially and funnel sensitive operations through that queue. - Send the LOAD_BLANACER_NOT_READY_EVENT when a host expires and results in an empty host set. Result: What is the result of this change?
963f926
to
e4c8e38
Compare
servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/SequentialExecutor.java
Show resolved
Hide resolved
@@ -179,6 +154,34 @@ private void subscribeToEvents(boolean resubscribe) { | |||
} | |||
} | |||
|
|||
// This method is called eagerly, meaning the completable will be immediately subscribed to, | |||
// so we don't need to do any Completable.defer business. | |||
private Completable doClose(final boolean graceful) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is ugly and looks a lot like the io.servicetalk.concurrent.api.Executor.submit
method. I didn't go with that because all submissions to that type are cancellable (but that's intrinsically racy so I suppose we could just ignore cancels) and there are a bunch of other methods on there I don't think we want to support. However, maybe it's fine: we might have a time source in the healthCheck already.
Opinions appreciated.
servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/SequentialExecutor.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a quick peek ... iiuc this won't impact connection selection on the hot path and is targeted at discovery/control events (closure, ..)?
servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/SequentialExecutor.java
Outdated
Show resolved
Hide resolved
Yes, that's right: its entirely for sequencing control events. The hot path remains the same volatile read pattern that existed before. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general approach lgtm, few comments/suggestions
servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/SequentialExecutor.java
Outdated
Show resolved
Hide resolved
try { | ||
runnable.run(); | ||
} catch (Exception ex) { | ||
logger.error("Exception caught in sequential execution", ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we always want to swallow/log exceptions, or should we save/propagate them after we are done draining? It is simpler to swallow/log but also may limit visibility and error handling outside. that way we also don't need to take a logger as a constructor arg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some ambiguity in the propagation of of exceptions since it may not be the thread that requested the work that runs it. We could add an exception handler to make it configurable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added an exception handler as a constructor arg. It is handy for performing resource disposal if we get an exception.
CompletableSource.Processor processor = Processors.newCompletableProcessor(); | ||
sequentialExecutor.execute(() -> { | ||
if (!isClosed) { | ||
discoveryCancellable.cancel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets put other code in try/finally since this may call externally and throw.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. I think I've done what you've suggested but it's worth a double check.
I also feel this is messier than it needs to be, but otoh the io.st.concurrent.api.Executor is a bit thicker of an API than I think we want. Any ideas appreciated.
servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java
Show resolved
Hide resolved
servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/DefaultLoadBalancer.java
Show resolved
Hide resolved
servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/SequentialExecutorTest.java
Show resolved
Hide resolved
servicetalk-loadbalancer/src/test/java/io/servicetalk/loadbalancer/SequentialExecutorTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No comments here, i like the new approach, my only consideration would be whether we should extract the "sequential" execution to an operator, it feels awkward to have custom execution model localized, it seems like a feature of itself. see. https://reactivex.io/documentation/operators/serialize.html (we would still have cancellation/close to race with)
servicetalk-loadbalancer/src/main/java/io/servicetalk/loadbalancer/SequentialExecutor.java
Show resolved
Hide resolved
This has a few reviews and it seems like the pattern, which is the most important part, is desirable so I'm going to merge this to unblock other work and we can iterate on it more in the future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach LGTM, have a few questions and also opened a follow-up with minor corrections: #2775
this.sequentialExecutor = new SequentialExecutor((uncaughtException) -> { | ||
LOGGER.error("{}: Uncaught exception in SequentialExecutor triggered closing of the load balancer.", | ||
this, uncaughtException); | ||
closeAsync().subscribe(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems dangerous that one slipped exception can close the entire LB, stopping all traffic. Understand that this way we will be notified immediately, but taking into account that a fix might take time to deliver, will it be safer to keep the traffic flowing and just logging an exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose it depends on the reason for failure. Host-set updates may have been corrupted and because we use a differential stream it may never recover. This is catastrophizing a bit, and this problem exists to a small extent even without update corruption, but failing on a host removal could mean that we may forever try sending traffic to a destination that may not be who we intended.
That said, I'm happy to just log for now.
try { | ||
if (!isClosed) { | ||
discoveryCancellable.cancel(); | ||
eventStreamProcessor.onComplete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why these 2 lines protected by if (!isClosed)
?
toAsyncCloseable
has a CAS internally to make sure it's executed only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe in toAsyncClosable
the ClosableResource
can be called twice: once for gracefully and then again for a hard close.
private volatile List<Host<ResolvedAddress, C>> usedHosts = emptyList(); | ||
private volatile boolean isClosed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is a motivation to introduce additional volatile
boolean instead of using ClosedList
like it was before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To delete code and be more explicit. In a follow up here I make isClosed
non-volatile.
event.address(), event); | ||
} | ||
private void sequentialOnNext(Collection<? extends ServiceDiscovererEvent<ResolvedAddress>> events) { | ||
assert events != null && !events.isEmpty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically, it's possible to receive an empty collection. Not necessary to fail it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The filtering for null or empty updates still happens on onNext
. This was to assert that connection, but maybe it's unnecessary.
import static org.hamcrest.Matchers.contains; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertNotNull; | ||
import static org.junit.jupiter.api.Assertions.assertNull; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please prefer using hamcrest for new tests, it generates much better exception messages compare to jUnit asserts
executor = new SequentialExecutor(exceptionHandler); | ||
final RuntimeException ex = new RuntimeException("expected"); | ||
executor.execute(() -> { | ||
throw ex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a special DELIBERATE_EXCEPTION
type for cases like this
Motivation:
We use an atomic field and CAS operations to manage concurrency in the DefaultLoadBalancer. Because the CAS operation has to succeed before changes have taken affect, it's not easy to make events coordinate with that model. A few examples:
CAS failures may also be relatively common due to re-entrance: SD update events can cause hosts to be closed that that may synchronously cause the hosts
.afterFinally
listeners to remove themselves from the host list, resulting in a followup CAS failure.Modifications:
Result: