Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema migration #1110

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libsql-server/src/connection/libsql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ impl<W: Wal> Connection<W> {

let mut vm = Vm::new(
builder,
pgm,
&pgm,
move |stmt_kind| {
let should_block = match stmt_kind {
StmtKind::Read | StmtKind::TxnBegin => config.block_reads,
Expand Down
9 changes: 9 additions & 0 deletions libsql-server/src/connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use libsql_replication::rpc::replication::NAMESPACE_METADATA_KEY;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::time::{Duration, Instant};
Expand Down Expand Up @@ -390,6 +391,14 @@ impl<T> Drop for TrackedConnection<T> {
}
}

impl<T> Deref for TrackedConnection<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

#[async_trait::async_trait]
impl<DB: Connection> Connection for TrackedConnection<DB> {
#[inline]
Expand Down
8 changes: 4 additions & 4 deletions libsql-server/src/connection/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,22 +87,22 @@ pub struct DescribeCol {
pub decltype: Option<String>,
}

pub struct Vm<B, F, S> {
pub struct Vm<'a, B, F, S> {
results: Vec<bool>,
builder: B,
program: Program,
program: &'a Program,
current_step: usize,
should_block: F,
update_stats: S,
}

impl<B, F, S> Vm<B, F, S>
impl<'a, B, F, S> Vm<'a, B, F, S>
where
B: QueryResultBuilder,
F: Fn(&StmtKind) -> (bool, Option<String>),
S: Fn(String, &rusqlite::Statement, Duration),
{
pub fn new(builder: B, program: Program, should_block: F, update_stats: S) -> Self {
pub fn new(builder: B, program: &'a Program, should_block: F, update_stats: S) -> Self {
Self {
results: Vec::with_capacity(program.steps().len()),
builder,
Expand Down
8 changes: 8 additions & 0 deletions libsql-server/src/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,12 @@ impl Database {
None
}
}

pub(crate) fn as_schema(&self) -> Option<&SchemaDatabase> {
if let Self::Schema(v) = self {
Some(v)
} else {
None
}
}
}
47 changes: 38 additions & 9 deletions libsql-server/src/database/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,21 @@ use std::sync::Arc;
use crate::connection::program::Program;
use crate::connection::{MakeConnection, RequestContext};
use crate::namespace::NamespaceName;
use crate::schema_migration::SchedulerHandle;
use crate::schema::{perform_migration, SchedulerHandle};

use super::primary::PrimaryConnectionMaker;
use super::PrimaryConnection;

pub struct SchemaConnection {
migration_scheduler: SchedulerHandle,
schema: NamespaceName,
connection: PrimaryConnection,
connection: Arc<PrimaryConnection>,
}

impl SchemaConnection {
pub(crate) fn connection(&self) -> &PrimaryConnection {
&self.connection
}
}

#[async_trait::async_trait]
Expand All @@ -22,15 +28,38 @@ impl crate::connection::Connection for SchemaConnection {
&self,
pgm: Program,
ctx: RequestContext,
response_builder: B,
builder: B,
replication_index: Option<crate::replication::FrameNo>,
) -> crate::Result<B> {
self.migration_scheduler
.register_migration_task(self.schema.clone(), pgm.clone())
.await?;
self.connection
.execute_program(pgm, ctx, response_builder, replication_index)
if pgm.is_read_only() {
self.connection
.execute_program(pgm, ctx, builder, replication_index)
.await
} else {
let connection = self.connection.clone();
let pgm = Arc::new(pgm);
let pgm_clone = pgm.clone();
let ret = tokio::task::spawn_blocking(move || {
connection.with_raw(|conn| -> crate::Result<B> {
let mut txn = conn
.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)
.unwrap();
let (ret, _) = perform_migration(&mut txn, &pgm_clone, true, builder);
txn.rollback().unwrap();
Ok(ret?)
})
})
.await
.unwrap();
// if dry run is successfull, enqueue
self.migration_scheduler
.register_migration_task(self.schema.clone(), pgm)
.await?;

// TODO here wait for dry run to be executed on all dbs

ret
}
}

async fn describe(
Expand Down Expand Up @@ -71,7 +100,7 @@ impl MakeConnection for SchemaDatabase {
type Connection = SchemaConnection;

async fn create(&self) -> crate::Result<Self::Connection, crate::error::Error> {
let connection = self.connection_maker.create().await?;
let connection = Arc::new(self.connection_maker.create().await?);
Ok(SchemaConnection {
migration_scheduler: self.migration_scheduler.clone(),
schema: self.schema.clone(),
Expand Down
2 changes: 1 addition & 1 deletion libsql-server/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub enum Error {
SharedSchemaCreationError(String),

#[error("migration error: {0}")]
Migration(#[from] crate::schema_migration::Error),
Migration(#[from] crate::schema::Error),
}

impl AsRef<Self> for Error {
Expand Down
4 changes: 2 additions & 2 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::rpc::replication_log::rpc::replication_log_server::ReplicationLog;
use crate::rpc::replication_log::ReplicationLogService;
use crate::rpc::replication_log_proxy::ReplicationLogProxyService;
use crate::rpc::run_rpc_server;
use crate::schema_migration::Scheduler;
use crate::schema::Scheduler;
use crate::stats::Stats;
use anyhow::Context as AnyhowContext;
use auth::Auth;
Expand Down Expand Up @@ -67,7 +67,7 @@ mod query;
mod query_analysis;
mod query_result_builder;
mod replication;
mod schema_migration;
mod schema;
mod stats;
#[cfg(test)]
mod test;
Expand Down
Loading
Loading