Skip to content

Commit

Permalink
Metastore: Implement shared schema links
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Jastrzebski <[email protected]>
  • Loading branch information
haaawk committed Feb 25, 2024
1 parent 3323a50 commit f372af5
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 16 deletions.
12 changes: 6 additions & 6 deletions libsql-server/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ struct CreateNamespaceReq {
shared_schema: bool,
/// If some, this is a [NamespaceName] reference to a shared schema DB.
#[serde(default)]
shared_schema_name: Option<String>,
shared_schema_name: Option<NamespaceName>,
}

async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
Expand All @@ -304,11 +304,11 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
"shared schema database cannot reference another shared schema".to_string(),
));
}
let namespace = NamespaceName::from_string(ns)?;
if !app_state.namespaces.exists(&namespace) {
return Err(Error::NamespaceDoesntExist(namespace.to_string()));
// TODO: move this check into meta store
if !app_state.namespaces.exists(&ns) {
return Err(Error::NamespaceDoesntExist(ns.to_string()));
}
Some(namespace.to_string())
Some(ns)
} else {
None
};
Expand All @@ -334,7 +334,7 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
let mut config = (*store.get()).clone();

config.is_shared_schema = req.shared_schema;
config.shared_schema_name = shared_schema_name;
config.shared_schema_name = shared_schema_name.as_ref().map(|x| x.to_string());
if let Some(max_db_size) = req.max_db_size {
config.max_db_pages = max_db_size.as_u64() / LIBSQL_PAGE_SIZE;
}
Expand Down
55 changes: 45 additions & 10 deletions libsql-server/src/namespace/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl MetaStoreInner {
Sqlite3WalManager::default(),
);
let conn = open_conn_active_checkpoint(&db_path, wal_manager.clone(), None, 1000, None)?;
conn.execute("PRAGMA foreign_keys=ON", ())?;
conn.execute(
"CREATE TABLE IF NOT EXISTS namespace_configs (
namespace TEXT NOT NULL PRIMARY KEY,
Expand All @@ -128,6 +129,17 @@ impl MetaStoreInner {
",
(),
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS shared_schema_links (
shared_schema_name TEXT NOT NULL,
namespace TEXT NOT NULL,
PRIMARY KEY (shared_schema_name, namespace),
FOREIGN KEY (shared_schema_name) REFERENCES namespace_configs (namespace) ON DELETE RESTRICT ON UPDATE RESTRICT,
FOREIGN KEY (namespace) REFERENCES namespace_configs (namespace) ON DELETE RESTRICT ON UPDATE RESTRICT
)
",
(),
)?;

let mut this = MetaStoreInner {
configs: Default::default(),
Expand Down Expand Up @@ -249,10 +261,23 @@ fn process(msg: ChangeMsg, inner: Arc<Mutex<MetaStoreInner>>) -> Result<()> {

let inner = &mut inner.lock();

inner.conn.execute(
"INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)",
rusqlite::params![namespace.as_str(), config_encoded],
)?;
if let Some(schema) = config.shared_schema_name.as_ref() {
let tx = inner.conn.transaction()?;
tx.execute(
"INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)",
rusqlite::params![namespace.as_str(), config_encoded],
)?;
tx.execute(
"INSERT OR REPLACE INTO shared_schema_links (shared_schema_name, namespace) VALUES (?1, ?2)",
rusqlite::params![schema.as_str(), namespace.as_str()],
)?;
tx.commit()?;
} else {
inner.conn.execute(
"INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)",
rusqlite::params![namespace.as_str(), config_encoded],
)?;
}

let configs = &mut inner.configs;

Expand Down Expand Up @@ -322,18 +347,28 @@ impl MetaStore {
tracing::debug!("removing namespace `{}` from meta store", namespace);

let mut guard = self.inner.lock();
guard.conn.execute(
"DELETE FROM namespace_configs WHERE namespace = ?",
[namespace.as_str()],
)?;
if let Some(sender) = guard.configs.remove(&namespace) {
let r = if let Some(sender) = guard.configs.get(&namespace) {
tracing::debug!("removed namespace `{}` from meta store", namespace);
let config = sender.borrow().clone();
let tx = guard.conn.transaction()?;
if let Some(shared_schema) = config.config.shared_schema_name.as_deref() {
tx.execute(
"DELETE FROM shared_schema_links WHERE shared_schema_name = ? AND namespace = ?",
[shared_schema, namespace.as_str()],
)?;
}
tx.execute(
"DELETE FROM namespace_configs WHERE namespace = ?",
[namespace.as_str()],
)?;
tx.commit()?;
Ok(Some(config.config))
} else {
tracing::trace!("namespace `{}` not found in meta store", namespace);
Ok(None)
}
};
guard.configs.remove(&namespace);
r
}

// TODO: we need to either make sure that the metastore is restored
Expand Down

0 comments on commit f372af5

Please sign in to comment.