Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared schema execution path #1067

Closed
wants to merge 9 commits into from
173 changes: 172 additions & 1 deletion libsql-server/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use async_trait::async_trait;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::time::{Duration, Instant};
Expand Down Expand Up @@ -30,7 +31,7 @@ const TXN_TIMEOUT: Duration = Duration::from_secs(5);
const TXN_TIMEOUT: Duration = Duration::from_millis(100);

#[async_trait::async_trait]
pub trait Connection: Send + Sync + 'static {
pub trait Connection: Send + Sync + Sized + 'static {
/// Executes a query program
async fn execute_program<B: QueryResultBuilder>(
&self,
Expand Down Expand Up @@ -172,6 +173,13 @@ pub trait MakeConnection: Send + Sync + 'static {
max_concurrent_requests,
)
}

fn shared_schema(self, is_shared_schema: bool) -> MakeSharedSchemaConnection<Self>
where
Self: Sized,
{
MakeSharedSchemaConnection::new(self, is_shared_schema)
}
}

#[async_trait::async_trait]
Expand All @@ -189,6 +197,89 @@ where
}
}

pub struct MakeSharedSchemaConnection<F> {
inner: F,
is_shared_schema: bool,
}

impl<F> MakeSharedSchemaConnection<F> {
pub fn new(inner: F, is_shared_schema: bool) -> Self {
MakeSharedSchemaConnection {
inner,
is_shared_schema,
}
}
}

#[async_trait::async_trait]
impl<F: MakeConnection> MakeConnection for MakeSharedSchemaConnection<F> {
type Connection = PrimaryConnection<F::Connection>;

async fn create(&self) -> Result<Self::Connection, Error> {
let conn = self.inner.create().await?;
Ok(if self.is_shared_schema {
PrimaryConnection::SharedSchema(SharedSchemaConnection::new(conn))
} else {
PrimaryConnection::Tracked(conn)
})
}
}

#[derive(Debug)]
pub struct SharedSchemaConnection<DB> {
inner: DB,
}

impl<DB> SharedSchemaConnection<DB> {
fn new(inner: DB) -> Self {
SharedSchemaConnection { inner }
}
}

#[async_trait::async_trait]
impl<DB: Connection> Connection for SharedSchemaConnection<DB> {
async fn execute_program<B: QueryResultBuilder>(
&self,
pgm: Program,
auth: Authenticated,
response_builder: B,
replication_index: Option<FrameNo>,
) -> Result<B> {
//TODO: execute multi-database execution procedure
todo!()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part will be responsible for executing multi-database execution. We'll need to modify current connection state with data necessary for driving the execution process, dry_runs etc.

}

#[inline]
async fn describe(
&self,
sql: String,
auth: Authenticated,
replication_index: Option<FrameNo>,
) -> Result<Result<DescribeResponse>> {
self.inner.describe(sql, auth, replication_index).await
}

#[inline]
async fn is_autocommit(&self) -> Result<bool> {
self.inner.is_autocommit()
}

#[inline]
async fn checkpoint(&self) -> Result<()> {
self.inner.checkpoint()
}

#[inline]
async fn vacuum_if_needed(&self) -> Result<()> {
self.inner.vacuum_if_needed()
}

#[inline]
fn diagnostics(&self) -> String {
self.inner.diagnostics()
}
}

pub struct MakeThrottledConnection<F> {
semaphore: Arc<Semaphore>,
connection_maker: F,
Expand Down Expand Up @@ -309,6 +400,86 @@ impl<F: MakeConnection> MakeConnection for MakeThrottledConnection<F> {
}
}

pub enum PrimaryConnection<DB> {
Tracked(DB),
SharedSchema(SharedSchemaConnection<DB>),
}

impl<DB> PrimaryConnection<TrackedConnection<DB>> {
pub fn idle_time(&self) -> Duration {
let now = crate::connection::now_millis();
let atime = match self {
PrimaryConnection::Tracked(conn) => conn.atime.load(Ordering::Relaxed),
PrimaryConnection::SharedSchema(conn) => conn.inner.atime.load(Ordering::Relaxed),
};
Duration::from_millis(now.saturating_sub(atime))
}
}

#[async_trait]
impl<DB: Connection> Connection for PrimaryConnection<DB> {
async fn execute_program<B: QueryResultBuilder>(
&self,
pgm: Program,
auth: Authenticated,
response_builder: B,
replication_index: Option<FrameNo>,
) -> Result<B> {
match self {
PrimaryConnection::Tracked(conn) => {
conn.execute_program(pgm, auth, response_builder, replication_index)
.await
}
PrimaryConnection::SharedSchema(conn) => {
conn.execute_program(pgm, auth, response_builder, replication_index)
.await
}
}
}

async fn describe(
&self,
sql: String,
auth: Authenticated,
replication_index: Option<FrameNo>,
) -> Result<Result<DescribeResponse>> {
match self {
PrimaryConnection::Tracked(conn) => conn.describe(sql, auth, replication_index).await,
PrimaryConnection::SharedSchema(conn) => {
conn.describe(sql, auth, replication_index).await
}
}
}

async fn is_autocommit(&self) -> Result<bool> {
match self {
PrimaryConnection::Tracked(conn) => conn.is_autocommit().await,
PrimaryConnection::SharedSchema(conn) => conn.is_autocommit().await,
}
}

async fn checkpoint(&self) -> Result<()> {
match self {
PrimaryConnection::Tracked(conn) => conn.checkpoint().await,
PrimaryConnection::SharedSchema(conn) => conn.checkpoint().await,
}
}

async fn vacuum_if_needed(&self) -> Result<()> {
match self {
PrimaryConnection::Tracked(conn) => conn.vacuum_if_needed().await,
PrimaryConnection::SharedSchema(conn) => conn.vacuum_if_needed().await,
}
}

fn diagnostics(&self) -> String {
match self {
PrimaryConnection::Tracked(conn) => conn.diagnostics(),
PrimaryConnection::SharedSchema(conn) => conn.diagnostics(),
}
}
}

#[derive(Debug)]
pub struct TrackedConnection<DB> {
inner: DB,
Expand Down
10 changes: 4 additions & 6 deletions libsql-server/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use std::sync::Arc;

use async_trait::async_trait;

use crate::connection::libsql::LibSqlConnection;
use crate::connection::write_proxy::{RpcStream, WriteProxyConnection};
use crate::connection::{Connection, MakeConnection, TrackedConnection};
use crate::namespace::replication_wal::{ReplicationWal, ReplicationWalManager};
use async_trait::async_trait;
use std::sync::Arc;

pub type PrimaryConnection = TrackedConnection<LibSqlConnection<ReplicationWal>>;

pub type PrimaryConnection =
crate::connection::PrimaryConnection<TrackedConnection<LibSqlConnection<ReplicationWal>>>;
pub type Result<T> = anyhow::Result<T>;

#[async_trait]
Expand Down
7 changes: 5 additions & 2 deletions libsql-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ pub enum Error {
#[error("Unable to decode protobuf: {0}")]
ProstDecode(#[from] prost::DecodeError),
#[error("Shared schema error: {0}")]
SharedSchemaError(String),
SharedSchemaCreationError(String),
#[error("Cannot remove shared schema database `{0}`: there are {1} databases using it")]
SharedSchemaDestroyError(String, u32),
}

impl AsRef<Self> for Error {
Expand Down Expand Up @@ -180,7 +182,8 @@ impl IntoResponse for &Error {
MetaStoreUpdateFailure(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
Ref(this) => this.as_ref().into_response(),
ProstDecode(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR),
SharedSchemaError(_) => self.format_err(StatusCode::BAD_REQUEST),
SharedSchemaCreationError(_) => self.format_err(StatusCode::BAD_REQUEST),
SharedSchemaDestroyError(_, _) => self.format_err(StatusCode::BAD_REQUEST),
}
}
}
Expand Down
17 changes: 12 additions & 5 deletions libsql-server/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,15 +300,15 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
}
let shared_schema_name = if let Some(ns) = req.shared_schema_name {
if req.shared_schema {
return Err(Error::SharedSchemaError(
return Err(Error::SharedSchemaCreationError(
"shared schema database cannot reference another shared schema".to_string(),
));
}
let namespace = NamespaceName::from_string(ns)?;
if !app_state.namespaces.exists(&namespace).await {
if !app_state.namespaces.exists(&namespace) {
return Err(Error::NamespaceDoesntExist(namespace.to_string()));
}
Some(namespace.to_string())
Some(namespace)
} else {
None
};
Expand All @@ -330,11 +330,11 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
.create(namespace.clone(), dump, bottomless_db_id)
.await?;

let store = app_state.namespaces.config_store(namespace).await?;
let store = app_state.namespaces.config_store(namespace.clone()).await?;
let mut config = (*store.get()).clone();

config.is_shared_schema = req.shared_schema;
config.shared_schema_name = shared_schema_name;
config.shared_schema_name = shared_schema_name.as_ref().map(NamespaceName::to_string);
if let Some(max_db_size) = req.max_db_size {
config.max_db_pages = max_db_size.as_u64() / LIBSQL_PAGE_SIZE;
}
Expand All @@ -353,6 +353,13 @@ async fn handle_create_namespace<M: MakeNamespace, C: Connector>(
config.jwt_key = req.jwt_key;
store.store(config).await?;

if let Some(shared_schema) = shared_schema_name {
app_state
.namespaces
.shared_schema_link(shared_schema, namespace)
.await?;
}

Ok(())
}

Expand Down
Loading
Loading