From 9c94b3751ae0c9decc337e33de2750a954a88d6f Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Tue, 11 Nov 2025 22:17:47 +0900 Subject: [PATCH v7 6/6] Reuse partition pruning results in parallel workers Pass the leader's initial partition pruning results and unpruned relids to parallel workers and reuse them via ExecutorPrep(). This avoids repeating pruning logic in workers, which is not only redundant but also risks divergence due to nondeterminism in pruning steps or parameter evaluation timing. Introduce CheckInitialPruningResultsInWorker() (debug-builds only) to verify that the results match what the worker would compute. This check helps catch inconsistencies across leader and worker pruning logic. --- src/backend/executor/execParallel.c | 108 +++++++++++++++++++++++++++- 1 file changed, 107 insertions(+), 1 deletion(-) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 024780d3516..d337bf8c081 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -24,6 +24,7 @@ #include "postgres.h" #include "executor/execParallel.h" +#include "executor/execPartition.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeAppend.h" @@ -67,6 +68,8 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_PARTITION_PRUNE_RESULTS UINT64CONST(0xE00000000000000B) +#define PARALLEL_KEY_UNPRUNED_RELIDS UINT64CONST(0xE00000000000000C) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -141,6 +144,8 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +static void CheckInitialPruningResultsInWorker(EState *estate); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -620,12 +625,18 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, FixedParallelExecutorState *fpes; char *pstmt_data; char *pstmt_space; + char *part_prune_results_data; + char *part_prune_results_space; + char *unpruned_relids_data; + char *unpruned_relids_space; char *paramlistinfo_space; BufferUsage *bufusage_space; WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; + int part_prune_results_len; + int unpruned_relids_len; int paramlistinfo_len; int instrumentation_len = 0; int jit_instrumentation_len = 0; @@ -654,6 +665,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, /* Fix up and serialize plan to be sent to workers. */ pstmt_data = ExecSerializePlan(planstate->plan, estate); + part_prune_results_data = nodeToString(estate->es_part_prune_results); + unpruned_relids_data = nodeToString(estate->es_unpruned_relids); /* Create a parallel context. */ pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers); @@ -680,6 +693,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, pstmt_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized part_prune_results. */ + part_prune_results_len = strlen(part_prune_results_data) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, part_prune_results_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate space for serialized unpruned_relids. */ + unpruned_relids_len = strlen(unpruned_relids_data) + 1; + shm_toc_estimate_chunk(&pcxt->estimator, unpruned_relids_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized ParamListInfo. */ paramlistinfo_len = EstimateParamListSpace(estate->es_param_list_info); shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); @@ -781,6 +804,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, memcpy(pstmt_space, pstmt_data, pstmt_len); shm_toc_insert(pcxt->toc, PARALLEL_KEY_PLANNEDSTMT, pstmt_space); + /* Store serialized part_prune_results */ + part_prune_results_space = shm_toc_allocate(pcxt->toc, part_prune_results_len); + memcpy(part_prune_results_space, part_prune_results_data, part_prune_results_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS, part_prune_results_space); + + /* Store serialized unpruned_relids */ + unpruned_relids_space = shm_toc_allocate(pcxt->toc, unpruned_relids_len); + memcpy(unpruned_relids_space, unpruned_relids_data, unpruned_relids_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNPRUNED_RELIDS, unpruned_relids_space); + /* Store serialized ParamListInfo. */ paramlistinfo_space = shm_toc_allocate(pcxt->toc, paramlistinfo_len); shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); @@ -1280,10 +1313,15 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, int instrument_options) { char *pstmtspace; + char *part_prune_results_space; + char *unpruned_relids_space; char *paramspace; PlannedStmt *pstmt; + List *part_prune_results; + Bitmapset *unpruned_relids; ParamListInfo paramLI; char *queryString; + EState *prep_estate = NULL; /* Get the query string from shared memory */ queryString = shm_toc_lookup(toc, PARALLEL_KEY_QUERY_TEXT, false); @@ -1296,12 +1334,80 @@ ExecParallelGetQueryDesc(shm_toc *toc, DestReceiver *receiver, paramspace = shm_toc_lookup(toc, PARALLEL_KEY_PARAMLISTINFO, false); paramLI = RestoreParamList(¶mspace); + /* Reconstruct leader-supplied part_prune_results and unpruned_relids. */ + part_prune_results_space = + shm_toc_lookup(toc, PARALLEL_KEY_PARTITION_PRUNE_RESULTS, false); + part_prune_results = (List *) stringToNode(part_prune_results_space); + unpruned_relids_space = + shm_toc_lookup(toc, PARALLEL_KEY_UNPRUNED_RELIDS, false); + unpruned_relids = (Bitmapset *) stringToNode(unpruned_relids_space); + + /* + * If pruning was done in the leader, build a prep estate in the worker + * and inject the leader's pruning results into it for reuse. + */ + if (pstmt->partPruneInfos) + { + prep_estate = ExecutorPrep(pstmt, paramLI, CurrentResourceOwner, false, 0); + Assert(prep_estate); + + prep_estate->es_part_prune_results = part_prune_results; + prep_estate->es_unpruned_relids = + bms_add_members(prep_estate->es_unpruned_relids, + unpruned_relids); + + /* + * A debug-build-only check that the pruning results passed from the + * leader match what the worker would independently compute. + */ + CheckInitialPruningResultsInWorker(prep_estate); + } + /* Create a QueryDesc for the query. */ return CreateQueryDesc(pstmt, queryString, GetActiveSnapshot(), InvalidSnapshot, receiver, paramLI, NULL, instrument_options, - NULL); + prep_estate); +} + +/* + * CheckInitialPruningResultsInWorker + * Verify partition pruning results passed from the leader process. + * + * This is intended to be called during parallel worker query setup. + * It recomputes initial pruning results locally and compares them with + * those received from the leader. Any mismatch may indicate a divergence + * between leader and worker logic or environment. + * + * Only performed in debug builds. + */ +static void +CheckInitialPruningResultsInWorker(EState *estate) +{ +#ifdef USE_ASSERT_CHECKING + ListCell *lc; + int i; + + Assert(estate->es_part_prune_results != NULL); + i = 0; + foreach(lc, estate->es_part_prune_states) + { + PartitionPruneState *prunestate = (PartitionPruneState *) lfirst(lc); + Bitmapset *reuse_validsubplans = + list_nth_node(Bitmapset, estate->es_part_prune_results, i++); + Bitmapset *validsubplans = NULL; + Bitmapset *validsubplan_rtis = NULL; + + if (prunestate->do_initial_prune) + validsubplans = ExecFindMatchingSubPlans(prunestate, true, + &validsubplan_rtis); + if (!bms_equal(validsubplans, reuse_validsubplans)) + elog(ERROR, "different validsubplans in parallel worker"); + if (bms_nonempty_difference(validsubplan_rtis, estate->es_unpruned_relids)) + elog(ERROR, "different unprunable_relids in parallel worker"); + } +#endif } /* -- 2.47.3