Skip to content

Commit

Permalink
feat(storage): make it possible to disable read tx timeout (#6181)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Jan 23, 2024
1 parent 4e24ba7 commit ede3b8c
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 15 deletions.
2 changes: 1 addition & 1 deletion crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,7 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
.provider()?
// State root calculation can take a while, and we're sure no write transaction
// will be open in parallel. See https://github.com/paradigmxyz/reth/issues/6168.
.disable_backtrace_on_long_read_transaction();
.disable_long_read_transaction_safety();
let (state_root, trie_updates) = hashed_state
.state_root_with_updates(provider.tx_ref())
.map_err(Into::<DatabaseError>::into)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/db/src/abstraction/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl DbTx for TxMock {
Ok(self._table.len())
}

fn disable_backtrace_on_long_read_transaction(&mut self) {}
fn disable_long_read_transaction_safety(&mut self) {}
}

impl DbTxMut for TxMock {
Expand Down
4 changes: 2 additions & 2 deletions crates/storage/db/src/abstraction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub trait DbTx: Send + Sync {
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError>;
/// Returns number of entries in the table.
fn entries<T: Table>(&self) -> Result<usize, DatabaseError>;
/// Disables backtrace recording for this read transaction when it's open for too long.
fn disable_backtrace_on_long_read_transaction(&mut self);
/// Disables long-lived read transaction safety guarantees.
fn disable_long_read_transaction_safety(&mut self);
}

/// Read write transaction that allows writing to database
Expand Down
66 changes: 60 additions & 6 deletions crates/storage/db/src/implementation/mdbx/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ impl<K: TransactionKind> Tx<K> {
let dbi_handle = handles.get_mut(table as usize).expect("should exist");
if dbi_handle.is_none() {
*dbi_handle = Some(
self.inner
.open_db(Some(T::NAME))
.map_err(|e| DatabaseError::InitCursor(e.into()))?
.dbi(),
self.inner.open_db(Some(T::NAME)).map_err(|e| DatabaseError::Open(e.into()))?.dbi(),
);
}

Expand Down Expand Up @@ -289,11 +286,14 @@ impl<K: TransactionKind> DbTx for Tx<K> {
.entries())
}

/// Disables backtrace recording for this read transaction when it's open for too long.
fn disable_backtrace_on_long_read_transaction(&mut self) {
/// Disables long-lived read transaction safety guarantees, such as backtrace recording and
/// timeout.
fn disable_long_read_transaction_safety(&mut self) {
if let Some(metrics_handler) = self.metrics_handler.as_mut() {
metrics_handler.record_backtrace = false;
}

self.inner.disable_timeout();
}
}

Expand Down Expand Up @@ -353,3 +353,57 @@ impl DbTxMut for Tx<RW> {
self.new_cursor()
}
}

#[cfg(test)]
mod tests {
use crate::{
database::Database,
mdbx::{tx::LONG_TRANSACTION_DURATION, DatabaseArguments},
tables,
transaction::DbTx,
DatabaseEnv, DatabaseEnvKind,
};
use reth_interfaces::db::DatabaseError;
use reth_libmdbx::MaxReadTransactionDuration;
use std::{sync::atomic::Ordering, thread::sleep, time::Duration};
use tempfile::tempdir;

#[test]
fn long_read_transaction_safety_disabled() {
const MAX_DURATION: Duration = Duration::from_secs(1);

let dir = tempdir().unwrap();
let args = DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(MAX_DURATION)));
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();

let mut tx = db.tx().unwrap();
tx.disable_long_read_transaction_safety();
sleep(MAX_DURATION.max(LONG_TRANSACTION_DURATION));

assert_eq!(
tx.get::<tables::Transactions>(0).err(),
Some(DatabaseError::Open(reth_libmdbx::Error::NotFound.to_err_code()))
); // Transaction is not timeout-ed
assert!(!tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed)); // Backtrace is not recorded
}

#[test]
fn long_read_transaction_safety_enabled() {
const MAX_DURATION: Duration = Duration::from_secs(1);

let dir = tempdir().unwrap();
let args = DatabaseArguments::default()
.max_read_transaction_duration(Some(MaxReadTransactionDuration::Set(MAX_DURATION)));
let db = DatabaseEnv::open(dir.path(), DatabaseEnvKind::RW, args).unwrap().with_metrics();

let tx = db.tx().unwrap();
sleep(MAX_DURATION.max(LONG_TRANSACTION_DURATION));

assert_eq!(
tx.get::<tables::Transactions>(0).err(),
Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionAborted.to_err_code()))
); // Transaction is timeout-ed
assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed)); // Backtrace is recorded
}
}
8 changes: 8 additions & 0 deletions crates/storage/libmdbx-rs/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,14 @@ where
pub fn cursor_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Cursor<K>> {
Cursor::new(self.clone(), dbi)
}

/// Disables a timeout for this read transaction.
#[cfg(feature = "read-tx-timeouts")]
pub fn disable_timeout(&self) {
if K::IS_READ_ONLY {
self.env().txn_manager().remove_active_read_transaction(self.txn());
}
}
}

impl<K> Clone for Transaction<K>
Expand Down
10 changes: 5 additions & 5 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,14 @@ impl<TX: DbTx> DatabaseProvider<TX> {
.collect::<Result<Vec<_>, DatabaseError>>()
}

/// Disables backtrace recording for the underlying read database transaction when it's open for
/// too long.
/// Disables long-lived read transaction safety guarantees for leaks prevention and
/// observability improvements.
///
/// CAUTION: In most of the cases, you want to keep backtraces on long read transactions
/// CAUTION: In most of the cases, you want the safety guarantees for long read transactions
/// enabled. Use this only if you're sure that no write transaction is open in parallel, meaning
/// that Reth as a node is offline and not progressing.
pub fn disable_backtrace_on_long_read_transaction(mut self) -> Self {
self.tx.disable_backtrace_on_long_read_transaction();
pub fn disable_long_read_transaction_safety(mut self) -> Self {
self.tx.disable_long_read_transaction_safety();
self
}

Expand Down

0 comments on commit ede3b8c

Please sign in to comment.