From cea53b29625f383ebdb17c1a97914b1ebe605753 Mon Sep 17 00:00:00 2001 From: losfair Date: Mon, 26 Sep 2022 13:22:35 +0000 Subject: [PATCH] Add t=0 special case for time2version. --- mvstore/src/main.rs | 1 + mvstore/src/server.rs | 74 ++++--------------------- mvstore/src/time2version.rs | 104 ++++++++++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+), 64 deletions(-) create mode 100644 mvstore/src/time2version.rs diff --git a/mvstore/src/main.rs b/mvstore/src/main.rs index cdfdd03..94f3d4d 100644 --- a/mvstore/src/main.rs +++ b/mvstore/src/main.rs @@ -8,6 +8,7 @@ mod page; mod replica; mod server; mod stat; +mod time2version; mod util; mod write; diff --git a/mvstore/src/server.rs b/mvstore/src/server.rs index b61dcdf..606960f 100644 --- a/mvstore/src/server.rs +++ b/mvstore/src/server.rs @@ -1,7 +1,6 @@ use anyhow::{Context, Result}; use bytes::{Bytes, BytesMut}; use foundationdb::{ - future::FdbValues, options::{MutationType, StreamingMode, TransactionOption}, tuple::unpack, Database, FdbError, RangeOption, Transaction, @@ -35,6 +34,7 @@ use crate::{ lock::DistributedLock, page::Page, replica::ReplicaManager, + time2version::time2version, util::{decode_version, generate_suffix_versionstamp_atomic_op}, write::{WriteApplier, WriteApplierContext, WriteRequest, WriteResponse}, }; @@ -120,18 +120,6 @@ pub struct CommitResponse { pub changelog: HashMap>, } -#[derive(Serialize)] -pub struct TimeToVersionResponse { - pub after: Option, - pub not_after: Option, -} - -#[derive(Serialize)] -pub struct TimeToVersionPoint { - pub version: String, - pub time: u64, -} - #[derive(Deserialize)] pub struct AdminCreateNamespaceRequest { pub key: String, @@ -1256,63 +1244,21 @@ impl Server { .with_context(|| "missing t")? .parse::() .with_context(|| "invalid t")?; - let key = self.key_codec.construct_time2version_key(time_in_seconds); - let lower_bound = self.key_codec.construct_time2version_key(std::u64::MIN); - let upper_bound = self.key_codec.construct_time2version_key(std::u64::MAX); - let prefix = self.key_codec.construct_time2version_prefix(); + let txn = self.db.create_trx()?; if self.is_read_only() { txn.set_option(TransactionOption::ReadLockAware).unwrap(); } - let after = txn - .get_range( - &RangeOption { - limit: Some(1), - reverse: true, - mode: StreamingMode::Small, - ..RangeOption::from(lower_bound.clone()..key.clone()) - }, - 0, - true, - ) - .await?; - - let not_after = txn - .get_range( - &RangeOption { - limit: Some(1), - reverse: false, - mode: StreamingMode::Small, - ..RangeOption::from(key.clone()..upper_bound.clone()) - }, - 0, - true, - ) - .await?; - let map_it = |x: FdbValues| -> Option { - if x.len() == 0 || x[0].key().len() < prefix.len() { - None - } else { - let item = &x[0]; - let time_suffix = &item.key()[prefix.len()..]; - match unpack::(time_suffix) { - Ok(time_secs) => match <[u8; 10]>::try_from(item.value()) { - Ok(version) => Some(TimeToVersionPoint { - version: hex::encode(&version), - time: time_secs, - }), - Err(_) => None, - }, - Err(_) => None, - } - } - }; - let after = map_it(after); - let not_after = map_it(not_after); - - let body = serde_json::to_vec(&TimeToVersionResponse { after, not_after }) + let ttv_res = time2version( + &txn, + &self.key_codec, + time_in_seconds, + self.replica_manager.as_ref(), + ) + .await?; + let body = serde_json::to_vec(&ttv_res) .with_context(|| "cannot serialize time2version response")?; res = Response::builder() diff --git a/mvstore/src/time2version.rs b/mvstore/src/time2version.rs new file mode 100644 index 0000000..cf242e1 --- /dev/null +++ b/mvstore/src/time2version.rs @@ -0,0 +1,104 @@ +use std::time::SystemTime; + +use crate::{keys::KeyCodec, replica::ReplicaManager}; +use anyhow::Result; +use foundationdb::{ + future::FdbValues, options::StreamingMode, tuple::unpack, RangeOption, Transaction, +}; +use serde::Serialize; + +#[derive(Serialize)] +pub struct TimeToVersionResponse { + pub after: Option, + pub not_after: Option, +} + +#[derive(Serialize)] +pub struct TimeToVersionPoint { + pub version: String, + pub time: u64, +} + +pub async fn time2version( + txn: &Transaction, + key_codec: &KeyCodec, + time_in_seconds: u64, + rm: Option<&ReplicaManager>, +) -> Result { + // Special case: t=0 gives the current version + if time_in_seconds == 0 { + let version = if let Some(rm) = rm { + rm.replica_version(&txn).await? + } else { + txn.get_read_version().await? + }; + let next_version = version + 1; + let time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let after = Some(TimeToVersionPoint { + version: format!("{}ffff", hex::encode(&version.to_be_bytes())), + time, + }); + let not_after = Some(TimeToVersionPoint { + version: format!("{}0000", hex::encode(&next_version.to_be_bytes())), + time: time + 1, + }); + + return Ok(TimeToVersionResponse { after, not_after }); + } + + let key = key_codec.construct_time2version_key(time_in_seconds); + let lower_bound = key_codec.construct_time2version_key(std::u64::MIN); + let upper_bound = key_codec.construct_time2version_key(std::u64::MAX); + let prefix = key_codec.construct_time2version_prefix(); + + let after = txn + .get_range( + &RangeOption { + limit: Some(1), + reverse: true, + mode: StreamingMode::Small, + ..RangeOption::from(lower_bound.clone()..key.clone()) + }, + 0, + true, + ) + .await?; + + let not_after = txn + .get_range( + &RangeOption { + limit: Some(1), + reverse: false, + mode: StreamingMode::Small, + ..RangeOption::from(key.clone()..upper_bound.clone()) + }, + 0, + true, + ) + .await?; + let map_it = |x: FdbValues| -> Option { + if x.len() == 0 || x[0].key().len() < prefix.len() { + None + } else { + let item = &x[0]; + let time_suffix = &item.key()[prefix.len()..]; + match unpack::(time_suffix) { + Ok(time_secs) => match <[u8; 10]>::try_from(item.value()) { + Ok(version) => Some(TimeToVersionPoint { + version: hex::encode(&version), + time: time_secs, + }), + Err(_) => None, + }, + Err(_) => None, + } + } + }; + let after = map_it(after); + let not_after = map_it(not_after); + + Ok(TimeToVersionResponse { after, not_after }) +}