diff --git a/src/locustdb.rs b/src/locustdb.rs index 2683acad..aa8eae10 100644 --- a/src/locustdb.rs +++ b/src/locustdb.rs @@ -133,6 +133,10 @@ impl LocustDB { } } + pub fn search_column_names(&self, table: &str, query: &str) -> Vec { + self.inner_locustdb.search_column_names(table, query) + } + pub async fn bulk_load(&self) -> Result, oneshot::Canceled> { for table in self.inner_locustdb.full_snapshot() { self.inner_locustdb diff --git a/src/mem_store/table.rs b/src/mem_store/table.rs index b5cd0da6..9b7db812 100644 --- a/src/mem_store/table.rs +++ b/src/mem_store/table.rs @@ -7,8 +7,8 @@ use std::sync::{Mutex, RwLock}; use itertools::Itertools; -use crate::disk_store::*; use crate::disk_store::storage::{Storage, WALSegment}; +use crate::disk_store::*; use crate::ingest::buffer::Buffer; use crate::ingest::input_column::InputColumn; use crate::ingest::raw_val::RawVal; @@ -23,6 +23,9 @@ pub struct Table { buffer: Mutex, /// LRU that keeps track of when each (table, partition, column) segment was last accessed. lru: Lru, + + // Set of every column name that is present in any partition + column_names: RwLock>, } impl Table { @@ -33,6 +36,7 @@ impl Table { next_partition_id: AtomicU64::new(0), buffer: Mutex::new(Buffer::default()), lru, + column_names: RwLock::default(), } } @@ -87,11 +91,19 @@ impl Table { .into_iter() .map(|(k, v)| { let col = match v.data { - ColumnData::Dense(data) => if (data.len() as u64) < rows { - InputColumn::NullableFloat(rows, data.into_iter().enumerate().map(|(i, v)| (i as u64, v)).collect()) - } else { - InputColumn::Float(data) - }, + ColumnData::Dense(data) => { + if (data.len() as u64) < rows { + InputColumn::NullableFloat( + rows, + data.into_iter() + .enumerate() + .map(|(i, v)| (i as u64, v)) + .collect(), + ) + } else { + InputColumn::Float(data) + } + } ColumnData::Sparse(data) => InputColumn::NullableFloat(rows, data), }; (k, col) @@ -125,23 +137,50 @@ impl Table { self.lru.clone(), )); let mut partitions = self.partitions.write().unwrap(); + let mut column_names = self.column_names.write().unwrap(); partitions.insert(md.id, partition); - self.next_partition_id.fetch_max(md.id + 1, std::sync::atomic::Ordering::SeqCst); + for sp in &md.subpartitions { + for col in &sp.column_names { + if !column_names.contains(col) { + column_names.insert(col.clone()); + } + } + } + self.next_partition_id + .fetch_max(md.id + 1, std::sync::atomic::Ordering::SeqCst); } pub fn ingest(&self, row: Vec<(String, RawVal)>) { log::debug!("Ingesting row: {:?}", row); let mut buffer = self.buffer.lock().unwrap(); + let mut column_names = self.column_names.write().unwrap(); + for (col, _) in &row { + if !column_names.contains(col) { + column_names.insert(col.clone()); + } + } buffer.push_row(row); } pub fn ingest_homogeneous(&self, columns: HashMap) { let mut buffer = self.buffer.lock().unwrap(); + let mut column_names = self.column_names.write().unwrap(); + for col in columns.keys() { + if !column_names.contains(col) { + column_names.insert(col.clone()); + } + } buffer.push_typed_cols(columns); } pub fn ingest_heterogeneous(&self, columns: HashMap>) { let mut buffer = self.buffer.lock().unwrap(); + let mut column_names = self.column_names.write().unwrap(); + for col in columns.keys() { + if !column_names.contains(col) { + column_names.insert(col.clone()); + } + } buffer.push_untyped_cols(columns); } @@ -194,7 +233,12 @@ impl Table { None } - pub fn compact(&self, id: PartitionID, columns: Vec>, old_partitions: &[PartitionID]) { + pub fn compact( + &self, + id: PartitionID, + columns: Vec>, + old_partitions: &[PartitionID], + ) { let (partition, keys) = Partition::new(self.name(), id, columns, self.lru.clone()); { let mut partitions = self.partitions.write().unwrap(); @@ -286,8 +330,25 @@ impl Table { columns.into_iter().sorted() } + pub fn search_column_names(&self, pattern: &str) -> Vec { + let column_names = self.column_names.read().unwrap(); + match regex::Regex::new(pattern) { + Ok(re) => column_names + .iter() + .filter(|col| re.is_match(col)) + .cloned() + .sorted(), + Err(_) => column_names + .iter() + .filter(|col| col.contains(pattern)) + .cloned() + .sorted(), + } + } + pub fn next_partition_id(&self) -> u64 { - self.next_partition_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst) + self.next_partition_id + .fetch_add(1, std::sync::atomic::Ordering::SeqCst) } } diff --git a/src/scheduler/inner_locustdb.rs b/src/scheduler/inner_locustdb.rs index 24cb9b17..15bf28df 100644 --- a/src/scheduler/inner_locustdb.rs +++ b/src/scheduler/inner_locustdb.rs @@ -438,6 +438,11 @@ impl InnerLocustDB { } bytes_evicted } + + pub fn search_column_names(&self, table: &str, column: &str) -> Vec { + let tables = self.tables.read().unwrap(); + tables.get(table).map_or(vec![], |t| t.search_column_names(column)) + } } impl Drop for InnerLocustDB { diff --git a/src/server/mod.rs b/src/server/mod.rs index 1010c483..8233ffc4 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fmt::Write; use std::sync::Arc; @@ -6,6 +6,7 @@ use actix_cors::Cors; use actix_web::web::{Bytes, Data}; use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder}; use futures::channel::oneshot::Canceled; +use itertools::Itertools; use serde::{Deserialize, Serialize}; use serde_json::json; use tera::{Context, Tera}; @@ -44,6 +45,14 @@ struct QueryRequest { query: String, } +#[derive(Serialize, Deserialize, Debug)] +struct ColumnNameRequest { + tables: Vec, + pattern: Option, + offset: Option, + limit: Option, +} + #[derive(Serialize, Deserialize, Debug)] struct MultiQueryRequest { queries: Vec, @@ -214,6 +223,21 @@ async fn multi_query_cols( HttpResponse::Ok().json(json_results) } +#[post("/columns")] +async fn columns( + data: web::Data, + req_body: web::Json, +) -> impl Responder { + let mut cols = HashSet::new(); + let pattern = req_body.pattern.clone().unwrap_or("".to_string()); + for table in &req_body.tables { + cols.extend(data.db.search_column_names(table, &pattern)); + } + HttpResponse::Ok().json(json!({ + "columns": cols.iter().cloned().sorted(), + })) +} + fn flatmap_err_response( err: Result, Canceled>, ) -> Result { @@ -367,6 +391,7 @@ pub async fn run(db: Arc, cors_allow_all: bool, cors_allow_origin: Vec .service(query_data) .service(query_cols) .service(multi_query_cols) + .service(columns) .service(plot) .route("/hey", web::get().to(manual_hello)) })