From f0f873099fae9e0ee0c22fde8448b5887f7fea65 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 30 Jan 2024 23:34:12 +0100 Subject: [PATCH] limit the number of retries of the busy handler (#963) * limit the amount of retries for the lock stealer * use mutex for `is_stolen` * unconditionally update state after execute * shorten watch channel borrow * fmt --- libsql-server/src/connection/libsql.rs | 157 +++++++++++++++---------- 1 file changed, 94 insertions(+), 63 deletions(-) diff --git a/libsql-server/src/connection/libsql.rs b/libsql-server/src/connection/libsql.rs index 382c34db1c..7eae25eafb 100644 --- a/libsql-server/src/connection/libsql.rs +++ b/libsql-server/src/connection/libsql.rs @@ -1,6 +1,5 @@ use std::ffi::{c_int, c_void}; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use libsql_sys::wal::wrapper::{WalWrapper, WrapWal, WrappedWal}; @@ -381,7 +380,7 @@ struct TxnSlot { /// Time at which the transaction can be stolen created_at: tokio::time::Instant, /// The transaction lock was stolen - is_stolen: AtomicBool, + is_stolen: parking_lot::Mutex, txn_timeout: Duration, } @@ -407,7 +406,7 @@ impl TxnSlot { impl std::fmt::Debug for TxnSlot { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let stolen = self.is_stolen.load(Ordering::Relaxed); + let stolen = self.is_stolen.lock(); let time_left = self.expires_at().duration_since(Instant::now()); write!( f, @@ -451,7 +450,9 @@ impl Default for TxnState { /// - If the handler waits until the txn timeout and isn't notified of the termination of the txn, it will attempt to steal the lock. /// This is done by calling rollback on the slot's txn, and marking the slot as stolen. /// - When a connection notices that it's slot has been stolen, it returns a timedout error to the next request. -unsafe extern "C" fn busy_handler(state: *mut c_void, _retries: c_int) -> c_int { +const MAX_BUSY_RETRIES: c_int = 512; + +unsafe extern "C" fn busy_handler(state: *mut c_void, retries: c_int) -> c_int { let state = &*(state as *mut TxnState); let lock = state.slot.read(); // we take a reference to the slot we will attempt to steal. this is to make sure that we @@ -459,7 +460,14 @@ unsafe extern "C" fn busy_handler(state: *mut c_void, _retries: c_int) - let slot = match &*lock { Some(slot) => slot.clone(), // fast path: there is no slot, try to acquire the lock again - None => return 1, + None if retries < 512 => { + std::thread::sleep(std::time::Duration::from_millis(10)); + return 1; + } + None => { + tracing::info!("Failed to steal connection lock after {MAX_BUSY_RETRIES} retries."); + return 0; + } }; tokio::runtime::Handle::current().block_on(async move { @@ -484,7 +492,17 @@ unsafe extern "C" fn busy_handler(state: *mut c_void, _retries: c_int) - if Arc::ptr_eq(s, &slot) { // We check that slot wasn't already stolen, and that their is still a slot. // The ordering is relaxed because the atomic is only set under the slot lock. - if slot.is_stolen.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed).is_ok() { + let can_steal = { + let mut can_steal = false; + let mut is_stolen = slot.is_stolen.lock(); + if !*is_stolen { + can_steal = true; + *is_stolen = true; + } + can_steal + }; + + if can_steal { // The connection holding the current txn will set itself as stolen when it // detects a timeout, so if we arrive to this point, then there is // necessarily a slot, and this slot has to be the one we attempted to @@ -505,7 +523,6 @@ unsafe extern "C" fn busy_handler(state: *mut c_void, _retries: c_int) - 1 } } - }) } @@ -584,13 +601,12 @@ impl Connection { pgm: Program, mut builder: B, ) -> Result { - use rusqlite::TransactionState as Tx; - - let (state, txn_timeout) = { - let lock = this.lock(); - let txn_timeout = lock.config_store.get().txn_timeout.unwrap_or(TXN_TIMEOUT); - (lock.state.clone(), txn_timeout) - }; + let txn_timeout = this + .lock() + .config_store + .get() + .txn_timeout + .unwrap_or(TXN_TIMEOUT); let mut results = Vec::with_capacity(pgm.steps.len()); builder.init(&this.lock().builder_config)?; @@ -603,12 +619,17 @@ impl Connection { for step in pgm.steps() { let mut lock = this.lock(); - if let Some(slot) = &lock.slot { - if slot.is_stolen.load(Ordering::Relaxed) || Instant::now() > slot.expires_at() { - // we mark ourselves as stolen to notify any waiting lock thief. - slot.is_stolen.store(true, Ordering::Relaxed); - lock.rollback(); - has_timeout = true; + if !has_timeout { + if let Some(slot) = &lock.slot { + let mut is_stolen = slot.is_stolen.lock(); + if *is_stolen || Instant::now() > slot.expires_at() { + // we mark ourselves as stolen to notify any waiting lock thief. + if !*is_stolen { + lock.rollback(); + } + *is_stolen = true; + has_timeout = true; + } } } @@ -621,45 +642,10 @@ impl Connection { continue; } - let res = lock.execute_step(step, &results, &mut builder)?; - - let new_state = lock.conn.transaction_state(Some(DatabaseName::Main))?; - match (previous_state, new_state) { - // lock was upgraded, claim the slot - (Tx::None | Tx::Read, Tx::Write) => { - let slot = Arc::new(TxnSlot { - conn: this.clone(), - created_at: Instant::now(), - is_stolen: AtomicBool::new(false), - txn_timeout, - }); - - lock.slot.replace(slot.clone()); - state.slot.write().replace(slot); - } - // lock was downgraded, notify a waiter - (Tx::Write, Tx::None | Tx::Read) => { - let old_slot = lock - .slot - .take() - .expect("there should be a slot right after downgrading a txn"); - let mut maybe_state_slot = state.slot.write(); - // We need to make sure that the state slot is our slot before removing it. - if let Some(ref state_slot) = *maybe_state_slot { - if Arc::ptr_eq(state_slot, &old_slot) { - maybe_state_slot.take(); - } - } - - drop(maybe_state_slot); - - state.notify.notify_waiters(); - } - // nothing to do - (_, _) => (), - } - - previous_state = new_state; + let ret = lock.execute_step(step, &results, &mut builder); + // /!\ always make sure that the state is updated before returning + previous_state = lock.update_state(this.clone(), previous_state, txn_timeout)?; + let res = ret?; results.push(res); } @@ -667,15 +653,60 @@ impl Connection { { let mut lock = this.lock(); let is_autocommit = lock.conn.is_autocommit(); - builder.finish( - *(lock.current_frame_no_receiver.borrow_and_update()), - is_autocommit, - )?; + let current_fno = *lock.current_frame_no_receiver.borrow_and_update(); + builder.finish(current_fno, is_autocommit)?; } Ok(builder) } + fn update_state( + &mut self, + arc_this: Arc>, + previous_state: TransactionState, + txn_timeout: Duration, + ) -> Result { + use rusqlite::TransactionState as Tx; + + let new_state = self.conn.transaction_state(Some(DatabaseName::Main))?; + match (previous_state, new_state) { + // lock was upgraded, claim the slot + (Tx::None | Tx::Read, Tx::Write) => { + let slot = Arc::new(TxnSlot { + conn: arc_this, + created_at: Instant::now(), + is_stolen: false.into(), + txn_timeout, + }); + + self.slot.replace(slot.clone()); + self.state.slot.write().replace(slot); + } + // lock was downgraded, notify a waiter + (Tx::Write, Tx::None | Tx::Read) => { + let old_slot = self + .slot + .take() + .expect("there should be a slot right after downgrading a txn"); + let mut maybe_state_slot = self.state.slot.write(); + // We need to make sure that the state slot is our slot before removing it. + if let Some(ref state_slot) = *maybe_state_slot { + if Arc::ptr_eq(state_slot, &old_slot) { + maybe_state_slot.take(); + } + } + + drop(maybe_state_slot); + + self.state.notify.notify_waiters(); + } + // nothing to do + (_, _) => (), + } + + Ok(new_state) + } + fn execute_step( &mut self, step: &Step,