Skip to content

Commit

Permalink
columns endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
cswinter committed Mar 18, 2024
1 parent 7f72802 commit 8277536
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 10 deletions.
4 changes: 4 additions & 0 deletions src/locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ impl LocustDB {
}
}

pub fn search_column_names(&self, table: &str, query: &str) -> Vec<String> {
self.inner_locustdb.search_column_names(table, query)
}

pub async fn bulk_load(&self) -> Result<Vec<MemTreeTable>, oneshot::Canceled> {
for table in self.inner_locustdb.full_snapshot() {
self.inner_locustdb
Expand Down
79 changes: 70 additions & 9 deletions src/mem_store/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::sync::{Mutex, RwLock};

use itertools::Itertools;

use crate::disk_store::*;
use crate::disk_store::storage::{Storage, WALSegment};
use crate::disk_store::*;
use crate::ingest::buffer::Buffer;
use crate::ingest::input_column::InputColumn;
use crate::ingest::raw_val::RawVal;
Expand All @@ -23,6 +23,9 @@ pub struct Table {
buffer: Mutex<Buffer>,
/// LRU that keeps track of when each (table, partition, column) segment was last accessed.
lru: Lru,

// Set of every column name that is present in any partition
column_names: RwLock<HashSet<String>>,
}

impl Table {
Expand All @@ -33,6 +36,7 @@ impl Table {
next_partition_id: AtomicU64::new(0),
buffer: Mutex::new(Buffer::default()),
lru,
column_names: RwLock::default(),
}
}

Expand Down Expand Up @@ -87,11 +91,19 @@ impl Table {
.into_iter()
.map(|(k, v)| {
let col = match v.data {
ColumnData::Dense(data) => if (data.len() as u64) < rows {
InputColumn::NullableFloat(rows, data.into_iter().enumerate().map(|(i, v)| (i as u64, v)).collect())
} else {
InputColumn::Float(data)
},
ColumnData::Dense(data) => {
if (data.len() as u64) < rows {
InputColumn::NullableFloat(
rows,
data.into_iter()
.enumerate()
.map(|(i, v)| (i as u64, v))
.collect(),
)
} else {
InputColumn::Float(data)
}
}
ColumnData::Sparse(data) => InputColumn::NullableFloat(rows, data),
};
(k, col)
Expand Down Expand Up @@ -125,23 +137,50 @@ impl Table {
self.lru.clone(),
));
let mut partitions = self.partitions.write().unwrap();
let mut column_names = self.column_names.write().unwrap();
partitions.insert(md.id, partition);
self.next_partition_id.fetch_max(md.id + 1, std::sync::atomic::Ordering::SeqCst);
for sp in &md.subpartitions {
for col in &sp.column_names {
if !column_names.contains(col) {
column_names.insert(col.clone());
}
}
}
self.next_partition_id
.fetch_max(md.id + 1, std::sync::atomic::Ordering::SeqCst);
}

pub fn ingest(&self, row: Vec<(String, RawVal)>) {
log::debug!("Ingesting row: {:?}", row);
let mut buffer = self.buffer.lock().unwrap();
let mut column_names = self.column_names.write().unwrap();
for (col, _) in &row {
if !column_names.contains(col) {
column_names.insert(col.clone());
}
}
buffer.push_row(row);
}

pub fn ingest_homogeneous(&self, columns: HashMap<String, InputColumn>) {
let mut buffer = self.buffer.lock().unwrap();
let mut column_names = self.column_names.write().unwrap();
for col in columns.keys() {
if !column_names.contains(col) {
column_names.insert(col.clone());
}
}
buffer.push_typed_cols(columns);
}

pub fn ingest_heterogeneous(&self, columns: HashMap<String, Vec<RawVal>>) {
let mut buffer = self.buffer.lock().unwrap();
let mut column_names = self.column_names.write().unwrap();
for col in columns.keys() {
if !column_names.contains(col) {
column_names.insert(col.clone());
}
}
buffer.push_untyped_cols(columns);
}

Expand Down Expand Up @@ -194,7 +233,12 @@ impl Table {
None
}

pub fn compact(&self, id: PartitionID, columns: Vec<Arc<Column>>, old_partitions: &[PartitionID]) {
pub fn compact(
&self,
id: PartitionID,
columns: Vec<Arc<Column>>,
old_partitions: &[PartitionID],
) {
let (partition, keys) = Partition::new(self.name(), id, columns, self.lru.clone());
{
let mut partitions = self.partitions.write().unwrap();
Expand Down Expand Up @@ -286,8 +330,25 @@ impl Table {
columns.into_iter().sorted()
}

pub fn search_column_names(&self, pattern: &str) -> Vec<String> {
let column_names = self.column_names.read().unwrap();
match regex::Regex::new(pattern) {
Ok(re) => column_names
.iter()
.filter(|col| re.is_match(col))
.cloned()
.sorted(),
Err(_) => column_names
.iter()
.filter(|col| col.contains(pattern))
.cloned()
.sorted(),
}
}

pub fn next_partition_id(&self) -> u64 {
self.next_partition_id.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
self.next_partition_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/scheduler/inner_locustdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,11 @@ impl InnerLocustDB {
}
bytes_evicted
}

pub fn search_column_names(&self, table: &str, column: &str) -> Vec<String> {
let tables = self.tables.read().unwrap();
tables.get(table).map_or(vec![], |t| t.search_column_names(column))
}
}

impl Drop for InnerLocustDB {
Expand Down
27 changes: 26 additions & 1 deletion src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fmt::Write;
use std::sync::Arc;

use actix_cors::Cors;
use actix_web::web::{Bytes, Data};
use actix_web::{get, post, web, App, HttpResponse, HttpServer, Responder};
use futures::channel::oneshot::Canceled;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tera::{Context, Tera};
Expand Down Expand Up @@ -44,6 +45,14 @@ struct QueryRequest {
query: String,
}

#[derive(Serialize, Deserialize, Debug)]
struct ColumnNameRequest {
tables: Vec<String>,
pattern: Option<String>,
offset: Option<usize>,
limit: Option<usize>,
}

#[derive(Serialize, Deserialize, Debug)]
struct MultiQueryRequest {
queries: Vec<String>,
Expand Down Expand Up @@ -214,6 +223,21 @@ async fn multi_query_cols(
HttpResponse::Ok().json(json_results)
}

#[post("/columns")]
async fn columns(
data: web::Data<AppState>,
req_body: web::Json<ColumnNameRequest>,
) -> impl Responder {
let mut cols = HashSet::new();
let pattern = req_body.pattern.clone().unwrap_or("".to_string());
for table in &req_body.tables {
cols.extend(data.db.search_column_names(table, &pattern));
}
HttpResponse::Ok().json(json!({
"columns": cols.iter().cloned().sorted(),
}))
}

fn flatmap_err_response(
err: Result<Result<QueryOutput, QueryError>, Canceled>,
) -> Result<QueryOutput, HttpResponse> {
Expand Down Expand Up @@ -367,6 +391,7 @@ pub async fn run(db: Arc<LocustDB>, cors_allow_all: bool, cors_allow_origin: Vec
.service(query_data)
.service(query_cols)
.service(multi_query_cols)
.service(columns)
.service(plot)
.route("/hey", web::get().to(manual_hello))
})
Expand Down

0 comments on commit 8277536

Please sign in to comment.