Skip to content

Commit

Permalink
IoTConsensusV2: Fix some consensus group missing due to recover faile…
Browse files Browse the repository at this point in the history
…d or blocked (#14613)

* fix recover failed

* fix review

* fix review
  • Loading branch information
Pengzna authored Jan 10, 2025
1 parent c8d7c5d commit 937df76
Showing 1 changed file with 42 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -123,7 +126,7 @@ public PipeConsensus(ConsensusConfig config, IStateMachine.Registry registry) {

@Override
public synchronized void start() throws IOException {
initAndRecover();
Future<Void> recoverFuture = initAndRecover();

rpcService.initSyncedServiceImpl(new PipeConsensusRPCServiceProcessor(this, config.getPipe()));
try {
Expand All @@ -132,19 +135,31 @@ public synchronized void start() throws IOException {
throw new IOException(e);
}

try {
recoverFuture.get();
} catch (CancellationException ce) {
LOGGER.info("IoTV2 Recover Task is cancelled", ce);
} catch (ExecutionException ee) {
LOGGER.error("Exception while waiting for recover future completion", ee);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOGGER.warn("IoTV2 Recover Task is interrupted", ie);
}
// only when we recover all consensus group can we launch async backend checker thread
consensusPipeGuardian.start(
CONSENSUS_PIPE_GUARDIAN_TASK_ID,
this::checkAllConsensusPipe,
config.getPipe().getConsensusPipeGuardJobIntervalInSeconds());
}

private void initAndRecover() throws IOException {
private Future<Void> initAndRecover() throws IOException {
if (!storageDir.exists()) {
// init
if (!storageDir.mkdirs()) {
LOGGER.warn("Unable to create consensus dir at {}", storageDir);
throw new IOException(String.format("Unable to create consensus dir at %s", storageDir));
}
return CompletableFuture.completedFuture(null);
} else {
// asynchronously recover, retry logic is implemented at PipeConsensusImpl
CompletableFuture<Void> future =
Expand All @@ -155,27 +170,39 @@ private void initAndRecover() throws IOException {
for (Path path : stream) {
ConsensusGroupId consensusGroupId =
parsePeerFileName(path.getFileName().toString());
PipeConsensusServerImpl consensus =
new PipeConsensusServerImpl(
new Peer(consensusGroupId, thisNodeId, thisNode),
registry.apply(consensusGroupId),
path.toString(),
new ArrayList<>(),
config,
consensusPipeManager,
syncClientManager);
stateMachineMap.put(consensusGroupId, consensus);
checkPeerListAndStartIfEligible(consensusGroupId, consensus);
try {
PipeConsensusServerImpl consensus =
new PipeConsensusServerImpl(
new Peer(consensusGroupId, thisNodeId, thisNode),
registry.apply(consensusGroupId),
path.toString(),
new ArrayList<>(),
config,
consensusPipeManager,
syncClientManager);
stateMachineMap.put(consensusGroupId, consensus);
checkPeerListAndStartIfEligible(consensusGroupId, consensus);
} catch (Exception e) {
LOGGER.error(
"Failed to recover consensus from {} for {}, ignore it and continue recover other group, async backend checker thread will automatically deregister related pipe side effects for this failed consensus group.",
storageDir,
consensusGroupId,
e);
}
}
} catch (Exception e) {
LOGGER.error("Failed to recover consensus from {}", storageDir, e);
} catch (IOException e) {
LOGGER.error(
"Failed to recover consensus from {} because read dir failed",
storageDir,
e);
}
})
.exceptionally(
e -> {
LOGGER.error("Failed to recover consensus from {}", storageDir, e);
return null;
});
return future;
}
}

Expand Down

0 comments on commit 937df76

Please sign in to comment.