diff --git a/src/storage/kv/compaction.rs b/src/storage/kv/compaction.rs index 5aebc28..2e8b6dc 100644 --- a/src/storage/kv/compaction.rs +++ b/src/storage/kv/compaction.rs @@ -68,7 +68,7 @@ impl StoreInner { // Lock the oracle to prevent operations during compaction let oracle = self.core.oracle.clone(); - let oracle_lock = oracle.write_lock.lock().await; + let oracle_lock = oracle.lock().await; // Async wait for lock // Rotate the commit log and get the new segment ID let mut clog = self.core.clog.as_ref().unwrap().write(); diff --git a/src/storage/kv/oracle.rs b/src/storage/kv/oracle.rs index 651cd7a..b420d0f 100644 --- a/src/storage/kv/oracle.rs +++ b/src/storage/kv/oracle.rs @@ -12,7 +12,6 @@ use ahash::{HashMap, HashMapExt, HashSet}; use async_channel::{bounded, Receiver, Sender}; use bytes::Bytes; use parking_lot::{Mutex, RwLock}; -use tokio::sync::Mutex as AsyncMutex; use vart::VariableSizeKey; use crate::storage::kv::{ @@ -27,7 +26,8 @@ use crate::storage::kv::{ /// It supports two isolation levels: SnapshotIsolation and SerializableSnapshotIsolation. pub(crate) struct Oracle { /// Write lock to ensure that only one transaction can commit at a time. - pub(crate) write_lock: AsyncMutex<()>, + writer: Sender<()>, + receiver: Option>, /// Isolation level of the transactions. isolation: IsolationLevel, } @@ -45,8 +45,12 @@ impl Oracle { } }; + let (tx, rx) = bounded(1); + tx.try_send(()).unwrap(); + Self { - write_lock: AsyncMutex::new(()), + writer: tx, + receiver: Some(rx), isolation, } } @@ -91,6 +95,29 @@ impl Oracle { } } } + + // Takes lock asynchronously + pub async fn lock(&self) -> CommitGuard { + if let Some(rx) = &self.receiver { + rx.recv().await.unwrap(); + } + CommitGuard { + tx: self.writer.clone(), + } + } +} + +pub(crate) struct CommitGuard { + tx: Sender<()>, +} + +impl Drop for CommitGuard { + fn drop(&mut self) { + // Decided to use send instead of try_send because: + // 1. drop is called in async context already + // 2. we must ensure tx is returned for next transaction + let _ = futures::executor::block_on(self.tx.send(())); + } } /// Enum representing the isolation level of a transaction. diff --git a/src/storage/kv/transaction.rs b/src/storage/kv/transaction.rs index 2348b21..0a2364a 100644 --- a/src/storage/kv/transaction.rs +++ b/src/storage/kv/transaction.rs @@ -475,7 +475,7 @@ impl Transaction { // Lock the oracle to serialize commits to the transaction log. let oracle = self.core.oracle.clone(); - let write_ch_lock = oracle.write_lock.lock().await; + let write_ch_lock = oracle.lock().await; // Async wait for lock // Prepare for the commit by getting a transaction ID. let (tx_id, commit_ts) = self.prepare_commit()?;