From 4b2b70e2099d5de01537ca41b4896920c91ddd37 Mon Sep 17 00:00:00 2001 From: Clemens Winter Date: Sun, 18 Aug 2024 18:43:55 -0700 Subject: [PATCH] Only copy referenced columns in queried buffers --- src/ingest/buffer.rs | 14 +++++++++++++- src/locustdb.rs | 3 ++- src/mem_store/partition.rs | 12 ++---------- src/mem_store/table.rs | 18 +++++++++++++----- src/scheduler/inner_locustdb.rs | 6 +++--- 5 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/ingest/buffer.rs b/src/ingest/buffer.rs index 37dc6c5b..f4639d59 100644 --- a/src/ingest/buffer.rs +++ b/src/ingest/buffer.rs @@ -96,10 +96,22 @@ impl Buffer { } pub fn heap_size_of_children(&self) -> usize { - self.buffer.values().map(|v| { + self.buffer + .values() + .map(|v| { // Currently does not take into account the memory of String. v.heap_size_of_children() }) .sum() } + + pub fn filter(&self, columns: &[String]) -> Buffer { + Buffer { + buffer: columns + .iter() + .filter_map(|name| self.buffer.get(name).map(|col| (name.clone(), col.clone()))) + .collect(), + length: self.length, + } + } } diff --git a/src/locustdb.rs b/src/locustdb.rs index ecab740a..68ce9e2f 100644 --- a/src/locustdb.rs +++ b/src/locustdb.rs @@ -50,7 +50,8 @@ impl LocustDB { Err(err) => return Ok(Err(err)), }; - let mut data = match self.inner_locustdb.snapshot(&query.table) { + let referenced_cols: Vec<_> = query.find_referenced_cols().into_iter().collect(); + let mut data = match self.inner_locustdb.snapshot(&query.table, Some(&referenced_cols[..])) { Some(data) => data, None => { return Ok(Err(QueryError::NotImplemented(format!( diff --git a/src/mem_store/partition.rs b/src/mem_store/partition.rs index a8bc93bf..d492cecf 100644 --- a/src/mem_store/partition.rs +++ b/src/mem_store/partition.rs @@ -72,11 +72,7 @@ impl Partition { for name in md.column_name_to_subpartition_index.keys() { cols.insert( name.clone(), - ColumnHandle::non_resident( - table, - md.id, - name.clone(), - ), + ColumnHandle::non_resident(table, md.id, name.clone()), ); } Partition { @@ -284,11 +280,7 @@ impl ColumnHandle { } } - fn non_resident( - table: &str, - id: PartitionID, - name: String, - ) -> ColumnHandle { + fn non_resident(table: &str, id: PartitionID, name: String) -> ColumnHandle { ColumnHandle { key: ColumnLocator::new(table, id, &name), name, diff --git a/src/mem_store/table.rs b/src/mem_store/table.rs index 7495211f..a15d0432 100644 --- a/src/mem_store/table.rs +++ b/src/mem_store/table.rs @@ -54,18 +54,22 @@ impl Table { &self.name } - pub fn snapshot(&self) -> Vec> { + pub fn snapshot(&self, column_filter: Option<&[String]>) -> Vec> { let frozen_buffer = self.frozen_buffer.lock().unwrap(); let partitions = self.partitions.read().unwrap(); let buffer = self.buffer.lock().unwrap(); let mut partitions: Vec<_> = partitions.values().cloned().collect(); let mut offset = partitions.iter().map(|p| p.len()).sum::(); if frozen_buffer.len() > 0 { + let buffer = match column_filter { + Some(columns) => frozen_buffer.filter(columns), + None => frozen_buffer.clone(), + }; partitions.push(Arc::new( Partition::from_buffer( self.name(), u64::MAX, - frozen_buffer.clone(), + buffer, self.lru.clone(), offset, ) @@ -74,11 +78,15 @@ impl Table { offset += frozen_buffer.len(); } if buffer.len() > 0 { + let buffer = match column_filter { + Some(columns) => buffer.filter(columns), + None => buffer.clone(), + }; partitions.push(Arc::new( Partition::from_buffer( self.name(), u64::MAX, - buffer.clone(), + buffer, self.lru.clone(), offset, ) @@ -335,7 +343,7 @@ impl Table { size_bytes: 0, columns: HashMap::default(), }; - let partitions = self.snapshot(); + let partitions = self.snapshot(None); for partition in partitions { partition.mem_tree(&mut tree.columns, if depth == 1 { 1 } else { depth - 1 }); tree.rows += partition.len(); @@ -348,7 +356,7 @@ impl Table { } pub fn stats(&self) -> TableStats { - let partitions = self.snapshot(); + let partitions = self.snapshot(None); let size_per_column = Table::size_per_column(&partitions); let buffer = self.buffer.lock().unwrap(); TableStats { diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index 067b9f06..916987dc 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -116,14 +116,14 @@ impl InnerLocustDB { thread::spawn(move || cloned.enforce_wal_limit()); } - pub fn snapshot(&self, table: &str) -> Option>> { + pub fn snapshot(&self, table: &str, column_filter: Option<&[String]>) -> Option>> { let tables = self.tables.read().unwrap(); - tables.get(table).map(|t| t.snapshot()) + tables.get(table).map(|t| t.snapshot(column_filter)) } pub fn full_snapshot(&self) -> Vec>> { let tables = self.tables.read().unwrap(); - tables.values().map(|t| t.snapshot()).collect() + tables.values().map(|t| t.snapshot(None)).collect() } pub fn stop(&self) {