Skip to content

Commit

Permalink
Avoid more race condition weirdness
Browse files Browse the repository at this point in the history
  • Loading branch information
lukebemish committed Dec 10, 2024
1 parent 07bbbd2 commit 341c907
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions src/main/java/dev/lukebemish/immaculate/ForkFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class ForkFormatter implements FileFormatter {
Expand Down Expand Up @@ -123,7 +124,7 @@ private ResultListener(Socket socket) throws IOException {
}

public synchronized CompletableFuture<String> submit(int id, String fileName, String text) throws IOException {
if (closed) {
if (closed.get()) {
throw new IOException("Listener is closed");
}
var out = results.computeIfAbsent(id, i -> new CompletableFuture<>());
Expand All @@ -138,49 +139,41 @@ public synchronized CompletableFuture<String> submit(int id, String fileName, St
return out;
}

private volatile boolean closed = false;
private final AtomicBoolean closed = new AtomicBoolean();

public void shutdown() throws IOException {
shutdown(new IOException("Execution was interrupted"));
}

private void shutdown(Throwable t) throws IOException {
if (!this.closed.compareAndSet(false, true)) return;

private void beginClose(Throwable e) throws IOException {
if (this.closed) return;
this.closed = true;
for (var future : results.values()) {
future.completeExceptionally(e);
future.completeExceptionally(t);
}
results.clear();

socket.shutdownInput();
}

private void finishClose() throws IOException {
if (!socket.isClosed()) {
output.writeInt(-1);
output.flush();
socket.close();
}
}

public void shutdown() throws IOException {
shutdown(new IOException("Execution was interrupted"));
}

private synchronized void shutdown(Throwable t) throws IOException {
this.beginClose(t);
if (Thread.currentThread() != this) {
try {
this.join();
} catch (InterruptedException e) {
// continue, it's fine
}
}
this.finishClose();

output.writeInt(-1);
output.flush();
socket.close();
}

@Override
public void run() {
try {
if (!closed) {
if (!closed.get()) {
var input = new DataInputStream(socket.getInputStream());
while (!closed) {
while (!closed.get()) {
int id = input.readInt();
boolean success = input.readBoolean();
if (success) {
Expand Down

0 comments on commit 341c907

Please sign in to comment.