Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsql: add durable_frame_num caching and metadata file #1830

Merged
merged 12 commits into from
Nov 22, 2024
Merged
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ fallible-iterator = { version = "0.3", optional = true }
libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true }
async-stream = { version = "0.3.5", optional = true }

crc32fast = { version = "1", optional = true }
chrono = { version = "0.4", optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] }
pprof = { version = "0.12.1", features = ["criterion", "flamegraph"] }
Expand Down Expand Up @@ -105,6 +108,10 @@ sync = [
"dep:tokio",
"dep:futures",
"dep:serde_json",
"dep:crc32fast",
"dep:chrono",
"dep:uuid",
"tokio/fs"
]
hrana = [
"parser",
Expand Down
7 changes: 7 additions & 0 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ cfg_core! {
}

cfg_replication_or_sync! {

pub type FrameNo = u64;

#[derive(Debug)]
// TODO(lucio): remove this once we use these fields in our sync code
#[allow(dead_code)]
haaawk marked this conversation as resolved.
Show resolved Hide resolved
pub struct Replicated {
pub(crate) frame_no: Option<FrameNo>,
pub(crate) frames_synced: usize,
Expand All @@ -47,12 +50,16 @@ cfg_replication_or_sync! {
/// where in the log you might be. Beware that this value can be reset to a lower value by the
/// server in certain situations. Please use `frames_synced` if you want to track the amount of
/// work a sync has done.
// TODO(lucio): remove this once we use these fields in our sync code
#[allow(dead_code)]
pub fn frame_no(&self) -> Option<FrameNo> {
self.frame_no
}

/// The count of frames synced during this call of `sync`. A frame is a 4kB frame from the
/// libsql write ahead log.
// TODO(lucio): remove this once we use these fields in our sync code
#[allow(dead_code)]
pub fn frames_synced(&self) -> usize {
self.frames_synced
}
Expand Down
4 changes: 2 additions & 2 deletions libsql/src/database/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Builder<()> {
}
}

cfg_replication_or_remote! {
cfg_replication_or_remote_or_sync! {
/// Remote configuration type used in [`Builder`].
pub struct Remote {
url: String,
Expand Down Expand Up @@ -505,7 +505,7 @@ cfg_remote! {
}
}

cfg_replication_or_remote! {
cfg_replication_or_remote_or_sync! {
impl Remote {
fn connector<C>(mut self, connector: C) -> Remote
where
Expand Down
9 changes: 9 additions & 0 deletions libsql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub enum Error {
TransactionalBatchError(String),
#[error("Invalid blob size, expected {0}")]
InvalidBlobSize(usize),
#[error("sync error: {0}")]
Sync(crate::BoxError),
}

#[cfg(feature = "hrana")]
Expand All @@ -64,6 +66,13 @@ impl From<crate::hrana::HranaError> for Error {
}
}

#[cfg(feature = "sync")]
impl From<crate::sync::SyncError> for Error {
fn from(e: crate::sync::SyncError) -> Self {
Error::Sync(e.into())
}
}

impl From<std::convert::Infallible> for Error {
fn from(_: std::convert::Infallible) -> Self {
unreachable!()
Expand Down
18 changes: 11 additions & 7 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ impl Database {
endpoint
};
let mut db = Database::open(&db_path, flags)?;
db.sync_ctx = Some(tokio::sync::Mutex::new(SyncContext::new(
connector,
endpoint,
Some(auth_token),
)));

let sync_ctx =
SyncContext::new(connector, db_path.into(), endpoint, Some(auth_token)).await?;
db.sync_ctx = Some(tokio::sync::Mutex::new(sync_ctx));

Ok(db)
}

Expand Down Expand Up @@ -388,7 +388,7 @@ impl Database {
#[cfg(feature = "sync")]
/// Push WAL frames to remote.
pub async fn push(&self) -> Result<crate::database::Replicated> {
let sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let conn = self.connect()?;

let page_size = {
Expand All @@ -402,7 +402,7 @@ impl Database {

let max_frame_no = conn.wal_frame_count();

let generation = 1; // TODO: Probe from WAL.
let generation = sync_ctx.generation(); // TODO: Probe from WAL.
let start_frame_no = sync_ctx.durable_frame_num() + 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we cache durable frame number, we need to be prepared for the scenario where remote database got deleted. IOW. the durable frame number we have here might be higher than what is actually on server side.

let end_frame_no = max_frame_no;

Expand All @@ -423,6 +423,10 @@ impl Database {
frame_no += 1;
}

sync_ctx.write_metadata().await?;

// TODO(lucio): this can underflow if the server previously returned a higher max_frame_no
// than what we have stored here.
let frame_count = end_frame_no - start_frame_no + 1;
Ok(crate::database::Replicated {
frame_no: None,
Expand Down
2 changes: 1 addition & 1 deletion libsql/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ macro_rules! cfg_core {
}
}

macro_rules! cfg_replication_or_remote {
macro_rules! cfg_replication_or_remote_or_sync {
($($item:item)*) => {
$(
#[cfg(any(feature = "replication", feature = "sync", feature = "remote"))]
Expand Down
Loading
Loading