Skip to content

Commit

Permalink
When loading joint partition, populate cache for all columns
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Feb 25, 2024
1 parent 18e605f commit 988859e
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 71 deletions.
50 changes: 48 additions & 2 deletions src/bin/db_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ fn small_table_names(load_factor: u64) -> Vec<String> {
// Returned 10 columns with 2048 rows in 610.756901ms (10 files opened, 62.1MiB)
// Querying 10 random columns in large table
// Returned 10 columns with 2048 rows in 605.867601ms (10 files opened, 68.4MiB)

//
// elapsed: 27.505325227s
// total uncompressed data: 256MiB
// total size on disk: 278MiB (SmallRng output is compressible)
Expand All @@ -354,4 +354,50 @@ fn small_table_names(load_factor: u64) -> Vec<String> {
// ingestion bytes: 280MiB
// query
// files opened: 454
// disk read: 235MiB
// disk read: 235MiB
// RELEASE
// Querying 100 related columns in small table
// Returned 100 columns with 256 rows in 23.5521ms (100 files opened, 40.1MiB)
// Querying full small table
// Returned 257 columns with 256 rows in 61.5311ms (257 files opened, 103MiB)
// Querying 100 random columns in small table
// Returned 100 columns with 256 rows in 21.6757ms (85 files opened, 34.1MiB)
// Querying 10 related columns in large table
// Returned 10 columns with 2048 rows in 39.8148ms (10 files opened, 69.0MiB)
// Querying 10 random columns in large table
// Returned 10 columns with 2048 rows in 35.0397ms (10 files opened, 72.9MiB)

// $ RUST_BACKTRACE=1 cargo run --bin db_bench -- --load-factor=8
// Querying 100 related columns in small table
// Returned 100 columns with 256 rows in 8.0145ms (1 files opened, 250KiB)
// Querying full small table
// Returned 257 columns with 256 rows in 12.959ms (1 files opened, 250KiB)
// Querying 100 random columns in small table
// Returned 100 columns with 256 rows in 6.8562ms (1 files opened, 250KiB)
// Querying 10 related columns in large table
// Returned 10 columns with 2048 rows in 154.896201ms (3 files opened, 17.2MiB)
// Querying 10 random columns in large table
// Returned 10 columns with 2048 rows in 165.483502ms (2 files opened, 16.1MiB)

// elapsed: 32.362289132s
// total uncompressed data: 256MiB
// total size on disk: 277MiB (SmallRng output is compressible)
// total files: 791
// total events: 73728
// disk writes
// total: 565MiB
// wal: 282MiB
// partition: 274MiB
// compaction: 0.000B
// meta store: 8.76MiB
// files created
// total: 809
// wal: 15
// partition: 790
// meta: 4
// network
// ingestion requests: 15
// ingestion bytes: 282MiB
// query
// files opened: 8
// disk read: 34.1MiB
109 changes: 58 additions & 51 deletions src/mem_store/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ pub struct ColumnLocator {
pub struct Partition {
pub id: PartitionID,
len: usize,
pub(crate) cols: Vec<ColumnHandle>,
// Column name -> PartitionID -> ColumnHandle
pub(crate) cols: HashMap<String, HashMap<PartitionID, ColumnHandle>>,
lru: Lru,
}

Expand All @@ -32,19 +33,23 @@ impl Partition {
lru: Lru,
) -> (Partition, Vec<(u64, String)>) {
let mut keys = Vec::with_capacity(cols.len());
let mut column_handles = HashMap::default();
let len = cols[0].len();
for col in cols {
let name = col.name().to_string();
// Can't put into lru directly, because then memory limit enforcer might try to evict unreachable column.
if !column_handles.contains_key(&name) {
column_handles.insert(name.clone(), HashMap::default());
}
column_handles.get_mut(&name).unwrap().insert(id, ColumnHandle::resident(table, id, col));
keys.push((id, name));
}

(
Partition {
id,
len: cols[0].len(),
cols: cols
.into_iter()
.map(|c| {
let key = (id, c.name().to_string());
// Can't put into lru directly, because then memory limit enforcer might try to evict unreachable column.
keys.push(key);
ColumnHandle::resident(table, id, c)
})
.collect(),
len,
cols: column_handles,
lru,
},
keys,
Expand All @@ -55,27 +60,27 @@ impl Partition {
table: &str,
id: PartitionID,
len: usize,
cols: &[SubpartitionMeatadata],
spm: &[SubpartitionMeatadata],
lru: Lru,
) -> Partition {
Partition {
id,
len,
cols: cols
.iter()
.flat_map(|subpartition| {
subpartition.column_names.iter().map(|c| {
ColumnHandle::non_resident(
table,
id,
&subpartition.subpartition_key,
c.to_string(),
)
})
})
.collect(),
lru,
let mut cols = HashMap::default();
for subpartition in spm {
for name in &subpartition.column_names {
if !cols.contains_key(name) {
cols.insert(name.clone(), HashMap::default());
}
cols.get_mut(name).unwrap().insert(
id,
ColumnHandle::non_resident(
table,
id,
&subpartition.subpartition_key,
name.clone(),
),
);
}
}
Partition { id, len, cols, lru }
}

pub fn from_buffer(
Expand Down Expand Up @@ -103,28 +108,28 @@ impl Partition {
perf_counter: &QueryPerfCounter,
) -> HashMap<String, Arc<dyn DataSource>> {
let mut columns = HashMap::<String, Arc<dyn DataSource>>::new();
for handle in &self.cols {
if referenced_cols.contains(handle.name()) {
let column = drs.get_or_load(handle, perf_counter);
columns.insert(handle.name().to_string(), Arc::new(column));
for colname in referenced_cols {
if let Some(handles) = self.cols.get(colname) {
for handle in handles.values() {
let column = drs.get_or_load(handle, &self.cols, perf_counter);
columns.insert(handle.name().to_string(), Arc::new(column));
}
}
}
columns
}

pub fn col_names(&self) -> Vec<&str> {
let mut names = Vec::new();
for handle in &self.cols {
names.push(handle.name());
}
names
pub fn col_names(&self) -> impl Iterator<Item = &String> {
self.cols.keys()
}

pub fn non_residents(&self, cols: &HashSet<String>) -> HashSet<String> {
let mut non_residents = HashSet::new();
for handle in &self.cols {
if !handle.is_resident() && cols.contains(handle.name()) {
non_residents.insert(handle.name().to_string());
for handles in self.cols.values() {
for handle in handles.values() {
if !handle.is_resident() && cols.contains(handle.name()) {
non_residents.insert(handle.name().to_string());
}
}
}
non_residents
Expand All @@ -135,7 +140,7 @@ impl Partition {
nonresidents: &HashSet<String>,
eligible: &HashSet<String>,
) -> bool {
for handle in &self.cols {
for handle in self.col_handles() {
if handle.is_resident() {
if nonresidents.contains(handle.name()) {
return false;
Expand All @@ -149,7 +154,7 @@ impl Partition {

pub fn promise_load(&self, cols: &HashSet<String>) -> usize {
let mut total_size = 0;
for handle in &self.cols {
for handle in self.col_handles() {
if cols.contains(handle.name()) {
handle.load_scheduled.store(true, Ordering::SeqCst);
total_size += handle.size_bytes();
Expand All @@ -159,7 +164,7 @@ impl Partition {
}

pub fn restore(&self, col: &Arc<Column>) {
for handle in &self.cols {
for handle in self.col_handles() {
if handle.name() == col.name() {
let mut maybe_column = handle.col.lock().unwrap();
if maybe_column.is_none() {
Expand All @@ -173,7 +178,7 @@ impl Partition {
}

pub fn evict(&self, col: &str) -> usize {
for handle in &self.cols {
for handle in self.col_handles() {
if handle.name() == col {
let mut maybe_column = handle.col.lock().unwrap();
let mem_size = handle.heap_size_of_children();
Expand All @@ -194,7 +199,7 @@ impl Partition {
if depth == 0 {
return;
}
for handle in &self.cols {
for handle in self.col_handles() {
let col = handle.col.lock().unwrap();
let coltree = coltrees
.entry(handle.name().to_string())
Expand All @@ -214,8 +219,7 @@ impl Partition {
}

pub fn heap_size_per_column(&self) -> Vec<(String, usize)> {
self.cols
.iter()
self.col_handles()
.map(|handle| {
let c = handle.col.lock().unwrap();
(
Expand All @@ -230,8 +234,7 @@ impl Partition {
}

pub fn heap_size_of_children(&self) -> usize {
self.cols
.iter()
self.col_handles()
.map(|handle| {
let c = handle.col.lock().unwrap();
match *c {
Expand All @@ -241,6 +244,10 @@ impl Partition {
})
.sum()
}

pub fn col_handles(&self) -> impl Iterator<Item = &ColumnHandle> {
self.cols.values().flat_map(|x| x.values())
}
}

pub struct ColumnHandle {
Expand Down
38 changes: 22 additions & 16 deletions src/scheduler/disk_read_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl DiskReadScheduler {
run.bytes > chunk_size
};
if reached_chunk_size {
task_queue.push_back(runs.remove(col).unwrap());
task_queue.push_back(runs.remove(&col.as_str()).unwrap());
}
}
}
Expand All @@ -116,7 +116,7 @@ impl DiskReadScheduler {
debug!("Scheduled sequential reads. Queue: {:#?}", &*task_queue);
}

pub fn get_or_load(&self, handle: &ColumnHandle, perf_counter: &QueryPerfCounter) -> Arc<Column> {
pub fn get_or_load(&self, handle: &ColumnHandle, cols: &HashMap<String, HashMap<PartitionID, ColumnHandle>>, perf_counter: &QueryPerfCounter) -> Arc<Column> {
loop {
if handle.is_resident() {
let mut maybe_column = handle.try_get();
Expand Down Expand Up @@ -150,23 +150,29 @@ impl DiskReadScheduler {
let _token = self.reader_semaphore.access();
self.disk_store.load_column(&handle.key().table, handle.id(), handle.name(), perf_counter)
};
// Need to hold lock when we put new value into lru
// TODO: also populate columns that were colocated in same subpartition
let mut result = None;
#[allow(unused_mut)]
let mut column = columns.into_iter().find(|c| c.name() == handle.name()).unwrap();
let mut maybe_column = handle.try_get();
self.lru.put(handle.key().clone());
#[cfg(feature = "enable_lz4")]
{
if self.lz4_decode {
column.lz4_decode();
handle.update_size_bytes(column.heap_size_of_children());
for mut column in columns {
let _handle = cols.get(column.name()).unwrap().get(&handle.id()).unwrap();
// Need to hold lock when we put new value into lru
let mut maybe_column = _handle.try_get();
// TODO: if not main handle, put it at back of lru
self.lru.put(_handle.key().clone());
#[cfg(feature = "enable_lz4")]
{
if self.lz4_decode {
column.lz4_decode();
_handle.update_size_bytes(column.heap_size_of_children());
}
}
let column = Arc::new(column);
*maybe_column = Some(column.clone());
_handle.set_resident();
if column.name() == handle.name() {
result = Some(column);
}
}
let column = Arc::new(column);
*maybe_column = Some(column.clone());
handle.set_resident();
return column;
return result.unwrap()
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/scheduler/inner_locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ impl InnerLocustDB {
for table in tables.values() {
if let Some(partition) = table.batch() {
let columns: Vec<_> = partition
.cols
.iter()
.col_handles()
.map(|c| c.try_get().as_ref().unwrap().clone())
.sorted_by(|a, b| a.name().cmp(b.name()));

Expand Down

0 comments on commit 988859e

Please sign in to comment.