Skip to content

Commit

Permalink
add safety try-finally to ensure executor shutdown occurs
Browse files Browse the repository at this point in the history
  • Loading branch information
gemmellr committed Sep 27, 2024
1 parent f5cd939 commit f2d1113
Showing 1 changed file with 33 additions and 31 deletions.
64 changes: 33 additions & 31 deletions qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -380,43 +380,45 @@ protected boolean shutdown(Throwable cause) throws JMSException {
// Ensure that no asynchronous completion sends remain blocked after close but wait
// using the close timeout for the asynchronous sends to complete normally.
final ExecutorService completionExecutor = getCompletionExecutor();

synchronized (sessionInfo) {
// Producers are now quiesced and we can await completion of asynchronous sends
// that are still pending a result or timeout once we've done a quick check to
// see if any are actually pending or have completed already.
asyncSendsCompletion = connection.newProviderFuture();

if (asyncSendsCompletion != null) {
completionExecutor.execute(() -> {
if (asyncSendQueue.isEmpty()) {
asyncSendsCompletion.onSuccess();
}
});
try {
synchronized (sessionInfo) {
// Producers are now quiesced and we can await completion of asynchronous sends
// that are still pending a result or timeout once we've done a quick check to
// see if any are actually pending or have completed already.
asyncSendsCompletion = connection.newProviderFuture();

if (asyncSendsCompletion != null) {
completionExecutor.execute(() -> {
if (asyncSendQueue.isEmpty()) {
asyncSendsCompletion.onSuccess();
}
});
}
}
}

try {
if (asyncSendsCompletion != null) {
asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
try {
if (asyncSendsCompletion != null) {
asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
}
} catch (Exception ex) {
LOG.trace("Exception during wait for asynchronous sends to complete", ex);
} finally {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}

// as a last task we want to fail any stragglers in the asynchronous send queue and then
// shutdown the queue to prevent any more submissions while the cleanup goes on.
completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
}
} catch (Exception ex) {
LOG.trace("Exception during wait for asynchronous sends to complete", ex);
} finally {
if (cause == null) {
cause = new JMSException("Session closed remotely before message transfer result was notified");
}

// as a last task we want to fail any stragglers in the asynchronous send queue and then
// shutdown the queue to prevent any more submissions while the cleanup goes on.
completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
completionExecutor.shutdown();
}

try {
completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
try {
completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.trace("Session close awaiting send completions was interrupted");
}
}

if (shutdownError != null) {
Expand Down

0 comments on commit f2d1113

Please sign in to comment.