Skip to content

Commit

Permalink
Add t=0 special case for time2version.
Browse files Browse the repository at this point in the history
  • Loading branch information
losfair committed Sep 26, 2022
1 parent d03e698 commit cea53b2
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 64 deletions.
1 change: 1 addition & 0 deletions mvstore/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod page;
mod replica;
mod server;
mod stat;
mod time2version;
mod util;
mod write;

Expand Down
74 changes: 10 additions & 64 deletions mvstore/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use anyhow::{Context, Result};
use bytes::{Bytes, BytesMut};
use foundationdb::{
future::FdbValues,
options::{MutationType, StreamingMode, TransactionOption},
tuple::unpack,
Database, FdbError, RangeOption, Transaction,
Expand Down Expand Up @@ -35,6 +34,7 @@ use crate::{
lock::DistributedLock,
page::Page,
replica::ReplicaManager,
time2version::time2version,
util::{decode_version, generate_suffix_versionstamp_atomic_op},
write::{WriteApplier, WriteApplierContext, WriteRequest, WriteResponse},
};
Expand Down Expand Up @@ -120,18 +120,6 @@ pub struct CommitResponse {
pub changelog: HashMap<String, Vec<u32>>,
}

#[derive(Serialize)]
pub struct TimeToVersionResponse {
pub after: Option<TimeToVersionPoint>,
pub not_after: Option<TimeToVersionPoint>,
}

#[derive(Serialize)]
pub struct TimeToVersionPoint {
pub version: String,
pub time: u64,
}

#[derive(Deserialize)]
pub struct AdminCreateNamespaceRequest {
pub key: String,
Expand Down Expand Up @@ -1256,63 +1244,21 @@ impl Server {
.with_context(|| "missing t")?
.parse::<u64>()
.with_context(|| "invalid t")?;
let key = self.key_codec.construct_time2version_key(time_in_seconds);
let lower_bound = self.key_codec.construct_time2version_key(std::u64::MIN);
let upper_bound = self.key_codec.construct_time2version_key(std::u64::MAX);
let prefix = self.key_codec.construct_time2version_prefix();

let txn = self.db.create_trx()?;

if self.is_read_only() {
txn.set_option(TransactionOption::ReadLockAware).unwrap();
}

let after = txn
.get_range(
&RangeOption {
limit: Some(1),
reverse: true,
mode: StreamingMode::Small,
..RangeOption::from(lower_bound.clone()..key.clone())
},
0,
true,
)
.await?;

let not_after = txn
.get_range(
&RangeOption {
limit: Some(1),
reverse: false,
mode: StreamingMode::Small,
..RangeOption::from(key.clone()..upper_bound.clone())
},
0,
true,
)
.await?;
let map_it = |x: FdbValues| -> Option<TimeToVersionPoint> {
if x.len() == 0 || x[0].key().len() < prefix.len() {
None
} else {
let item = &x[0];
let time_suffix = &item.key()[prefix.len()..];
match unpack::<u64>(time_suffix) {
Ok(time_secs) => match <[u8; 10]>::try_from(item.value()) {
Ok(version) => Some(TimeToVersionPoint {
version: hex::encode(&version),
time: time_secs,
}),
Err(_) => None,
},
Err(_) => None,
}
}
};
let after = map_it(after);
let not_after = map_it(not_after);

let body = serde_json::to_vec(&TimeToVersionResponse { after, not_after })
let ttv_res = time2version(
&txn,
&self.key_codec,
time_in_seconds,
self.replica_manager.as_ref(),
)
.await?;
let body = serde_json::to_vec(&ttv_res)
.with_context(|| "cannot serialize time2version response")?;

res = Response::builder()
Expand Down
104 changes: 104 additions & 0 deletions mvstore/src/time2version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use std::time::SystemTime;

use crate::{keys::KeyCodec, replica::ReplicaManager};
use anyhow::Result;
use foundationdb::{
future::FdbValues, options::StreamingMode, tuple::unpack, RangeOption, Transaction,
};
use serde::Serialize;

#[derive(Serialize)]
pub struct TimeToVersionResponse {
pub after: Option<TimeToVersionPoint>,
pub not_after: Option<TimeToVersionPoint>,
}

#[derive(Serialize)]
pub struct TimeToVersionPoint {
pub version: String,
pub time: u64,
}

pub async fn time2version(
txn: &Transaction,
key_codec: &KeyCodec,
time_in_seconds: u64,
rm: Option<&ReplicaManager>,
) -> Result<TimeToVersionResponse> {
// Special case: t=0 gives the current version
if time_in_seconds == 0 {
let version = if let Some(rm) = rm {
rm.replica_version(&txn).await?
} else {
txn.get_read_version().await?
};
let next_version = version + 1;
let time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let after = Some(TimeToVersionPoint {
version: format!("{}ffff", hex::encode(&version.to_be_bytes())),
time,
});
let not_after = Some(TimeToVersionPoint {
version: format!("{}0000", hex::encode(&next_version.to_be_bytes())),
time: time + 1,
});

return Ok(TimeToVersionResponse { after, not_after });
}

let key = key_codec.construct_time2version_key(time_in_seconds);
let lower_bound = key_codec.construct_time2version_key(std::u64::MIN);
let upper_bound = key_codec.construct_time2version_key(std::u64::MAX);
let prefix = key_codec.construct_time2version_prefix();

let after = txn
.get_range(
&RangeOption {
limit: Some(1),
reverse: true,
mode: StreamingMode::Small,
..RangeOption::from(lower_bound.clone()..key.clone())
},
0,
true,
)
.await?;

let not_after = txn
.get_range(
&RangeOption {
limit: Some(1),
reverse: false,
mode: StreamingMode::Small,
..RangeOption::from(key.clone()..upper_bound.clone())
},
0,
true,
)
.await?;
let map_it = |x: FdbValues| -> Option<TimeToVersionPoint> {
if x.len() == 0 || x[0].key().len() < prefix.len() {
None
} else {
let item = &x[0];
let time_suffix = &item.key()[prefix.len()..];
match unpack::<u64>(time_suffix) {
Ok(time_secs) => match <[u8; 10]>::try_from(item.value()) {
Ok(version) => Some(TimeToVersionPoint {
version: hex::encode(&version),
time: time_secs,
}),
Err(_) => None,
},
Err(_) => None,
}
}
};
let after = map_it(after);
let not_after = map_it(not_after);

Ok(TimeToVersionResponse { after, not_after })
}

0 comments on commit cea53b2

Please sign in to comment.