Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce manifest file for fast disk cache clear #780

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions foyer-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@
#[arg(short, long)]
dir: Option<String>,

/// Setup path for the manifest file.
///
/// The manifest file is required with `DirectFile` device.
#[arg(short, long)]
manifest: Option<String>,

/// In-memory cache capacity.
#[arg(long, default_value_t = ByteSize::gib(1))]
mem: ByteSize,
Expand Down Expand Up @@ -484,6 +490,10 @@
_ => unreachable!(),
};

if let Some(path) = &args.manifest {
builder = builder.with_manifest_file_path(path);

Check warning on line 494 in foyer-bench/src/main.rs

View check run for this annotation

Codecov / codecov/patch

foyer-bench/src/main.rs#L494

Added line #L494 was not covered by tests
}

builder = builder
.with_flush(args.flush)
.with_recover_mode(args.recover_mode)
Expand Down
4 changes: 4 additions & 0 deletions foyer-storage/src/device/direct_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ pub struct DirectFsDeviceConfig {
}

impl DirectFsDeviceConfig {
pub fn dir(&self) -> &PathBuf {
&self.dir
}

fn verify(&self) -> Result<()> {
if self.file_size == 0 || self.file_size % ALIGN != 0 {
return Err(anyhow::anyhow!(
Expand Down
14 changes: 10 additions & 4 deletions foyer-storage/src/large/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,19 @@
}

pub fn entry(&mut self, entry: CacheEntry<K, V, S>, compression: &Compression, sequence: Sequence) -> bool {
tracing::trace!("[batch]: append entry with sequence: {sequence}");
tracing::trace!("[lodc batch]: append entry with sequence: {sequence}");

self.may_init();

if entry.is_outdated() {
tracing::trace!("[lodc batch]: skip outdated entry");
return false;
}

let pos = self.len;

if pos + EntryHeader::serialized_len() >= self.buffer.len() {
tracing::trace!("[lodc batch]: entry out of buffer capacity, skip");

Check warning on line 126 in foyer-storage/src/large/batch.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/batch.rs#L126

Added line #L126 was not covered by tests
// Only handle start position overflow. End position overflow will be handled by serde.
return false;
}
Expand Down Expand Up @@ -157,15 +159,17 @@
self.advance(aligned);

let group = self.groups.last_mut().unwrap();
group.indices.push(HashedEntryAddress {
let addr = HashedEntryAddress {
hash: entry.hash(),
address: EntryAddress {
region: RegionId::MAX,
offset: group.region.offset as u32 + group.region.len as u32,
len: header.entry_len() as _,
sequence,
},
});
};
tracing::trace!("[lodc batch]: entry addr: {addr:?}");
group.indices.push(addr);
group.entries.push(entry);
group.region.len += aligned;
group.range.end += aligned;
Expand Down Expand Up @@ -225,6 +229,8 @@
}

pub fn rotate(&mut self) -> Option<Batch<K, V, S>> {
tracing::trace!("[lodc batch]: rotate");

if self.is_empty() {
return None;
}
Expand Down Expand Up @@ -351,7 +357,7 @@
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RegionHandle")
.field("offset", &self.offset)
.field("size", &self.len)
.field("len", &self.len)

Check warning on line 360 in foyer-storage/src/large/batch.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/batch.rs#L360

Added line #L360 was not covered by tests
.field("is_full", &self.is_full)
.finish()
}
Expand Down
51 changes: 32 additions & 19 deletions foyer-storage/src/large/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
device::{monitor::DeviceStats, Dev, DevExt, MonitoredDevice, RegionId},
error::{Error, Result},
large::{
reclaimer::RegionCleaner,
serde::{AtomicSequence, EntryHeader},
tombstone::{Tombstone, TombstoneLog, TombstoneLogConfig},
},
manifest::Manifest,
picker::{EvictionPicker, ReinsertionPicker},
region::RegionManager,
runtime::Runtime,
Expand All @@ -66,6 +66,7 @@
{
pub name: String,
pub device: MonitoredDevice,
pub manifest: Manifest,
pub regions: Range<RegionId>,
pub compression: Compression,
pub flush: bool,
Expand Down Expand Up @@ -95,6 +96,7 @@
f.debug_struct("GenericStoreConfig")
.field("name", &self.name)
.field("device", &self.device)
.field("manifest", &self.manifest)

Check warning on line 99 in foyer-storage/src/large/generic.rs

View check run for this annotation

Codecov / codecov/patch

foyer-storage/src/large/generic.rs#L99

Added line #L99 was not covered by tests
.field("compression", &self.compression)
.field("flush", &self.flush)
.field("indexer_shards", &self.indexer_shards)
Expand Down Expand Up @@ -142,7 +144,8 @@
{
indexer: Indexer,
device: MonitoredDevice,
region_manager: RegionManager,
manifest: Manifest,
_region_manager: RegionManager,

flushers: Vec<Flusher<K, V, S>>,
reclaimers: Vec<Reclaimer>,
Expand All @@ -152,8 +155,6 @@

statistics: Arc<Statistics>,

flush: bool,

sequence: AtomicSequence,

runtime: Runtime,
Expand Down Expand Up @@ -249,6 +250,7 @@

let reclaimers = join_all((0..config.reclaimers).map(|_| async {
Reclaimer::open(
config.manifest.clone(),
region_manager.clone(),
reclaim_semaphore.clone(),
config.reinsertion_picker.clone(),
Expand All @@ -266,13 +268,13 @@
inner: Arc::new(GenericStoreInner {
indexer,
device,
region_manager,
manifest: config.manifest,
_region_manager: region_manager,
flushers,
reclaimers,
submit_queue_size,
submit_queue_size_threshold: config.submit_queue_size_threshold,
statistics: stats,
flush: config.flush,
sequence,
runtime: config.runtime,
active: AtomicBool::new(true),
Expand Down Expand Up @@ -430,6 +432,8 @@
tombstone: Tombstone { hash: 0, sequence },
stats: None,
});

// Wait all inflight data to finish.
self.wait().await;

// Clear indices.
Expand All @@ -438,16 +442,11 @@
// otherwise the indices of the latest batch cannot be cleared.
self.inner.indexer.clear();

// Clean regions.
try_join_all((0..self.inner.region_manager.regions() as RegionId).map(|id| {
let region = self.inner.region_manager.region(id).clone();
async move {
let res = RegionCleaner::clean(&region, self.inner.flush).await;
region.stats().reset();
res
}
}))
.await?;
// Update manifest watermark to prevent stale regions to be read in future.
self.inner
.manifest
.update_sequence_watermark(self.inner.sequence.fetch_add(1, Ordering::Relaxed))
.await?;

Ok(())
}
Expand Down Expand Up @@ -534,8 +533,8 @@
Monitored::open(
MonitoredConfig {
config: DirectFsDeviceOptions::new(dir)
.with_capacity(ByteSize::kib(64).as_u64() as _)
.with_file_size(ByteSize::kib(16).as_u64() as _)
.with_capacity(ByteSize::kib(80).as_u64() as _)
.with_file_size(ByteSize::kib(20).as_u64() as _)
.into(),
metrics: Arc::new(Metrics::new("test")),
},
Expand All @@ -554,11 +553,18 @@
dir: impl AsRef<Path>,
reinsertion_picker: Arc<dyn ReinsertionPicker<Key = u64>>,
) -> GenericLargeStorage<u64, Vec<u8>, RandomState> {
let dir = dir.as_ref();
let runtime = Runtime::new(None, None, Handle::current());
let device = device_for_test(dir).await;
let manifest = Manifest::open(dir.join(Manifest::DEFAULT_FILENAME), true, runtime.clone())
.await
.unwrap();

let regions = 0..device.regions() as RegionId;
let config = GenericLargeStorageConfig {
name: "test".to_string(),
device,
manifest,
regions,
compression: Compression::None,
flush: true,
Expand All @@ -574,7 +580,7 @@
buffer_pool_size: 16 * 1024 * 1024,
submit_queue_size_threshold: 16 * 1024 * 1024 * 2,
statistics: Arc::<Statistics>::default(),
runtime: Runtime::new(None, None, Handle::current()),
runtime,
marker: PhantomData,
};
GenericLargeStorage::open(config).await.unwrap()
Expand All @@ -584,11 +590,18 @@
dir: impl AsRef<Path>,
path: impl AsRef<Path>,
) -> GenericLargeStorage<u64, Vec<u8>, RandomState> {
let dir = dir.as_ref();
let runtime = Runtime::new(None, None, Handle::current());
let device = device_for_test(dir).await;
let manifest = Manifest::open(dir.join(Manifest::DEFAULT_FILENAME), true, runtime.clone())
.await
.unwrap();

let regions = 0..device.regions() as RegionId;
let config = GenericLargeStorageConfig {
name: "test".to_string(),
device,
manifest,
regions,
compression: Compression::None,
flush: true,
Expand Down
11 changes: 11 additions & 0 deletions foyer-storage/src/large/reclaimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::{
scanner::RegionScanner,
serde::Sequence,
},
manifest::Manifest,
picker::ReinsertionPicker,
region::{Region, RegionManager},
runtime::Runtime,
Expand All @@ -46,6 +47,7 @@ pub struct Reclaimer {
impl Reclaimer {
#[expect(clippy::too_many_arguments)]
pub fn open<K, V, S>(
manifest: Manifest,
region_manager: RegionManager,
reclaim_semaphore: Arc<Semaphore>,
reinsertion_picker: Arc<dyn ReinsertionPicker<Key = K>>,
Expand All @@ -64,6 +66,7 @@ impl Reclaimer {
let (wait_tx, wait_rx) = mpsc::unbounded_channel();

let runner = ReclaimRunner {
manifest,
region_manager,
reclaim_semaphore,
indexer,
Expand Down Expand Up @@ -99,7 +102,10 @@ where
{
reinsertion_picker: Arc<dyn ReinsertionPicker<Key = K>>,

manifest: Manifest,

region_manager: RegionManager,

reclaim_semaphore: Arc<Semaphore>,

indexer: Indexer,
Expand Down Expand Up @@ -171,6 +177,7 @@ where

tracing::debug!("[reclaimer]: Start reclaiming region {id}.");

let watermark = self.manifest.sequence_watermark().await;
let mut scanner = RegionScanner::new(region.clone(), self.metrics.clone());
let mut picked_count = 0;
let mut unpicked = vec![];
Expand All @@ -193,6 +200,10 @@ where
}
Ok(Some((info, key))) => (info, key),
};
if info.sequence < watermark {
unpicked.push(info.hash);
continue;
}
if self.reinsertion_picker.pick(&self.stats, &key) {
let buffer = match region.read(info.addr.offset as _, info.addr.len as _).await {
Err(e) => {
Expand Down
9 changes: 8 additions & 1 deletion foyer-storage/src/large/recover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ impl RecoverRunner {
// Recover regions concurrently.
let semaphore = Arc::new(Semaphore::new(config.recover_concurrency));
let mode = config.recover_mode;
let watermark = config.manifest.sequence_watermark().await;
let handles = regions.map(|id| {
let semaphore = semaphore.clone();
let region = region_manager.region(id).clone();
Expand Down Expand Up @@ -145,6 +146,7 @@ impl RecoverRunner {
tracing::trace!("[recover runner]: hash {hash} has versions: {versions:?}");
match versions.pop() {
None => None,
Some((sequence, _)) if sequence < watermark => None,
Some((_, EntryAddressOrTombstone::Tombstone)) => None,
Some((_, EntryAddressOrTombstone::EntryAddress(address))) => {
Some(HashedEntryAddress { hash, address })
Expand All @@ -167,8 +169,9 @@ impl RecoverRunner {
);

// Update components.
let seq = latest_sequence + 1;
indexer.insert_batch(indices);
sequence.store(latest_sequence + 1, Ordering::Release);
sequence.store(seq, Ordering::Release);
for region in clean_regions {
region_manager.mark_clean(region).await;
}
Expand All @@ -177,6 +180,10 @@ impl RecoverRunner {
}
region_manager.reclaim_semaphore().add_permits(permits);
region_manager.reclaim_semaphore_countdown().reset(countdown);
if watermark > seq {
// Update the manifest sequence watermark with the smallest possible value.
config.manifest.update_sequence_watermark(seq).await?;
}

// Note: About reclaim semaphore permits and countdown:
//
Expand Down
1 change: 1 addition & 0 deletions foyer-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod engine;
mod error;
mod io_buffer_pool;
mod large;
mod manifest;
mod picker;
mod region;
mod runtime;
Expand Down
Loading
Loading