From d5ff8e14add86233afd3c82935d4f72a31859a57 Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Thu, 4 Sep 2025 22:55:25 +0900 Subject: [PATCH v1 4/8] WIP: Add agg_retrieve_direct_batch() for plain aggregates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Teach Agg to consume child tuples in batches for AGG_PLAIN. A new agg_retrieve_direct_batch() pulls TupleBatch from the child via ExecProcNodeBatch(), materializes as needed, and advances per-agg transition state over the batch. A first tuple is copied to match the direct path’s behavior before batch processing. Add AggCanUsePlainBatch() and select retrieve_plain at init: batch path when no grouping sets, strategy is AGG_PLAIN, and the child exposes ExecProcNodeBatch(); otherwise keep the row path. Plan shape and EXPLAIN remain unchanged. Semantics are identical to the non-batch direct path; this only reduces per-tuple overhead. --- src/backend/executor/nodeAgg.c | 123 +++++++++++++++++++++++++++++++++ src/include/nodes/execnodes.h | 5 ++ 2 files changed, 128 insertions(+) diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index a4f3d30f307..3ace6363509 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -820,6 +820,20 @@ advance_aggregates(AggState *aggstate) aggstate->tmpcontext); } +static void +advance_aggregates_batch(AggState *aggstate, TupleBatch *b) +{ + ExprContext *tmpcontext = aggstate->tmpcontext; + ExprState *evaltrans = aggstate->phase->evaltrans; + + while (TupleBatchHasMore(b)) + { + tmpcontext->ecxt_outertuple = TupleBatchGetNextSlot(b); + ExecEvalExprNoReturnSwitchContext(evaltrans, tmpcontext); + ResetExprContext(tmpcontext); + } +} + /* * Run the transition function for a DISTINCT or ORDER BY aggregate * with only one input. This is called after we have completed @@ -2260,6 +2274,9 @@ ExecAgg(PlanState *pstate) result = agg_retrieve_hash_table(node); break; case AGG_PLAIN: + /* init-time choice */ + result = node->retrieve_plain(node); + break; case AGG_SORTED: result = agg_retrieve_direct(node); break; @@ -2618,6 +2635,91 @@ agg_retrieve_direct(AggState *aggstate) return NULL; } +static TupleTableSlot * +agg_retrieve_direct_batch(AggState *aggstate) +{ + PlanState *child = outerPlanState(aggstate); + ExprContext *econtext = aggstate->ss.ps.ps_ExprContext; + ExprContext *tmpcontext = aggstate->tmpcontext; + const bool hasGroupingSets = aggstate->phase->numsets > 0; + TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot; + TupleBatch *b = NULL; + + Assert(child->ExecProcNodeBatch); + + /* mimic the first-tuple copy from agg_retrieve_direct() */ + for (;;) + { + b = ExecProcNodeBatch(child); + if (b == NULL) + { + if (hasGroupingSets) + { + aggstate->input_done = true; + break; + } + aggstate->agg_done = true; + break; + } + if (b->nvalid == 0) + continue; + + TupleBatchMaterializeAll(b); + aggstate->grp_firstTuple = ExecCopySlotHeapTuple(TupleBatchGetSlot(b, 0)); + break; + } + + /* initialize_aggregates etc. as in the direct path */ + ReScanExprContext(econtext); + for (int i = 0; i < Max(aggstate->phase->numsets, 1); i++) + ReScanExprContext(aggstate->aggcontexts[i]); + + initialize_aggregates(aggstate, aggstate->pergroups, + Max(aggstate->phase->numsets, 1)); + + if (aggstate->grp_firstTuple) + { + ExecForceStoreHeapTuple(aggstate->grp_firstTuple, firstSlot, true); + aggstate->grp_firstTuple = NULL; + tmpcontext->ecxt_outertuple = firstSlot; + + advance_aggregates_batch(aggstate, b); + ResetExprContext(tmpcontext); + } + + /* consume remaining rows in current and subsequent batches */ + if (b) + { + if (TupleBatchHasMore(b)) + advance_aggregates_batch(aggstate, b); + for (;;) + { + b = ExecProcNodeBatch(child); + if (b == NULL) + { + if (hasGroupingSets) + aggstate->input_done = true; + else + aggstate->agg_done = true; + break; + } + if (b->nvalid == 0) + continue; + + TupleBatchMaterializeAll(b); + advance_aggregates_batch(aggstate, b); + } + } + + /* finalize and project like the direct path */ + econtext->ecxt_outertuple = firstSlot; + prepare_projection_slot(aggstate, econtext->ecxt_outertuple, 0); + select_current_set(aggstate, 0, false); + finalize_aggregates(aggstate, aggstate->peragg, aggstate->pergroups[0]); + + return project_aggregates(aggstate); +} + /* * ExecAgg for hashed case: read input and build hash table */ @@ -3265,6 +3367,22 @@ hashagg_reset_spill_state(AggState *aggstate) } } +static bool +AggCanUsePlainBatch(AggState *aggstate) +{ + const Agg *aggnode = (const Agg *) aggstate->ss.ps.plan; + + Assert(outerPlanState(aggstate)); + + /* grouping sets present -> bail */ + if (aggnode->groupingSets != NIL) + return false; + + if (aggstate->phase->aggstrategy != AGG_PLAIN) + return false; + + return outerPlanState(aggstate)->ExecProcNodeBatch; +} /* ----------------- * ExecInitAgg @@ -4060,6 +4178,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) (errcode(ERRCODE_GROUPING_ERROR), errmsg("aggregate function calls cannot be nested"))); + if (AggCanUsePlainBatch(aggstate)) + aggstate->retrieve_plain = agg_retrieve_direct_batch; + else + aggstate->retrieve_plain = agg_retrieve_direct; + /* * Build expressions doing all the transition work at once. We build a * different one for each phase, as the number of transition function diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index a104591ac20..9b81b842161 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2535,6 +2535,9 @@ typedef struct AggStatePerGroupData *AggStatePerGroup; typedef struct AggStatePerPhaseData *AggStatePerPhase; typedef struct AggStatePerHashData *AggStatePerHash; +struct AggState; +typedef TupleTableSlot *(*AggRetrievePlainFn)(struct AggState *); + typedef struct AggState { ScanState ss; /* its first field is NodeTag */ @@ -2610,6 +2613,8 @@ typedef struct AggState AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than * ->hash_pergroup */ SharedAggInfo *shared_info; /* one entry per worker */ + + AggRetrievePlainFn retrieve_plain; /* init-time choice */ } AggState; /* ---------------- -- 2.43.0