-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] Lake compaction scheduler optimize in fe restart scenarios #54881
base: main
Are you sure you want to change the base?
[Enhancement] Lake compaction scheduler optimize in fe restart scenarios #54881
Conversation
readUnlock(); | ||
} | ||
} | ||
|
||
// Check whether there is committed txns on partitionId. | ||
public boolean hasCommittedTxnOnPartition(long tableId, long partitionId) { | ||
readLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Potential deadlock due to holding a read lock while performing long-running operations (e.g., sorting/stream operations) which might inadvertently acquire further locks.
You can modify the code like this:
public List<TransactionState> getLakeCompactionCommittedTxnList() {
List<TransactionState> committedTransactionStates;
readLock();
try {
// Collect transactions while holding the read lock
committedTransactionStates = idToRunningTransactionState.values().stream()
.filter(transactionState -> (transactionState.getTransactionStatus() ==
TransactionStatus.COMMITTED))
.filter(transactionState -> transactionState.getSourceType() ==
TransactionState.LoadJobSourceType.LAKE_COMPACTION)
.collect(Collectors.toList());
} finally {
readUnlock();
}
// Perform sorting outside of the lock to reduce risk of deadlocks
return committedTransactionStates.stream()
.sorted(Comparator.comparing(TransactionState::getCommitTime))
.collect(Collectors.toList());
}
} | ||
return runningCompactions; | ||
} | ||
|
||
public void handleLoadingFinished(PartitionIdentifier partition, long version, long versionTime, | ||
Quantiles compactionScore) { | ||
PartitionVersion currentVersion = new PartitionVersion(version, versionTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
Potential deadlock due to improper lock acquisition and release pattern.
You can modify the code like this:
protected Map<PartitionIdentifier, CompactionJob> rebuildRunningCompactionsOnRestart() {
Map<PartitionIdentifier, CompactionJob> runningCompactions = new ConcurrentHashMap<>();
List<Long> activeTxnIds =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getLakeCompactionActiveTxnIds();
for (PartitionStatistics statistics : partitionStatisticsHashMap.values()) {
PartitionIdentifier partitionIdentifier = statistics.getPartition();
long compactionTxnId = statistics.getLastCompactionTxnId();
if (!activeTxnIds.contains(compactionTxnId)) {
continue;
}
Database db = GlobalStateMgr.getCurrentState().getDb(partitionIdentifier.getDbId());
if (db == null) {
continue;
}
// Ensure that locker is only created if necessary
Locker locker = new Locker();
try {
locker.lockDatabase(db, LockType.READ);
OlapTable table = (OlapTable) db.getTable(partitionIdentifier.getTableId());
if (table == null) {
continue;
}
PhysicalPartition partition = table.getPhysicalPartition(partitionIdentifier.getPartitionId());
if (partition == null) {
continue;
}
CompactionJob job = new CompactionJob(db, table, partition, compactionTxnId,
Config.lake_compaction_allow_partial_success);
runningCompactions.put(partitionIdentifier, job);
} catch (Exception e) {
LOG.error("Rebuild running compactions on fe restart failed: {}", e.getMessage(), e);
} finally {
// Ensure that unlock is called safely without overwriting with a new locker
try {
locker.unLockDatabase(db, LockType.READ);
} catch (Exception e) {
LOG.error("Failed to unlock database: {}", e.getMessage(), e);
}
}
}
return runningCompactions;
}
In the suggested fix, we ensure that any failures during unlocking are also logged, minimizing the risk of issues being swallowed silently. Additionally, ensuring that the locker is consistently used aids in maintaining robust locking semantics.
fe/fe-core/src/main/java/com/starrocks/lake/compaction/CompactionScheduler.java
Show resolved
Hide resolved
4a717a5
to
39f4485
Compare
Signed-off-by: drake_wang <[email protected]> fix Signed-off-by: drake_wang <[email protected]>
39f4485
to
8b274cc
Compare
Quality Gate passedIssues Measures |
[Java-Extensions Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[BE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
Why I'm doing:
What I'm doing:
Fixes #54883
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: