Skip to content

Commit

Permalink
chore: remove async mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
arriqaaq committed Nov 23, 2024
1 parent fa052f5 commit 221c91b
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/storage/kv/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl StoreInner {

// Lock the oracle to prevent operations during compaction
let oracle = self.core.oracle.clone();
let oracle_lock = oracle.write_lock.lock().await;
let oracle_lock = oracle.lock().await; // Async wait for lock

// Rotate the commit log and get the new segment ID
let mut clog = self.core.clog.as_ref().unwrap().write();
Expand Down
33 changes: 30 additions & 3 deletions src/storage/kv/oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use ahash::{HashMap, HashMapExt, HashSet};
use async_channel::{bounded, Receiver, Sender};
use bytes::Bytes;
use parking_lot::{Mutex, RwLock};
use tokio::sync::Mutex as AsyncMutex;
use vart::VariableSizeKey;

use crate::storage::kv::{
Expand All @@ -27,7 +26,8 @@ use crate::storage::kv::{
/// It supports two isolation levels: SnapshotIsolation and SerializableSnapshotIsolation.
pub(crate) struct Oracle {
/// Write lock to ensure that only one transaction can commit at a time.
pub(crate) write_lock: AsyncMutex<()>,
writer: Sender<()>,
receiver: Option<Receiver<()>>,
/// Isolation level of the transactions.
isolation: IsolationLevel,
}
Expand All @@ -45,8 +45,12 @@ impl Oracle {
}
};

let (tx, rx) = bounded(1);
tx.try_send(()).unwrap();

Self {
write_lock: AsyncMutex::new(()),
writer: tx,
receiver: Some(rx),
isolation,
}
}
Expand Down Expand Up @@ -91,6 +95,29 @@ impl Oracle {
}
}
}

// Takes lock asynchronously
pub async fn lock(&self) -> CommitGuard {
if let Some(rx) = &self.receiver {
rx.recv().await.unwrap();
}
CommitGuard {
tx: self.writer.clone(),
}
}
}

pub(crate) struct CommitGuard {
tx: Sender<()>,
}

impl Drop for CommitGuard {
fn drop(&mut self) {
// Decided to use send instead of try_send because:
// 1. drop is called in async context already
// 2. we must ensure tx is returned for next transaction
let _ = futures::executor::block_on(self.tx.send(()));
}
}

/// Enum representing the isolation level of a transaction.
Expand Down
2 changes: 1 addition & 1 deletion src/storage/kv/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ impl Transaction {

// Lock the oracle to serialize commits to the transaction log.
let oracle = self.core.oracle.clone();
let write_ch_lock = oracle.write_lock.lock().await;
let write_ch_lock = oracle.lock().await; // Async wait for lock

// Prepare for the commit by getting a transaction ID.
let (tx_id, commit_ts) = self.prepare_commit()?;
Expand Down

0 comments on commit 221c91b

Please sign in to comment.