Skip to content

Commit

Permalink
switch to indexmap
Browse files Browse the repository at this point in the history
  • Loading branch information
rebasedming committed Aug 5, 2024
1 parent cf09c2a commit 1784935
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 35 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4854652" }
supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "6c58451" }
thiserror = "1.0.59"
uuid = "1.9.1"
heapless = "0.8.0"
heapless = "0.7.16"

[dev-dependencies]
aws-config = "1.5.1"
Expand Down
1 change: 0 additions & 1 deletion src/api/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use anyhow::Result;
use duckdb::types::Value;
use pgrx::*;

use crate::duckdb::connection;
use crate::duckdb::utils;
use crate::env::get_global_connection;

Expand Down
1 change: 0 additions & 1 deletion src/api/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
use anyhow::Result;
use pgrx::*;

use crate::duckdb::connection;
use crate::duckdb::utils;
use crate::env::get_global_connection;

Expand Down
45 changes: 19 additions & 26 deletions src/duckdb/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

use anyhow::{anyhow, Result};
use duckdb::arrow::array::RecordBatch;
use duckdb::{Connection, Params, Statement};
use duckdb::{Params, Statement};
use signal_hook::consts::signal::*;
use signal_hook::iterator::Signals;
use std::cell::UnsafeCell;
use std::collections::HashMap;
use std::sync::Once;
use std::thread;

use crate::env::{get_global_connection, postgres_data_dir_path, postgres_database_oid};
use crate::env::get_global_connection;

use super::csv;
use super::delta;
Expand All @@ -34,7 +34,6 @@ use super::parquet;
use super::secret;

// Global mutable static variables
static mut GLOBAL_CONNECTION: Option<UnsafeCell<Connection>> = None;
static mut GLOBAL_STATEMENT: Option<UnsafeCell<Option<Statement<'static>>>> = None;
static mut GLOBAL_ARROW: Option<UnsafeCell<Option<duckdb::Arrow<'static>>>> = None;
static INIT: Once = Once::new();
Expand All @@ -57,14 +56,12 @@ fn init_globals() {
}

fn iceberg_loaded() -> Result<bool> {
unsafe {
let conn = get_global_connection();
let conn = conn.lock().unwrap();
let mut statement = conn.prepare("SELECT * FROM duckdb_extensions() WHERE extension_name = 'iceberg' AND installed = true AND loaded = true")?;
match statement.query([])?.next() {
Ok(Some(_)) => Ok(true),
_ => Ok(false),
}
let conn = get_global_connection();
let conn = conn.lock().unwrap();
let mut statement = conn.prepare("SELECT * FROM duckdb_extensions() WHERE extension_name = 'iceberg' AND installed = true AND loaded = true")?;
match statement.query([])?.next() {
Ok(Some(_)) => Ok(true),
_ => Ok(false),
}
}

Expand Down Expand Up @@ -182,24 +179,20 @@ pub fn get_batches() -> Result<Vec<RecordBatch>> {
}

pub fn execute<P: Params>(sql: &str, params: P) -> Result<usize> {
unsafe {
let conn = get_global_connection();
let conn = conn.lock().unwrap();
conn.execute(sql, params).map_err(|err| anyhow!("{err}"))
}
let conn = get_global_connection();
let conn = conn.lock().unwrap();
conn.execute(sql, params).map_err(|err| anyhow!("{err}"))
}

pub fn drop_relation(table_name: &str, schema_name: &str) -> Result<()> {
unsafe {
let conn = get_global_connection();
let conn = conn.lock().unwrap();
let mut statement = conn.prepare(format!("SELECT table_type from information_schema.tables WHERE table_schema = '{schema_name}' AND table_name = '{table_name}' LIMIT 1").as_str())?;
if let Ok(Some(row)) = statement.query([])?.next() {
let table_type: String = row.get(0)?;
let table_type = table_type.replace("BASE", "").trim().to_string();
let statement = format!("DROP {table_type} {schema_name}.{table_name}");
execute(statement.as_str(), [])?;
}
let conn = get_global_connection();
let conn = conn.lock().unwrap();
let mut statement = conn.prepare(format!("SELECT table_type from information_schema.tables WHERE table_schema = '{schema_name}' AND table_name = '{table_name}' LIMIT 1").as_str())?;
if let Ok(Some(row)) = statement.query([])?.next() {
let table_type: String = row.get(0)?;
let table_type = table_type.replace("BASE", "").trim().to_string();
let statement = format!("DROP {table_type} {schema_name}.{table_name}");
execute(statement.as_str(), [])?;
}

Ok(())
Expand Down
21 changes: 15 additions & 6 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ use std::sync::{Arc, Mutex};
pub struct DuckdbConnection(pub Arc<Mutex<Connection>>);
unsafe impl PGRXSharedMemory for DuckdbConnection {}

// One connection per database, so 128 databases can have a DuckDB connection
const MAX_CONNECTIONS: usize = 128;
pub static DUCKDB_CONNECTION: PgLwLock<
heapless::FnvIndexMap<u32, DuckdbConnection, MAX_CONNECTIONS>,
> = PgLwLock::new();

impl Default for DuckdbConnection {
fn default() -> Self {
let mut duckdb_path = postgres_data_dir_path();
Expand All @@ -21,17 +27,20 @@ impl Default for DuckdbConnection {
duckdb_path.push(postgres_database_oid().to_string());
duckdb_path.set_extension("db3");

let conn =
Connection::open_with_flags(duckdb_path, duckdb::Config::default().threads(1).unwrap())
.expect("failed to open duckdb connection");
let conn = Connection::open(duckdb_path).expect("failed to open duckdb connection");
DuckdbConnection(Arc::new(Mutex::new(conn)))
}
}

pub static DUCKDB_CONNECTION: PgLwLock<DuckdbConnection> = PgLwLock::new();

pub fn get_global_connection() -> Arc<Mutex<Connection>> {
(*DUCKDB_CONNECTION.share()).0.clone()
match DUCKDB_CONNECTION.exclusive().entry(postgres_database_oid()) {
heapless::Entry::Occupied(entry) => entry.get().0.clone(),
heapless::Entry::Vacant(entry) => {
let conn = DuckdbConnection::default();
let _ = entry.insert(conn.clone());
conn.0.clone()
}
}
}

pub fn postgres_data_dir_path() -> PathBuf {
Expand Down

0 comments on commit 1784935

Please sign in to comment.