Skip to content

Commit

Permalink
limit the number of retries of the busy handler (#963)
Browse files Browse the repository at this point in the history
* limit the amount of retries for the lock stealer

* use mutex for `is_stolen`

* unconditionally update state after execute

* shorten watch channel borrow

* fmt
  • Loading branch information
MarinPostma authored Jan 30, 2024
1 parent 552d052 commit f0f8730
Showing 1 changed file with 94 additions and 63 deletions.
157 changes: 94 additions & 63 deletions libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::ffi::{c_int, c_void};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use libsql_sys::wal::wrapper::{WalWrapper, WrapWal, WrappedWal};
Expand Down Expand Up @@ -381,7 +380,7 @@ struct TxnSlot<T> {
/// Time at which the transaction can be stolen
created_at: tokio::time::Instant,
/// The transaction lock was stolen
is_stolen: AtomicBool,
is_stolen: parking_lot::Mutex<bool>,
txn_timeout: Duration,
}

Expand All @@ -407,7 +406,7 @@ impl<T> TxnSlot<T> {

impl<T> std::fmt::Debug for TxnSlot<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let stolen = self.is_stolen.load(Ordering::Relaxed);
let stolen = self.is_stolen.lock();
let time_left = self.expires_at().duration_since(Instant::now());
write!(
f,
Expand Down Expand Up @@ -451,15 +450,24 @@ impl<T> Default for TxnState<T> {
/// - If the handler waits until the txn timeout and isn't notified of the termination of the txn, it will attempt to steal the lock.
/// This is done by calling rollback on the slot's txn, and marking the slot as stolen.
/// - When a connection notices that it's slot has been stolen, it returns a timedout error to the next request.
unsafe extern "C" fn busy_handler<T: Wal>(state: *mut c_void, _retries: c_int) -> c_int {
const MAX_BUSY_RETRIES: c_int = 512;

unsafe extern "C" fn busy_handler<T: Wal>(state: *mut c_void, retries: c_int) -> c_int {
let state = &*(state as *mut TxnState<T>);
let lock = state.slot.read();
// we take a reference to the slot we will attempt to steal. this is to make sure that we
// actually steal the correct lock.
let slot = match &*lock {
Some(slot) => slot.clone(),
// fast path: there is no slot, try to acquire the lock again
None => return 1,
None if retries < 512 => {
std::thread::sleep(std::time::Duration::from_millis(10));
return 1;
}
None => {
tracing::info!("Failed to steal connection lock after {MAX_BUSY_RETRIES} retries.");
return 0;
}
};

tokio::runtime::Handle::current().block_on(async move {
Expand All @@ -484,7 +492,17 @@ unsafe extern "C" fn busy_handler<T: Wal>(state: *mut c_void, _retries: c_int) -
if Arc::ptr_eq(s, &slot) {
// We check that slot wasn't already stolen, and that their is still a slot.
// The ordering is relaxed because the atomic is only set under the slot lock.
if slot.is_stolen.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed).is_ok() {
let can_steal = {
let mut can_steal = false;
let mut is_stolen = slot.is_stolen.lock();
if !*is_stolen {
can_steal = true;
*is_stolen = true;
}
can_steal
};

if can_steal {
// The connection holding the current txn will set itself as stolen when it
// detects a timeout, so if we arrive to this point, then there is
// necessarily a slot, and this slot has to be the one we attempted to
Expand All @@ -505,7 +523,6 @@ unsafe extern "C" fn busy_handler<T: Wal>(state: *mut c_void, _retries: c_int) -
1
}
}

})
}

Expand Down Expand Up @@ -584,13 +601,12 @@ impl<W: Wal> Connection<W> {
pgm: Program,
mut builder: B,
) -> Result<B> {
use rusqlite::TransactionState as Tx;

let (state, txn_timeout) = {
let lock = this.lock();
let txn_timeout = lock.config_store.get().txn_timeout.unwrap_or(TXN_TIMEOUT);
(lock.state.clone(), txn_timeout)
};
let txn_timeout = this
.lock()
.config_store
.get()
.txn_timeout
.unwrap_or(TXN_TIMEOUT);

let mut results = Vec::with_capacity(pgm.steps.len());
builder.init(&this.lock().builder_config)?;
Expand All @@ -603,12 +619,17 @@ impl<W: Wal> Connection<W> {
for step in pgm.steps() {
let mut lock = this.lock();

if let Some(slot) = &lock.slot {
if slot.is_stolen.load(Ordering::Relaxed) || Instant::now() > slot.expires_at() {
// we mark ourselves as stolen to notify any waiting lock thief.
slot.is_stolen.store(true, Ordering::Relaxed);
lock.rollback();
has_timeout = true;
if !has_timeout {
if let Some(slot) = &lock.slot {
let mut is_stolen = slot.is_stolen.lock();
if *is_stolen || Instant::now() > slot.expires_at() {
// we mark ourselves as stolen to notify any waiting lock thief.
if !*is_stolen {
lock.rollback();
}
*is_stolen = true;
has_timeout = true;
}
}
}

Expand All @@ -621,61 +642,71 @@ impl<W: Wal> Connection<W> {
continue;
}

let res = lock.execute_step(step, &results, &mut builder)?;

let new_state = lock.conn.transaction_state(Some(DatabaseName::Main))?;
match (previous_state, new_state) {
// lock was upgraded, claim the slot
(Tx::None | Tx::Read, Tx::Write) => {
let slot = Arc::new(TxnSlot {
conn: this.clone(),
created_at: Instant::now(),
is_stolen: AtomicBool::new(false),
txn_timeout,
});

lock.slot.replace(slot.clone());
state.slot.write().replace(slot);
}
// lock was downgraded, notify a waiter
(Tx::Write, Tx::None | Tx::Read) => {
let old_slot = lock
.slot
.take()
.expect("there should be a slot right after downgrading a txn");
let mut maybe_state_slot = state.slot.write();
// We need to make sure that the state slot is our slot before removing it.
if let Some(ref state_slot) = *maybe_state_slot {
if Arc::ptr_eq(state_slot, &old_slot) {
maybe_state_slot.take();
}
}

drop(maybe_state_slot);

state.notify.notify_waiters();
}
// nothing to do
(_, _) => (),
}

previous_state = new_state;
let ret = lock.execute_step(step, &results, &mut builder);
// /!\ always make sure that the state is updated before returning
previous_state = lock.update_state(this.clone(), previous_state, txn_timeout)?;
let res = ret?;

results.push(res);
}

{
let mut lock = this.lock();
let is_autocommit = lock.conn.is_autocommit();
builder.finish(
*(lock.current_frame_no_receiver.borrow_and_update()),
is_autocommit,
)?;
let current_fno = *lock.current_frame_no_receiver.borrow_and_update();
builder.finish(current_fno, is_autocommit)?;
}

Ok(builder)
}

fn update_state(
&mut self,
arc_this: Arc<Mutex<Self>>,
previous_state: TransactionState,
txn_timeout: Duration,
) -> Result<TransactionState> {
use rusqlite::TransactionState as Tx;

let new_state = self.conn.transaction_state(Some(DatabaseName::Main))?;
match (previous_state, new_state) {
// lock was upgraded, claim the slot
(Tx::None | Tx::Read, Tx::Write) => {
let slot = Arc::new(TxnSlot {
conn: arc_this,
created_at: Instant::now(),
is_stolen: false.into(),
txn_timeout,
});

self.slot.replace(slot.clone());
self.state.slot.write().replace(slot);
}
// lock was downgraded, notify a waiter
(Tx::Write, Tx::None | Tx::Read) => {
let old_slot = self
.slot
.take()
.expect("there should be a slot right after downgrading a txn");
let mut maybe_state_slot = self.state.slot.write();
// We need to make sure that the state slot is our slot before removing it.
if let Some(ref state_slot) = *maybe_state_slot {
if Arc::ptr_eq(state_slot, &old_slot) {
maybe_state_slot.take();
}
}

drop(maybe_state_slot);

self.state.notify.notify_waiters();
}
// nothing to do
(_, _) => (),
}

Ok(new_state)
}

fn execute_step(
&mut self,
step: &Step,
Expand Down

0 comments on commit f0f8730

Please sign in to comment.