Skip to content

Commit

Permalink
More thread safety stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-anderson committed Dec 8, 2023
1 parent 6ed40e2 commit cc412c6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final Single<C> selectConnection(@Nonnull Predicate<C> selector, @Nullabl

@Override
public final boolean isHealthy() {
// TODO: in the future we may want to make this more of a "are at least X hosts available" question
// so that we can compose a group of selectors into a priority set.
return !allUnhealthy(hosts);
}

Expand All @@ -67,7 +69,6 @@ private Single<C> noHostsFailure() {
this.getClass(), "selectConnection(...)"));
}

// This will be faster than `allHealthy` in the typical case since we expect hosts to be healthy most of the time.
private static <ResolvedAddress, C extends LoadBalancedConnection> boolean allUnhealthy(
final List<Host<ResolvedAddress, C>> usedHosts) {
boolean allUnhealthy = !usedHosts.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,17 +517,26 @@ public Completable closeAsyncGracefully() {

@Override
public List<Entry<ResolvedAddress, List<C>>> usedAddresses() {
// This method is just for testing so we can use some awaiting to get the results in a thread safe way.
// If we're already in the executor we can't submit a task and wait for it without deadlock but
// the access is thread safe anyway so just go for it.
if (sequentialExecutor.isCurrentThreadDraining()) {
return sequentialUsedAddresses();
}
CompletableFuture<List<Entry<ResolvedAddress, List<C>>>> future = new CompletableFuture<>();
sequentialExecutor.execute(() -> future.complete(
usedHosts.stream().map(host -> ((DefaultHost<ResolvedAddress, C>) host).asEntry()).collect(toList())));
sequentialExecutor.execute(() -> future.complete(sequentialUsedAddresses()));
try {
// This method is just for testing, so it's fine to do some awaiting.
return future.get(5, TimeUnit.SECONDS);
} catch (Exception ex) {
throw new AssertionError("Failed to get results", ex);
}
}

// must be called from within the sequential executor.
private List<Entry<ResolvedAddress, List<C>>> sequentialUsedAddresses() {
return usedHosts.stream().map(host -> ((DefaultHost<ResolvedAddress, C>) host).asEntry()).collect(toList());
}

private String makeDescription(String id, String targetResource) {
return getClass().getSimpleName() + "{" +
"id=" + id + '@' + toHexString(identityHashCode(this)) +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,21 @@ public interface ExceptionHandler {

private final ExceptionHandler exceptionHandler;
private final AtomicReference<Cell> tail = new AtomicReference<>();
@Nullable
private Thread currentDrainingThread;

SequentialExecutor(final ExceptionHandler exceptionHandler) {
this.exceptionHandler = requireNonNull(exceptionHandler, "exceptionHandler");
}

public boolean isCurrentThreadDraining() {
// Even though `currentDrainingThread` is not a volatile field this is thread safe:
// the only way that `currentDrainingThread` will ever equal this thread, even if
// we get a stale value, is if _this_ thread set it.
// The null check is just an optimization: it's really the second check that matters.
return currentDrainingThread != null && currentDrainingThread == Thread.currentThread();
}

@Override
public void execute(Runnable command) {
// Make sure we propagate any sync contexts.
Expand All @@ -72,6 +82,8 @@ public void execute(Runnable command) {
}

private void drain(Cell next) {
final Thread thisThread = Thread.currentThread();
currentDrainingThread = thisThread;
for (;;) {
assert next != null;
try {
Expand All @@ -85,10 +97,14 @@ private void drain(Cell next) {
if (n == null) {
// There doesn't seem to be another element linked. See if it was the tail and if so terminate draining.
// Note that a successful CAS established a happens-before relationship with future draining threads.
// Note that we also have to clear the draining thread before the CAS to prevent races.
currentDrainingThread = null;
if (tail.compareAndSet(next, null)) {
break;
}
// next isn't the tail but the link hasn't resolved: we must poll until it does.
// Next isn't the tail but the link hasn't resolved: we must re-set the draining thread and poll until
// it does resolve then we can keep on trucking.
currentDrainingThread = thisThread;
while ((n = next.next) == null) {
// Still not resolved: yield and then try again.
Thread.yield();
Expand Down

0 comments on commit cc412c6

Please sign in to comment.