Skip to content

Commit

Permalink
shared schema db: save/validate shared schema link on database create…
Browse files Browse the repository at this point in the history
…/destroy
  • Loading branch information
Horusiath committed Feb 21, 2024
1 parent e6d075b commit 81d5de0
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 38 deletions.
7 changes: 5 additions & 2 deletions libsql-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ pub enum Error {
#[error("Unable to decode protobuf: {0}")]
ProstDecode(#[from] prost::DecodeError),
#[error("Shared schema error: {0}")]
SharedSchemaError(String),
SharedSchemaCreationError(String),
#[error("Cannot remove shared schema database `{0}`: there are {1} databases using it")]
SharedSchemaDestroyError(String, u32),
}

impl AsRef<Self> for Error {
Expand Down Expand Up @@ -180,7 +182,8 @@ impl IntoResponse for &Error {
MetaStoreUpdateFailure(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
Ref(this) => this.as_ref().into_response(),
ProstDecode(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
SharedSchemaError(_) => self.format_err(StatusCode::BAD_REQUEST),
SharedSchemaCreationError(_) => self.format_err(StatusCode::BAD_REQUEST),
SharedSchemaDestroyError(_, _) => self.format_err(StatusCode::BAD_REQUEST),
}
}
}
Expand Down
17 changes: 12 additions & 5 deletions libsql-server/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,15 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
}
let shared_schema_name = if let Some(ns) = req.shared_schema_name {
if req.shared_schema {
return Err(Error::SharedSchemaError(
return Err(Error::SharedSchemaCreationError(
"shared schema database cannot reference another shared schema".to_string(),
));
}
let namespace = NamespaceName::from_string(ns)?;
if !app_state.namespaces.exists(&namespace).await {
if !app_state.namespaces.exists(&namespace) {
return Err(Error::NamespaceDoesntExist(namespace.to_string()));
}
Some(namespace.to_string())
Some(namespace)
} else {
None
};
Expand All @@ -330,11 +330,11 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
.create(namespace.clone(), dump, bottomless_db_id)
.await?;

let store = app_state.namespaces.config_store(namespace).await?;
let store = app_state.namespaces.config_store(namespace.clone()).await?;
let mut config = (*store.get()).clone();

config.is_shared_schema = req.shared_schema;
config.shared_schema_name = shared_schema_name;
config.shared_schema_name = shared_schema_name.as_ref().map(NamespaceName::to_string);
if let Some(max_db_size) = req.max_db_size {
config.max_db_pages = max_db_size.as_u64() / LIBSQL_PAGE_SIZE;
}
Expand All @@ -353,6 +353,13 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
config.jwt_key = req.jwt_key;
store.store(config).await?;

if let Some(shared_schema) = shared_schema_name {
app_state
.namespaces
.shared_schema_link(shared_schema, namespace)
.await?;
}

Ok(())
}

Expand Down
110 changes: 82 additions & 28 deletions libsql-server/src/namespace/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use crate::{

use super::NamespaceName;

type ChangeMsg = (NamespaceName, Arc<DatabaseConfig>);
pub enum ChangeMsg {
ChangeConfig(NamespaceName, Arc<DatabaseConfig>),
Link(NamespaceName, NamespaceName),
}
type WalManager = WalWrapper<Option<BottomlessWalWrapper>, Sqlite3WalManager>;
type Connection = libsql_sys::Connection<WrappedWal<Option<BottomlessWalWrapper>, Sqlite3Wal>>;

Expand Down Expand Up @@ -127,6 +130,15 @@ impl MetaStoreInner {
",
(),
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS shared_schema_links (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
shared_schema_name TEXT NOT NULL,
db_name TEXT NOT NULL
)
",
(),
)?;

let mut this = MetaStoreInner {
configs: Default::default(),
Expand Down Expand Up @@ -239,34 +251,41 @@ impl MetaStoreInner {
}
}

/// Handles config change updates by inserting them into the database and in-memory
/// cache of configs.
fn process(msg: ChangeMsg, inner: Arc<Mutex<MetaStoreInner>>) -> Result<()> {
let (namespace, config) = msg;
match msg {
ChangeMsg::ChangeConfig(namespace, config) => {
let config_encoded = metadata::DatabaseConfig::from(&*config).encode_to_vec();

let config_encoded = metadata::DatabaseConfig::from(&*config).encode_to_vec();
let inner = &mut inner.lock();

let inner = &mut inner.lock();
inner.conn.execute(
"INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)",
rusqlite::params![namespace.as_str(), config_encoded],
)?;

inner.conn.execute(
"INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)",
rusqlite::params![namespace.as_str(), config_encoded],
)?;
let configs = &mut inner.configs;

let configs = &mut inner.configs;
if let Some(config_watch) = configs.get_mut(&namespace) {
let new_version = config_watch.borrow().version.wrapping_add(1);

if let Some(config_watch) = configs.get_mut(&namespace) {
let new_version = config_watch.borrow().version.wrapping_add(1);

config_watch.send_modify(|c| {
*c = InnerConfig {
version: new_version,
config,
};
});
} else {
let (tx, _) = watch::channel(InnerConfig { version: 0, config });
configs.insert(namespace, tx);
config_watch.send_modify(|c| {
*c = InnerConfig {
version: new_version,
config,
};
});
} else {
let (tx, _) = watch::channel(InnerConfig { version: 0, config });
configs.insert(namespace, tx);
}
}
ChangeMsg::Link(parent, child) => {
let inner = &mut inner.lock();
inner.conn.execute(
"INSERT OR REPLACE INTO shared_schema_links (shared_schema_name, db_name) VALUES (?1, ?2)",
rusqlite::params![parent.as_str(), child.as_str()],
)?;
}
}

Ok(())
Expand Down Expand Up @@ -321,13 +340,32 @@ impl MetaStore {
tracing::debug!("removing namespace `{}` from meta store", namespace);

let mut guard = self.inner.lock();
guard.conn.execute(
"DELETE FROM namespace_configs WHERE namespace = ?",
[namespace.as_str()],
)?;
if let Some(sender) = guard.configs.remove(&namespace) {
tracing::debug!("removed namespace `{}` from meta store", namespace);
let config = sender.borrow().clone();
if let Some(shared_schema) = config.config.shared_schema_name.as_deref() {
guard.conn.execute(
"DELETE FROM shared_schema_links WHERE shared_schema_name = ? AND db_name = ?",
[shared_schema, namespace.as_str()],
)?;
}
if config.config.is_shared_schema {
let res: rusqlite::Result<u32> = guard.conn.query_row(
"SELECT COUNT(1) FROM shared_schema_links WHERE shared_schema_name = ?",
[namespace.as_str()],
|row| row.get(0),
);
match res {
Ok(0) | Err(rusqlite::Error::QueryReturnedNoRows) => { /* ok */ }
Ok(n) => return Err(Error::SharedSchemaDestroyError(namespace.to_string(), n)),
Err(e) => return Err(e.into()),
}
}
guard.conn.execute(
"DELETE FROM namespace_configs WHERE namespace = ?",
[namespace.as_str()],
)?;

Ok(Some(config.config))
} else {
tracing::trace!("namespace `{}` not found in meta store", namespace);
Expand Down Expand Up @@ -409,6 +447,22 @@ impl MetaStoreHandle {
}
}

pub async fn link(&self, shared_schema: NamespaceName, linked_db: NamespaceName) -> Result<()> {
match &self.inner {
HandleState::Internal(_) => {
//FIXME: what is this even for?
}
HandleState::External(changes_tx, _) => {
changes_tx
.send(ChangeMsg::Link(shared_schema, linked_db))
.await
.map_err(|e| Error::MetaStoreUpdateFailure(e.into()))?;
}
};

Ok(())
}

pub async fn store(&self, new_config: impl Into<Arc<DatabaseConfig>>) -> Result<()> {
match &self.inner {
HandleState::Internal(config) => {
Expand All @@ -421,7 +475,7 @@ impl MetaStoreHandle {
let changed = c.changed();

changes_tx
.send((self.namespace.clone(), new_config))
.send(ChangeMsg::ChangeConfig(self.namespace.clone(), new_config))
.await
.map_err(|e| Error::MetaStoreUpdateFailure(e.into()))?;

Expand Down
25 changes: 22 additions & 3 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,28 @@ impl<M: MakeNamespace> NamespaceStore<M> {
})
}

pub async fn exists(&self, namespace: &NamespaceName) -> bool {
let e = self.inner.store.get(namespace).await;
e.is_some()
pub(crate) async fn shared_schema_link(
&self,
shared_schema_db: NamespaceName,
linked_db: NamespaceName,
) -> crate::Result<()> {
let ns = self
.inner
.make_namespace
.create(
shared_schema_db.clone(),
RestoreOption::Latest,
NamespaceBottomlessDbId::NotProvided,
self.make_reset_cb(),
&self.inner.metadata,
)
.await?;
ns.db_config_store.link(shared_schema_db, linked_db).await?;
Ok(())
}

pub fn exists(&self, namespace: &NamespaceName) -> bool {
self.inner.metadata.exists(namespace)
}

pub async fn destroy(&self, namespace: NamespaceName) -> crate::Result<()> {
Expand Down

0 comments on commit 81d5de0

Please sign in to comment.