Skip to content

Commit

Permalink
happy path schema migration
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Mar 4, 2024
1 parent 7f9e289 commit 23838ee
Show file tree
Hide file tree
Showing 36 changed files with 1,516 additions and 198 deletions.
2 changes: 1 addition & 1 deletion libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ impl<W: Wal> Connection<W> {

let mut vm = Vm::new(
builder,
pgm,
&pgm,
move |stmt_kind| {
let should_block = match stmt_kind {
StmtKind::Read | StmtKind::TxnBegin => config.block_reads,
Expand Down
9 changes: 9 additions & 0 deletions libsql-server/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use libsql_replication::rpc::replication::NAMESPACE_METADATA_KEY;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::time::{Duration, Instant};
Expand Down Expand Up @@ -390,6 +391,14 @@ impl<T> Drop for TrackedConnection<T> {
}
}

impl<T> Deref for TrackedConnection<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

#[async_trait::async_trait]
impl<DB: Connection> Connection for TrackedConnection<DB> {
#[inline]
Expand Down
8 changes: 4 additions & 4 deletions libsql-server/src/connection/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,22 @@ pub struct DescribeCol {
pub decltype: Option<String>,
}

pub struct Vm<B, F, S> {
pub struct Vm<'a, B, F, S> {
results: Vec<bool>,
builder: B,
program: Program,
program: &'a Program,
current_step: usize,
should_block: F,
update_stats: S,
}

impl<B, F, S> Vm<B, F, S>
impl<'a, B, F, S> Vm<'a, B, F, S>
where
B: QueryResultBuilder,
F: Fn(&StmtKind) -> (bool, Option<String>),
S: Fn(String, &rusqlite::Statement, Duration),
{
pub fn new(builder: B, program: Program, should_block: F, update_stats: S) -> Self {
pub fn new(builder: B, program: &'a Program, should_block: F, update_stats: S) -> Self {
Self {
results: Vec::with_capacity(program.steps().len()),
builder,
Expand Down
8 changes: 8 additions & 0 deletions libsql-server/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,12 @@ impl Database {
None
}
}

pub(crate) fn as_schema(&self) -> Option<&SchemaDatabase> {
if let Self::Schema(v) = self {
Some(v)
} else {
None
}
}
}
47 changes: 38 additions & 9 deletions libsql-server/src/database/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ use std::sync::Arc;
use crate::connection::program::Program;
use crate::connection::{MakeConnection, RequestContext};
use crate::namespace::NamespaceName;
use crate::schema_migration::SchedulerHandle;
use crate::schema::{perform_migration, SchedulerHandle};

use super::primary::PrimaryConnectionMaker;
use super::PrimaryConnection;

pub struct SchemaConnection {
migration_scheduler: SchedulerHandle,
schema: NamespaceName,
connection: PrimaryConnection,
connection: Arc<PrimaryConnection>,
}

impl SchemaConnection {
pub(crate) fn connection(&self) -> &PrimaryConnection {
&self.connection
}
}

#[async_trait::async_trait]
Expand All @@ -22,15 +28,38 @@ impl crate::connection::Connection for SchemaConnection {
&self,
pgm: Program,
ctx: RequestContext,
response_builder: B,
builder: B,
replication_index: Option<crate::replication::FrameNo>,
) -> crate::Result<B> {
self.migration_scheduler
.register_migration_task(self.schema.clone(), pgm.clone())
.await?;
self.connection
.execute_program(pgm, ctx, response_builder, replication_index)
if pgm.is_read_only() {
self.connection
.execute_program(pgm, ctx, builder, replication_index)
.await
} else {
let connection = self.connection.clone();
let pgm = Arc::new(pgm);
let pgm_clone = pgm.clone();
let ret = tokio::task::spawn_blocking(move || {
connection.with_raw(|conn| -> crate::Result<B> {
let mut txn = conn
.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
.unwrap();
let (ret, _) = perform_migration(&mut txn, &pgm_clone, true, builder);
txn.rollback().unwrap();
Ok(ret?)
})
})
.await
.unwrap();
// if dry run is successfull, enqueue
self.migration_scheduler
.register_migration_task(self.schema.clone(), pgm)
.await?;

// TODO here wait for dry run to be executed on all dbs

ret
}
}

async fn describe(
Expand Down Expand Up @@ -71,7 +100,7 @@ impl MakeConnection for SchemaDatabase {
type Connection = SchemaConnection;

async fn create(&self) -> crate::Result<Self::Connection, crate::error::Error> {
let connection = self.connection_maker.create().await?;
let connection = Arc::new(self.connection_maker.create().await?);
Ok(SchemaConnection {
migration_scheduler: self.migration_scheduler.clone(),
schema: self.schema.clone(),
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub enum Error {
SharedSchemaCreationError(String),

#[error("migration error: {0}")]
Migration(#[from] crate::schema_migration::Error),
Migration(#[from] crate::schema::Error),
}

impl AsRef<Self> for Error {
Expand Down
4 changes: 2 additions & 2 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::rpc::replication_log::rpc::replication_log_server::ReplicationLog;
use crate::rpc::replication_log::ReplicationLogService;
use crate::rpc::replication_log_proxy::ReplicationLogProxyService;
use crate::rpc::run_rpc_server;
use crate::schema_migration::Scheduler;
use crate::schema::Scheduler;
use crate::stats::Stats;
use anyhow::Context as AnyhowContext;
use auth::Auth;
Expand Down Expand Up @@ -67,7 +67,7 @@ mod query;
mod query_analysis;
mod query_result_builder;
mod replication;
mod schema_migration;
mod schema;
mod stats;
#[cfg(test)]
mod test;
Expand Down
Loading

0 comments on commit 23838ee

Please sign in to comment.