From 88d953e8670ec6fc056c8f0d1826c594b71055dc Mon Sep 17 00:00:00 2001 From: ad hoc Date: Thu, 29 Feb 2024 00:45:20 +0100 Subject: [PATCH] factorize program Execution in Vm --- libsql-server/src/connection/libsql.rs | 380 +++++++----------------- libsql-server/src/connection/program.rs | 259 ++++++++++++++++ 2 files changed, 365 insertions(+), 274 deletions(-) diff --git a/libsql-server/src/connection/libsql.rs b/libsql-server/src/connection/libsql.rs index 653e0963bb2..efc0f0442e0 100644 --- a/libsql-server/src/connection/libsql.rs +++ b/libsql-server/src/connection/libsql.rs @@ -1,11 +1,12 @@ use std::ffi::{c_int, c_void}; +use std::ops::Deref; use std::path::{Path, PathBuf}; use std::sync::Arc; use libsql_sys::wal::wrapper::{WrapWal, WrappedWal}; use libsql_sys::wal::{BusyHandler, CheckpointCallback, Wal, WalManager}; use libsql_sys::EncryptionConfig; -use metrics::{histogram, increment_counter}; +use metrics::histogram; use once_cell::sync::Lazy; use parking_lot::{Mutex, RwLock}; use rusqlite::ffi::SQLITE_BUSY; @@ -17,19 +18,17 @@ use crate::auth::Permission; use crate::connection::TXN_TIMEOUT; use crate::error::Error; use crate::metrics::{ - DESCRIBE_COUNT, PROGRAM_EXEC_COUNT, READ_QUERY_COUNT, VACUUM_COUNT, WAL_CHECKPOINT_COUNT, - WRITE_QUERY_COUNT, WRITE_TXN_DURATION, + DESCRIBE_COUNT, PROGRAM_EXEC_COUNT, VACUUM_COUNT, WAL_CHECKPOINT_COUNT, WRITE_TXN_DURATION, }; use crate::namespace::meta_store::MetaStoreHandle; -use crate::query::Query; use crate::query_analysis::{StmtKind, TxnStatus}; use crate::query_result_builder::{QueryBuilderConfig, QueryResultBuilder}; use crate::replication::FrameNo; use crate::stats::Stats; use crate::Result; -use super::program::{Cond, DescribeCol, DescribeParam, DescribeResponse}; -use super::{MakeConnection, Program, RequestContext, Step}; +use super::program::{DescribeCol, DescribeParam, DescribeResponse, Vm}; +use super::{MakeConnection, Program, RequestContext}; pub struct MakeLibSqlConn { db_path: PathBuf, @@ -337,6 +336,14 @@ where .transaction_state(Some(DatabaseName::Main))? .into()) } + + pub fn with_raw(&self, f: F) -> R + where + F: FnOnce(&mut libsql_sys::Connection) -> R, + { + let mut inner = self.inner.lock(); + f(&mut inner.conn) + } } #[cfg(test)] @@ -533,17 +540,6 @@ unsafe extern "C" fn busy_handler(state: *mut c_void, retries: c_int) -> }) } -fn value_size(val: &rusqlite::types::ValueRef) -> usize { - use rusqlite::types::ValueRef; - match val { - ValueRef::Null => 0, - ValueRef::Integer(_) => 8, - ValueRef::Real(_) => 8, - ValueRef::Text(s) => s.len(), - ValueRef::Blob(b) => b.len(), - } -} - impl From for TxnStatus { fn from(value: TransactionState) -> Self { match value { @@ -553,6 +549,55 @@ impl From for TxnStatus { } } } +fn update_stats(stats: &Stats, sql: String, stmt: &rusqlite::Statement, elapsed: Duration) { + histogram!("libsql_server_statement_execution_time", elapsed); + let elapsed = elapsed.as_millis() as u64; + let rows_read = stmt.get_status(StatementStatus::RowsRead) as u64; + let rows_written = stmt.get_status(StatementStatus::RowsWritten) as u64; + + if rows_read >= 10_000 || rows_written >= 1_000 { + let sql = if sql.len() >= 512 { + &sql[..512] + } else { + &sql[..] + }; + + tracing::info!( + "high read ({}) or write ({}) query: {}", + rows_read, + rows_written, + sql + ); + } + + let mem_used = stmt.get_status(StatementStatus::MemUsed) as u64; + histogram!("libsql_server_statement_mem_used_bytes", mem_used as f64); + let rows_read = if rows_read == 0 && rows_written == 0 { + 1 + } else { + rows_read + }; + stats.inc_rows_read(rows_read); + stats.inc_rows_written(rows_written); + let weight = rows_read + rows_written; + if stats.qualifies_as_top_query(weight) { + stats.add_top_query(crate::stats::TopQuery::new( + sql.clone(), + rows_read, + rows_written, + )); + } + if stats.qualifies_as_slowest_query(elapsed) { + stats.add_slowest_query(crate::stats::SlowestQuery::new( + sql.clone(), + elapsed, + rows_read, + rows_written, + )); + } + + stats.update_query_metrics(rows_read, rows_written, mem_used, elapsed) +} impl Connection { fn new>( @@ -614,22 +659,49 @@ impl Connection { pgm: Program, mut builder: B, ) -> Result { - let txn_timeout = this - .lock() - .config_store - .get() - .txn_timeout - .unwrap_or(TXN_TIMEOUT); + let (config, stats) = { + let lock = this.lock(); + let config = lock.config_store.get(); + let stats = lock.stats.clone(); + + (config, stats) + }; + + let txn_timeout = config.txn_timeout.unwrap_or(TXN_TIMEOUT); - let mut results = Vec::with_capacity(pgm.steps.len()); builder.init(&this.lock().builder_config)?; let mut previous_state = this .lock() .conn .transaction_state(Some(DatabaseName::Main))?; + let mut vm = Vm::new( + builder, + pgm, + move |stmt_kind| { + let should_block = match stmt_kind { + StmtKind::Read | StmtKind::TxnBegin => config.block_reads, + StmtKind::Write => config.block_reads || config.block_writes, + StmtKind::DDL => { + config.block_reads || config.block_writes || config.block_ddl() + } + StmtKind::TxnEnd + | StmtKind::Release + | StmtKind::Savepoint + | StmtKind::Detach + | StmtKind::Attach(_) => false, + }; + + ( + should_block, + should_block.then(|| config.block_reason.clone()).flatten(), + ) + }, + move |sql, stmt, elapsed| update_stats(&stats, sql, stmt, elapsed), + ); + let mut has_timeout = false; - for step in pgm.steps() { + while !vm.finished() { let mut lock = this.lock(); if !has_timeout { @@ -649,28 +721,28 @@ impl Connection { // once there was a timeout, invalidate all the program steps if has_timeout { lock.slot = None; - builder.begin_step()?; - builder.step_error(Error::LibSqlTxTimeout)?; - builder.finish_step(0, None)?; + vm.builder().begin_step()?; + vm.builder().step_error(Error::LibSqlTxTimeout)?; + vm.builder().finish_step(0, None)?; + vm.advance(); continue; } - let ret = lock.execute_step(step, &results, &mut builder); + let conn = lock.conn.deref(); + let ret = vm.step(conn); // /!\ always make sure that the state is updated before returning previous_state = lock.update_state(this.clone(), previous_state, txn_timeout)?; - let res = ret?; - - results.push(res); + ret?; } { let mut lock = this.lock(); let is_autocommit = lock.conn.is_autocommit(); let current_fno = *lock.current_frame_no_receiver.borrow_and_update(); - builder.finish(current_fno, is_autocommit)?; + vm.builder().finish(current_fno, is_autocommit)?; } - Ok(builder) + Ok(vm.into_builder()) } fn update_state( @@ -720,174 +792,6 @@ impl Connection { Ok(new_state) } - fn execute_step( - &mut self, - step: &Step, - results: &[bool], - builder: &mut impl QueryResultBuilder, - ) -> Result { - builder.begin_step()?; - - let mut enabled = match step.cond.as_ref() { - Some(cond) => match eval_cond(cond, results, self.is_autocommit()) { - Ok(enabled) => enabled, - Err(e) => { - builder.step_error(e).unwrap(); - false - } - }, - None => true, - }; - - let (affected_row_count, last_insert_rowid) = if enabled { - match self.execute_query(&step.query, builder) { - // builder error interrupt the execution of query. we should exit immediately. - Err(e @ Error::BuilderError(_)) => return Err(e), - Err(mut e) => { - if let Error::RusqliteError(err) = e { - let extended_code = - unsafe { rusqlite::ffi::sqlite3_extended_errcode(self.conn.handle()) }; - - e = Error::RusqliteErrorExtended(err, extended_code as i32); - }; - - builder.step_error(e)?; - enabled = false; - (0, None) - } - Ok(x) => x, - } - } else { - (0, None) - }; - - builder.finish_step(affected_row_count, last_insert_rowid)?; - - Ok(enabled) - } - - fn prepare_attach_query(&self, attached: &str, attached_alias: &str) -> Result { - let attached = attached.strip_prefix('"').unwrap_or(attached); - let attached = attached.strip_suffix('"').unwrap_or(attached); - if attached.contains('/') { - return Err(Error::Internal(format!( - "Invalid attached database name: {:?}", - attached - ))); - } - let path = PathBuf::from(self.conn.path().unwrap_or(".")); - let dbs_path = path - .parent() - .unwrap_or_else(|| std::path::Path::new("..")) - .parent() - .unwrap_or_else(|| std::path::Path::new("..")) - .canonicalize() - .unwrap_or_else(|_| std::path::PathBuf::from("..")); - let query = format!( - "ATTACH DATABASE 'file:{}?mode=ro' AS \"{attached_alias}\"", - dbs_path.join(attached).join("data").display() - ); - tracing::trace!("ATTACH rewritten to: {query}"); - Ok(query) - } - - fn execute_query( - &self, - query: &Query, - builder: &mut impl QueryResultBuilder, - ) -> Result<(u64, Option)> { - tracing::debug!("executing query: {}", query.stmt.stmt); - - increment_counter!("libsql_server_libsql_query_execute"); - - let start = Instant::now(); - let config = self.config_store.get(); - - let blocked = match query.stmt.kind { - StmtKind::Read | StmtKind::TxnBegin => config.block_reads, - StmtKind::Write => config.block_reads || config.block_writes, - StmtKind::DDL => config.block_reads || config.block_writes || config.block_ddl(), - StmtKind::TxnEnd - | StmtKind::Release - | StmtKind::Savepoint - | StmtKind::Detach - | StmtKind::Attach(_) => false, - }; - if blocked { - return Err(Error::Blocked(config.block_reason.clone())); - } - - let mut stmt = if matches!(query.stmt.kind, StmtKind::Attach(_)) { - match &query.stmt.attach_info { - Some((attached, attached_alias)) => { - let query = self.prepare_attach_query(attached, attached_alias)?; - self.conn.prepare(&query)? - } - None => { - return Err(Error::Internal(format!( - "Failed to ATTACH: {:?}", - query.stmt.attach_info - ))) - } - } - } else { - self.conn.prepare(&query.stmt.stmt)? - }; - - if stmt.readonly() { - READ_QUERY_COUNT.increment(1); - } else { - WRITE_QUERY_COUNT.increment(1); - } - - let cols = stmt.columns(); - let cols_count = cols.len(); - builder.cols_description(cols.iter())?; - drop(cols); - - query - .params - .bind(&mut stmt) - .map_err(Error::LibSqlInvalidQueryParams)?; - - let mut qresult = stmt.raw_query(); - - let mut values_total_bytes = 0; - builder.begin_rows()?; - while let Some(row) = qresult.next()? { - builder.begin_row()?; - for i in 0..cols_count { - let val = row.get_ref(i)?; - values_total_bytes += value_size(&val); - builder.add_row_value(val)?; - } - builder.finish_row()?; - } - histogram!("libsql_server_returned_bytes", values_total_bytes as f64); - - builder.finish_rows()?; - - // sqlite3_changes() is only modified for INSERT, UPDATE or DELETE; it is not reset for SELECT, - // but we want to return 0 in that case. - let affected_row_count = match query.stmt.is_iud { - true => self.conn.changes(), - false => 0, - }; - - // sqlite3_last_insert_rowid() only makes sense for INSERTs into a rowid table. we can't detect - // a rowid table, but at least we can detect an INSERT - let last_insert_rowid = match query.stmt.is_insert { - true => Some(self.conn.last_insert_rowid()), - false => None, - }; - - drop(qresult); - - self.update_stats(query.stmt.stmt.clone(), &stmt, Instant::now() - start); - - Ok((affected_row_count, last_insert_rowid)) - } - fn rollback(&self) { if let Err(e) = self.conn.execute("ROLLBACK", ()) { tracing::error!("failed to rollback: {e}"); @@ -921,58 +825,6 @@ impl Connection { Ok(()) } - fn update_stats(&self, sql: String, stmt: &rusqlite::Statement, elapsed: Duration) { - histogram!("libsql_server_statement_execution_time", elapsed); - let elapsed = elapsed.as_millis() as u64; - let rows_read = stmt.get_status(StatementStatus::RowsRead) as u64; - let rows_written = stmt.get_status(StatementStatus::RowsWritten) as u64; - - if rows_read >= 10_000 || rows_written >= 1_000 { - let sql = if sql.len() >= 512 { - &sql[..512] - } else { - &sql[..] - }; - - tracing::info!( - "high read ({}) or write ({}) query: {}", - rows_read, - rows_written, - sql - ); - } - - let mem_used = stmt.get_status(StatementStatus::MemUsed) as u64; - histogram!("libsql_server_statement_mem_used_bytes", mem_used as f64); - let rows_read = if rows_read == 0 && rows_written == 0 { - 1 - } else { - rows_read - }; - self.stats.inc_rows_read(rows_read); - self.stats.inc_rows_written(rows_written); - let weight = rows_read + rows_written; - if self.stats.qualifies_as_top_query(weight) { - self.stats.add_top_query(crate::stats::TopQuery::new( - sql.clone(), - rows_read, - rows_written, - )); - } - if self.stats.qualifies_as_slowest_query(elapsed) { - self.stats - .add_slowest_query(crate::stats::SlowestQuery::new( - sql.clone(), - elapsed, - rows_read, - rows_written, - )); - } - - self.stats - .update_query_metrics(rows_read, rows_written, mem_used, elapsed) - } - fn describe(&self, sql: &str) -> crate::Result { let stmt = self.conn.prepare(sql)?; @@ -1008,26 +860,6 @@ impl Connection { } } -fn eval_cond(cond: &Cond, results: &[bool], is_autocommit: bool) -> Result { - let get_step_res = |step: usize| -> Result { - let res = results.get(step).ok_or(Error::InvalidBatchStep(step))?; - Ok(*res) - }; - - Ok(match cond { - Cond::Ok { step } => get_step_res(*step)?, - Cond::Err { step } => !get_step_res(*step)?, - Cond::Not { cond } => !eval_cond(cond, results, is_autocommit)?, - Cond::And { conds } => conds.iter().try_fold(true, |x, cond| { - eval_cond(cond, results, is_autocommit).map(|y| x & y) - })?, - Cond::Or { conds } => conds.iter().try_fold(false, |x, cond| { - eval_cond(cond, results, is_autocommit).map(|y| x | y) - })?, - Cond::IsAutocommit => is_autocommit, - }) -} - fn check_program_auth(ctx: &RequestContext, pgm: &Program) -> Result<()> { for step in pgm.steps() { match step.query.stmt.kind { diff --git a/libsql-server/src/connection/program.rs b/libsql-server/src/connection/program.rs index 4e747f9a1ca..54f3640e326 100644 --- a/libsql-server/src/connection/program.rs +++ b/libsql-server/src/connection/program.rs @@ -1,6 +1,14 @@ +use std::path::PathBuf; use std::sync::Arc; +use std::time::{Duration, Instant}; +use metrics::{histogram, increment_counter}; + +use crate::error::Error; +use crate::metrics::{READ_QUERY_COUNT, WRITE_QUERY_COUNT}; use crate::query::Query; +use crate::query_analysis::StmtKind; +use crate::query_result_builder::QueryResultBuilder; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct Program { @@ -78,3 +86,254 @@ pub struct DescribeCol { pub name: String, pub decltype: Option, } + +pub struct Vm { + results: Vec, + builder: B, + program: Program, + current_step: usize, + should_block: F, + update_stats: S, +} + +impl Vm +where + B: QueryResultBuilder, + F: Fn(&StmtKind) -> (bool, Option), + S: Fn(String, &rusqlite::Statement, Duration), +{ + pub fn new(builder: B, program: Program, should_block: F, update_stats: S) -> Self { + Self { + results: Vec::with_capacity(program.steps().len()), + builder, + program, + current_step: 0, + should_block, + update_stats, + } + } + + #[inline] + fn current_step(&self) -> &Step { + &self.program.steps()[self.current_step] + } + + pub fn step(&mut self, conn: &rusqlite::Connection) -> crate::Result { + match self.try_step(conn) { + Ok(res) => { + self.results.push(res); + self.current_step += 1; + Ok(self.current_step < self.program.steps().len()) + } + Err(e) => Err(e), + } + } + + fn try_step(&mut self, conn: &rusqlite::Connection) -> crate::Result { + self.builder.begin_step()?; + let mut enabled = match self.current_step().cond.as_ref() { + Some(cond) => match eval_cond(cond, &self.results, conn.is_autocommit()) { + Ok(enabled) => enabled, + Err(e) => { + self.builder.step_error(e).unwrap(); + false + } + }, + None => true, + }; + + let (affected_row_count, last_insert_rowid) = if enabled { + match self.execute_query(conn) { + // builder error interrupt the execution of query. we should exit immediately. + Err(e @ Error::BuilderError(_)) => return Err(e), + Err(mut e) => { + if let Error::RusqliteError(err) = e { + let extended_code = + unsafe { rusqlite::ffi::sqlite3_extended_errcode(conn.handle()) }; + + e = Error::RusqliteErrorExtended(err, extended_code as i32); + }; + + self.builder.step_error(e)?; + enabled = false; + (0, None) + } + Ok(x) => x, + } + } else { + (0, None) + }; + + self.builder + .finish_step(affected_row_count, last_insert_rowid)?; + + Ok(enabled) + } + + fn prepare_attach_query( + &self, + conn: &rusqlite::Connection, + attached: &str, + attached_alias: &str, + ) -> crate::Result { + // all this is crap. take the namesapce store here, and lookup the namespace, then get the + // path. + let attached = attached.strip_prefix('"').unwrap_or(attached); + let attached = attached.strip_suffix('"').unwrap_or(attached); + if attached.contains('/') { + return Err(Error::Internal(format!( + "Invalid attached database name: {attached:?}" + ))); + } + let path = PathBuf::from(conn.path().unwrap_or(".")); + let dbs_path = path + .parent() + .unwrap_or_else(|| std::path::Path::new("..")) + .parent() + .unwrap_or_else(|| std::path::Path::new("..")) + .canonicalize() + .unwrap_or_else(|_| std::path::PathBuf::from("..")); + let query = format!( + "ATTACH DATABASE 'file:{}?mode=ro' AS \"{attached_alias}\"", + dbs_path.join(attached).join("data").display() + ); + tracing::trace!("ATTACH rewritten to: {query}"); + Ok(query) + } + + fn execute_query(&mut self, conn: &rusqlite::Connection) -> crate::Result<(u64, Option)> { + tracing::debug!("executing query: {}", self.current_step().query.stmt.stmt); + + increment_counter!("libsql_server_libsql_query_execute"); + + let start = Instant::now(); + let (blocked, reason) = (self.should_block)(&self.current_step().query.stmt.kind); + if blocked { + return Err(Error::Blocked(reason)); + } + + let mut stmt = if matches!(self.current_step().query.stmt.kind, StmtKind::Attach(_)) { + match &self.current_step().query.stmt.attach_info { + Some((attached, attached_alias)) => { + // nope nope nope: only builder error should return + let query = self.prepare_attach_query(conn, attached, attached_alias)?; + conn.prepare(&query)? + } + None => { + return Err(Error::Internal(format!( + "Failed to ATTACH: {:?}", + self.current_step().query.stmt.attach_info + ))) + } + } + } else { + conn.prepare(&self.current_step().query.stmt.stmt)? + }; + + if stmt.readonly() { + READ_QUERY_COUNT.increment(1); + } else { + WRITE_QUERY_COUNT.increment(1); + } + + let cols = stmt.columns(); + let cols_count = cols.len(); + self.builder.cols_description(cols.iter())?; + drop(cols); + + self.current_step() + .query + .params + .bind(&mut stmt) + .map_err(Error::LibSqlInvalidQueryParams)?; + + let mut qresult = stmt.raw_query(); + + let mut values_total_bytes = 0; + self.builder.begin_rows()?; + while let Some(row) = qresult.next()? { + self.builder.begin_row()?; + for i in 0..cols_count { + let val = row.get_ref(i)?; + values_total_bytes += value_size(&val); + self.builder.add_row_value(val)?; + } + self.builder.finish_row()?; + } + histogram!("libsql_server_returned_bytes", values_total_bytes as f64); + + self.builder.finish_rows()?; + + // sqlite3_changes() is only modified for INSERT, UPDATE or DELETE; it is not reset for SELECT, + // but we want to return 0 in that case. + let affected_row_count = match self.current_step().query.stmt.is_iud { + true => conn.changes(), + false => 0, + }; + + // sqlite3_last_insert_rowid() only makes sense for INSERTs into a rowid table. we can't detect + // a rowid table, but at least we can detect an INSERT + let last_insert_rowid = match self.current_step().query.stmt.is_insert { + true => Some(conn.last_insert_rowid()), + false => None, + }; + + drop(qresult); + + (self.update_stats)( + self.current_step().query.stmt.stmt.clone(), + &stmt, + Instant::now() - start, + ); + + Ok((affected_row_count, last_insert_rowid)) + } + + pub fn builder(&mut self) -> &mut B { + &mut self.builder + } + + /// advance the program without executing the step + pub(crate) fn advance(&mut self) { + self.current_step += 1; + } + + pub(crate) fn finished(&self) -> bool { + self.current_step >= self.program.steps().len() + } + + pub(crate) fn into_builder(self) -> B { + self.builder + } +} + +fn eval_cond(cond: &Cond, results: &[bool], is_autocommit: bool) -> crate::Result { + let get_step_res = |step: usize| -> crate::Result { + let res = results.get(step).ok_or(Error::InvalidBatchStep(step))?; + Ok(*res) + }; + + Ok(match cond { + Cond::Ok { step } => get_step_res(*step)?, + Cond::Err { step } => !get_step_res(*step)?, + Cond::Not { cond } => !eval_cond(cond, results, is_autocommit)?, + Cond::And { conds } => conds.iter().try_fold(true, |x, cond| { + eval_cond(cond, results, is_autocommit).map(|y| x & y) + })?, + Cond::Or { conds } => conds.iter().try_fold(false, |x, cond| { + eval_cond(cond, results, is_autocommit).map(|y| x | y) + })?, + Cond::IsAutocommit => is_autocommit, + }) +} + +fn value_size(val: &rusqlite::types::ValueRef) -> usize { + use rusqlite::types::ValueRef; + match val { + ValueRef::Null => 0, + ValueRef::Integer(_) => 8, + ValueRef::Real(_) => 8, + ValueRef::Text(s) => s.len(), + ValueRef::Blob(b) => b.len(), + } +}