From e411dfe10aa36c6a76cf8fd70777c0b72324f1b7 Mon Sep 17 00:00:00 2001 From: kysshsy Date: Sat, 28 Sep 2024 22:02:04 +0800 Subject: [PATCH 01/14] feat: support prepare statement --- src/hooks/executor.rs | 74 +----------------- src/hooks/mod.rs | 3 +- src/hooks/query.rs | 73 +++++++++++++++++- src/hooks/utility.rs | 94 ++++++++++++----------- src/hooks/utility/explain.rs | 61 +++++++++++++++ src/hooks/utility/prepare.rs | 144 +++++++++++++++++++++++++++++++++++ 6 files changed, 332 insertions(+), 117 deletions(-) create mode 100644 src/hooks/utility/explain.rs create mode 100644 src/hooks/utility/prepare.rs diff --git a/src/hooks/executor.rs b/src/hooks/executor.rs index 99c73d0e..d28ded06 100644 --- a/src/hooks/executor.rs +++ b/src/hooks/executor.rs @@ -15,25 +15,17 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use anyhow::{anyhow, Result}; -use duckdb::arrow::array::RecordBatch; +use anyhow::Result; use pgrx::*; use std::ffi::CStr; use crate::duckdb::connection; -use crate::schema::cell::*; use super::query::*; #[cfg(debug_assertions)] use crate::DEBUG_GUCS; -macro_rules! fallback_warning { - ($msg:expr) => { - warning!("This query was not fully pushed down to DuckDB because DuckDB returned an error. Query times may be impacted. If you would like to see this query pushed down, please submit a request to https://github.com/paradedb/paradedb/issues with the following context:\n{}", $msg); - }; -} - #[allow(deprecated)] pub async fn executor_run( query_desc: PgBox, @@ -66,6 +58,7 @@ pub async fn executor_run( // Tech Debt: Find a less hacky way to let COPY/CREATE go through || query.to_lowercase().starts_with("copy") || query.to_lowercase().starts_with("create") + || query.to_lowercase().starts_with("prepare") { prev_hook(query_desc, direction, count, execute_once); return Ok(()); @@ -102,66 +95,3 @@ pub async fn executor_run( connection::clear_arrow(); Ok(()) } - -#[inline] -fn write_batches_to_slots( - query_desc: PgBox, - mut batches: Vec, -) -> Result<()> { - // Convert the DataFusion batches to Postgres tuples and send them to the destination - unsafe { - let tuple_desc = PgTupleDesc::from_pg(query_desc.tupDesc); - let estate = query_desc.estate; - (*estate).es_processed = 0; - - let dest = query_desc.dest; - let startup = (*dest) - .rStartup - .ok_or_else(|| anyhow!("rStartup not found"))?; - startup(dest, query_desc.operation as i32, query_desc.tupDesc); - - let receive = (*dest) - .receiveSlot - .ok_or_else(|| anyhow!("receiveSlot not found"))?; - - for batch in batches.iter_mut() { - for row_index in 0..batch.num_rows() { - let tuple_table_slot = - pg_sys::MakeTupleTableSlot(query_desc.tupDesc, &pg_sys::TTSOpsVirtual); - - pg_sys::ExecStoreVirtualTuple(tuple_table_slot); - - for (col_index, _) in tuple_desc.iter().enumerate() { - let attribute = tuple_desc - .get(col_index) - .ok_or_else(|| anyhow!("attribute at {col_index} not found in tupdesc"))?; - let column = batch.column(col_index); - let tts_value = (*tuple_table_slot).tts_values.add(col_index); - let tts_isnull = (*tuple_table_slot).tts_isnull.add(col_index); - - match column.get_cell(row_index, attribute.atttypid, attribute.name())? { - Some(cell) => { - if let Some(datum) = cell.into_datum() { - *tts_value = datum; - } - } - None => { - *tts_isnull = true; - } - }; - } - - receive(tuple_table_slot, dest); - (*estate).es_processed += 1; - pg_sys::ExecDropSingleTupleTableSlot(tuple_table_slot); - } - } - - let shutdown = (*dest) - .rShutdown - .ok_or_else(|| anyhow!("rShutdown not found"))?; - shutdown(dest); - } - - Ok(()) -} diff --git a/src/hooks/mod.rs b/src/hooks/mod.rs index 73a87ff0..c0065299 100644 --- a/src/hooks/mod.rs +++ b/src/hooks/mod.rs @@ -15,8 +15,9 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -mod executor; +#[macro_use] mod query; +mod executor; mod utility; use async_std::task::block_on; diff --git a/src/hooks/query.rs b/src/hooks/query.rs index 92e84a12..c4e93a35 100644 --- a/src/hooks/query.rs +++ b/src/hooks/query.rs @@ -15,13 +15,21 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use anyhow::Result; +use anyhow::{anyhow, Result}; +use duckdb::arrow::array::RecordBatch; use pgrx::*; use std::ffi::CStr; use std::str::Utf8Error; use crate::duckdb::connection; use crate::fdw::handler::FdwHandler; +use crate::schema::cell::*; + +macro_rules! fallback_warning { + ($msg:expr) => { + warning!("This query was not fully pushed down to DuckDB because DuckDB returned an error. Query times may be impacted. If you would like to see this query pushed down, please submit a request to https://github.com/paradedb/paradedb/issues with the following context:\n{}", $msg); + }; +} pub fn get_current_query( planned_stmt: *mut pg_sys::PlannedStmt, @@ -121,3 +129,66 @@ pub fn is_duckdb_query(relations: &[PgRelation]) -> bool { } }) } + +#[inline] +pub fn write_batches_to_slots( + query_desc: PgBox, + mut batches: Vec, +) -> Result<()> { + // Convert the DataFusion batches to Postgres tuples and send them to the destination + unsafe { + let tuple_desc = PgTupleDesc::from_pg(query_desc.tupDesc); + let estate = query_desc.estate; + (*estate).es_processed = 0; + + let dest = query_desc.dest; + let startup = (*dest) + .rStartup + .ok_or_else(|| anyhow!("rStartup not found"))?; + startup(dest, query_desc.operation as i32, query_desc.tupDesc); + + let receive = (*dest) + .receiveSlot + .ok_or_else(|| anyhow!("receiveSlot not found"))?; + + for batch in batches.iter_mut() { + for row_index in 0..batch.num_rows() { + let tuple_table_slot = + pg_sys::MakeTupleTableSlot(query_desc.tupDesc, &pg_sys::TTSOpsVirtual); + + pg_sys::ExecStoreVirtualTuple(tuple_table_slot); + + for (col_index, _) in tuple_desc.iter().enumerate() { + let attribute = tuple_desc + .get(col_index) + .ok_or_else(|| anyhow!("attribute at {col_index} not found in tupdesc"))?; + let column = batch.column(col_index); + let tts_value = (*tuple_table_slot).tts_values.add(col_index); + let tts_isnull = (*tuple_table_slot).tts_isnull.add(col_index); + + match column.get_cell(row_index, attribute.atttypid, attribute.name())? { + Some(cell) => { + if let Some(datum) = cell.into_datum() { + *tts_value = datum; + } + } + None => { + *tts_isnull = true; + } + }; + } + + receive(tuple_table_slot, dest); + (*estate).es_processed += 1; + pg_sys::ExecDropSingleTupleTableSlot(tuple_table_slot); + } + } + + let shutdown = (*dest) + .rShutdown + .ok_or_else(|| anyhow!("rShutdown not found"))?; + shutdown(dest); + } + + Ok(()) +} diff --git a/src/hooks/utility.rs b/src/hooks/utility.rs index 70362e04..d53dbd41 100644 --- a/src/hooks/utility.rs +++ b/src/hooks/utility.rs @@ -16,17 +16,19 @@ // along with this program. If not, see . #![allow(clippy::too_many_arguments)] +#![allow(deprecated)] +mod explain; +mod prepare; -use std::ffi::CString; +use std::ptr::null_mut; use anyhow::{bail, Result}; -use pg_sys::NodeTag; -use pgrx::*; +use pgrx::{pg_sys, AllocatedByRust, HookResult, PgBox}; use sqlparser::{ast::Statement, dialect::PostgreSqlDialect, parser::Parser}; -use super::query::*; +use explain::explain_query; +use prepare::*; -#[allow(deprecated)] type ProcessUtilityHook = fn( pstmt: PgBox, query_string: &core::ffi::CStr, @@ -66,7 +68,47 @@ pub async fn process_utility_hook( return Ok(()); } + let parse_state = unsafe { + let state = pg_sys::make_parsestate(null_mut()); + (*state).p_sourcetext = query_string.as_ptr(); + (*state).p_queryEnv = query_env.as_ptr(); + state + }; + let need_exec_prev_hook = match stmt_type { + pg_sys::NodeTag::T_PrepareStmt => prepare_query( + parse_state, + pstmt.utilityStmt as *mut pg_sys::PrepareStmt, + pstmt.stmt_location, + pstmt.stmt_len, + )?, + + pg_sys::NodeTag::T_ExecuteStmt => { + let mut query_desc = unsafe { + PgBox::::from_rust(pg_sys::CreateQueryDesc( + pstmt.as_ptr(), + query_string.as_ptr(), + null_mut(), + null_mut(), + dest.as_ptr(), + null_mut(), + query_env.as_ptr(), + 0, + )) + }; + query_desc.estate = unsafe { pg_sys::CreateExecutorState() }; + + execute_query( + parse_state, + pstmt.utilityStmt as *mut pg_sys::ExecuteStmt, + query_desc, + )? + } + + pg_sys::NodeTag::T_DeallocateStmt => { + deallocate_query(pstmt.utilityStmt as *mut pg_sys::DeallocateStmt)? + } + pg_sys::NodeTag::T_ExplainStmt => explain_query( query_string, pstmt.utilityStmt as *mut pg_sys::ExplainStmt, @@ -91,45 +133,11 @@ pub async fn process_utility_hook( Ok(()) } -fn is_support_utility(stmt_type: NodeTag) -> bool { +fn is_support_utility(stmt_type: pg_sys::NodeTag) -> bool { stmt_type == pg_sys::NodeTag::T_ExplainStmt -} - -fn explain_query( - query_string: &core::ffi::CStr, - stmt: *mut pg_sys::ExplainStmt, - dest: *mut pg_sys::DestReceiver, -) -> Result { - let query = unsafe { (*stmt).query as *mut pg_sys::Query }; - - let query_relations = get_query_relations(unsafe { (*query).rtable }); - if unsafe { (*query).commandType } != pg_sys::CmdType::CMD_SELECT - || !is_duckdb_query(&query_relations) - { - return Ok(true); - } - - if unsafe { !(*stmt).options.is_null() } { - error!("the EXPLAIN options provided are not supported for DuckDB pushdown queries."); - } - - unsafe { - let tstate = pg_sys::begin_tup_output_tupdesc( - dest, - pg_sys::ExplainResultDesc(stmt), - &pg_sys::TTSOpsVirtual, - ); - let query = format!( - "DuckDB Scan: {}", - parse_query_from_utility_stmt(query_string)? - ); - let query_c_str = CString::new(query)?; - - pg_sys::do_text_output_multiline(tstate, query_c_str.as_ptr()); - pg_sys::end_tup_output(tstate); - } - - Ok(false) + || stmt_type == pg_sys::NodeTag::T_PrepareStmt + || stmt_type == pg_sys::NodeTag::T_DeallocateStmt + || stmt_type == pg_sys::NodeTag::T_ExecuteStmt } fn parse_query_from_utility_stmt(query_string: &core::ffi::CStr) -> Result { diff --git a/src/hooks/utility/explain.rs b/src/hooks/utility/explain.rs new file mode 100644 index 00000000..bdfca82b --- /dev/null +++ b/src/hooks/utility/explain.rs @@ -0,0 +1,61 @@ +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::ffi::CString; + +use anyhow::Result; +use pgrx::{error, pg_sys}; + +use super::parse_query_from_utility_stmt; +use crate::hooks::query::{get_query_relations, is_duckdb_query}; + +pub fn explain_query( + query_string: &core::ffi::CStr, + stmt: *mut pg_sys::ExplainStmt, + dest: *mut pg_sys::DestReceiver, +) -> Result { + let query = unsafe { (*stmt).query as *mut pg_sys::Query }; + + let query_relations = get_query_relations(unsafe { (*query).rtable }); + if unsafe { (*query).commandType } != pg_sys::CmdType::CMD_SELECT + || !is_duckdb_query(&query_relations) + { + return Ok(true); + } + + if unsafe { !(*stmt).options.is_null() } { + error!("the EXPLAIN options provided are not supported for DuckDB pushdown queries."); + } + + unsafe { + let tstate = pg_sys::begin_tup_output_tupdesc( + dest, + pg_sys::ExplainResultDesc(stmt), + &pg_sys::TTSOpsVirtual, + ); + let query = format!( + "DuckDB Scan: {}", + parse_query_from_utility_stmt(query_string)? + ); + let query_c_str = CString::new(query)?; + + pg_sys::do_text_output_multiline(tstate, query_c_str.as_ptr()); + pg_sys::end_tup_output(tstate); + } + + Ok(false) +} diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs new file mode 100644 index 00000000..f125107b --- /dev/null +++ b/src/hooks/utility/prepare.rs @@ -0,0 +1,144 @@ +// Copyright (c) 2023-2024 Retake, Inc. +// +// This file is part of ParadeDB - Postgres for Search and Analytics +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::ffi::CStr; +use std::ptr::null_mut; + +use anyhow::Result; +use pgrx::{pg_sys, pgbox, warning, PgBox}; + +use crate::duckdb::connection; +use crate::hooks::query::*; + +pub fn prepare_query( + pstate: *mut pg_sys::ParseState, + stmt: *mut pg_sys::PrepareStmt, + stmt_location: i32, + stmt_len: i32, +) -> Result { + if unsafe { (*stmt).name }.is_null() || unsafe { *(*stmt).name } == '\0' as std::os::raw::c_char + { + return Ok(true); + } + + // Perform parsing and analysis to get the Query + let query = unsafe { + let mut raw_stmt = pg_sys::RawStmt { + type_: pg_sys::NodeTag::T_RawStmt, + stmt: (*stmt).query, + stmt_location, + stmt_len, + }; + + let arg_types = (*stmt).argtypes; + let mut nargs = if arg_types.is_null() { + 0 + } else { + (*arg_types).length + }; + + // Transform list of TypeNames to array of type OIDs + let mut types_oid: *mut pg_sys::Oid = if nargs > 0 { + let oid_ptr = pg_sys::palloc((nargs as usize) * std::mem::size_of::()) + as *mut pg_sys::Oid; + let type_elements = (*arg_types).elements; + for i in 0..(*arg_types).length { + let type_name = + (*type_elements.offset(i as isize)).ptr_value as *const pg_sys::TypeName; + *oid_ptr.offset(i as isize) = pg_sys::typenameTypeId(pstate, type_name) + } + oid_ptr + } else { + null_mut() + }; + + pg_sys::parse_analyze_varparams( + &mut raw_stmt, + (*pstate).p_sourcetext, + &mut types_oid, + &mut nargs, + null_mut(), + ) + }; + + let query_relations = get_query_relations(unsafe { (*query).rtable }); + if unsafe { (*query).commandType } != pg_sys::CmdType::CMD_SELECT + || !is_duckdb_query(&query_relations) + { + return Ok(true); + } + + // set search path according to postgres + set_search_path_by_pg()?; + + let query_sql: &CStr = unsafe { CStr::from_ptr((*pstate).p_sourcetext) }; + if let Err(e) = connection::execute(query_sql.to_str()?, []) { + fallback_warning!(e.to_string()); + return Ok(true); + } + + // It's always necessary to execute the previous hook to store a prepared statement in PostgreSQL + Ok(true) +} + +pub fn execute_query( + _psate: *mut pg_sys::ParseState, + stmt: *mut pg_sys::ExecuteStmt, + query_desc: PgBox, +) -> Result { + unsafe { + let prepared_stmt = pg_sys::FetchPreparedStatement((*stmt).name, true); + + (*query_desc.as_ptr()).tupDesc = (*(*prepared_stmt).plansource).resultDesc + } + + let query = unsafe { CStr::from_ptr((*query_desc.as_ptr()).sourceText) }; + match connection::create_arrow(query.to_str()?) { + Err(err) => { + connection::clear_arrow(); + fallback_warning!(err.to_string()); + return Ok(true); + } + Ok(false) => { + connection::clear_arrow(); + return Ok(false); + } + _ => {} + } + + match connection::get_batches() { + Ok(batches) => write_batches_to_slots(query_desc, batches)?, + Err(err) => { + connection::clear_arrow(); + fallback_warning!(err.to_string()); + return Ok(true); + } + } + + connection::clear_arrow(); + Ok(false) +} + +pub fn deallocate_query(stmt: *mut pg_sys::DeallocateStmt) -> Result { + if !unsafe { (*stmt).name }.is_null() { + let name = unsafe { CStr::from_ptr((*stmt).name) }; + // we don't care the result + let _ = connection::execute(&format!(r#"DEALLOCATE "{}""#, name.to_str()?), []); + } + + Ok(true) +} From be075f1c2b1cb24f07d0a155723be34eeb0798e4 Mon Sep 17 00:00:00 2001 From: kysshsy Date: Sat, 28 Sep 2024 22:15:10 +0800 Subject: [PATCH 02/14] test: add prepare test --- tests/scan.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/scan.rs b/tests/scan.rs index 053c4b04..f2e79de8 100644 --- a/tests/scan.rs +++ b/tests/scan.rs @@ -557,3 +557,37 @@ async fn test_executor_hook_search_path(mut conn: PgConnection, tempdir: TempDir Ok(()) } + +#[rstest] +async fn test_prepare_stmt_execute(#[future(awt)] s3: S3, mut conn: PgConnection) -> Result<()> { + NycTripsTable::setup().execute(&mut conn); + let rows: Vec = "SELECT * FROM nyc_trips".fetch(&mut conn); + s3.client + .create_bucket() + .bucket(S3_TRIPS_BUCKET) + .send() + .await?; + s3.create_bucket(S3_TRIPS_BUCKET).await?; + s3.put_rows(S3_TRIPS_BUCKET, S3_TRIPS_KEY, &rows).await?; + + NycTripsTable::setup_s3_listing_fdw( + &s3.url.clone(), + &format!("s3://{S3_TRIPS_BUCKET}/{S3_TRIPS_KEY}"), + ) + .execute(&mut conn); + + r#"PREPARE test_query(int) AS SELECT count(*) FROM trips WHERE "VendorID" = $1;"# + .execute(&mut conn); + + let count: (i64,) = "EXECUTE test_query(1)".fetch_one(&mut conn); + assert_eq!(count.0, 39); + + let count: (i64,) = "EXECUTE test_query(3)".fetch_one(&mut conn); + assert_eq!(count.0, 0); + + "DEALLOCATE test_query".execute(&mut conn); + + assert!("EXECUTE test_query(3)".execute_result(&mut conn).is_err()); + + Ok(()) +} From 77046a0715e366521413f7fee9697965a249a9e3 Mon Sep 17 00:00:00 2001 From: kysshsy Date: Sat, 28 Sep 2024 23:06:52 +0800 Subject: [PATCH 03/14] feat: add Query plan check --- src/hooks/utility/prepare.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index f125107b..0eea2bf0 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -102,8 +102,19 @@ pub fn execute_query( ) -> Result { unsafe { let prepared_stmt = pg_sys::FetchPreparedStatement((*stmt).name, true); + let plan_source = *(*prepared_stmt).plansource; + + if plan_source.query_list.is_null() { + return Ok(true); + } + let query = (*((*plan_source.query_list).elements).offset(0)).ptr_value as *mut pg_sys::Query; + let query_relations = get_query_relations((*query).rtable); + if !is_duckdb_query(&query_relations) { + return Ok(true); + } + + (*query_desc.as_ptr()).tupDesc = (plan_source).resultDesc - (*query_desc.as_ptr()).tupDesc = (*(*prepared_stmt).plansource).resultDesc } let query = unsafe { CStr::from_ptr((*query_desc.as_ptr()).sourceText) }; From 840511a8cd8a9d869a1fdffc1d78b2bab2360bfb Mon Sep 17 00:00:00 2001 From: kysshsy Date: Wed, 2 Oct 2024 21:35:21 +0800 Subject: [PATCH 04/14] fix: run fmt --- src/hooks/utility/prepare.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index 0eea2bf0..ce90b782 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -107,14 +107,14 @@ pub fn execute_query( if plan_source.query_list.is_null() { return Ok(true); } - let query = (*((*plan_source.query_list).elements).offset(0)).ptr_value as *mut pg_sys::Query; + let query = + (*((*plan_source.query_list).elements).offset(0)).ptr_value as *mut pg_sys::Query; let query_relations = get_query_relations((*query).rtable); if !is_duckdb_query(&query_relations) { return Ok(true); } (*query_desc.as_ptr()).tupDesc = (plan_source).resultDesc - } let query = unsafe { CStr::from_ptr((*query_desc.as_ptr()).sourceText) }; From 1bae8802ea7aa2a4789295dedf2ffb0d067af18d Mon Sep 17 00:00:00 2001 From: kysshsy Date: Wed, 2 Oct 2024 23:07:16 +0800 Subject: [PATCH 05/14] feat: change the way to get cached plan --- src/hooks/utility/prepare.rs | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index ce90b782..383d6a4e 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -102,22 +102,29 @@ pub fn execute_query( ) -> Result { unsafe { let prepared_stmt = pg_sys::FetchPreparedStatement((*stmt).name, true); - let plan_source = *(*prepared_stmt).plansource; + let plan_source = (*prepared_stmt).plansource; - if plan_source.query_list.is_null() { + if (*plan_source).query_list.is_null() || !(*plan_source).fixed_result { return Ok(true); } - let query = - (*((*plan_source.query_list).elements).offset(0)).ptr_value as *mut pg_sys::Query; - let query_relations = get_query_relations((*query).rtable); - if !is_duckdb_query(&query_relations) { + + let cached_plan = pg_sys::GetCachedPlan(plan_source, null_mut(), null_mut(), null_mut()); + if (*cached_plan).stmt_list.is_null() { + return Ok(true); + } + + let planned_stmt = (*(*(*cached_plan).stmt_list).elements.offset(0)).ptr_value as *mut pg_sys::PlannedStmt; + let query_relations = get_query_relations((*planned_stmt).rtable); + if (*planned_stmt).commandType != pg_sys::CmdType::CMD_SELECT || !is_duckdb_query(&query_relations) { return Ok(true); } - (*query_desc.as_ptr()).tupDesc = (plan_source).resultDesc + (*query_desc.as_ptr()).tupDesc = (*plan_source).resultDesc } let query = unsafe { CStr::from_ptr((*query_desc.as_ptr()).sourceText) }; + + set_search_path_by_pg()?; match connection::create_arrow(query.to_str()?) { Err(err) => { connection::clear_arrow(); From 1084aee80f698f2fbc9790ea0ffad3c8d26dc346 Mon Sep 17 00:00:00 2001 From: kysshsy Date: Wed, 2 Oct 2024 23:16:29 +0800 Subject: [PATCH 06/14] fix: run fmt --- src/hooks/utility/prepare.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index 383d6a4e..e801f673 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -104,7 +104,7 @@ pub fn execute_query( let prepared_stmt = pg_sys::FetchPreparedStatement((*stmt).name, true); let plan_source = (*prepared_stmt).plansource; - if (*plan_source).query_list.is_null() || !(*plan_source).fixed_result { + if (*plan_source).query_list.is_null() || !(*plan_source).fixed_result { return Ok(true); } @@ -113,9 +113,12 @@ pub fn execute_query( return Ok(true); } - let planned_stmt = (*(*(*cached_plan).stmt_list).elements.offset(0)).ptr_value as *mut pg_sys::PlannedStmt; + let planned_stmt = + (*(*(*cached_plan).stmt_list).elements.offset(0)).ptr_value as *mut pg_sys::PlannedStmt; let query_relations = get_query_relations((*planned_stmt).rtable); - if (*planned_stmt).commandType != pg_sys::CmdType::CMD_SELECT || !is_duckdb_query(&query_relations) { + if (*planned_stmt).commandType != pg_sys::CmdType::CMD_SELECT + || !is_duckdb_query(&query_relations) + { return Ok(true); } From b8d00f55a02f8c5d9c0e7203e1082eff89280509 Mon Sep 17 00:00:00 2001 From: kysshsy Date: Wed, 2 Oct 2024 23:57:55 +0800 Subject: [PATCH 07/14] fix: pg13 compile --- src/hooks/utility/prepare.rs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index e801f673..838cc049 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -66,13 +66,25 @@ pub fn prepare_query( null_mut() }; - pg_sys::parse_analyze_varparams( - &mut raw_stmt, - (*pstate).p_sourcetext, - &mut types_oid, - &mut nargs, - null_mut(), - ) + #[cfg(not(feature = "pg13"))] + { + pg_sys::parse_analyze_varparams( + &mut raw_stmt, + (*pstate).p_sourcetext, + &mut types_oid, + &mut nargs, + null_mut(), + ) + } + #[cfg(feature = "pg13")] + { + pg_sys::parse_analyze_varparams( + &mut raw_stmt, + (*pstate).p_sourcetext, + &mut types_oid, + &mut nargs, + ) + } }; let query_relations = get_query_relations(unsafe { (*query).rtable }); From be7b0296fd2de51abcea6cef8fe4154f382ab98b Mon Sep 17 00:00:00 2001 From: kysshsy Date: Thu, 3 Oct 2024 00:20:35 +0800 Subject: [PATCH 08/14] fix: fix compile --- src/hooks/utility/prepare.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index 838cc049..22d8d1f3 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -66,7 +66,7 @@ pub fn prepare_query( null_mut() }; - #[cfg(not(feature = "pg13"))] + #[cfg(any(feature = "pg15", feature = "pg16", feature = "pg17"))] { pg_sys::parse_analyze_varparams( &mut raw_stmt, @@ -76,7 +76,7 @@ pub fn prepare_query( null_mut(), ) } - #[cfg(feature = "pg13")] + #[cfg(any(feature = "pg13", feature = "pg14"))] { pg_sys::parse_analyze_varparams( &mut raw_stmt, From 2f5a6a0b9719310f06bdadbe92afd8d9d5f5e60a Mon Sep 17 00:00:00 2001 From: kysshsy Date: Fri, 4 Oct 2024 18:25:09 +0800 Subject: [PATCH 09/14] fix: replan in Duckdb when search_path changed --- src/hooks/utility/prepare.rs | 35 ++++++++++++++++---- tests/fixtures/arrow.rs | 62 +++++++++++++++++++++++++++++++++--- tests/scan.rs | 50 +++++++++++++++++++++++++++-- 3 files changed, 134 insertions(+), 13 deletions(-) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index 22d8d1f3..3da1aaa7 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -19,7 +19,7 @@ use std::ffi::CStr; use std::ptr::null_mut; use anyhow::Result; -use pgrx::{pg_sys, pgbox, warning, PgBox}; +use pgrx::{error, pg_sys, pgbox, warning, PgBox}; use crate::duckdb::connection; use crate::hooks::query::*; @@ -35,6 +35,15 @@ pub fn prepare_query( return Ok(true); } + let prepared_stmt = unsafe { pg_sys::FetchPreparedStatement((*stmt).name, false) }; + if !prepared_stmt.is_null() { + let stmt_name = unsafe { CStr::from_ptr((*stmt).name) }; + error!( + "prepared statement \"{}\" already exists", + stmt_name.to_str()? + ); + } + // Perform parsing and analysis to get the Query let query = unsafe { let mut raw_stmt = pg_sys::RawStmt { @@ -116,12 +125,15 @@ pub fn execute_query( let prepared_stmt = pg_sys::FetchPreparedStatement((*stmt).name, true); let plan_source = (*prepared_stmt).plansource; - if (*plan_source).query_list.is_null() || !(*plan_source).fixed_result { + if plan_source.is_null() || !(*plan_source).fixed_result { return Ok(true); } + // we need to make Duckdb replan the `PREPARE` statement when search path changed. + let need_replan = !pg_sys::OverrideSearchPathMatchesCurrent((*plan_source).search_path); + let cached_plan = pg_sys::GetCachedPlan(plan_source, null_mut(), null_mut(), null_mut()); - if (*cached_plan).stmt_list.is_null() { + if cached_plan.is_null() || (*cached_plan).stmt_list.is_null() { return Ok(true); } @@ -134,12 +146,22 @@ pub fn execute_query( return Ok(true); } - (*query_desc.as_ptr()).tupDesc = (*plan_source).resultDesc + (*query_desc.as_ptr()).tupDesc = (*plan_source).resultDesc; + + // Note that DuckDB does not replan prepared statements when the search path changes. + // We enforce this by executing the PREPARE statement again. + set_search_path_by_pg()?; + + if need_replan { + let prepare_stmt = CStr::from_ptr((*plan_source).query_string); + if let Err(e) = connection::execute(prepare_stmt.to_str()?, []) { + error!("execute prepare replan error: {}", e.to_string()); + } + } } let query = unsafe { CStr::from_ptr((*query_desc.as_ptr()).sourceText) }; - set_search_path_by_pg()?; match connection::create_arrow(query.to_str()?) { Err(err) => { connection::clear_arrow(); @@ -169,7 +191,8 @@ pub fn execute_query( pub fn deallocate_query(stmt: *mut pg_sys::DeallocateStmt) -> Result { if !unsafe { (*stmt).name }.is_null() { let name = unsafe { CStr::from_ptr((*stmt).name) }; - // we don't care the result + // We don't care the result + // Next prepare statement will override this one. let _ = connection::execute(&format!(r#"DEALLOCATE "{}""#, name.to_str()?), []); } diff --git a/tests/fixtures/arrow.rs b/tests/fixtures/arrow.rs index 4e98dd99..fc88e50b 100644 --- a/tests/fixtures/arrow.rs +++ b/tests/fixtures/arrow.rs @@ -43,6 +43,14 @@ fn array_data() -> ArrayData { .unwrap() } +fn single_array_data() -> ArrayData { + ArrayData::builder(DataType::Binary) + .len(1) + .add_buffer(Buffer::from("hello")) + .build() + .unwrap() +} + // Fixed size binary is not supported yet, but this will be useful for test data when we do support. fn fixed_size_array_data() -> ArrayData { let values: [u8; 15] = *b"hellotherearrow"; // Ensure length is consistent @@ -66,6 +74,14 @@ fn binary_array_data() -> ArrayData { .unwrap() } +fn single_binary_array_data() -> ArrayData { + ArrayData::builder(DataType::Binary) + .len(1) + .add_buffer(Buffer::from("hello")) + .build() + .unwrap() +} + /// A separate version of the primitive_record_batch fixture, /// narrowed to only the types that Delta Lake supports. pub fn delta_primitive_record_batch() -> Result { @@ -119,10 +135,8 @@ pub fn record_batch_with_casing() -> Result { Ok(batch) } -// Blows up deltalake, so comment out for now. -pub fn primitive_record_batch() -> Result { - // Define the fields for each datatype - let fields = vec![ +pub fn primitive_fields() -> Vec { + vec![ Field::new("boolean_col", DataType::Boolean, true), Field::new("int8_col", DataType::Int8, false), Field::new("int16_col", DataType::Int16, false), @@ -140,7 +154,13 @@ pub fn primitive_record_batch() -> Result { Field::new("large_binary_col", DataType::LargeBinary, false), Field::new("utf8_col", DataType::Utf8, false), Field::new("large_utf8_col", DataType::LargeUtf8, false), - ]; + ] +} + +// Blows up deltalake, so comment out for now. +pub fn primitive_record_batch() -> Result { + // Define the fields for each datatype + let fields = primitive_fields(); // Create a schema from the fields let schema = Arc::new(Schema::new(fields)); @@ -186,6 +206,38 @@ pub fn primitive_record_batch() -> Result { )?) } +pub fn primitive_record_batch_single() -> Result { + // Define the fields for each datatype + let fields = primitive_fields(); + + // Create a schema from the fields + let schema = Arc::new(Schema::new(fields)); + + // Create a RecordBatch + Ok(RecordBatch::try_new( + schema, + vec![ + Arc::new(BooleanArray::from(vec![Some(true)])), + Arc::new(Int8Array::from(vec![1])), + Arc::new(Int16Array::from(vec![1])), + Arc::new(Int32Array::from(vec![1])), + Arc::new(Int64Array::from(vec![1])), + Arc::new(UInt8Array::from(vec![1])), + Arc::new(UInt16Array::from(vec![1])), + Arc::new(UInt32Array::from(vec![1])), + Arc::new(UInt64Array::from(vec![1])), + Arc::new(Float32Array::from(vec![1.0])), + Arc::new(Float64Array::from(vec![1.0])), + Arc::new(Date32Array::from(vec![18262])), + Arc::new(Date64Array::from(vec![1609459200000])), + Arc::new(BinaryArray::from(single_array_data())), + Arc::new(LargeBinaryArray::from(single_binary_array_data())), + Arc::new(StringArray::from(vec![Some("Hello")])), + Arc::new(LargeStringArray::from(vec![Some("Hello")])), + ], + )?) +} + pub fn primitive_create_foreign_data_wrapper( wrapper: &str, handler: &str, diff --git a/tests/scan.rs b/tests/scan.rs index f2e79de8..5a090fc2 100644 --- a/tests/scan.rs +++ b/tests/scan.rs @@ -20,8 +20,9 @@ mod fixtures; use crate::fixtures::arrow::{ delta_primitive_record_batch, primitive_create_foreign_data_wrapper, primitive_create_server, primitive_create_table, primitive_create_user_mapping_options, primitive_record_batch, - primitive_setup_fdw_local_file_delta, primitive_setup_fdw_local_file_listing, - primitive_setup_fdw_s3_delta, primitive_setup_fdw_s3_listing, + primitive_record_batch_single, primitive_setup_fdw_local_file_delta, + primitive_setup_fdw_local_file_listing, primitive_setup_fdw_s3_delta, + primitive_setup_fdw_s3_listing, }; use crate::fixtures::db::Query; use crate::fixtures::{conn, duckdb_conn, s3, tempdir, S3}; @@ -591,3 +592,48 @@ async fn test_prepare_stmt_execute(#[future(awt)] s3: S3, mut conn: PgConnection Ok(()) } + +#[rstest] +async fn test_prepare_search_path(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { + let stored_batch = primitive_record_batch()?; + let parquet_path = tempdir.path().join("test_arrow_types.parquet"); + let parquet_file = File::create(&parquet_path)?; + + let mut writer = ArrowWriter::try_new(parquet_file, stored_batch.schema(), None).unwrap(); + writer.write(&stored_batch)?; + writer.close()?; + + let stored_batch_less = primitive_record_batch_single()?; + let less_parquet_path = tempdir.path().join("test_arrow_types_less.parquet"); + let less_parquet_file = File::create(&less_parquet_path)?; + + let mut writer = + ArrowWriter::try_new(less_parquet_file, stored_batch_less.schema(), None).unwrap(); + writer.write(&stored_batch_less)?; + writer.close()?; + + "CREATE SCHEMA tpch1".execute(&mut conn); + "CREATE SCHEMA tpch2".execute(&mut conn); + + let file_path = parquet_path.as_path().to_str().unwrap(); + let file_less_path = less_parquet_path.as_path().to_str().unwrap(); + + let create_table_t1 = primitive_create_table("parquet_server", "tpch1.t1"); + (&format!("{create_table_t1} OPTIONS (files '{file_path}');")).execute(&mut conn); + + let create_table_less_t1 = primitive_create_table("parquet_server", "tpch2.t1"); + (&format!("{create_table_less_t1} OPTIONS (files '{file_less_path}');")).execute(&mut conn); + + "SET search_path TO tpch1".execute(&mut conn); + + "PREPARE q1 AS SELECT * FROM t1 where boolean_col = $1".execute(&mut conn); + + let result: Vec<(bool,)> = "EXECUTE q1(true)".fetch_collect(&mut conn); + assert_eq!(result.len(), 2); + + "SET search_path TO tpch2".execute(&mut conn); + let result: Vec<(bool,)> = "EXECUTE q1(true)".fetch_collect(&mut conn); + assert_eq!(result.len(), 1); + + Ok(()) +} From e11520aace0de1c17134192fe9b84735be977dc5 Mon Sep 17 00:00:00 2001 From: kysshsy Date: Fri, 4 Oct 2024 18:49:13 +0800 Subject: [PATCH 10/14] fix: pg17 function name change --- src/hooks/utility/prepare.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index 3da1aaa7..36b385ff 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -130,8 +130,12 @@ pub fn execute_query( } // we need to make Duckdb replan the `PREPARE` statement when search path changed. + #[cfg(not(feature = "pg17"))] let need_replan = !pg_sys::OverrideSearchPathMatchesCurrent((*plan_source).search_path); + #[cfg(feature = "pg17")] + let need_replan = !pg_sys::SearchPathMatchesCurrentEnvironment((*plan_source).search_path); + let cached_plan = pg_sys::GetCachedPlan(plan_source, null_mut(), null_mut(), null_mut()); if cached_plan.is_null() || (*cached_plan).stmt_list.is_null() { return Ok(true); From 9ab6aa15e68d541a6d147129d55e9ab69ec63ee5 Mon Sep 17 00:00:00 2001 From: kysshsy Date: Fri, 4 Oct 2024 19:28:44 +0800 Subject: [PATCH 11/14] test: add search path test comment: add test comment --- tests/fixtures/arrow.rs | 26 +++++++++++++++++++++++--- tests/scan.rs | 13 +++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/tests/fixtures/arrow.rs b/tests/fixtures/arrow.rs index fc88e50b..56578502 100644 --- a/tests/fixtures/arrow.rs +++ b/tests/fixtures/arrow.rs @@ -44,9 +44,12 @@ fn array_data() -> ArrayData { } fn single_array_data() -> ArrayData { + let values: [u8; 5] = *b"hello"; + let offsets: [i32; 2] = [0, 5]; ArrayData::builder(DataType::Binary) .len(1) - .add_buffer(Buffer::from("hello")) + .add_buffer(Buffer::from_slice_ref(&offsets[..])) + .add_buffer(Buffer::from_slice_ref(&values[..])) .build() .unwrap() } @@ -75,9 +78,12 @@ fn binary_array_data() -> ArrayData { } fn single_binary_array_data() -> ArrayData { - ArrayData::builder(DataType::Binary) + let values: [u8; 5] = *b"hello"; + let offsets: [i64; 2] = [0, 5]; + ArrayData::builder(DataType::LargeBinary) .len(1) - .add_buffer(Buffer::from("hello")) + .add_buffer(Buffer::from_slice_ref(&offsets[..])) + .add_buffer(Buffer::from_slice_ref(&values[..])) .build() .unwrap() } @@ -457,6 +463,20 @@ pub fn setup_local_file_listing_with_casing(local_file_path: &str, table: &str) ) } +pub fn setup_parquet_wrapper_and_server() -> String { + let create_foreign_data_wrapper = primitive_create_foreign_data_wrapper( + "parquet_wrapper", + "parquet_fdw_handler", + "parquet_fdw_validator", + ); + let create_server = primitive_create_server("parquet_server", "parquet_wrapper"); + format!( + "{create_foreign_data_wrapper}; + {create_server}; + " + ) +} + fn valid(data_type: &DataType, oid: u32) -> bool { let oid = match PgBuiltInOids::from_u32(oid) { Ok(oid) => oid, diff --git a/tests/scan.rs b/tests/scan.rs index 5a090fc2..bbe70e7e 100644 --- a/tests/scan.rs +++ b/tests/scan.rs @@ -22,7 +22,7 @@ use crate::fixtures::arrow::{ primitive_create_table, primitive_create_user_mapping_options, primitive_record_batch, primitive_record_batch_single, primitive_setup_fdw_local_file_delta, primitive_setup_fdw_local_file_listing, primitive_setup_fdw_s3_delta, - primitive_setup_fdw_s3_listing, + primitive_setup_fdw_s3_listing, setup_parquet_wrapper_and_server, }; use crate::fixtures::db::Query; use crate::fixtures::{conn, duckdb_conn, s3, tempdir, S3}; @@ -593,6 +593,11 @@ async fn test_prepare_stmt_execute(#[future(awt)] s3: S3, mut conn: PgConnection Ok(()) } +// Note: PostgreSQL will replan the query when certain catalog changes occur, +// such as changes to the search path or when a table is deleted. +// In contrast, DuckDB does not replan when the search path is changed. +// If there are two foreign tables in different schemas and the prepared statements do not specify the schemas, +// it may lead to ambiguity or errors when referencing the tables. #[rstest] async fn test_prepare_search_path(mut conn: PgConnection, tempdir: TempDir) -> Result<()> { let stored_batch = primitive_record_batch()?; @@ -612,9 +617,13 @@ async fn test_prepare_search_path(mut conn: PgConnection, tempdir: TempDir) -> R writer.write(&stored_batch_less)?; writer.close()?; + // In this example, we create two tables with identical structures and names, but in different schemas. + // We expect that when the search path is changed, the correct table (the one in the current schema) will be referenced in DuckDB. "CREATE SCHEMA tpch1".execute(&mut conn); "CREATE SCHEMA tpch2".execute(&mut conn); + setup_parquet_wrapper_and_server().execute(&mut conn); + let file_path = parquet_path.as_path().to_str().unwrap(); let file_less_path = less_parquet_path.as_path().to_str().unwrap(); @@ -626,7 +635,7 @@ async fn test_prepare_search_path(mut conn: PgConnection, tempdir: TempDir) -> R "SET search_path TO tpch1".execute(&mut conn); - "PREPARE q1 AS SELECT * FROM t1 where boolean_col = $1".execute(&mut conn); + "PREPARE q1 AS SELECT * FROM t1 WHERE boolean_col = $1".execute(&mut conn); let result: Vec<(bool,)> = "EXECUTE q1(true)".fetch_collect(&mut conn); assert_eq!(result.len(), 2); From 10804dbb32bd840a73fef6025222e0b62b70a6da Mon Sep 17 00:00:00 2001 From: kysshsy Date: Wed, 9 Oct 2024 20:54:58 +0800 Subject: [PATCH 12/14] fix: change warning message url --- src/hooks/query.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/hooks/query.rs b/src/hooks/query.rs index c4e93a35..863a2f97 100644 --- a/src/hooks/query.rs +++ b/src/hooks/query.rs @@ -27,7 +27,7 @@ use crate::schema::cell::*; macro_rules! fallback_warning { ($msg:expr) => { - warning!("This query was not fully pushed down to DuckDB because DuckDB returned an error. Query times may be impacted. If you would like to see this query pushed down, please submit a request to https://github.com/paradedb/paradedb/issues with the following context:\n{}", $msg); + warning!("This query was not fully pushed down to DuckDB because DuckDB returned an error. Query times may be impacted. If you would like to see this query pushed down, please submit a request to https://github.com/paradedb/pg_analytics/issues with the following context:\n{}", $msg); }; } From 92a77861aa4c475ddd9b15abbc39fce702d8a559 Mon Sep 17 00:00:00 2001 From: kysshsy Date: Wed, 9 Oct 2024 21:16:22 +0800 Subject: [PATCH 13/14] fix: fix pg13 compile --- src/hooks/utility/prepare.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index 36b385ff..0e969eb9 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -136,6 +136,10 @@ pub fn execute_query( #[cfg(feature = "pg17")] let need_replan = !pg_sys::SearchPathMatchesCurrentEnvironment((*plan_source).search_path); + #[cfg(feature = "pg13")] + let cached_plan = pg_sys::GetCachedPlan(plan_source, null_mut(), false, null_mut()); + + #[cfg(not(feature = "pg13"))] let cached_plan = pg_sys::GetCachedPlan(plan_source, null_mut(), null_mut(), null_mut()); if cached_plan.is_null() || (*cached_plan).stmt_list.is_null() { return Ok(true); From 76003c08d0b590218be77f2ba04a18b5c953912c Mon Sep 17 00:00:00 2001 From: kysshsy Date: Sun, 13 Oct 2024 19:40:35 +0800 Subject: [PATCH 14/14] update base on comment --- src/hooks/utility/prepare.rs | 9 ++++++--- tests/scan.rs | 3 +++ 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/hooks/utility/prepare.rs b/src/hooks/utility/prepare.rs index 0e969eb9..55de8c0e 100644 --- a/src/hooks/utility/prepare.rs +++ b/src/hooks/utility/prepare.rs @@ -129,18 +129,21 @@ pub fn execute_query( return Ok(true); } - // we need to make Duckdb replan the `PREPARE` statement when search path changed. + // We need to ensure that DuckDB replans the PREPARE statement when the search path changes, + // in order to match PostgreSQL’s default behavior. + #[cfg(not(feature = "pg17"))] let need_replan = !pg_sys::OverrideSearchPathMatchesCurrent((*plan_source).search_path); - #[cfg(feature = "pg17")] let need_replan = !pg_sys::SearchPathMatchesCurrentEnvironment((*plan_source).search_path); + // For PostgreSQL 13 #[cfg(feature = "pg13")] let cached_plan = pg_sys::GetCachedPlan(plan_source, null_mut(), false, null_mut()); - + // For PostgreSQL 14 and above #[cfg(not(feature = "pg13"))] let cached_plan = pg_sys::GetCachedPlan(plan_source, null_mut(), null_mut(), null_mut()); + if cached_plan.is_null() || (*cached_plan).stmt_list.is_null() { return Ok(true); } diff --git a/tests/scan.rs b/tests/scan.rs index bbe70e7e..425ced84 100644 --- a/tests/scan.rs +++ b/tests/scan.rs @@ -644,5 +644,8 @@ async fn test_prepare_search_path(mut conn: PgConnection, tempdir: TempDir) -> R let result: Vec<(bool,)> = "EXECUTE q1(true)".fetch_collect(&mut conn); assert_eq!(result.len(), 1); + "DEALLOCATE q1".execute(&mut conn); + assert!("EXECUTE q1(true)".execute_result(&mut conn).is_err()); + Ok(()) }