From e6630544a3b5b951ab6a55237200b74a6ac76625 Mon Sep 17 00:00:00 2001 From: Heyang Zhou Date: Wed, 7 Sep 2022 20:46:37 +0800 Subject: [PATCH] Add mv_time2version (#53) --- mvclient/src/lib.rs | 33 +++++++++++++++++++++++ mvfs/src/lib.rs | 1 + mvfs/src/vfs.rs | 13 ++++++++- mvsqlite/src/lib.rs | 64 ++++++++++++++++++++++++++++++++++++++------- 4 files changed, 101 insertions(+), 10 deletions(-) diff --git a/mvclient/src/lib.rs b/mvclient/src/lib.rs index f8869c2..9fe4b0b 100644 --- a/mvclient/src/lib.rs +++ b/mvclient/src/lib.rs @@ -138,6 +138,18 @@ pub struct NamespaceCommitIntent { pub requests: Vec, } +#[derive(Deserialize)] +pub struct TimeToVersionResponse { + pub after: Option, + pub not_after: Option, +} + +#[derive(Deserialize)] +pub struct TimeToVersionPoint { + pub version: String, + pub time: u64, +} + impl MultiVersionClient { pub fn new(config: MultiVersionClientConfig, client: reqwest::Client) -> Result> { Ok(Arc::new(Self { client, config })) @@ -292,6 +304,27 @@ impl MultiVersionClient { })); } } + + pub async fn time2version(self: &Arc, timestamp: u64) -> Result { + let mut url = self.config.random_data_plane().clone(); + url.set_path("/time2version"); + url.query_pairs_mut() + .append_pair("t", ×tamp.to_string()); + + let mut boff = RandomizedExponentialBackoff::default(); + let res: TimeToVersionResponse = loop { + let resp = request_and_check(self.client.get(url.clone()).decorate(self)).await?; + match resp { + Some((_, body)) => break serde_json::from_slice(&body)?, + None => { + boff.wait().await; + continue; + } + } + }; + + Ok(res) + } } pub struct Transaction { diff --git a/mvfs/src/lib.rs b/mvfs/src/lib.rs index 2e06672..a9ba697 100644 --- a/mvfs/src/lib.rs +++ b/mvfs/src/lib.rs @@ -1,3 +1,4 @@ pub mod types; pub mod vfs; +pub use mvclient; pub use vfs::{Connection, MultiVersionVfs}; diff --git a/mvfs/src/vfs.rs b/mvfs/src/vfs.rs index 8ded804..9a4eb54 100644 --- a/mvfs/src/vfs.rs +++ b/mvfs/src/vfs.rs @@ -8,7 +8,9 @@ use std::{ }, }; -use mvclient::{CommitOutput, MultiVersionClient, MultiVersionClientConfig, Transaction}; +use mvclient::{ + CommitOutput, MultiVersionClient, MultiVersionClientConfig, TimeToVersionResponse, Transaction, +}; use crate::types::LockKind; const TRANSITION_HISTORY_SIZE: usize = 10; @@ -323,6 +325,15 @@ impl Connection { self.force_flush_write_buffer().await; } } + + pub async fn time2version(&mut self, timestamp: u64) -> TimeToVersionResponse { + let res = self + .client + .time2version(timestamp) + .await + .expect("unrecoverable time2version failure"); + res + } } impl Connection { diff --git a/mvsqlite/src/lib.rs b/mvsqlite/src/lib.rs index 9ed4cd4..e62c683 100644 --- a/mvsqlite/src/lib.rs +++ b/mvsqlite/src/lib.rs @@ -140,6 +140,8 @@ pub extern "C" fn init_mvsqlite() { #[no_mangle] pub unsafe extern "C" fn init_mvsqlite_connection(db: *mut sqlite_c::sqlite3) { let mv_last_known_version_name = b"mv_last_known_version\0"; + let mv_time2version_name = b"mv_time2version\0"; + let ret = sqlite_c::sqlite3_create_function_v2( db, mv_last_known_version_name.as_ptr() as *const i8, @@ -152,6 +154,19 @@ pub unsafe extern "C" fn init_mvsqlite_connection(db: *mut sqlite_c::sqlite3) { None, ); assert_eq!(ret, sqlite_c::SQLITE_OK); + + let ret = sqlite_c::sqlite3_create_function_v2( + db, + mv_time2version_name.as_ptr() as *const i8, + 2, + sqlite_c::SQLITE_UTF8 | sqlite_c::SQLITE_DIRECTONLY, + std::ptr::null_mut(), + Some(mv_time2version), + None, + None, + None, + ); + assert_eq!(ret, sqlite_c::SQLITE_OK); } unsafe extern "C" fn mv_last_known_version( @@ -166,13 +181,44 @@ unsafe extern "C" fn mv_last_known_version( .to_str() .unwrap(); let conn = get_conn(db, selected_db); - let version = conn.inner.last_known_version().unwrap_or_default(); - tracing::info!(selected_db, version, "last known version"); - let version = CString::new(version).unwrap(); - sqlite_c::sqlite3_result_text( - ctx, - version.as_ptr(), - version.as_bytes().len() as i32, - crate::sqlite_misc::SQLITE_TRANSIENT(), - ); + + if let Some(version) = conn.inner.last_known_version() { + let version = CString::new(version).unwrap(); + sqlite_c::sqlite3_result_text( + ctx, + version.as_ptr(), + version.as_bytes().len() as i32, + crate::sqlite_misc::SQLITE_TRANSIENT(), + ); + } else { + sqlite_c::sqlite3_result_null(ctx); + } +} + +unsafe extern "C" fn mv_time2version( + ctx: *mut sqlite_c::sqlite3_context, + argc: std::os::raw::c_int, + argv: *mut *mut sqlite_c::sqlite3_value, +) { + assert_eq!(argc, 2); + let db = sqlite_c::sqlite3_context_db_handle(ctx); + let selected_db = sqlite_c::sqlite3_value_text(*argv.add(0)); + let selected_db = std::ffi::CStr::from_ptr(selected_db as *const i8) + .to_str() + .unwrap(); + let mut conn = get_conn(db, selected_db); + let timestamp_secs = sqlite_c::sqlite3_value_int64(*argv.add(1)); + let io = conn.io.clone(); + let info = io.run(conn.inner.time2version(timestamp_secs as u64)); + if let Some(after) = &info.after { + let version = CString::new(after.version.as_str()).unwrap(); + sqlite_c::sqlite3_result_text( + ctx, + version.as_ptr(), + version.as_bytes().len() as i32, + crate::sqlite_misc::SQLITE_TRANSIENT(), + ); + } else { + sqlite_c::sqlite3_result_null(ctx); + } }