Skip to content

Commit

Permalink
[wip] Guard PG table reader initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-- committed Jan 6, 2025
1 parent 411b514 commit 3a5e38e
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 27 deletions.
7 changes: 3 additions & 4 deletions include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ __PostgresFunctionGuard__(const char *func_name, FuncArgs... args) {
#define PostgresFunctionGuard(FUNC, ...) \
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, ##__VA_ARGS__)


template <typename T, typename ReturnType>
ReturnType __PostgresMemberGuard__(ReturnType (T::*func)(), T* instance, const char *func_name) {
ReturnType
__PostgresMemberGuard__(ReturnType (T::*func)(), T *instance, const char *func_name) {
MemoryContext ctx = CurrentMemoryContext;
ErrorData *edata = nullptr;
{ // Scope for PG_END_TRY
Expand All @@ -127,8 +127,7 @@ ReturnType __PostgresMemberGuard__(ReturnType (T::*func)(), T* instance, const c
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, message);
}

#define PostgresMemberGuard(FUNC, ...) \
pgduckdb::__PostgresMemberGuard__(&FUNC, this, __func__)
#define PostgresMemberGuard(FUNC, ...) pgduckdb::__PostgresMemberGuard__(&FUNC, this, __func__)

duckdb::unique_ptr<duckdb::QueryResult> DuckDBQueryOrThrow(duckdb::ClientContext &context, const std::string &query);

Expand Down
2 changes: 1 addition & 1 deletion include/pgduckdb/scan/postgres_table_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PostgresTableReader {
MinimalTuple GetNextWorkerTuple();
void PostgresTableReaderCleanupUnsafe();
int ParallelWorkerNumber(Cardinality cardinality);
const char * ExplainScanPlan(QueryDesc *query_desc);
const char *ExplainScanPlan(QueryDesc *query_desc);
bool CanTableScanRunInParallel(Plan *plan);
bool MarkPlanParallelAware(Plan *plan);

Expand Down
65 changes: 43 additions & 22 deletions src/scan/postgres_table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,45 @@ extern "C" {

namespace pgduckdb {

int SetupPostgresTableReader(ParallelExecutorInfo *&parallel_executor_info, void **&parallel_worker_readers,
int &nreaders, int &nworkers_launched, bool &entered_parallel_mode,
QueryDesc *&table_scan_query_desc, PlanState *&table_scan_planstate, TupleTableSlot *&slot,
const char *table_scan_query, bool count_tuples_only);

bool CanTableScanRunInParallel(Plan *plan);

bool MarkPlanParallelAware(Plan *plan);

int ParallelWorkerNumber(Cardinality cardinality);

const char *ExplainScanPlan_Unsafe(QueryDesc *query_desc);

PostgresTableReader::PostgresTableReader(const char *table_scan_query, bool count_tuples_only)
: parallel_executor_info(nullptr), parallel_worker_readers(nullptr), nreaders(0), next_parallel_reader(0),
entered_parallel_mode(false) {

std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock());
PostgresScopedStackReset scoped_stack_reset;

List *raw_parsetree_list = PostgresFunctionGuard(pg_parse_query, table_scan_query);
PostgresFunctionGuard(SetupPostgresTableReader, std::ref(parallel_executor_info), std::ref(parallel_worker_readers),
std::ref(nreaders), std::ref(nworkers_launched), std::ref(entered_parallel_mode),
std::ref(table_scan_query_desc), std::ref(table_scan_planstate), std::ref(slot),
table_scan_query, count_tuples_only);
}

int
SetupPostgresTableReader(ParallelExecutorInfo *&parallel_executor_info, void **&parallel_worker_readers, int &nreaders,
int &nworkers_launched, bool &entered_parallel_mode, QueryDesc *&table_scan_query_desc,
PlanState *&table_scan_planstate, TupleTableSlot *&slot, const char *table_scan_query,
bool count_tuples_only) {
List *raw_parsetree_list = pg_parse_query(table_scan_query);
Assert(list_length(raw_parsetree_list) == 1);
RawStmt *raw_parsetree = linitial_node(RawStmt, raw_parsetree_list);

#if PG_VERSION_NUM >= 150000
List *query_list =
PostgresFunctionGuard(pg_analyze_and_rewrite_fixedparams, raw_parsetree, table_scan_query, nullptr, 0, nullptr);
List *query_list = pg_analyze_and_rewrite_fixedparams(raw_parsetree, table_scan_query, nullptr, 0, nullptr);
#else
List *query_list =
PostgresFunctionGuard(pg_analyze_and_rewrite, raw_parsetree, table_scan_query, nullptr, 0, nullptr);
List *query_list = pg_analyze_and_rewrite(raw_parsetree, table_scan_query, nullptr, 0, nullptr);
#endif

Assert(list_length(query_list) == 1);
Expand All @@ -54,15 +76,14 @@ PostgresTableReader::PostgresTableReader(const char *table_scan_query, bool coun

char persistence = get_rel_persistence(rte->relid);

PlannedStmt *planned_stmt = PostgresFunctionGuard(standard_planner, query, table_scan_query, 0, nullptr);
PlannedStmt *planned_stmt = standard_planner(query, table_scan_query, 0, nullptr);

table_scan_query_desc = PostgresFunctionGuard(CreateQueryDesc, planned_stmt, table_scan_query, GetActiveSnapshot(),
InvalidSnapshot, None_Receiver, nullptr, nullptr, 0);
table_scan_query_desc = CreateQueryDesc(planned_stmt, table_scan_query, GetActiveSnapshot(), InvalidSnapshot,
None_Receiver, nullptr, nullptr, 0);

PostgresFunctionGuard(ExecutorStart, table_scan_query_desc, 0);
ExecutorStart(table_scan_query_desc, 0);

table_scan_planstate =
PostgresFunctionGuard(ExecInitNode, planned_stmt->planTree, table_scan_query_desc->estate, 0);
table_scan_planstate = ExecInitNode(planned_stmt->planTree, table_scan_query_desc->estate, 0);

bool run_scan_with_parallel_workers = persistence != RELPERSISTENCE_TEMP;
run_scan_with_parallel_workers &= CanTableScanRunInParallel(table_scan_query_desc->planstate->plan);
Expand Down Expand Up @@ -93,14 +114,14 @@ PostgresTableReader::PostgresTableReader(const char *table_scan_query, bool coun
}

ParallelContext *pcxt;
parallel_executor_info = PostgresFunctionGuard(ExecInitParallelPlan, table_scan_planstate,
table_scan_query_desc->estate, nullptr, parallel_workers, -1);
parallel_executor_info =
ExecInitParallelPlan(table_scan_planstate, table_scan_query_desc->estate, nullptr, parallel_workers, -1);
pcxt = parallel_executor_info->pcxt;
PostgresFunctionGuard(LaunchParallelWorkers, pcxt);
LaunchParallelWorkers(pcxt);
nworkers_launched = pcxt->nworkers_launched;

if (pcxt->nworkers_launched > 0) {
PostgresFunctionGuard(ExecParallelCreateReaders, parallel_executor_info);
ExecParallelCreateReaders(parallel_executor_info);
nreaders = pcxt->nworkers_launched;
parallel_worker_readers = (void **)palloc(nreaders * sizeof(TupleQueueReader *));
memcpy(parallel_worker_readers, parallel_executor_info->reader, nreaders * sizeof(TupleQueueReader *));
Expand All @@ -113,10 +134,11 @@ PostgresTableReader::PostgresTableReader(const char *table_scan_query, bool coun

elog(DEBUG1, "(PGDuckdDB/PostgresTableReader)\n\nQUERY: %s\nRUNNING: %s.\nEXECUTING: \n%s", table_scan_query,
!nreaders ? "IN PROCESS THREAD" : psprintf("ON %d PARALLEL WORKER(S)", nreaders),
ExplainScanPlan(table_scan_query_desc));
ExplainScanPlan_Unsafe(table_scan_query_desc));

slot = PostgresFunctionGuard(ExecInitExtraTupleSlot, table_scan_query_desc->estate,
table_scan_planstate->ps_ResultTupleDesc, &TTSOpsMinimalTuple);
slot = ExecInitExtraTupleSlot(table_scan_query_desc->estate, table_scan_planstate->ps_ResultTupleDesc,
&TTSOpsMinimalTuple);
return 0;
}

PostgresTableReader::~PostgresTableReader() {
Expand All @@ -142,7 +164,6 @@ PostgresTableReader::PostgresTableReaderCleanupUnsafe() {
parallel_executor_info = nullptr;
}


if (parallel_worker_readers) {
pfree(parallel_worker_readers);
parallel_worker_readers = nullptr;
Expand All @@ -160,7 +181,7 @@ PostgresTableReader::PostgresTableReaderCleanupUnsafe() {
}

int
PostgresTableReader::ParallelWorkerNumber(Cardinality cardinality) {
ParallelWorkerNumber(Cardinality cardinality) {
static const int base_log = 8;
int cardinality_log = std::log2(cardinality);
int base = cardinality_log / base_log;
Expand All @@ -182,7 +203,7 @@ PostgresTableReader::ExplainScanPlan(QueryDesc *query_desc) {
}

bool
PostgresTableReader::CanTableScanRunInParallel(Plan *plan) {
CanTableScanRunInParallel(Plan *plan) {
switch (nodeTag(plan)) {
case T_SeqScan:
case T_IndexScan:
Expand All @@ -207,7 +228,7 @@ PostgresTableReader::CanTableScanRunInParallel(Plan *plan) {
}

bool
PostgresTableReader::MarkPlanParallelAware(Plan *plan) {
MarkPlanParallelAware(Plan *plan) {
switch (nodeTag(plan)) {
case T_SeqScan:
case T_IndexScan:
Expand Down

0 comments on commit 3a5e38e

Please sign in to comment.