Skip to content

Commit

Permalink
Shared schema (#1080)
Browse files Browse the repository at this point in the history
* Rename SharedSchemaError to SharedSchemaCreationError

Signed-off-by: Piotr Jastrzebski <[email protected]>

* Improve NamespaceStore::exists

Signed-off-by: Piotr Jastrzebski <[email protected]>

* Metastore: Implement shared schema links

Signed-off-by: Piotr Jastrzebski <[email protected]>

---------

Signed-off-by: Piotr Jastrzebski <[email protected]>
  • Loading branch information
haaawk authored Feb 26, 2024
1 parent e60a4e3 commit 82cd274
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 22 deletions.
4 changes: 2 additions & 2 deletions libsql-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub enum Error {
#[error("Unable to decode protobuf: {0}")]
ProstDecode(#[from] prost::DecodeError),
#[error("Shared schema error: {0}")]
SharedSchemaError(String),
SharedSchemaCreationError(String),
}

impl AsRef<Self> for Error {
Expand Down Expand Up @@ -180,7 +180,7 @@ impl IntoResponse for &Error {
MetaStoreUpdateFailure(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
Ref(this) => this.as_ref().into_response(),
ProstDecode(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
SharedSchemaError(_) => self.format_err(StatusCode::BAD_REQUEST),
SharedSchemaCreationError(_) => self.format_err(StatusCode::BAD_REQUEST),
}
}
}
Expand Down
15 changes: 8 additions & 7 deletions libsql-server/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ struct CreateNamespaceReq {
shared_schema: bool,
/// If some, this is a [NamespaceName] reference to a shared schema DB.
#[serde(default)]
shared_schema_name: Option<String>,
shared_schema_name: Option<NamespaceName>,
}

async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
Expand All @@ -300,15 +300,15 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
}
let shared_schema_name = if let Some(ns) = req.shared_schema_name {
if req.shared_schema {
return Err(Error::SharedSchemaError(
return Err(Error::SharedSchemaCreationError(
"shared schema database cannot reference another shared schema".to_string(),
));
}
let namespace = NamespaceName::from_string(ns)?;
if !app_state.namespaces.exists(&namespace).await {
return Err(Error::NamespaceDoesntExist(namespace.to_string()));
// TODO: move this check into meta store
if !app_state.namespaces.exists(&ns) {
return Err(Error::NamespaceDoesntExist(ns.to_string()));
}
Some(namespace.to_string())
Some(ns)
} else {
None
};
Expand All @@ -334,7 +334,7 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
let mut config = (*store.get()).clone();

config.is_shared_schema = req.shared_schema;
config.shared_schema_name = shared_schema_name;
config.shared_schema_name = shared_schema_name.as_ref().map(|x| x.to_string());
if let Some(max_db_size) = req.max_db_size {
config.max_db_pages = max_db_size.as_u64() / LIBSQL_PAGE_SIZE;
}
Expand Down Expand Up @@ -379,6 +379,7 @@ async fn handle_fork_namespace<M: MakeNamespace, C>(
let mut to_config = (*to_store.get()).clone();
to_config.max_db_pages = from_config.max_db_pages;
to_config.heartbeat_url = from_config.heartbeat_url.clone();
to_config.shared_schema_name = from_config.shared_schema_name.clone();
to_store.store(to_config).await?;
Ok(())
}
Expand Down
59 changes: 49 additions & 10 deletions libsql-server/src/namespace/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl MetaStoreInner {
Sqlite3WalManager::default(),
);
let conn = open_conn_active_checkpoint(&db_path, wal_manager.clone(), None, 1000, None)?;
conn.execute("PRAGMA foreign_keys=ON", ())?;
conn.execute(
"CREATE TABLE IF NOT EXISTS namespace_configs (
namespace TEXT NOT NULL PRIMARY KEY,
Expand All @@ -128,6 +129,17 @@ impl MetaStoreInner {
",
(),
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS shared_schema_links (
shared_schema_name TEXT NOT NULL,
namespace TEXT NOT NULL,
PRIMARY KEY (shared_schema_name, namespace),
FOREIGN KEY (shared_schema_name) REFERENCES namespace_configs (namespace) ON DELETE RESTRICT ON UPDATE RESTRICT,
FOREIGN KEY (namespace) REFERENCES namespace_configs (namespace) ON DELETE RESTRICT ON UPDATE RESTRICT
)
",
(),
)?;

let mut this = MetaStoreInner {
configs: Default::default(),
Expand Down Expand Up @@ -249,10 +261,27 @@ fn process(msg: ChangeMsg, inner: Arc<Mutex<MetaStoreInner>>) -> Result<()> {

let inner = &mut inner.lock();

inner.conn.execute(
"INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)",
rusqlite::params![namespace.as_str(), config_encoded],
)?;
if let Some(schema) = config.shared_schema_name.as_ref() {
let tx = inner.conn.transaction()?;
tx.execute(
"INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)",
rusqlite::params![namespace.as_str(), config_encoded],
)?;
tx.execute(
"DELETE FROM shared_schema_links WHERE namespace = ?",
rusqlite::params![namespace.as_str()],
)?;
tx.execute(
"INSERT OR REPLACE INTO shared_schema_links (shared_schema_name, namespace) VALUES (?1, ?2)",
rusqlite::params![schema.as_str(), namespace.as_str()],
)?;
tx.commit()?;
} else {
inner.conn.execute(
"INSERT OR REPLACE INTO namespace_configs (namespace, config) VALUES (?1, ?2)",
rusqlite::params![namespace.as_str(), config_encoded],
)?;
}

let configs = &mut inner.configs;

Expand Down Expand Up @@ -322,18 +351,28 @@ impl MetaStore {
tracing::debug!("removing namespace `{}` from meta store", namespace);

let mut guard = self.inner.lock();
guard.conn.execute(
"DELETE FROM namespace_configs WHERE namespace = ?",
[namespace.as_str()],
)?;
if let Some(sender) = guard.configs.remove(&namespace) {
let r = if let Some(sender) = guard.configs.get(&namespace) {
tracing::debug!("removed namespace `{}` from meta store", namespace);
let config = sender.borrow().clone();
let tx = guard.conn.transaction()?;
if let Some(shared_schema) = config.config.shared_schema_name.as_deref() {
tx.execute(
"DELETE FROM shared_schema_links WHERE shared_schema_name = ? AND namespace = ?",
[shared_schema, namespace.as_str()],
)?;
}
tx.execute(
"DELETE FROM namespace_configs WHERE namespace = ?",
[namespace.as_str()],
)?;
tx.commit()?;
Ok(Some(config.config))
} else {
tracing::trace!("namespace `{}` not found in meta store", namespace);
Ok(None)
}
};
guard.configs.remove(&namespace);
r
}

// TODO: we need to either make sure that the metastore is restored
Expand Down
5 changes: 2 additions & 3 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,9 +458,8 @@ impl<M: MakeNamespace> NamespaceStore<M> {
})
}

pub async fn exists(&self, namespace: &NamespaceName) -> bool {
let e = self.inner.store.get(namespace).await;
e.is_some()
pub fn exists(&self, namespace: &NamespaceName) -> bool {
self.inner.metadata.exists(namespace)
}

pub async fn destroy(&self, namespace: NamespaceName) -> crate::Result<()> {
Expand Down

0 comments on commit 82cd274

Please sign in to comment.