diff --git a/libsql-server/src/connection/mod.rs b/libsql-server/src/connection/mod.rs index fc0835c426..e1ea914c49 100644 --- a/libsql-server/src/connection/mod.rs +++ b/libsql-server/src/connection/mod.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; use tokio::time::{Duration, Instant}; @@ -30,7 +31,7 @@ const TXN_TIMEOUT: Duration = Duration::from_secs(5); const TXN_TIMEOUT: Duration = Duration::from_millis(100); #[async_trait::async_trait] -pub trait Connection: Send + Sync + 'static { +pub trait Connection: Send + Sync + Sized + 'static { /// Executes a query program async fn execute_program( &self, @@ -172,6 +173,13 @@ pub trait MakeConnection: Send + Sync + 'static { max_concurrent_requests, ) } + + fn shared_schema(self, is_shared_schema: bool) -> MakeSharedSchemaConnection + where + Self: Sized, + { + MakeSharedSchemaConnection::new(self, is_shared_schema) + } } #[async_trait::async_trait] @@ -189,6 +197,89 @@ where } } +pub struct MakeSharedSchemaConnection { + inner: F, + is_shared_schema: bool, +} + +impl MakeSharedSchemaConnection { + pub fn new(inner: F, is_shared_schema: bool) -> Self { + MakeSharedSchemaConnection { + inner, + is_shared_schema, + } + } +} + +#[async_trait::async_trait] +impl MakeConnection for MakeSharedSchemaConnection { + type Connection = PrimaryConnection; + + async fn create(&self) -> Result { + let conn = self.inner.create().await?; + Ok(if self.is_shared_schema { + PrimaryConnection::SharedSchema(SharedSchemaConnection::new(conn)) + } else { + PrimaryConnection::Tracked(conn) + }) + } +} + +#[derive(Debug)] +pub struct SharedSchemaConnection { + inner: DB, +} + +impl SharedSchemaConnection { + fn new(inner: DB) -> Self { + SharedSchemaConnection { inner } + } +} + +#[async_trait::async_trait] +impl Connection for SharedSchemaConnection { + async fn execute_program( + &self, + pgm: Program, + auth: Authenticated, + response_builder: B, + replication_index: Option, + ) -> Result { + //TODO: execute multi-database execution procedure + todo!() + } + + #[inline] + async fn describe( + &self, + sql: String, + auth: Authenticated, + replication_index: Option, + ) -> Result> { + self.inner.describe(sql, auth, replication_index).await + } + + #[inline] + async fn is_autocommit(&self) -> Result { + self.inner.is_autocommit() + } + + #[inline] + async fn checkpoint(&self) -> Result<()> { + self.inner.checkpoint() + } + + #[inline] + async fn vacuum_if_needed(&self) -> Result<()> { + self.inner.vacuum_if_needed() + } + + #[inline] + fn diagnostics(&self) -> String { + self.inner.diagnostics() + } +} + pub struct MakeThrottledConnection { semaphore: Arc, connection_maker: F, @@ -309,6 +400,86 @@ impl MakeConnection for MakeThrottledConnection { } } +pub enum PrimaryConnection { + Tracked(DB), + SharedSchema(SharedSchemaConnection), +} + +impl PrimaryConnection> { + pub fn idle_time(&self) -> Duration { + let now = crate::connection::now_millis(); + let atime = match self { + PrimaryConnection::Tracked(conn) => conn.atime.load(Ordering::Relaxed), + PrimaryConnection::SharedSchema(conn) => conn.inner.atime.load(Ordering::Relaxed), + }; + Duration::from_millis(now.saturating_sub(atime)) + } +} + +#[async_trait] +impl Connection for PrimaryConnection { + async fn execute_program( + &self, + pgm: Program, + auth: Authenticated, + response_builder: B, + replication_index: Option, + ) -> Result { + match self { + PrimaryConnection::Tracked(conn) => { + conn.execute_program(pgm, auth, response_builder, replication_index) + .await + } + PrimaryConnection::SharedSchema(conn) => { + conn.execute_program(pgm, auth, response_builder, replication_index) + .await + } + } + } + + async fn describe( + &self, + sql: String, + auth: Authenticated, + replication_index: Option, + ) -> Result> { + match self { + PrimaryConnection::Tracked(conn) => conn.describe(sql, auth, replication_index).await, + PrimaryConnection::SharedSchema(conn) => { + conn.describe(sql, auth, replication_index).await + } + } + } + + async fn is_autocommit(&self) -> Result { + match self { + PrimaryConnection::Tracked(conn) => conn.is_autocommit().await, + PrimaryConnection::SharedSchema(conn) => conn.is_autocommit().await, + } + } + + async fn checkpoint(&self) -> Result<()> { + match self { + PrimaryConnection::Tracked(conn) => conn.checkpoint().await, + PrimaryConnection::SharedSchema(conn) => conn.checkpoint().await, + } + } + + async fn vacuum_if_needed(&self) -> Result<()> { + match self { + PrimaryConnection::Tracked(conn) => conn.vacuum_if_needed().await, + PrimaryConnection::SharedSchema(conn) => conn.vacuum_if_needed().await, + } + } + + fn diagnostics(&self) -> String { + match self { + PrimaryConnection::Tracked(conn) => conn.diagnostics(), + PrimaryConnection::SharedSchema(conn) => conn.diagnostics(), + } + } +} + #[derive(Debug)] pub struct TrackedConnection { inner: DB, diff --git a/libsql-server/src/database.rs b/libsql-server/src/database.rs index 4f9ebc2d6b..f6701372df 100644 --- a/libsql-server/src/database.rs +++ b/libsql-server/src/database.rs @@ -1,14 +1,12 @@ -use std::sync::Arc; - -use async_trait::async_trait; - use crate::connection::libsql::LibSqlConnection; use crate::connection::write_proxy::{RpcStream, WriteProxyConnection}; use crate::connection::{Connection, MakeConnection, TrackedConnection}; use crate::namespace::replication_wal::{ReplicationWal, ReplicationWalManager}; +use async_trait::async_trait; +use std::sync::Arc; -pub type PrimaryConnection = TrackedConnection>; - +pub type PrimaryConnection = + crate::connection::PrimaryConnection>>; pub type Result = anyhow::Result; #[async_trait] diff --git a/libsql-server/src/error.rs b/libsql-server/src/error.rs index 9ef8780cce..9e620ea5e4 100644 --- a/libsql-server/src/error.rs +++ b/libsql-server/src/error.rs @@ -102,7 +102,9 @@ pub enum Error { #[error("Unable to decode protobuf: {0}")] ProstDecode(#[from] prost::DecodeError), #[error("Shared schema error: {0}")] - SharedSchemaError(String), + SharedSchemaCreationError(String), + #[error("Cannot remove shared schema database `{0}`: there are {1} databases using it")] + SharedSchemaDestroyError(String, u32), } impl AsRef for Error { @@ -180,7 +182,8 @@ impl IntoResponse for &Error { MetaStoreUpdateFailure(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR), Ref(this) => this.as_ref().into_response(), ProstDecode(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR), - SharedSchemaError(_) => self.format_err(StatusCode::BAD_REQUEST), + SharedSchemaCreationError(_) => self.format_err(StatusCode::BAD_REQUEST), + SharedSchemaDestroyError(_, _) => self.format_err(StatusCode::BAD_REQUEST), } } } diff --git a/libsql-server/src/http/admin/mod.rs b/libsql-server/src/http/admin/mod.rs index 6867d14500..dbdcf9e1f1 100644 --- a/libsql-server/src/http/admin/mod.rs +++ b/libsql-server/src/http/admin/mod.rs @@ -300,15 +300,15 @@ async fn handle_create_namespace( } let shared_schema_name = if let Some(ns) = req.shared_schema_name { if req.shared_schema { - return Err(Error::SharedSchemaError( + return Err(Error::SharedSchemaCreationError( "shared schema database cannot reference another shared schema".to_string(), )); } let namespace = NamespaceName::from_string(ns)?; - if !app_state.namespaces.exists(&namespace).await { + if !app_state.namespaces.exists(&namespace) { return Err(Error::NamespaceDoesntExist(namespace.to_string())); } - Some(namespace.to_string()) + Some(namespace) } else { None }; @@ -330,11 +330,11 @@ async fn handle_create_namespace( .create(namespace.clone(), dump, bottomless_db_id) .await?; - let store = app_state.namespaces.config_store(namespace).await?; + let store = app_state.namespaces.config_store(namespace.clone()).await?; let mut config = (*store.get()).clone(); config.is_shared_schema = req.shared_schema; - config.shared_schema_name = shared_schema_name; + config.shared_schema_name = shared_schema_name.as_ref().map(NamespaceName::to_string); if let Some(max_db_size) = req.max_db_size { config.max_db_pages = max_db_size.as_u64() / LIBSQL_PAGE_SIZE; } @@ -353,6 +353,13 @@ async fn handle_create_namespace( config.jwt_key = req.jwt_key; store.store(config).await?; + if let Some(shared_schema) = shared_schema_name { + app_state + .namespaces + .shared_schema_link(shared_schema, namespace) + .await?; + } + Ok(()) } diff --git a/libsql-server/src/namespace/meta_store.rs b/libsql-server/src/namespace/meta_store.rs index 8c3b63c8e1..cfa896a7be 100644 --- a/libsql-server/src/namespace/meta_store.rs +++ b/libsql-server/src/namespace/meta_store.rs @@ -1,4 +1,5 @@ #![allow(clippy::mutable_key_type)] +use hashbrown::HashSet; use std::path::Path; use std::sync::Arc; use std::{collections::HashMap, fs::read_dir}; @@ -24,7 +25,10 @@ use crate::{ use super::NamespaceName; -type ChangeMsg = (NamespaceName, Arc); +pub enum ChangeMsg { + ChangeConfig(NamespaceName, Arc), + Link(NamespaceName, NamespaceName), +} type WalManager = WalWrapper, Sqlite3WalManager>; type Connection = libsql_sys::Connection, Sqlite3Wal>>; @@ -41,10 +45,26 @@ pub struct MetaStoreHandle { #[derive(Debug, Clone)] enum HandleState { - Internal(Arc>>), + Internal(Arc>), External(mpsc::Sender, Receiver), } +#[derive(Debug, Default)] +struct MetaState { + config: Arc, + shared_schemas: HashMap>, +} + +impl MetaState { + #[allow(dead_code)] // only used in tests + fn new(config: Arc) -> Self { + MetaState { + config, + shared_schemas: HashMap::default(), + } + } +} + #[derive(Debug, Default, Clone)] struct InnerConfig { /// Version of this config _per_ each running process of sqld, this means @@ -127,6 +147,15 @@ impl MetaStoreInner { ", (), )?; + conn.execute( + "CREATE TABLE IF NOT EXISTS shared_schema_links ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + shared_schema_name TEXT NOT NULL, + db_name TEXT NOT NULL + ) + ", + (), + )?; let mut this = MetaStoreInner { configs: Default::default(), @@ -239,34 +268,41 @@ impl MetaStoreInner { } } -/// Handles config change updates by inserting them into the database and in-memory -/// cache of configs. fn process(msg: ChangeMsg, inner: Arc>) -> Result<()> { - let (namespace, config) = msg; - - let config_encoded = metadata::DatabaseConfig::from(&*config).encode_to_vec(); + match msg { + ChangeMsg::ChangeConfig(namespace, config) => { + let config_encoded = metadata::DatabaseConfig::from(&*config).encode_to_vec(); - let inner = &mut inner.lock(); + let inner = &mut inner.lock(); - inner.conn.execute( - "INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)", - rusqlite::params![namespace.as_str(), config_encoded], - )?; + inner.conn.execute( + "INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)", + rusqlite::params![namespace.as_str(), config_encoded], + )?; - let configs = &mut inner.configs; + let configs = &mut inner.configs; - if let Some(config_watch) = configs.get_mut(&namespace) { - let new_version = config_watch.borrow().version.wrapping_add(1); + if let Some(config_watch) = configs.get_mut(&namespace) { + let new_version = config_watch.borrow().version.wrapping_add(1); - config_watch.send_modify(|c| { - *c = InnerConfig { - version: new_version, - config, - }; - }); - } else { - let (tx, _) = watch::channel(InnerConfig { version: 0, config }); - configs.insert(namespace, tx); + config_watch.send_modify(|c| { + *c = InnerConfig { + version: new_version, + config, + }; + }); + } else { + let (tx, _) = watch::channel(InnerConfig { version: 0, config }); + configs.insert(namespace, tx); + } + } + ChangeMsg::Link(parent, child) => { + let inner = &mut inner.lock(); + inner.conn.execute( + "INSERT OR REPLACE INTO shared_schema_links (shared_schema_name, db_name) VALUES (?1, ?2)", + rusqlite::params![parent.as_str(), child.as_str()], + )?; + } } Ok(()) @@ -321,13 +357,32 @@ impl MetaStore { tracing::debug!("removing namespace `{}` from meta store", namespace); let mut guard = self.inner.lock(); - guard.conn.execute( - "DELETE FROM namespace_configs WHERE namespace = ?", - [namespace.as_str()], - )?; if let Some(sender) = guard.configs.remove(&namespace) { tracing::debug!("removed namespace `{}` from meta store", namespace); let config = sender.borrow().clone(); + if let Some(shared_schema) = config.config.shared_schema_name.as_deref() { + guard.conn.execute( + "DELETE FROM shared_schema_links WHERE shared_schema_name = ? AND db_name = ?", + [shared_schema, namespace.as_str()], + )?; + } + if config.config.is_shared_schema { + let res: rusqlite::Result = guard.conn.query_row( + "SELECT COUNT(1) FROM shared_schema_links WHERE shared_schema_name = ?", + [namespace.as_str()], + |row| row.get(0), + ); + match res { + Ok(0) | Err(rusqlite::Error::QueryReturnedNoRows) => { /* ok */ } + Ok(n) => return Err(Error::SharedSchemaDestroyError(namespace.to_string(), n)), + Err(e) => return Err(e.into()), + } + } + guard.conn.execute( + "DELETE FROM namespace_configs WHERE namespace = ?", + [namespace.as_str()], + )?; + Ok(Some(config.config)) } else { tracing::trace!("namespace `{}` not found in meta store", namespace); @@ -384,20 +439,20 @@ impl MetaStoreHandle { Ok(Self { namespace: NamespaceName("testmetastore".into()), - inner: HandleState::Internal(Arc::new(Mutex::new(Arc::new(config)))), + inner: HandleState::Internal(Arc::new(Mutex::new(MetaState::new(Arc::new(config))))), }) } pub fn internal() -> Self { MetaStoreHandle { namespace: NamespaceName("testmetastore".into()), - inner: HandleState::Internal(Arc::new(Mutex::new(Arc::new(DatabaseConfig::default())))), + inner: HandleState::Internal(Arc::new(Mutex::new(MetaState::default()))), } } pub fn get(&self) -> Arc { match &self.inner { - HandleState::Internal(config) => config.lock().clone(), + HandleState::Internal(config) => config.lock().config.clone(), HandleState::External(_, config) => config.borrow().clone().config, } } @@ -409,10 +464,32 @@ impl MetaStoreHandle { } } + pub async fn link(&self, shared_schema: NamespaceName, linked_db: NamespaceName) -> Result<()> { + match &self.inner { + HandleState::Internal(configs) => { + let mut guard = configs.lock(); + guard + .shared_schemas + .entry(shared_schema) + .or_default() + .insert(linked_db); + } + HandleState::External(changes_tx, _) => { + changes_tx + .send(ChangeMsg::Link(shared_schema, linked_db)) + .await + .map_err(|e| Error::MetaStoreUpdateFailure(e.into()))?; + } + }; + + Ok(()) + } + pub async fn store(&self, new_config: impl Into>) -> Result<()> { match &self.inner { HandleState::Internal(config) => { - *config.lock() = new_config.into(); + let mut lock = config.lock(); + lock.config = new_config.into(); } HandleState::External(changes_tx, config) => { let new_config = new_config.into(); @@ -421,7 +498,7 @@ impl MetaStoreHandle { let changed = c.changed(); changes_tx - .send((self.namespace.clone(), new_config)) + .send(ChangeMsg::ChangeConfig(self.namespace.clone(), new_config)) .await .map_err(|e| Error::MetaStoreUpdateFailure(e.into()))?; diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 77ee8e7a94..f5728f2e95 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -458,9 +458,28 @@ impl NamespaceStore { }) } - pub async fn exists(&self, namespace: &NamespaceName) -> bool { - let e = self.inner.store.get(namespace).await; - e.is_some() + pub(crate) async fn shared_schema_link( + &self, + shared_schema_db: NamespaceName, + linked_db: NamespaceName, + ) -> crate::Result<()> { + let ns = self + .inner + .make_namespace + .create( + shared_schema_db.clone(), + RestoreOption::Latest, + NamespaceBottomlessDbId::NotProvided, + self.make_reset_cb(), + &self.inner.metadata, + ) + .await?; + ns.db_config_store.link(shared_schema_db, linked_db).await?; + Ok(()) + } + + pub fn exists(&self, namespace: &NamespaceName) -> bool { + self.inner.metadata.exists(namespace) } pub async fn destroy(&self, namespace: NamespaceName) -> crate::Result<()> { @@ -1078,18 +1097,24 @@ impl Namespace { tokio::fs::create_dir_all(&db_path).await?; - let bottomless_db_id = match bottomless_db_id { + let (bottomless_db_id, is_shared_schema_db) = match bottomless_db_id { NamespaceBottomlessDbId::Namespace(ref db_id) => { let config = &*(meta_store_handle.get()).clone(); + let is_shared_schema_db = config.is_shared_schema; let config = DatabaseConfig { bottomless_db_id: Some(db_id.clone()), ..config.clone() }; meta_store_handle.store(config).await?; - bottomless_db_id + (bottomless_db_id, is_shared_schema_db) } NamespaceBottomlessDbId::NotProvided => { - NamespaceBottomlessDbId::from_config(&meta_store_handle.get()) + let config = &meta_store_handle.get(); + let is_shared_schema_db = config.is_shared_schema; + ( + NamespaceBottomlessDbId::from_config(config), + is_shared_schema_db, + ) } }; @@ -1165,6 +1190,7 @@ impl Namespace { config.max_total_response_size, config.max_concurrent_requests, ) + .shared_schema(is_shared_schema_db) .into(); // this must happen after we create the connection maker. The connection maker old on a