Skip to content

Commit

Permalink
Add mv_time2version (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
losfair authored Sep 7, 2022
1 parent b9f488d commit e663054
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 10 deletions.
33 changes: 33 additions & 0 deletions mvclient/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ pub struct NamespaceCommitIntent {
pub requests: Vec<CommitRequest>,
}

#[derive(Deserialize)]
pub struct TimeToVersionResponse {
pub after: Option<TimeToVersionPoint>,
pub not_after: Option<TimeToVersionPoint>,
}

#[derive(Deserialize)]
pub struct TimeToVersionPoint {
pub version: String,
pub time: u64,
}

impl MultiVersionClient {
pub fn new(config: MultiVersionClientConfig, client: reqwest::Client) -> Result<Arc<Self>> {
Ok(Arc::new(Self { client, config }))
Expand Down Expand Up @@ -292,6 +304,27 @@ impl MultiVersionClient {
}));
}
}

pub async fn time2version(self: &Arc<Self>, timestamp: u64) -> Result<TimeToVersionResponse> {
let mut url = self.config.random_data_plane().clone();
url.set_path("/time2version");
url.query_pairs_mut()
.append_pair("t", &timestamp.to_string());

let mut boff = RandomizedExponentialBackoff::default();
let res: TimeToVersionResponse = loop {
let resp = request_and_check(self.client.get(url.clone()).decorate(self)).await?;
match resp {
Some((_, body)) => break serde_json::from_slice(&body)?,
None => {
boff.wait().await;
continue;
}
}
};

Ok(res)
}
}

pub struct Transaction {
Expand Down
1 change: 1 addition & 0 deletions mvfs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod types;
pub mod vfs;
pub use mvclient;
pub use vfs::{Connection, MultiVersionVfs};
13 changes: 12 additions & 1 deletion mvfs/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use std::{
},
};

use mvclient::{CommitOutput, MultiVersionClient, MultiVersionClientConfig, Transaction};
use mvclient::{
CommitOutput, MultiVersionClient, MultiVersionClientConfig, TimeToVersionResponse, Transaction,
};

use crate::types::LockKind;
const TRANSITION_HISTORY_SIZE: usize = 10;
Expand Down Expand Up @@ -323,6 +325,15 @@ impl Connection {
self.force_flush_write_buffer().await;
}
}

pub async fn time2version(&mut self, timestamp: u64) -> TimeToVersionResponse {
let res = self
.client
.time2version(timestamp)
.await
.expect("unrecoverable time2version failure");
res
}
}

impl Connection {
Expand Down
64 changes: 55 additions & 9 deletions mvsqlite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ pub extern "C" fn init_mvsqlite() {
#[no_mangle]
pub unsafe extern "C" fn init_mvsqlite_connection(db: *mut sqlite_c::sqlite3) {
let mv_last_known_version_name = b"mv_last_known_version\0";
let mv_time2version_name = b"mv_time2version\0";

let ret = sqlite_c::sqlite3_create_function_v2(
db,
mv_last_known_version_name.as_ptr() as *const i8,
Expand All @@ -152,6 +154,19 @@ pub unsafe extern "C" fn init_mvsqlite_connection(db: *mut sqlite_c::sqlite3) {
None,
);
assert_eq!(ret, sqlite_c::SQLITE_OK);

let ret = sqlite_c::sqlite3_create_function_v2(
db,
mv_time2version_name.as_ptr() as *const i8,
2,
sqlite_c::SQLITE_UTF8 | sqlite_c::SQLITE_DIRECTONLY,
std::ptr::null_mut(),
Some(mv_time2version),
None,
None,
None,
);
assert_eq!(ret, sqlite_c::SQLITE_OK);
}

unsafe extern "C" fn mv_last_known_version(
Expand All @@ -166,13 +181,44 @@ unsafe extern "C" fn mv_last_known_version(
.to_str()
.unwrap();
let conn = get_conn(db, selected_db);
let version = conn.inner.last_known_version().unwrap_or_default();
tracing::info!(selected_db, version, "last known version");
let version = CString::new(version).unwrap();
sqlite_c::sqlite3_result_text(
ctx,
version.as_ptr(),
version.as_bytes().len() as i32,
crate::sqlite_misc::SQLITE_TRANSIENT(),
);

if let Some(version) = conn.inner.last_known_version() {
let version = CString::new(version).unwrap();
sqlite_c::sqlite3_result_text(
ctx,
version.as_ptr(),
version.as_bytes().len() as i32,
crate::sqlite_misc::SQLITE_TRANSIENT(),
);
} else {
sqlite_c::sqlite3_result_null(ctx);
}
}

unsafe extern "C" fn mv_time2version(
ctx: *mut sqlite_c::sqlite3_context,
argc: std::os::raw::c_int,
argv: *mut *mut sqlite_c::sqlite3_value,
) {
assert_eq!(argc, 2);
let db = sqlite_c::sqlite3_context_db_handle(ctx);
let selected_db = sqlite_c::sqlite3_value_text(*argv.add(0));
let selected_db = std::ffi::CStr::from_ptr(selected_db as *const i8)
.to_str()
.unwrap();
let mut conn = get_conn(db, selected_db);
let timestamp_secs = sqlite_c::sqlite3_value_int64(*argv.add(1));
let io = conn.io.clone();
let info = io.run(conn.inner.time2version(timestamp_secs as u64));
if let Some(after) = &info.after {
let version = CString::new(after.version.as_str()).unwrap();
sqlite_c::sqlite3_result_text(
ctx,
version.as_ptr(),
version.as_bytes().len() as i32,
crate::sqlite_misc::SQLITE_TRANSIENT(),
);
} else {
sqlite_c::sqlite3_result_null(ctx);
}
}

0 comments on commit e663054

Please sign in to comment.