Skip to content

Commit

Permalink
Metastore: Implement shared schema links
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Jastrzebski <[email protected]>
  • Loading branch information
haaawk committed Feb 24, 2024
1 parent b4bb21a commit fb96070
Showing 1 changed file with 81 additions and 7 deletions.
88 changes: 81 additions & 7 deletions libsql-server/src/namespace/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use super::NamespaceName;

enum ChangeMsg {
ChangeConfig(NamespaceName, Arc<DatabaseConfig>),
// Namespace Name, Namespace Config, Namespace Schema Name
ChangeConfigAndSchema(NamespaceName, Arc<DatabaseConfig>, NamespaceName),
}
type WalManager = WalWrapper<Option<BottomlessWalWrapper>, Sqlite3WalManager>;
type Connection = libsql_sys::Connection<WrappedWal<Option<BottomlessWalWrapper>, Sqlite3Wal>>;
Expand Down Expand Up @@ -130,6 +132,17 @@ impl MetaStoreInner {
",
(),
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS shared_schema_links (
shared_schema_name TEXT NOT NULL,
db_name TEXT NOT NULL,
PRIMARY KEY (shared_schema_name, db_name),
FOREIGN KEY (shared_schema_name) REFERENCES namespace_configs (namespace),
FOREIGN KEY (db_name) REFERENCES namespace_configs (namespace)
)
",
(),
)?;

let mut this = MetaStoreInner {
configs: Default::default(),
Expand Down Expand Up @@ -258,6 +271,41 @@ fn process(msg: ChangeMsg, inner: Arc<Mutex<MetaStoreInner>>) -> Result<()> {

let configs = &mut inner.configs;

if let Some(config_watch) = configs.get_mut(&namespace) {
let new_version = config_watch.borrow().version.wrapping_add(1);

config_watch.send_modify(|c| {
*c = InnerConfig {
version: new_version,
config,
};
});
} else {
let (tx, _) = watch::channel(InnerConfig { version: 0, config });
configs.insert(namespace, tx);
}
}
ChangeMsg::ChangeConfigAndSchema(namespace, config, schema) => {
let config_encoded = metadata::DatabaseConfig::from(&*config).encode_to_vec();

let inner = &mut inner.lock();

let tx = inner.conn.transaction()?;

tx.execute(
"INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)",
rusqlite::params![namespace.as_str(), config_encoded],
)?;

tx.execute(
"INSERT OR REPLACE INTO shared_schema_links (shared_schema_name, db_name) VALUES (?1, ?2)",
rusqlite::params![schema.as_str(), namespace.as_str()],
)?;

tx.commit()?;

let configs = &mut inner.configs;

if let Some(config_watch) = configs.get_mut(&namespace) {
let new_version = config_watch.borrow().version.wrapping_add(1);

Expand Down Expand Up @@ -326,18 +374,40 @@ impl MetaStore {
tracing::debug!("removing namespace `{}` from meta store", namespace);

let mut guard = self.inner.lock();
guard.conn.execute(
"DELETE FROM namespace_configs WHERE namespace = ?",
[namespace.as_str()],
)?;
if let Some(sender) = guard.configs.remove(&namespace) {
let r = if let Some(sender) = guard.configs.get(&namespace) {
tracing::debug!("removed namespace `{}` from meta store", namespace);
let config = sender.borrow().clone();
let tx = guard.conn.transaction()?;
if let Some(shared_schema) = config.config.shared_schema_name.as_deref() {
tx.execute(
"DELETE FROM shared_schema_links WHERE shared_schema_name = ? AND db_name = ?",
[shared_schema, namespace.as_str()],
)?;
}
if config.config.is_shared_schema {
let res: rusqlite::Result<u32> = tx.query_row(
"SELECT COUNT(*) FROM shared_schema_links WHERE shared_schema_name = ?",
[namespace.as_str()],
|row| row.get(0),
);
match res {
Ok(0) | Err(rusqlite::Error::QueryReturnedNoRows) => { /* ok */ }
Ok(n) => return Err(Error::SharedSchemaDestroyError(namespace.to_string(), n)),
Err(e) => return Err(e.into()),
}
}
tx.execute(
"DELETE FROM namespace_configs WHERE namespace = ?",
[namespace.as_str()],
)?;
tx.commit()?;
Ok(Some(config.config))
} else {
tracing::trace!("namespace `{}` not found in meta store", namespace);
Ok(None)
}
};
guard.configs.remove(&namespace);
r
}

// TODO: we need to either make sure that the metastore is restored
Expand Down Expand Up @@ -371,7 +441,11 @@ impl MetaStore {
let changed = rx.changed();

self.changes_tx
.send(ChangeMsg::ChangeConfig(namespace.clone(), config.into()))
.send(ChangeMsg::ChangeConfigAndSchema(
namespace.clone(),
config.into(),
schema,
))
.await
.map_err(|e| Error::MetaStoreUpdateFailure(e.into()))?;

Expand Down

0 comments on commit fb96070

Please sign in to comment.