Skip to content

Commit

Permalink
Use guard for PostgresTableReaderCleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-- committed Dec 20, 2024
1 parent 669ce73 commit 1df184d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 8 deletions.
26 changes: 26 additions & 0 deletions include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,32 @@ __PostgresFunctionGuard__(const char *func_name, FuncArgs... args) {
#define PostgresFunctionGuard(FUNC, ...) \
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, ##__VA_ARGS__)


template <typename T, typename ReturnType>
ReturnType __PostgresMemberGuard__(ReturnType (T::*func)(), T* instance, const char *func_name) {
MemoryContext ctx = CurrentMemoryContext;
ErrorData *edata = nullptr;
{ // Scope for PG_END_TRY
PgExceptionGuard g;
sigjmp_buf _local_sigjmp_buf;
if (sigsetjmp(_local_sigjmp_buf, 0) == 0) {
PG_exception_stack = &_local_sigjmp_buf;
return (instance->*func)();
} else {
g.RestoreStacks();
CurrentMemoryContext = ctx;
edata = CopyErrorData();
FlushErrorState();
}
} // PG_END_TRY();

auto message = duckdb::StringUtil::Format("(PGDuckDB/%s) %s", func_name, pg::GetErrorDataMessage(edata));
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, message);
}

#define PostgresMemberGuard(FUNC, ...) \
pgduckdb::__PostgresMemberGuard__(&FUNC, this, __func__)

duckdb::unique_ptr<duckdb::QueryResult> DuckDBQueryOrThrow(duckdb::ClientContext &context, const std::string &query);

duckdb::unique_ptr<duckdb::QueryResult> DuckDBQueryOrThrow(duckdb::Connection &connection, const std::string &query);
Expand Down
1 change: 1 addition & 0 deletions include/pgduckdb/scan/postgres_table_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class PostgresTableReader {

private:
MinimalTuple GetNextWorkerTuple();
void PostgresTableReaderCleanupUnsafe();
int ParallelWorkerNumber(Cardinality cardinality);
const char * ExplainScanPlan(QueryDesc *query_desc);
bool CanTableScanRunInParallel(Plan *plan);
Expand Down
20 changes: 12 additions & 8 deletions src/scan/postgres_table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,27 +128,31 @@ PostgresTableReader::~PostgresTableReader() {
void
PostgresTableReader::PostgresTableReaderCleanup() {
std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock());

PostgresScopedStackReset scoped_stack_reset;

PostgresFunctionGuard(ExecEndNode, table_scan_planstate);
PostgresMemberGuard(PostgresTableReader::PostgresTableReaderCleanupUnsafe);
}

void
PostgresTableReader::PostgresTableReaderCleanupUnsafe() {
ExecEndNode(table_scan_planstate);

if (parallel_executor_info != NULL) {
PostgresFunctionGuard(ExecParallelFinish, parallel_executor_info);
PostgresFunctionGuard(ExecParallelCleanup, parallel_executor_info);
ExecParallelFinish(parallel_executor_info);
ExecParallelCleanup(parallel_executor_info);
}

parallel_executor_info = nullptr;

if (parallel_worker_readers) {
PostgresFunctionGuard(pfree, parallel_worker_readers);
pfree(parallel_worker_readers);
}

parallel_worker_readers = nullptr;

PostgresFunctionGuard(ExecutorFinish, table_scan_query_desc);
PostgresFunctionGuard(ExecutorEnd, table_scan_query_desc);
PostgresFunctionGuard(FreeQueryDesc, table_scan_query_desc);
ExecutorFinish(table_scan_query_desc);
ExecutorEnd(table_scan_query_desc);
FreeQueryDesc(table_scan_query_desc);

if (entered_parallel_mode) {
ExitParallelMode();
Expand Down

0 comments on commit 1df184d

Please sign in to comment.