Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsql: add durable_frame_num caching and metadata file #1830

Merged
merged 12 commits into from
Nov 22, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ fallible-iterator = { version = "0.3", optional = true }
libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true }
async-stream = { version = "0.3.5", optional = true }

crc32fast = { version = "1", optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] }
pprof = { version = "0.12.1", features = ["criterion", "flamegraph"] }
Expand Down Expand Up @@ -105,6 +107,7 @@ sync = [
"dep:tokio",
"dep:futures",
"dep:serde_json",
"dep:crc32fast",
]
hrana = [
"parser",
Expand Down
16 changes: 9 additions & 7 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ impl Database {
endpoint
};
let mut db = Database::open(&db_path, flags)?;
db.sync_ctx = Some(tokio::sync::Mutex::new(SyncContext::new(
connector,
endpoint,
Some(auth_token),
)));

let sync_ctx =
SyncContext::new(connector, db_path.into(), endpoint, Some(auth_token)).await?;
db.sync_ctx = Some(tokio::sync::Mutex::new(sync_ctx));

Ok(db)
}

Expand Down Expand Up @@ -388,7 +388,7 @@ impl Database {
#[cfg(feature = "sync")]
/// Push WAL frames to remote.
pub async fn push(&self) -> Result<crate::database::Replicated> {
let sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let conn = self.connect()?;

let page_size = {
Expand All @@ -402,7 +402,7 @@ impl Database {

let max_frame_no = conn.wal_frame_count();

let generation = 1; // TODO: Probe from WAL.
let generation = sync_ctx.generation(); // TODO: Probe from WAL.
let start_frame_no = sync_ctx.durable_frame_num() + 1;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we cache durable frame number, we need to be prepared for the scenario where remote database got deleted. IOW. the durable frame number we have here might be higher than what is actually on server side.

let end_frame_no = max_frame_no;

Expand All @@ -423,6 +423,8 @@ impl Database {
frame_no += 1;
}

// TODO(lucio): this can underflow if the server previously returned a higher max_frame_no
// than what we have stored here.
let frame_count = end_frame_no - start_frame_no + 1;
Ok(crate::database::Replicated {
frame_no: None,
Expand Down
223 changes: 211 additions & 12 deletions libsql/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,55 @@
use crate::{util::ConnectorService, Result};

use std::path::Path;

use bytes::Bytes;
use hyper::Body;
use tokio::io::AsyncWriteExt as _;
use uuid::Uuid;

const METADATA_VERSION: u32 = 0;

const DEFAULT_MAX_RETRIES: usize = 5;

pub struct SyncContext {
db_path: String,
client: hyper::Client<ConnectorService, Body>,
sync_url: String,
auth_token: Option<String>,
max_retries: usize,
/// Represents the max_frame_no from the server.
durable_frame_num: u32,
client: hyper::Client<ConnectorService, Body>,
/// Represents the current checkpoint generation.
generation: u32,
}

impl SyncContext {
pub fn new(connector: ConnectorService, sync_url: String, auth_token: Option<String>) -> Self {
// TODO(lucio): add custom connector + tls support here
pub async fn new(
connector: ConnectorService,
db_path: String,
sync_url: String,
auth_token: Option<String>,
) -> Result<Self> {
let client = hyper::client::Client::builder().build::<_, hyper::Body>(connector);

Self {
let mut me = Self {
db_path,
sync_url,
auth_token,
durable_frame_num: 0,
max_retries: DEFAULT_MAX_RETRIES,
client,
}
durable_frame_num: 0,
generation: 1,
};

me.read_metadata().await?;

Ok(me)
}

#[tracing::instrument(skip(self, frame))]
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) async fn push_one_frame(
&self,
&mut self,
frame: Bytes,
generation: u32,
frame_no: u32,
Expand All @@ -39,9 +61,18 @@ impl SyncContext {
frame_no,
frame_no + 1
);
let max_frame_no = self.push_with_retry(uri, frame, self.max_retries).await?;
tracing::debug!("pushing frame");

Ok(max_frame_no)
let durable_frame_num = self.push_with_retry(uri, frame, self.max_retries).await?;

tracing::debug!(?durable_frame_num, "frame successfully pushed");

// Update our last known max_frame_no from the server.
self.durable_frame_num = durable_frame_num;

self.write_metadata().await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to write metadata on every frame? Maybe we could do that after each batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this being fixed in commit de24d82

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I made a mistake it wasn't done in that commit but its done here 26ac07e


Ok(durable_frame_num)
}

async fn push_with_retry(&self, uri: String, frame: Bytes, max_retries: usize) -> Result<u32> {
Expand All @@ -62,9 +93,6 @@ impl SyncContext {
None => {}
}

// TODO(lucio): convert this to use bytes to make this clone cheap, it should be
// to possible use BytesMut when reading frames from the WAL and efficiently use Bytes
// from that.
let req = req.body(frame.clone().into()).expect("valid body");

let res = self.client.request(req).await.unwrap();
Expand Down Expand Up @@ -93,4 +121,175 @@ impl SyncContext {
pub(crate) fn durable_frame_num(&self) -> u32 {
self.durable_frame_num
}

pub(crate) fn generation(&self) -> u32 {
self.generation
}

async fn write_metadata(&mut self) -> Result<()> {
let path = format!("{}-info", self.db_path);

let mut metadata = MetadataJson {
hash: 0,
version: METADATA_VERSION,
durable_frame_num: self.durable_frame_num,
generation: self.generation,
};

metadata.set_hash();

let contents = serde_json::to_vec(&metadata).unwrap();

atomic_write(path, &contents[..]).await.unwrap();

Ok(())
}

async fn read_metadata(&mut self) -> Result<()> {
let path = format!("{}-info", self.db_path);

if !std::fs::exists(&path).unwrap() {
tracing::debug!("no metadata info file found");
return Ok(());
}

let contents = tokio::fs::read(&path).await.unwrap();

let metadata = serde_json::from_slice::<MetadataJson>(&contents[..]).unwrap();

metadata.verify_hash()?;
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved

// TODO(lucio): convert this into a proper error
assert_eq!(
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
metadata.version, METADATA_VERSION,
"Reading metadata from a different version than expected"
);

self.durable_frame_num = metadata.durable_frame_num;
self.generation = metadata.generation;

Ok(())
}
}

#[derive(serde::Serialize, serde::Deserialize)]
struct MetadataJson {
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
hash: u32,
version: u32,
durable_frame_num: u32,
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
generation: u32,
}

impl MetadataJson {
fn calculate_hash(&self) -> u32 {
let mut hasher = crc32fast::Hasher::new();

// Hash each field in a consistent order
hasher.update(&self.version.to_le_bytes());
hasher.update(&self.durable_frame_num.to_le_bytes());
hasher.update(&self.generation.to_le_bytes());

hasher.finalize()
}

fn set_hash(&mut self) {
self.hash = self.calculate_hash();
}

fn verify_hash(&self) -> Result<()> {
let calculated_hash = self.calculate_hash();

if self.hash == calculated_hash {
Ok(())
} else {
// TODO(lucio): convert this into a proper error rather than
// an panic.
panic!(
"metadata hash mismatch, expected={}, got={}",
self.hash, calculated_hash
);
}
}
}

async fn atomic_write<P: AsRef<Path>>(path: P, data: &[u8]) -> Result<()> {
// Create a temporary file in the same directory as the target file
let directory = path.as_ref().parent().unwrap();

let temp_name = format!(".tmp.{}", Uuid::new_v4());
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
let temp_path = directory.join(temp_name);

// Write data to temporary file
let mut temp_file = tokio::fs::File::create(&temp_path).await.unwrap();

temp_file.write_all(data).await.unwrap();

// Ensure all data is flushed to disk
temp_file.sync_all().await.unwrap();

// Close the file explicitly
drop(temp_file);

// Atomically rename temporary file to target file
tokio::fs::rename(&temp_path, &path).await.unwrap();

Ok(())
}

// TODO(lucio): for the tests to work we need proper error handling which
// will be done in follow up.
#[cfg(test)]
mod tests {
use super::*;

#[test]
#[ignore]
fn test_hash_verification() {
let mut metadata = MetadataJson {
hash: 0,
version: 1,
durable_frame_num: 100,
generation: 5,
};

assert!(metadata.verify_hash().is_err());

metadata.set_hash();

assert!(metadata.verify_hash().is_ok());
}

#[test]
#[ignore]
fn test_hash_tampering() {
let mut metadata = MetadataJson {
hash: 0,
version: 1,
durable_frame_num: 100,
generation: 5,
};

// Create metadata with hash
metadata.set_hash();

// Tamper with a field
metadata.version = 2;

// Verify should fail
assert!(metadata.verify_hash().is_err());

metadata.version = 1;
metadata.generation = 42;

assert!(metadata.verify_hash().is_err());

metadata.generation = 5;
metadata.durable_frame_num = 42;

assert!(metadata.verify_hash().is_err());

metadata.durable_frame_num = 100;

assert!(metadata.verify_hash().is_ok());
}
}
Loading