Skip to content

Commit

Permalink
feat: pushdown quals to duckdb in fdw
Browse files Browse the repository at this point in the history
  • Loading branch information
kysshsy committed Aug 8, 2024
1 parent 7545b28 commit 6989c6d
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ serde_json = "1.0.120"
signal-hook = "0.3.17"
signal-hook-async-std = "0.2.2"
shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4854652" }
supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "6c58451" }
supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "27af09b" }
thiserror = "1.0.59"
uuid = "1.9.1"

Expand Down
39 changes: 37 additions & 2 deletions src/fdw/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ pub trait BaseFdw {

async fn begin_scan_impl(
&mut self,
// TODO: Push down quals
_quals: &[Qual],
quals: &[Qual],
columns: &[Column],
sorts: &[Sort],
limit: &Option<Limit>,
Expand Down Expand Up @@ -94,6 +93,16 @@ pub trait BaseFdw {

let mut sql = format!("SELECT {targets} FROM {schema_name}.{table_name}");

if !quals.is_empty() {
let mut formatter = DuckDbFormatter::new();
let where_clauses = quals
.iter()
.map(|x| x.deparse_with_fmt(&mut formatter))
.collect::<Vec<String>>()
.join(" and ");
sql.push_str(&format!(" WHERE {}", where_clauses));
}

if !sorts.is_empty() {
let order_by = sorts
.iter()
Expand Down Expand Up @@ -246,3 +255,29 @@ pub enum BaseFdwError {
#[error(transparent)]
Options(#[from] OptionsError),
}

struct DuckDbFormatter {}

impl CellFormatter for DuckDbFormatter {
fn fmt_cell(&mut self, cell: &Cell) -> String {
match cell {
Cell::Bytea(v) => {
let byte_u8 = unsafe { varlena_to_byte_slice(*v) };
let hex = byte_u8
.iter()
.map(|b| format!(r#"\x{:02X}"#, b))
.collect::<Vec<String>>()
.join("");
format!("'{}'", hex)
}

cell => format!("{}", cell),
}
}
}

impl DuckDbFormatter {
fn new() -> Self {
Self {}
}
}
169 changes: 169 additions & 0 deletions tests/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,172 @@ async fn test_create_heap_from_parquet(mut conn: PgConnection, tempdir: TempDir)

Ok(())
}

#[rstest]
async fn test_quals_pushdown(mut conn: PgConnection, tempdir: TempDir) -> Result<()> {
let stored_batch = primitive_record_batch()?;
let parquet_path = tempdir.path().join("test_arrow_types.parquet");
let parquet_file = File::create(&parquet_path)?;

let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap();
writer.write(&stored_batch)?;
writer.close()?;

primitive_setup_fdw_local_file_listing(parquet_path.as_path().to_str().unwrap(), "primitive")
.execute(&mut conn);

"CREATE TABLE t1 (a int);".execute(&mut conn);

let test_case: Vec<(&str, &str, &str, i32)> = vec![
("boolean_col", "false", "false", 0),
("int8_col", "-1", "-1", -1),
("int16_col", "0", "0", 0),
("int32_col", "1", "1", 1),
("int64_col", "-1", "-1", -1),
("uint8_col", "0", "0", 0),
("uint16_col", "1", "1", 1),
("uint32_col", "2", "2", -1),
("uint64_col", "0", "0", 0),
("float32_col", "1.0", "1", 1),
("float64_col", "-1.0", "-1", -1),
("date32_col", r#"'2020-01-01'"#, r#"'2020-01-01'"#, 1),
("date64_col", r#"'2021-01-02'"#, r#"'2021-01-02'"#, -1),
(
"binary_col",
r#"decode(encode('hello', 'hex'),'hex')"#,
r#"'\x68\x65\x6C\x6C\x6F'"#,
1,
),
("binary_col", r#"E''"#, r#"''"#, -1),
(
"large_binary_col",
r#"'\x68656C6C6F'"#,
r#"'\x68\x65\x6C\x6C\x6F'"#,
1,
),
(
"large_binary_col",
r#"'\x70617271756574'"#,
r#"'\x70\x61\x72\x71\x75\x65\x74'"#,
0,
),
("utf8_col", "'Hello'", "'Hello'", 1),
("utf8_col", "'There'", "'There'", -1),
("large_utf8_col", "'Hello'", "'Hello'", 1),
("large_utf8_col", "'World'", "'World'", 0),
];

for (col_name, val, plan_val, res) in test_case {
let where_clause = format!("{col_name} = {val}");
// The condition in the clause may undergo simplification
let plan_clause = format!("{col_name} = {plan_val}");

// prevent executor push down, make sure it goes FDW (by using LEFT JOIN with normal postgres table)
let query =
format!("SELECT int32_col from primitive LEFT JOIN t1 on true WHERE {where_clause}");
let explain: Vec<(String,)> = format!("EXPLAIN {query}").fetch(&mut conn);

assert!(
explain[3].0.contains(&plan_clause),
"explain plan error: explain: {}\nplan_clause: {}\n",
explain[3].0,
plan_clause,
);
// make sure the result is correct
let rows: Vec<(i32,)> = query.clone().fetch(&mut conn);
assert!(
rows.len() == 1,
"result error: rows length: {}\nquery: {}\n",
rows.len(),
query
);
assert_eq!(
res, rows[0].0,
"result error: expect: {}, result: {} \n query: {}",
res, rows[0].0, query
);
}
Ok(())
}

#[rstest]
async fn test_complex_quals_pushdown(mut conn: PgConnection, tempdir: TempDir) -> Result<()> {
let stored_batch = primitive_record_batch()?;
let parquet_path = tempdir.path().join("test_arrow_types.parquet");
let parquet_file = File::create(&parquet_path)?;

let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap();
writer.write(&stored_batch)?;
writer.close()?;

primitive_setup_fdw_local_file_listing(parquet_path.as_path().to_str().unwrap(), "primitive")
.execute(&mut conn);

"CREATE TABLE t1 (a int);".execute(&mut conn);

let query = r#"SELECT int64_col
FROM primitive LEFT JOIN t1 ON true
WHERE (
boolean_col = TRUE
AND int8_col = 1
AND int16_col = 1
AND int32_col = 1
AND int64_col = 1
AND uint8_col = 1
AND uint16_col = 1
AND uint32_col = 1
AND uint64_col = 1
AND float32_col = 1.0
AND float64_col = 1.0
AND date32_col = DATE '2020-01-01'
AND date64_col = TIMESTAMP '2021-01-01'
AND binary_col = E'\\x68656c6c6f'
AND large_binary_col = E'\\x68656c6c6f'
AND utf8_col = 'Hello'
AND large_utf8_col = 'Hello'
)
OR (
boolean_col = FALSE
AND int8_col = 0
AND int16_col = 0
AND int32_col = 0
AND int64_col = 0
AND uint8_col = 0
AND uint16_col = 0
AND uint32_col = 0
AND uint64_col = 0
AND float32_col = 0.0
AND float64_col = 0.0
AND date32_col = DATE '2020-01-03'
AND date64_col = TIMESTAMP '2021-01-03'
AND binary_col = E'\\x70617271756574'
AND large_binary_col = E'\\x70617271756574'
AND utf8_col = 'World'
AND large_utf8_col = 'World'
);"#;

// make sure the result is correct with complex clauses.
let rows: Vec<(i64,)> = query.fetch(&mut conn);

// TODO: check the plan. Wrappers not parse quals correctly. So there is not qual pushdown
assert!(
rows.len() == 2,
"result error: rows length: {}\nquery: {}\n",
rows.len(),
query
);

assert_eq!(
1, rows[0].0,
"result error: expect: {}, result: {} \n query: {}",
1, rows[0].0, query
);

assert_eq!(
0, rows[1].0,
"result error: expect: {}, result: {} \n query: {}",
0, rows[1].0, query
);

Ok(())
}

0 comments on commit 6989c6d

Please sign in to comment.