From 1df184d499eedc3f5e3855e3257b8ae5be5d8d58 Mon Sep 17 00:00:00 2001 From: Yves Date: Fri, 20 Dec 2024 16:03:09 +0100 Subject: [PATCH] Use guard for PostgresTableReaderCleanup --- include/pgduckdb/pgduckdb_utils.hpp | 26 +++++++++++++++++++ .../pgduckdb/scan/postgres_table_reader.hpp | 1 + src/scan/postgres_table_reader.cpp | 20 ++++++++------ 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/include/pgduckdb/pgduckdb_utils.hpp b/include/pgduckdb/pgduckdb_utils.hpp index dfe74c1a..9b3ec3c3 100644 --- a/include/pgduckdb/pgduckdb_utils.hpp +++ b/include/pgduckdb/pgduckdb_utils.hpp @@ -104,6 +104,32 @@ __PostgresFunctionGuard__(const char *func_name, FuncArgs... args) { #define PostgresFunctionGuard(FUNC, ...) \ pgduckdb::__PostgresFunctionGuard__(__func__, ##__VA_ARGS__) + +template +ReturnType __PostgresMemberGuard__(ReturnType (T::*func)(), T* instance, const char *func_name) { + MemoryContext ctx = CurrentMemoryContext; + ErrorData *edata = nullptr; + { // Scope for PG_END_TRY + PgExceptionGuard g; + sigjmp_buf _local_sigjmp_buf; + if (sigsetjmp(_local_sigjmp_buf, 0) == 0) { + PG_exception_stack = &_local_sigjmp_buf; + return (instance->*func)(); + } else { + g.RestoreStacks(); + CurrentMemoryContext = ctx; + edata = CopyErrorData(); + FlushErrorState(); + } + } // PG_END_TRY(); + + auto message = duckdb::StringUtil::Format("(PGDuckDB/%s) %s", func_name, pg::GetErrorDataMessage(edata)); + throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, message); +} + +#define PostgresMemberGuard(FUNC, ...) \ + pgduckdb::__PostgresMemberGuard__(&FUNC, this, __func__) + duckdb::unique_ptr DuckDBQueryOrThrow(duckdb::ClientContext &context, const std::string &query); duckdb::unique_ptr DuckDBQueryOrThrow(duckdb::Connection &connection, const std::string &query); diff --git a/include/pgduckdb/scan/postgres_table_reader.hpp b/include/pgduckdb/scan/postgres_table_reader.hpp index b5e634d9..a0cc5227 100644 --- a/include/pgduckdb/scan/postgres_table_reader.hpp +++ b/include/pgduckdb/scan/postgres_table_reader.hpp @@ -15,6 +15,7 @@ class PostgresTableReader { private: MinimalTuple GetNextWorkerTuple(); + void PostgresTableReaderCleanupUnsafe(); int ParallelWorkerNumber(Cardinality cardinality); const char * ExplainScanPlan(QueryDesc *query_desc); bool CanTableScanRunInParallel(Plan *plan); diff --git a/src/scan/postgres_table_reader.cpp b/src/scan/postgres_table_reader.cpp index 3ab309e4..5a236505 100644 --- a/src/scan/postgres_table_reader.cpp +++ b/src/scan/postgres_table_reader.cpp @@ -128,27 +128,31 @@ PostgresTableReader::~PostgresTableReader() { void PostgresTableReader::PostgresTableReaderCleanup() { std::lock_guard lock(GlobalProcessLock::GetLock()); - PostgresScopedStackReset scoped_stack_reset; - PostgresFunctionGuard(ExecEndNode, table_scan_planstate); + PostgresMemberGuard(PostgresTableReader::PostgresTableReaderCleanupUnsafe); +} + +void +PostgresTableReader::PostgresTableReaderCleanupUnsafe() { + ExecEndNode(table_scan_planstate); if (parallel_executor_info != NULL) { - PostgresFunctionGuard(ExecParallelFinish, parallel_executor_info); - PostgresFunctionGuard(ExecParallelCleanup, parallel_executor_info); + ExecParallelFinish(parallel_executor_info); + ExecParallelCleanup(parallel_executor_info); } parallel_executor_info = nullptr; if (parallel_worker_readers) { - PostgresFunctionGuard(pfree, parallel_worker_readers); + pfree(parallel_worker_readers); } parallel_worker_readers = nullptr; - PostgresFunctionGuard(ExecutorFinish, table_scan_query_desc); - PostgresFunctionGuard(ExecutorEnd, table_scan_query_desc); - PostgresFunctionGuard(FreeQueryDesc, table_scan_query_desc); + ExecutorFinish(table_scan_query_desc); + ExecutorEnd(table_scan_query_desc); + FreeQueryDesc(table_scan_query_desc); if (entered_parallel_mode) { ExitParallelMode();