From 6989c6dd964eda4edf9ddb66377e01e29d52cf72 Mon Sep 17 00:00:00 2001 From: kysshsy Date: Thu, 8 Aug 2024 19:23:43 +0800 Subject: [PATCH] feat: pushdown quals to duckdb in fdw --- Cargo.toml | 2 +- src/fdw/base.rs | 39 ++++++++++- tests/scan.rs | 169 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 207 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7b72910f..a38f65da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ serde_json = "1.0.120" signal-hook = "0.3.17" signal-hook-async-std = "0.2.2" shared = { git = "https://github.com/paradedb/paradedb.git", rev = "4854652" } -supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "6c58451" } +supabase-wrappers = { git = "https://github.com/paradedb/wrappers.git", default-features = false, rev = "27af09b" } thiserror = "1.0.59" uuid = "1.9.1" diff --git a/src/fdw/base.rs b/src/fdw/base.rs index 1dcdf3d9..b4232f2d 100644 --- a/src/fdw/base.rs +++ b/src/fdw/base.rs @@ -46,8 +46,7 @@ pub trait BaseFdw { async fn begin_scan_impl( &mut self, - // TODO: Push down quals - _quals: &[Qual], + quals: &[Qual], columns: &[Column], sorts: &[Sort], limit: &Option, @@ -94,6 +93,16 @@ pub trait BaseFdw { let mut sql = format!("SELECT {targets} FROM {schema_name}.{table_name}"); + if !quals.is_empty() { + let mut formatter = DuckDbFormatter::new(); + let where_clauses = quals + .iter() + .map(|x| x.deparse_with_fmt(&mut formatter)) + .collect::>() + .join(" and "); + sql.push_str(&format!(" WHERE {}", where_clauses)); + } + if !sorts.is_empty() { let order_by = sorts .iter() @@ -246,3 +255,29 @@ pub enum BaseFdwError { #[error(transparent)] Options(#[from] OptionsError), } + +struct DuckDbFormatter {} + +impl CellFormatter for DuckDbFormatter { + fn fmt_cell(&mut self, cell: &Cell) -> String { + match cell { + Cell::Bytea(v) => { + let byte_u8 = unsafe { varlena_to_byte_slice(*v) }; + let hex = byte_u8 + .iter() + .map(|b| format!(r#"\x{:02X}"#, b)) + .collect::>() + .join(""); + format!("'{}'", hex) + } + + cell => format!("{}", cell), + } + } +} + +impl DuckDbFormatter { + fn new() -> Self { + Self {} + } +} \ No newline at end of file diff --git a/tests/scan.rs b/tests/scan.rs index dbec5c16..8d28f2d3 100644 --- a/tests/scan.rs +++ b/tests/scan.rs @@ -286,3 +286,172 @@ async fn test_create_heap_from_parquet(mut conn: PgConnection, tempdir: TempDir) Ok(()) } + +#[rstest] +async fn test_quals_pushdown(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { + let stored_batch = primitive_record_batch()?; + let parquet_path = tempdir.path().join("test_arrow_types.parquet"); + let parquet_file = File::create(&parquet_path)?; + + let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap(); + writer.write(&stored_batch)?; + writer.close()?; + + primitive_setup_fdw_local_file_listing(parquet_path.as_path().to_str().unwrap(), "primitive") + .execute(&mut conn); + + "CREATE TABLE t1 (a int);".execute(&mut conn); + + let test_case: Vec<(&str, &str, &str, i32)> = vec![ + ("boolean_col", "false", "false", 0), + ("int8_col", "-1", "-1", -1), + ("int16_col", "0", "0", 0), + ("int32_col", "1", "1", 1), + ("int64_col", "-1", "-1", -1), + ("uint8_col", "0", "0", 0), + ("uint16_col", "1", "1", 1), + ("uint32_col", "2", "2", -1), + ("uint64_col", "0", "0", 0), + ("float32_col", "1.0", "1", 1), + ("float64_col", "-1.0", "-1", -1), + ("date32_col", r#"'2020-01-01'"#, r#"'2020-01-01'"#, 1), + ("date64_col", r#"'2021-01-02'"#, r#"'2021-01-02'"#, -1), + ( + "binary_col", + r#"decode(encode('hello', 'hex'),'hex')"#, + r#"'\x68\x65\x6C\x6C\x6F'"#, + 1, + ), + ("binary_col", r#"E''"#, r#"''"#, -1), + ( + "large_binary_col", + r#"'\x68656C6C6F'"#, + r#"'\x68\x65\x6C\x6C\x6F'"#, + 1, + ), + ( + "large_binary_col", + r#"'\x70617271756574'"#, + r#"'\x70\x61\x72\x71\x75\x65\x74'"#, + 0, + ), + ("utf8_col", "'Hello'", "'Hello'", 1), + ("utf8_col", "'There'", "'There'", -1), + ("large_utf8_col", "'Hello'", "'Hello'", 1), + ("large_utf8_col", "'World'", "'World'", 0), + ]; + + for (col_name, val, plan_val, res) in test_case { + let where_clause = format!("{col_name} = {val}"); + // The condition in the clause may undergo simplification + let plan_clause = format!("{col_name} = {plan_val}"); + + // prevent executor push down, make sure it goes FDW (by using LEFT JOIN with normal postgres table) + let query = + format!("SELECT int32_col from primitive LEFT JOIN t1 on true WHERE {where_clause}"); + let explain: Vec<(String,)> = format!("EXPLAIN {query}").fetch(&mut conn); + + assert!( + explain[3].0.contains(&plan_clause), + "explain plan error: explain: {}\nplan_clause: {}\n", + explain[3].0, + plan_clause, + ); + // make sure the result is correct + let rows: Vec<(i32,)> = query.clone().fetch(&mut conn); + assert!( + rows.len() == 1, + "result error: rows length: {}\nquery: {}\n", + rows.len(), + query + ); + assert_eq!( + res, rows[0].0, + "result error: expect: {}, result: {} \n query: {}", + res, rows[0].0, query + ); + } + Ok(()) +} + +#[rstest] +async fn test_complex_quals_pushdown(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { + let stored_batch = primitive_record_batch()?; + let parquet_path = tempdir.path().join("test_arrow_types.parquet"); + let parquet_file = File::create(&parquet_path)?; + + let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap(); + writer.write(&stored_batch)?; + writer.close()?; + + primitive_setup_fdw_local_file_listing(parquet_path.as_path().to_str().unwrap(), "primitive") + .execute(&mut conn); + + "CREATE TABLE t1 (a int);".execute(&mut conn); + + let query = r#"SELECT int64_col + FROM primitive LEFT JOIN t1 ON true + WHERE ( + boolean_col = TRUE + AND int8_col = 1 + AND int16_col = 1 + AND int32_col = 1 + AND int64_col = 1 + AND uint8_col = 1 + AND uint16_col = 1 + AND uint32_col = 1 + AND uint64_col = 1 + AND float32_col = 1.0 + AND float64_col = 1.0 + AND date32_col = DATE '2020-01-01' + AND date64_col = TIMESTAMP '2021-01-01' + AND binary_col = E'\\x68656c6c6f' + AND large_binary_col = E'\\x68656c6c6f' + AND utf8_col = 'Hello' + AND large_utf8_col = 'Hello' + ) + OR ( + boolean_col = FALSE + AND int8_col = 0 + AND int16_col = 0 + AND int32_col = 0 + AND int64_col = 0 + AND uint8_col = 0 + AND uint16_col = 0 + AND uint32_col = 0 + AND uint64_col = 0 + AND float32_col = 0.0 + AND float64_col = 0.0 + AND date32_col = DATE '2020-01-03' + AND date64_col = TIMESTAMP '2021-01-03' + AND binary_col = E'\\x70617271756574' + AND large_binary_col = E'\\x70617271756574' + AND utf8_col = 'World' + AND large_utf8_col = 'World' + );"#; + + // make sure the result is correct with complex clauses. + let rows: Vec<(i64,)> = query.fetch(&mut conn); + + // TODO: check the plan. Wrappers not parse quals correctly. So there is not qual pushdown + assert!( + rows.len() == 2, + "result error: rows length: {}\nquery: {}\n", + rows.len(), + query + ); + + assert_eq!( + 1, rows[0].0, + "result error: expect: {}, result: {} \n query: {}", + 1, rows[0].0, query + ); + + assert_eq!( + 0, rows[1].0, + "result error: expect: {}, result: {} \n query: {}", + 0, rows[1].0, query + ); + + Ok(()) +} \ No newline at end of file