Skip to content

Commit

Permalink
Concurrent ingestion and wal flush/compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Aug 18, 2024
1 parent e62b887 commit d72ccef
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 55 deletions.
4 changes: 4 additions & 0 deletions src/disk_store/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ type PartitionID = u64;

#[derive(Clone)]
pub struct MetaStore {
// ID for the next WAL segment to be written
pub next_wal_id: u64,
// ID of the earliest WAL segment that has not been flushed into partitions (this WAL segment may not exist yet)
pub earliest_uncommited_wal_id: u64,
pub partitions: HashMap<TableName, HashMap<PartitionID, PartitionMetadata>>,
}

Expand Down Expand Up @@ -188,6 +191,7 @@ impl MetaStore {

Ok(MetaStore {
next_wal_id,
earliest_uncommited_wal_id: next_wal_id,
partitions,
})
}
Expand Down
80 changes: 48 additions & 32 deletions src/disk_store/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::ops::Range;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::Duration;
Expand Down Expand Up @@ -133,6 +134,7 @@ impl Storage {
} else {
MetaStore {
next_wal_id: 0,
earliest_uncommited_wal_id: 0,
partitions: HashMap::new(),
}
};
Expand Down Expand Up @@ -185,7 +187,9 @@ impl Storage {
subpartition_cols: Vec<Vec<Arc<Column>>>,
) {
for (metadata, cols) in partition.subpartitions.iter().zip(subpartition_cols) {
let table_dir = self.tables_path.join(sanitize_table_name(&partition.tablename));
let table_dir = self
.tables_path
.join(sanitize_table_name(&partition.tablename));
let cols = cols.iter().map(|col| &**col).collect::<Vec<_>>();
let data = PartitionSegment::serialize(&cols[..]);
self.perf_counter
Expand Down Expand Up @@ -216,40 +220,43 @@ impl Storage {
data.len() as u64
}

pub fn persist_partitions_delete_wal(
pub fn uncommited_wal_ids(&self) -> Range<u64> {
let meta_store = self.meta_store.read().unwrap();
meta_store.earliest_uncommited_wal_id..meta_store.next_wal_id
}

pub fn persist_partitions(
&self,
partitions: Vec<(PartitionMetadata, Vec<Vec<Arc<Column>>>)>,
) -> (Duration, Duration, Duration, Duration) {
// Lock meta store
let start_time_lock = std::time::Instant::now();
let mut meta_store = self.meta_store.write().unwrap();
let lock_time = start_time_lock.elapsed();
) -> (Duration, Duration) {
let mut total_write_time = Duration::default();
let mut total_lock_time = Duration::default();

// Write out new partition files
let start_time_write_partitions = std::time::Instant::now();
for (partition, subpartition_cols) in partitions {
let start_time = std::time::Instant::now();
self.write_subpartitions(&partition, subpartition_cols);
total_write_time += start_time.elapsed();

let start_time = std::time::Instant::now();
let mut meta_store = self.meta_store.write().unwrap();
meta_store
.partitions
.entry(partition.tablename.clone())
.or_default()
.insert(partition.id, partition);
total_lock_time += start_time.elapsed();
}
let write_time_partitions = start_time_write_partitions.elapsed();

// Atomically overwrite meta store file
let start_time_write_meta = std::time::Instant::now();
self.write_metastore(&meta_store);
let write_time_meta = start_time_write_meta.elapsed();
(total_write_time, total_lock_time)
}

// Delete WAL files
let start_time_delete_wal = std::time::Instant::now();
for file in self.writer.list(&self.wal_dir).unwrap() {
self.writer.delete(&file).unwrap();
/// Delete WAL segments with ids in the given range.
pub fn delete_wal_segments(&self, ids: Range<u64>) {
for id in ids {
let path = self.wal_dir.join(format!("{}.wal", id));
self.writer.delete(&path).unwrap();
}
let delete_time_wal = start_time_delete_wal.elapsed();

(lock_time, write_time_partitions, write_time_meta, delete_time_wal)
}

// Combine set of partitions into single new partition.
Expand Down Expand Up @@ -314,26 +321,33 @@ impl Storage {
to_delete
}

pub fn commit_compacts(
&self,
to_delete: Vec<(String, Vec<(u64, String)>)>,
) {
// Persist metastore
{
let meta_store = self.meta_store.read().unwrap();
self.write_metastore(&meta_store);
}

pub fn delete_orphaned_partitions(&self, to_delete: Vec<(String, Vec<(u64, String)>)>) {
// Delete old partition files
for (table, to_delete) in &to_delete {
for (id, key) in to_delete {
let table_dir = self.tables_path.join(sanitize_table_name(table));
let table_dir = self.tables_path.join(sanitize_table_name(table));
let path = table_dir.join(partition_filename(*id, key));
self.writer.delete(&path).unwrap();
}
}
}

pub fn persist_metastore(&self, earliest_uncommited_wal_id: u64) -> (Duration, Duration) {
let clone_start_time = std::time::Instant::now();
{
self.meta_store.write().unwrap().earliest_uncommited_wal_id =
earliest_uncommited_wal_id;
}
let meta_store = { self.meta_store.read().unwrap().clone() };
let clone_elapsed = clone_start_time.elapsed();

let write_start_time = std::time::Instant::now();
self.write_metastore(&meta_store);
let write_elapsed = write_start_time.elapsed();

(clone_elapsed, write_elapsed)
}

pub fn load_column(
&self,
partition: PartitionID,
Expand Down Expand Up @@ -367,7 +381,9 @@ fn partition_filename(id: PartitionID, subpartition_key: &str) -> String {
fn sanitize_table_name(table_name: &str) -> String {
let mut name = table_name.to_lowercase();
name.retain(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-' || c == '.');
name = name.trim_start_matches(|c| c == '-' || c == '.').to_string();
name = name
.trim_start_matches(|c| c == '-' || c == '.')
.to_string();
if name.len() > 189 {
name = name[..189].to_string();
}
Expand Down
4 changes: 2 additions & 2 deletions src/mem_store/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ impl Table {
partitions
}

pub fn snapshot_parts(&self, parts: &[PartitionID]) -> Vec<Arc<Partition>> {
pub fn snapshot_parts(&self, parts: &[PartitionID], snapshot_buffer: bool) -> Vec<Arc<Partition>> {
let partitions = self.partitions.read().unwrap();
let mut partitions: Vec<_> = parts.iter().map(|id| partitions[id].clone()).collect();
let offset = partitions.iter().map(|p| p.len()).sum::<usize>();
let buffer = self.buffer.lock().unwrap();
if buffer.len() > 0 {
if buffer.len() > 0 && snapshot_buffer {
partitions.push(Arc::new(
Partition::from_buffer(
self.name(),
Expand Down
72 changes: 51 additions & 21 deletions src/scheduler/inner_locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,25 @@ impl InnerLocustDB {

/// Creates new partition from currently open buffer in each table, persists partitions to disk, and deletes WAL.
pub(crate) fn wal_flush(&self) {
log::info!("Commencing WAL flush");
let start_time = Instant::now();
let mut time_write_partitions = Duration::default();
let mut time_meta_update = Duration::default();
let mut time_clone_metastore = Duration::default();
let mut time_persist_metastore = Duration::default();
let mut time_delete_partitions = Duration::default();
let mut time_delete_wal = Duration::default();

// Create new partitions from open buffers
// Before flushing table buffers, we acquire the wal_size lock to prevent any concurrent additions to table buffers,
// and record the range of unflushed WAL entries.
let (wal_size, wal_condvar) = &self.wal_size;
let mut wal_size = wal_size.lock().unwrap();
let uncommited_wal_ids = self
.storage
.as_ref()
.map(|s| s.uncommited_wal_ids())
.unwrap_or(0..0);
let tables = self.tables.read().unwrap();
let mut new_partitions = Vec::new();
let mut compactions = Vec::new();
Expand Down Expand Up @@ -319,11 +337,14 @@ impl InnerLocustDB {
compactions.push((table.name(), table.next_partition_id(), compaction));
}
}
*wal_size = 0;
wal_condvar.notify_all();
drop(wal_size);
let time_batching = start_time_batching.elapsed();

let mut persist_timings = None;
// Persist new partitions
if let Some(s) = self.storage.as_ref() {
persist_timings = Some(s.persist_partitions_delete_wal(new_partitions));
(time_write_partitions, time_meta_update) = s.persist_partitions(new_partitions);
}

let start_time_compaction = Instant::now();
Expand All @@ -335,7 +356,7 @@ impl InnerLocustDB {
// - create subpartitions
let colnames = tables[table].column_names(&parts);
let mut columns = Vec::with_capacity(colnames.len());
let data = tables[table].snapshot_parts(&parts);
let data = tables[table].snapshot_parts(&parts, false);
for column in &colnames {
let query = Query::read_column(table, column);
let (sender, receiver) = oneshot::channel();
Expand Down Expand Up @@ -375,7 +396,14 @@ impl InnerLocustDB {
let (metadata, subpartitions) = subpartition(&self.opts, columns.clone());
// write subpartitions to disk, update metastore unlinking old partitions, delete old partitions
if let Some(storage) = self.storage.as_ref() {
let to_delete = storage.prepare_compact(table, id, metadata, subpartitions, &parts, range.start);
let to_delete = storage.prepare_compact(
table,
id,
metadata,
subpartitions,
&parts,
range.start,
);
partitions_to_delete.push((table.to_string(), to_delete));
}

Expand All @@ -385,17 +413,23 @@ impl InnerLocustDB {
let time_compaction = start_time_compaction.elapsed();

if let Some(storage) = self.storage.as_ref() {
storage.commit_compacts(partitions_to_delete);
(time_clone_metastore, time_persist_metastore) =
storage.persist_metastore(uncommited_wal_ids.end);

let start_time = Instant::now();
storage.delete_orphaned_partitions(partitions_to_delete);
time_delete_partitions = start_time.elapsed();

let start_time = Instant::now();
storage.delete_wal_segments(uncommited_wal_ids);
time_delete_wal = start_time.elapsed();
}

let total_time = start_time.elapsed();
match persist_timings {
None =>
log::info!("Performed wal flush in {total_time:?} (batching: {time_batching:?}, compaction: {time_compaction:?})"),
Some((lock_time, write_time_partitions, write_time_meta, delete_time_wal)) => {
log::info!("Performed wal flush in {total_time:?} (batching: {time_batching:?}, compaction: {time_compaction:?}, lock: {lock_time:?}, write partitions: {write_time_partitions:?}, write meta: {write_time_meta:?}, delete wal: {delete_time_wal:?})");
}
}
log::info!("Performed wal flush in {total_time:?} (batching: {time_batching:?}, compaction: {time_compaction:?}, \
meta partitions: {time_meta_update:?}, write partitions: {time_write_partitions:?}, \
clone meta: {time_clone_metastore:?}, write meta: {time_persist_metastore:?}, delete wal: {time_delete_wal:?} \
delete partitions: {time_delete_partitions:?})",);
}

pub fn restore(&self, id: PartitionID, column: Column) {
Expand Down Expand Up @@ -528,17 +562,13 @@ impl InnerLocustDB {
}

fn enforce_wal_limit(&self) {
let (wal_size, wal_condvar) = &self.wal_size;
let mut wal_size = wal_size.lock().unwrap();
let (wal_size, _) = &self.wal_size;
while self.running.load(Ordering::SeqCst) {
if *wal_size < self.opts.max_wal_size_bytes {
(wal_size, _) = wal_condvar
.wait_timeout(wal_size, Duration::from_secs(1))
.unwrap();
} else {
let wal_size = { *wal_size.lock().unwrap() };
if wal_size > self.opts.max_wal_size_bytes {
self.wal_flush();
*wal_size = 0;
wal_condvar.notify_all();
} else {
thread::sleep(Duration::from_millis(1000));
}
}
}
Expand Down

0 comments on commit d72ccef

Please sign in to comment.