public inbox for [email protected]
help / color / mirror / Atom feedFrom: Amit Langote <[email protected]>
To: Tomas Vondra <[email protected]>
Cc: PostgreSQL-development <[email protected]>
Subject: Re: Batching in executor
Date: Tue, 30 Sep 2025 11:11:30 +0900
Message-ID: <CA+HiwqF-31mmZ2hhnBLuWpu1UYTVPXoEzKBW6wrf96KpY=AU7A@mail.gmail.com> (raw)
In-Reply-To: <[email protected]>
References: <CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com>
<[email protected]>
Hi Tomas,
Thanks a lot for your comments and benchmarking.
I plan to reply to your detailed comments and benchmark results, but I
just realized I had forgotten to attach patch 0008 (oops!) in my last
email. That patch adds batched qual evaluation.
I also noticed that the batched path was unnecessarily doing early
“batch-materialization” in cases like SELECT count(*) FROM bar. I’ve
fixed that as well. It was originally designed to avoid such
materialization, but I must have broken it while refactoring.
Attachments:
[application/octet-stream] v2-0008-WIP-Add-ExecQualBatch-and-EEOPs-for-batched-quals.patch (22.8K, 2-v2-0008-WIP-Add-ExecQualBatch-and-EEOPs-for-batched-quals.patch)
download | inline diff:
From 0ac98eedfef945403822d23e3efc9f7248602895 Mon Sep 17 00:00:00 2001
From: Amit Langote <[email protected]>
Date: Mon, 22 Sep 2025 16:19:26 +0900
Subject: [PATCH v2 8/8] WIP: Add ExecQualBatch() and EEOPs for batched quals
Introduce ExecInitQualBatch()/ExecQualBatch() to evaluate scan quals
over a TupleBatch. The batched qual interpreter produces a boolean
mask aligned with the batch, marking which rows satisfy the qual.
The scan node later uses this mask to copy only passing rows into
its output slots. If batching is not possible, fall back to the
existing per-tuple engine.
Add EEOP_QUAL_BATCH_INITMASK and EEOP_QUAL_BATCH_TERM, and wire them
after EEOP_SCAN_FETCHSOME_BATCH and EEOP_BUILD_SCAN_BATCH_VECTOR.
Batching is limited to quals that are a top-level AND of simple
clauses: either NullTest(var) or strict binary OpExpr with var/const
or var/var arguments. A walker validates the tree, collects the
referenced attnos, and builds a BatchVector; terms are compiled from
the leaves and evaluated to update the mask.
ExprState gains batch_private to hold BatchQualRuntime (mask, words)
which are used by the parent node to populate output slots in
TupleBatch.
---
src/backend/executor/execExpr.c | 324 ++++++++++++++++++++++++++
src/backend/executor/execExprInterp.c | 202 ++++++++++++++++
src/backend/executor/nodeSeqscan.c | 2 +
src/backend/jit/llvm/llvmjit_expr.c | 11 +
src/backend/jit/llvm/llvmjit_types.c | 2 +
src/include/executor/execExpr.h | 60 +++++
src/include/executor/execScan.h | 35 +--
src/include/executor/executor.h | 3 +
src/include/nodes/execnodes.h | 4 +
9 files changed, 630 insertions(+), 13 deletions(-)
diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c
index 27a5780f557..63df560d5f1 100644
--- a/src/backend/executor/execExpr.c
+++ b/src/backend/executor/execExpr.c
@@ -111,6 +111,19 @@ static BatchVector *BatchVectorCreate(Bitmapset *attnos, AttrNumber last_var);
static bool ExprListAllSimpleVars(const List *args, Bitmapset **allattnos);
static BatchVectorSlice *BatchVectorSliceFromExprArgs(const List *args,
const BatchVector *bv);
+static int16 BatchVectorFindAttColno(const BatchVector *bv, AttrNumber attno);
+static int16 BatchVectorOffsetForVarExpr(Expr *expr, const BatchVector *bv);
+
+/* private context for the walker */
+typedef struct QualBatchContext
+{
+ List *leaves; /* List<Node*> of accepted leaves */
+ Bitmapset *attnos; /* Vars referenced by accepted leaves */
+ bool ok; /* stays true if batchable */
+ AttrNumber last_scan; /* last needed attribute in scan slot */
+} QualBatchContext;
+
+static bool qual_batchable_walker(Node *node, void *context);
/*
* ExecInitExpr: prepare an expression tree for execution
@@ -5221,6 +5234,209 @@ ExprListAllSimpleVars(const List *args, Bitmapset **allattnos)
return true;
}
+/* helper: extract Var (allowing RelabelType->Var); returns NULL if not */
+static Var *
+strip_to_var(Node *n)
+{
+ if (n == NULL)
+ return NULL;
+ if (IsA(n, RelabelType))
+ n = (Node *) ((RelabelType *) n)->arg;
+ if (!IsA(n, Var))
+ return NULL;
+ if (((Var *) n)->varattno < 0)
+ return NULL;
+ return (Var *) n;
+}
+
+/* main walker; return true to abort traversal early, false to continue */
+static bool
+qual_batchable_walker(Node *node, void *context)
+{
+ QualBatchContext *cxt = (QualBatchContext *) context;
+
+ if (node == NULL || !cxt->ok)
+ return false;
+
+ switch (nodeTag(node))
+ {
+ case T_List:
+ return expression_tree_walker(node, qual_batchable_walker, cxt);
+
+ case T_BoolExpr:
+ {
+ BoolExpr *b = (BoolExpr *) node;
+
+ /* Only AND trees are allowed */
+ if (b->boolop != AND_EXPR)
+ {
+ cxt->ok = false;
+ return true; /* abort */
+ }
+ /* Recurse normally over children */
+ return expression_tree_walker(node, qual_batchable_walker, cxt);
+ }
+
+ case T_NullTest:
+ {
+ NullTest *nt = (NullTest *) node;
+ Var *v = strip_to_var((Node *) nt->arg);
+
+ if (v == NULL)
+ {
+ cxt->ok = false;
+ return true;
+ }
+
+ cxt->attnos = bms_add_member(cxt->attnos, v->varattno);
+ if (v->varattno > cxt->last_scan)
+ cxt->last_scan = v->varattno;
+ cxt->leaves = lappend(cxt->leaves, node);
+
+ /* Do NOT recurse into leaf */
+ return false;
+ }
+
+ case T_OpExpr:
+ {
+ OpExpr *op = (OpExpr *) node;
+ List *args = op->args;
+ Node *l, *r;
+ Var *lv,
+ *rv = NULL;
+
+ /* binary only */
+ if (list_length(args) != 2)
+ {
+ cxt->ok = false;
+ return true;
+ }
+ /* strict operator only (NULL -> false semantics) */
+ if (!func_strict(op->opfuncid))
+ {
+ cxt->ok = false;
+ return true;
+ }
+
+ l = linitial(args);
+ r = lsecond(args);
+ lv = strip_to_var(l);
+ if (lv == NULL)
+ {
+ cxt->ok = false;
+ return true;
+ }
+ cxt->attnos = bms_add_member(cxt->attnos, lv->varattno);
+ if (lv->varattno > cxt->last_scan)
+ cxt->last_scan = lv->varattno;
+
+ if (IsA(r, Const))
+ {
+ /* ok; no attno to add */
+ }
+ else
+ {
+ rv = strip_to_var(r);
+ if (rv == NULL)
+ {
+ cxt->ok = false;
+ return true;
+ }
+ cxt->attnos = bms_add_member(cxt->attnos, rv->varattno);
+ if (rv->varattno > cxt->last_scan)
+ cxt->last_scan = rv->varattno;
+ }
+
+ cxt->leaves = lappend(cxt->leaves, node);
+
+ /* Leaf handled; do NOT recurse into args */
+ return false;
+ }
+
+ /* Whitelist ends here; anything else in the tree rejects */
+ default:
+ cxt->ok = false;
+ break;
+ }
+
+ return true;
+}
+
+/* build a BatchQualTerm from a validated leaf */
+static BatchQualTerm *
+build_term_from_leaf(Node *n, BatchVector *bv)
+{
+ BatchQualTerm *term;
+ BatchQualTermKind kind;
+ bool strict;
+ int16 l_off;
+ int16 r_off;
+ Datum r_const = (Datum) 0;
+ bool r_isnull = false;
+ FmgrInfo *finfo = NULL;
+ Oid collation;
+
+ if (IsA(n, NullTest))
+ {
+ NullTest *nt = (NullTest *) n;
+
+ kind = nt->nulltesttype == IS_NULL ? BQTK_IS_NULL : BQTK_IS_NOT_NULL;
+ l_off = BatchVectorOffsetForVarExpr(nt->arg, bv);
+ r_off = -1;
+ strict = false;
+ collation = InvalidOid;
+
+ if (l_off < 0)
+ return NULL;
+ }
+ else if (IsA(n, OpExpr))
+ {
+ OpExpr *op = (OpExpr *) n;
+ Expr *l = linitial(op->args);
+ Expr *r = lsecond(op->args);
+
+ l_off = BatchVectorOffsetForVarExpr(l, bv);
+ if (l_off < 0)
+ return NULL;
+
+ r_off = BatchVectorOffsetForVarExpr(r, bv);
+ if (IsA(r, Const))
+ {
+ Const *c = (Const *) r;
+
+ kind = BQTK_VAR_CONST;
+ r_const = c->constvalue;
+ r_isnull = c->constisnull;
+ r_off = -1;
+ }
+ else
+ {
+ if (r_off < 0)
+ return NULL;
+ kind = BQTK_VAR_VAR;
+ }
+
+ strict = func_strict(op->opfuncid);
+ collation = exprInputCollation((Node *) op);
+ finfo = palloc(sizeof(FmgrInfo));
+ fmgr_info(op->opfuncid, finfo);
+ }
+ else
+ return NULL;
+
+ term = palloc(sizeof(BatchQualTerm));
+ term->kind = kind;
+ term->strict = strict;
+ term->l_off = l_off;
+ term->r_off = r_off;
+ term->r_const = r_const;
+ term->r_isnull = r_isnull;
+ term->finfo = finfo;
+ term->collation = collation;
+
+ return term;
+}
+
/* ---------- BatchVector stuff ------------- */
static BatchVector *
@@ -5298,3 +5514,111 @@ BatchVectorSliceFromExprArgs(const List *args, const BatchVector *bv)
return bvs;
}
+
+/*
+ * BatchVectorOffsetForVarExpr
+ * Map a Var (or RelabelType->Var) to its BatchVector column index.
+ * Returns -1 if the Var’s attno is not present.
+ */
+static int16
+BatchVectorOffsetForVarExpr(Expr *expr, const BatchVector *bv)
+{
+ AttrNumber attno;
+
+ if (!expr_is_simple_var(expr, &attno))
+ return -1;
+
+ return (int16) BatchVectorFindAttColno(bv, attno);
+}
+
+/*
+ * ExecInitQualBatch
+ * Build a batched-qual EEOP program (AND-only).
+ * Caller should also keep scalar ps->qual for runtime fallback.
+ */
+ExprState *
+ExecInitQualBatch(PlanState *ps)
+{
+ Node *qual = (Node *) ps->plan->qual;
+ QualBatchContext cxt = {NIL, NULL, true, 0};
+ BatchQualRuntime *rt;
+ ExprState *state;
+ BatchVector *bv;
+ uint64 *mask;
+ int mask_words;
+ ListCell *lc;
+ ExprEvalStep scratch = {0};
+
+ if (qual == NULL)
+ return NULL;
+
+ /* validate + collect leaves/attnos with walker */
+ (void) qual_batchable_walker(qual, &cxt);
+ if (!cxt.ok || cxt.leaves == NIL || bms_is_empty(cxt.attnos))
+ return NULL;
+
+ bv = BatchVectorCreate(cxt.attnos, cxt.last_scan);
+
+ mask_words = (bv->maxrows + 63) >> 6;
+ mask = (uint64 *) palloc0(sizeof(uint64) * mask_words);
+
+ /* Runtime carrier (lifetime == exprstate) */
+ rt = palloc0(sizeof(BatchQualRuntime));
+ rt->mask = mask;
+ rt->mask_words = mask_words;
+
+ /* dedicated ExprState for batched program */
+
+ state = makeNode(ExprState);
+ state->expr = (Expr *) qual;
+ state->parent = ps;
+ state->ext_params = NULL;
+
+ /* mark expression as to be used with ExecQual() */
+ state->flags = EEO_FLAG_IS_QUAL;
+
+ /* Only valid as batch qual if this is set. */
+ state->batch_private = (void *) rt;
+
+ scratch.opcode = EEOP_SCAN_FETCHSOME_BATCH;
+ scratch.d.fetch_batch.last_var = cxt.last_scan;
+ ExprEvalPushStep(state, &scratch);
+
+ scratch.opcode = EEOP_BUILD_SCAN_BATCH_VECTOR;
+ scratch.d.batch_vector.bv = bv;
+ ExprEvalPushStep(state, &scratch);
+
+ scratch.opcode = EEOP_QUAL_BATCH_INITMASK;
+ scratch.d.qualbatch_init.bv = bv;
+ scratch.d.qualbatch_init.mask = mask;
+ scratch.d.qualbatch_init.mask_words = mask_words;
+ ExprEvalPushStep(state, &scratch);
+
+ /* TERM per leaf */
+ foreach(lc, cxt.leaves)
+ {
+ BatchQualTerm *term = build_term_from_leaf((Node *) lfirst(lc), bv);
+
+ if (term == NULL)
+ return NULL;
+
+ scratch.opcode = EEOP_QUAL_BATCH_TERM;
+ scratch.d.qualbatch_term.bv = bv;
+ scratch.d.qualbatch_term.mask = mask;
+ scratch.d.qualbatch_term.mask_words = mask_words;
+ scratch.d.qualbatch_term.term = term; /* by value */
+ ExprEvalPushStep(state, &scratch);
+ }
+
+ /*
+ * At the end, we don't need to do anything more. The last qual expr must
+ * have yielded TRUE, and since its result is stored in the desired output
+ * location, we're done.
+ */
+ scratch.opcode = EEOP_DONE_NO_RETURN;
+ ExprEvalPushStep(state, &scratch);
+
+ ExecReadyExpr(state);
+
+ return state;
+}
diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c
index 41ad9b4838d..5c2baa0e19d 100644
--- a/src/backend/executor/execExprInterp.c
+++ b/src/backend/executor/execExprInterp.c
@@ -608,6 +608,8 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
&&CASE_EEOP_BUILD_SCAN_BATCH_VECTOR,
&&CASE_EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP,
&&CASE_EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT,
+ &&CASE_EEOP_QUAL_BATCH_INITMASK,
+ &&CASE_EEOP_QUAL_BATCH_TERM,
&&CASE_EEOP_LAST
};
@@ -2350,7 +2352,19 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
{
/* too complex for an inline implementation */
ExecAggPlainTransBatch(state, op, econtext);
+ EEO_NEXT();
+ }
+
+ EEO_CASE(EEOP_QUAL_BATCH_INITMASK)
+ {
+ ExecQualBatchInitMask(state, op, econtext);
+ EEO_NEXT();
+ }
+
+ EEO_CASE(EEOP_QUAL_BATCH_TERM)
+ {
+ ExecQualBatchTerm(state, op, econtext);
EEO_NEXT();
}
@@ -6185,3 +6199,191 @@ ExecAggPlainTransBatch(ExprState *state, ExprEvalStep *op, ExprContext *econtext
elog(ERROR, "invalid ExprEvalOp in ExecAggPlainTransBatch()");
}
}
+
+/* set mask bits [0..nvalid_bits) to 1; clear padding in the last word */
+static inline void
+mask_init_all_ones(uint64 *a, int nwords, int nvalid_bits)
+{
+ for (int i = 0; i < nwords; i++)
+ a[i] = ~UINT64CONST(0);
+
+ if ((nvalid_bits & 63) != 0)
+ {
+ int rem = nvalid_bits & 63;
+
+ a[nwords - 1] &= (~UINT64CONST(0)) >> (64 - rem);
+ }
+}
+
+static inline void
+mask_clear_bit(uint64 *a, int i)
+{
+ a[i >> 6] &= ~(UINT64CONST(1) << (i & 63));
+}
+
+void
+ExecQualBatchInitMask(ExprState *state, ExprEvalStep *op, ExprContext *econtext)
+{
+ BatchVector *bv = op->d.qualbatch_init.bv;
+ uint64 *mask = op->d.qualbatch_init.mask;
+ int nwords = op->d.qualbatch_init.mask_words;
+ int n = bv->nrows;
+
+ /* initialize to all-pass for current batch size */
+ mask_init_all_ones(mask, nwords, n);
+}
+
+void
+ExecQualBatchTerm(ExprState *state, ExprEvalStep *op, ExprContext *econtext)
+{
+ BatchVector *bv = op->d.qualbatch_term.bv;
+ uint64 *mask = op->d.qualbatch_term.mask;
+ BatchQualTerm *t = op->d.qualbatch_term.term;
+ int n = bv->nrows;
+
+ switch (t->kind)
+ {
+ case BQTK_IS_NULL:
+ {
+ /* keep bit set only if value IS NULL; clear otherwise */
+ for (int i = 0; i < n; i++)
+ {
+ if (!bv->nulls[t->l_off][i])
+ mask_clear_bit(mask, i);
+ }
+ break;
+ }
+
+ case BQTK_IS_NOT_NULL:
+ {
+ /* keep bit set only if value IS NOT NULL; clear if NULL */
+ for (int i = 0; i < n; i++)
+ {
+ if (bv->nulls[t->l_off][i])
+ mask_clear_bit(mask, i);
+ }
+ break;
+ }
+
+ case BQTK_VAR_CONST:
+ {
+ const bool r_isnull = t->r_isnull;
+ const Datum r_const = t->r_const;
+ const bool strict = t->strict;
+ const Oid coll = t->collation;
+ FmgrInfo *finfo = t->finfo;
+ int loff = t->l_off;
+
+ for (int i = 0; i < n; i++)
+ {
+ bool ln = bv->nulls[loff][i];
+ bool pass;
+
+ /* WHERE treats NULL as false; strict ops short-circuit */
+ if (strict && (ln || r_isnull))
+ pass = false;
+ else
+ {
+ Datum lv = bv->cols[loff][i];
+
+ /* fast-paths could go here based on t->fastclass */
+
+ pass = DatumGetBool(FunctionCall2Coll(finfo, coll, lv, r_const));
+ }
+
+ if (!pass)
+ mask_clear_bit(mask, i);
+ }
+ break;
+ }
+
+ case BQTK_VAR_VAR:
+ {
+ const bool strict = t->strict;
+ const Oid coll = t->collation;
+ FmgrInfo *finfo = t->finfo;
+ int loff = t->l_off;
+ int roff = t->r_off;
+
+ for (int i = 0; i < n; i++)
+ {
+ bool ln = bv->nulls[loff][i];
+ bool rn = bv->nulls[roff][i];
+ bool pass;
+
+ if (strict && (ln || rn))
+ pass = false;
+ else
+ {
+ Datum lv = bv->cols[loff][i];
+ Datum rv = bv->cols[roff][i];
+
+ /* fast-paths could go here based on t->fastclass */
+
+ pass = DatumGetBool(FunctionCall2Coll(finfo, coll, lv, rv));
+ }
+
+ if (!pass)
+ mask_clear_bit(mask, i);
+ }
+ break;
+ }
+
+ default:
+ /* should not happen; leave mask unchanged */
+ break;
+ }
+}
+
+static inline bool
+mask_is_empty(const uint64 *mask, int nwords)
+{
+ for (int i = 0; i < nwords; i++)
+ {
+ if (mask[i] != 0)
+ return false;
+ }
+ return true;
+}
+
+/*
+ * ExecQualBatch
+ * Evaluate a compiled qual (EEOP_QUAL) for a batch of rows.
+ *
+ * Returns the number of true rows (optional convenience for callers).
+ */
+int
+ExecQualBatch(ExprState *state, ExprContext *econtext, TupleBatch *b)
+{
+ int i;
+ uint64 *mask;
+ int kept = 0;
+ BatchQualRuntime *rt = ExecGetBatchQualRuntime(state);;
+
+ /* verify that expression was compiled using ExecInitQual */
+ Assert(state->flags & EEO_FLAG_IS_QUAL);
+ Assert(rt && rt->mask && rt->mask_words);
+
+ /* run the batched EEOP program once */
+ econtext->scan_batch = b;
+ ExecEvalExprNoReturn(state, econtext);
+
+ mask = rt->mask;
+ if (mask_is_empty(mask, rt->mask_words))
+ return 0;
+
+ /* Add survivors into outslots */
+ TupleBatchRewind(b);
+ i = 0;
+ while (TupleBatchHasMore(b))
+ {
+ TupleTableSlot *slot = TupleBatchGetNextSlot(b);
+
+ /* mask bit set => row survives */
+ if (mask[i >> 6] & (UINT64CONST(1) << (i & 63)))
+ TupleBatchStoreInOut(b, kept++, slot);
+ i++;
+ }
+
+ return kept;
+}
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index a4cf1e51af0..e5ca619731f 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -401,6 +401,8 @@ SeqScanInitBatching(SeqScanState *scanstate, int eflags)
scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQualProject;
}
}
+
+ scanstate->ss.ps.qual_batch = ExecInitQualBatch((PlanState *) scanstate);
}
/* ----------------------------------------------------------------
diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c
index 45346124bd7..b97d5faebde 100644
--- a/src/backend/jit/llvm/llvmjit_expr.c
+++ b/src/backend/jit/llvm/llvmjit_expr.c
@@ -3033,6 +3033,17 @@ llvm_compile_expr(ExprState *state)
LLVMBuildBr(b, opblocks[opno + 1]);
break;
+ case EEOP_QUAL_BATCH_INITMASK:
+ build_EvalXFunc(b, mod, "ExecQualBatchInitMask",
+ v_state, op, v_econtext);
+ LLVMBuildBr(b, opblocks[opno + 1]);
+ break;
+ case EEOP_QUAL_BATCH_TERM:
+ build_EvalXFunc(b, mod, "ExecQualBatchTerm",
+ v_state, op, v_econtext);
+ LLVMBuildBr(b, opblocks[opno + 1]);
+ break;
+
case EEOP_LAST:
Assert(false);
break;
diff --git a/src/backend/jit/llvm/llvmjit_types.c b/src/backend/jit/llvm/llvmjit_types.c
index 1b5e06f60cc..f4f756e7cb5 100644
--- a/src/backend/jit/llvm/llvmjit_types.c
+++ b/src/backend/jit/llvm/llvmjit_types.c
@@ -187,4 +187,6 @@ void *referenced_functions[] =
ExecBuildOuterBatchVector,
ExecBuildScanBatchVector,
ExecAggPlainTransBatch,
+ ExecQualBatchInitMask,
+ ExecQualBatchTerm,
};
diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h
index f24782ecf58..f50936acaaa 100644
--- a/src/include/executor/execExpr.h
+++ b/src/include/executor/execExpr.h
@@ -306,6 +306,10 @@ typedef enum ExprEvalOp
EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP, /* per-row fmgr calls */
EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT, /* call transfn once with AggBulkArgs */
+ /* Batched qual evaluation */
+ EEOP_QUAL_BATCH_INITMASK,
+ EEOP_QUAL_BATCH_TERM,
+
/* non-existent operation, used e.g. to check array lengths */
EEOP_LAST
} ExprEvalOp;
@@ -796,6 +800,21 @@ typedef struct ExprEvalStep
{
struct BatchVector *bv;
} batch_vector;
+
+ struct
+ {
+ struct BatchVector *bv; /* filled earlier by BUILD_BATCH_VECTOR */
+ uint64 *mask; /* shared mask buffer for this program */
+ int mask_words; /* ceil(es_max_batch/64) */
+ } qualbatch_init; /* EEOP_QUAL_BATCH_INITMASK */
+
+ struct
+ {
+ struct BatchVector *bv; /* same bv as init */
+ uint64 *mask; /* same mask buffer */
+ int mask_words; /* same word count */
+ struct BatchQualTerm *term; /* compiled leaf */
+ } qualbatch_term; /* EEOP_QUAL_BATCH_TERM */
} d;
} ExprEvalStep;
@@ -975,4 +994,45 @@ extern void ExecBuildOuterBatchVector(ExprState *state, ExprEvalStep *op, ExprCo
extern void ExecBuildScanBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
extern void ExecAggPlainTransBatch(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
+
+/* See ExecQualBatchTerm(). */
+typedef enum BatchQualTermKind
+{
+ BQTK_VAR_CONST,
+ BQTK_VAR_VAR,
+ BQTK_IS_NULL,
+ BQTK_IS_NOT_NULL,
+} BatchQualTermKind;
+
+typedef struct BatchQualTerm
+{
+ BatchQualTermKind kind;
+ bool strict; /* follow strict NULL semantics if true */
+ int16 l_off; /* left VAR column (index into BatchVector) */
+ int16 r_off; /* right VAR column, or -1 if Const */
+ Datum r_const; /* for VAR_CONST */
+ bool r_isnull; /* for VAR_CONST */
+ FmgrInfo *finfo; /* fmgr for generic binary ops */
+ Oid collation; /* op collation */
+} BatchQualTerm;
+
+/*
+ * Runtime view for batched qual programs.
+ * Owned by the ExprState; lifetime == ExprState.
+ */
+typedef struct BatchQualRuntime
+{
+ uint64 *mask;
+ int mask_words;
+} BatchQualRuntime;
+
+static inline BatchQualRuntime *
+ExecGetBatchQualRuntime(ExprState *batch_qual)
+{
+ return (BatchQualRuntime *) batch_qual->batch_private;
+}
+
+extern void ExecQualBatchInitMask(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
+extern void ExecQualBatchTerm(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
+
#endif /* EXEC_EXPR_H */
diff --git a/src/include/executor/execScan.h b/src/include/executor/execScan.h
index fb4b57a831c..568a7a33b7d 100644
--- a/src/include/executor/execScan.h
+++ b/src/include/executor/execScan.h
@@ -304,7 +304,8 @@ ExecScanExtendedBatch(ScanState *node,
{
ExprContext *econtext = node->ps.ps_ExprContext;
TupleBatch *b = node->ps.ps_Batch;
- int qualified;
+ ExprState *qual_batch = node->ps.qual_batch;
+ int qualified = 0;
/* Batch path does not support EPQ */
Assert(node->ps.state->es_epq_active == NULL);
@@ -320,23 +321,31 @@ ExecScanExtendedBatch(ScanState *node,
if (qual != NULL)
{
- qualified = 0;
- while (TupleBatchHasMore(b))
+ ResetExprContext(econtext);
+ if (qual_batch)
{
- TupleTableSlot *in = TupleBatchGetNextSlot(b);
-
- Assert(in);
- ResetExprContext(econtext);
- econtext->ecxt_scantuple = in;
+ qualified = ExecQualBatch(qual_batch, econtext, b);
+ }
+ else
+ {
+ int i = 0;
- if (ExecQual(qual, econtext))
+ while (TupleBatchHasMore(b))
{
- TupleBatchStoreInOut(b, qualified, in);
- qualified++;
+ TupleTableSlot *slot = TupleBatchGetNextSlot(b);
+
+ Assert(slot);
+ econtext->ecxt_scantuple = slot;
+ if (ExecQual(qual, econtext))
+ {
+ TupleBatchStoreInOut(b, qualified, slot);
+ qualified++;
+ }
+ i++;
}
- else
- InstrCountFiltered1(node, 1);
}
+ InstrCountFiltered1(node, b->nvalid - qualified);
+ /* Update count and start using b->outslots. */
TupleBatchUseOutput(b, qualified);
}
else
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index c72bd755b79..dd0f2c74ae5 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -333,6 +333,7 @@ ExecProcNodeBatch(PlanState *node)
extern ExprState *ExecInitExpr(Expr *node, PlanState *parent);
extern ExprState *ExecInitExprWithParams(Expr *node, ParamListInfo ext_params);
extern ExprState *ExecInitQual(List *qual, PlanState *parent);
+extern ExprState *ExecInitQualBatch(PlanState *ps);
extern ExprState *ExecInitCheck(List *qual, PlanState *parent);
extern List *ExecInitExprList(List *nodes, PlanState *parent);
extern ExprState *ExecBuildAggTrans(AggState *aggstate, struct AggStatePerPhaseData *phase,
@@ -581,6 +582,8 @@ AggGetBulkArgs(FunctionCallInfo fcinfo)
}
#endif
+extern int ExecQualBatch(ExprState *state, ExprContext *econtext, TupleBatch *b);
+
extern bool ExecCheck(ExprState *state, ExprContext *econtext);
/*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index fdfe8b4ddaf..78c5abbb23a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -146,6 +146,9 @@ typedef struct ExprState
* ExecInitExprRec().
*/
ErrorSaveContext *escontext;
+
+ /* batched-program runtime (e.g., BatchQualRuntime) */
+ void *batch_private;
} ExprState;
@@ -1196,6 +1199,7 @@ typedef struct PlanState
* subPlan list, which does not exist in the plan tree).
*/
ExprState *qual; /* boolean qual condition */
+ ExprState *qual_batch; /* boolean qual condition evaluated on batches */
PlanState *lefttree; /* input plan tree(s) */
PlanState *righttree;
--
2.43.0
[application/octet-stream] v2-0006-WIP-Add-EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP.patch (21.5K, 3-v2-0006-WIP-Add-EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP.patch)
download | inline diff:
From c0797084b54d1e5d9ffe1af49c76c9396126ea1c Mon Sep 17 00:00:00 2001
From: Amit Langote <[email protected]>
Date: Tue, 2 Sep 2025 23:46:34 +0900
Subject: [PATCH v2 6/8] WIP: Add EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP
Introduce a batch EEOP that runs plain aggregate transitions by
looping over rows of a TupleBatch. This keeps transition logic in
the interpreter while amortizing per-row costs.
Gate with AggTransCanUseBatch(): plain, non-hashed, single-set
aggregates with no DISTINCT/ORDER/FILTER, and simple Var args.
Extend ExecBuildAggTrans() to prepare batch fetch/build steps and
to return whether a batch path is used.
---
src/backend/executor/execExpr.c | 228 ++++++++++++++++++++++++--
src/backend/executor/execExprInterp.c | 103 ++++++++++++
src/backend/executor/nodeAgg.c | 17 +-
src/backend/jit/llvm/llvmjit_expr.c | 6 +
src/backend/jit/llvm/llvmjit_types.c | 1 +
src/include/executor/execBatch.h | 6 +
src/include/executor/execExpr.h | 14 ++
src/include/executor/executor.h | 3 +-
src/include/executor/nodeAgg.h | 2 +
9 files changed, 363 insertions(+), 17 deletions(-)
diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c
index f1569879b52..af5ed8b6368 100644
--- a/src/backend/executor/execExpr.c
+++ b/src/backend/executor/execExpr.c
@@ -95,7 +95,9 @@ static void ExecBuildAggTransCall(ExprState *state, AggState *aggstate,
ExprEvalStep *scratch,
FunctionCallInfo fcinfo, AggStatePerTrans pertrans,
int transno, int setno, int setoff, bool ishash,
- bool nullcheck);
+ bool nullcheck, bool batch,
+ BatchVector *bv);
+
static void ExecInitJsonExpr(JsonExpr *jsexpr, ExprState *state,
Datum *resv, bool *resnull,
ExprEvalStep *scratch);
@@ -104,6 +106,10 @@ static void ExecInitJsonCoercion(ExprState *state, JsonReturning *returning,
bool exists_coerce,
Datum *resv, bool *resnull);
+static BatchVector *BatchVectorCreate(Bitmapset *attnos, AttrNumber last_var);
+static bool ExprListAllSimpleVars(const List *args, Bitmapset **allattnos);
+static BatchVectorSlice *BatchVectorSliceFromExprArgs(const List *args,
+ const BatchVector *bv);
/*
* ExecInitExpr: prepare an expression tree for execution
@@ -3659,6 +3665,33 @@ ExecInitCoerceToDomain(ExprEvalStep *scratch, CoerceToDomain *ctest,
}
}
+/* plain agg, single set, not hashed, no DISTINCT/ORDER/FILTER */
+static inline bool
+AggTransCanUseBatch(AggState *as, AggStatePerTrans pt)
+{
+ Agg *aggnode = (Agg *) as->ss.ps.plan;
+
+ if (!AggCanUsePlainBatch(as))
+ return false;
+ if (as->aggstrategy == AGG_HASHED)
+ return false;
+ if (aggnode->groupingSets != NIL)
+ return false;
+ if (as->phase == NULL || as->phase->numsets > 0)
+ return false;
+
+ /* per-aggregate complications */
+ if (pt->aggsortrequired)
+ return false;
+ if (pt->aggref &&
+ (pt->aggref->aggdistinct != NIL ||
+ pt->aggref->aggorder != NIL ||
+ pt->aggref->aggfilter != NULL))
+ return false;
+
+ return true;
+}
+
/*
* Build transition/combine function invocations for all aggregate transition
* / combination function invocations in a grouping sets phase. This has to
@@ -3675,13 +3708,17 @@ ExecInitCoerceToDomain(ExprEvalStep *scratch, CoerceToDomain *ctest,
*/
ExprState *
ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
- bool doSort, bool doHash, bool nullcheck)
+ bool doSort, bool doHash, bool nullcheck,
+ bool *batch_trans)
{
ExprState *state = makeNode(ExprState);
PlanState *parent = &aggstate->ss.ps;
ExprEvalStep scratch = {0};
bool isCombine = DO_AGGSPLIT_COMBINE(aggstate->aggsplit);
ExprSetupInfo deform = {0, 0, 0, 0, 0, NIL};
+ bool batch = AggCanUsePlainBatch(aggstate);
+ Bitmapset *allattnos = NULL;
+ BatchVector *bv = NULL;
state->expr = (Expr *) aggstate;
state->parent = parent;
@@ -3707,8 +3744,36 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
&deform);
expr_setup_walker((Node *) pertrans->aggref->aggfilter,
&deform);
+
+ if (!AggTransCanUseBatch(aggstate, pertrans) ||
+ !ExprListAllSimpleVars(pertrans->aggref->args, &allattnos))
+ batch = false;
}
- ExecPushExprSetupSteps(state, &deform);
+
+ if (batch)
+ {
+ if (deform.last_outer > 0)
+ {
+ Assert(!bms_is_empty(allattnos));
+ bv = BatchVectorCreate(allattnos, deform.last_outer);
+
+ /*
+ * Deform all tuples upto last_outer in batch
+ */
+ scratch.opcode = EEOP_OUTER_FETCHSOME_BATCH;
+ scratch.d.fetch_batch.last_var = deform.last_outer;
+ ExprEvalPushStep(state, &scratch);
+
+ /*
+ * Put all arg Vars into vectors once per batch slice
+ */
+ scratch.opcode = EEOP_BUILD_OUTER_BATCH_VECTOR;
+ scratch.d.batch_vector.bv = bv;
+ ExprEvalPushStep(state, &scratch);
+ }
+ }
+ else
+ ExecPushExprSetupSteps(state, &deform);
/*
* Emit instructions for each transition value / grouping set combination.
@@ -3746,7 +3811,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
* Evaluate arguments to aggregate/combine function.
*/
argno = 0;
- if (isCombine)
+ if (isCombine && !batch)
{
/*
* Combining two aggregate transition values. Instead of directly
@@ -3816,7 +3881,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
Assert(pertrans->numInputs == argno);
}
- else if (!pertrans->aggsortrequired)
+ else if (!pertrans->aggsortrequired && !batch)
{
ListCell *arg;
@@ -3849,7 +3914,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
}
Assert(pertrans->numTransInputs == argno);
}
- else if (pertrans->numInputs == 1)
+ else if (pertrans->numInputs == 1 && !batch)
{
/*
* Non-presorted DISTINCT and/or ORDER BY case, with a single
@@ -3868,7 +3933,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
Assert(pertrans->numInputs == argno);
}
- else
+ else if (!batch)
{
/*
* Non-presorted DISTINCT and/or ORDER BY case, with multiple
@@ -3896,7 +3961,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
* just keep the prior transValue. This is true for both plain and
* sorted/distinct aggregates.
*/
- if (trans_fcinfo->flinfo->fn_strict && pertrans->numTransInputs > 0)
+ if (trans_fcinfo->flinfo->fn_strict && pertrans->numTransInputs > 0 && !batch)
{
if (strictnulls)
scratch.opcode = EEOP_AGG_STRICT_INPUT_CHECK_NULLS;
@@ -3914,7 +3979,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
}
/* Handle DISTINCT aggregates which have pre-sorted input */
- if (pertrans->numDistinctCols > 0 && !pertrans->aggsortrequired)
+ if (pertrans->numDistinctCols > 0 && !pertrans->aggsortrequired && !batch)
{
if (pertrans->numDistinctCols > 1)
scratch.opcode = EEOP_AGG_PRESORTED_DISTINCT_MULTI;
@@ -3942,7 +4007,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
{
ExecBuildAggTransCall(state, aggstate, &scratch, trans_fcinfo,
pertrans, transno, setno, setoff, false,
- nullcheck);
+ nullcheck, batch, bv);
setoff++;
}
}
@@ -3962,7 +4027,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
{
ExecBuildAggTransCall(state, aggstate, &scratch, trans_fcinfo,
pertrans, transno, setno, setoff, true,
- nullcheck);
+ nullcheck, false, NULL);
setoff++;
}
}
@@ -4007,6 +4072,9 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase,
ExecReadyExpr(state);
+ if (batch_trans)
+ *batch_trans = batch;
+
return state;
}
@@ -4020,10 +4088,11 @@ ExecBuildAggTransCall(ExprState *state, AggState *aggstate,
ExprEvalStep *scratch,
FunctionCallInfo fcinfo, AggStatePerTrans pertrans,
int transno, int setno, int setoff, bool ishash,
- bool nullcheck)
+ bool nullcheck, bool batch, BatchVector *bv)
{
ExprContext *aggcontext;
int adjust_jumpnull = -1;
+ BatchVectorSlice *bvs = NULL;
if (ishash)
aggcontext = aggstate->hashcontext;
@@ -4077,7 +4146,13 @@ ExecBuildAggTransCall(ExprState *state, AggState *aggstate,
*/
if (!pertrans->aggsortrequired)
{
- if (pertrans->transtypeByVal)
+ if (batch)
+ {
+ if (bv)
+ bvs = BatchVectorSliceFromExprArgs(pertrans->aggref->args, bv);
+ scratch->opcode = EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP;
+ }
+ else if (pertrans->transtypeByVal)
{
if (fcinfo->flinfo->fn_strict &&
pertrans->initValueIsNull)
@@ -4108,6 +4183,7 @@ ExecBuildAggTransCall(ExprState *state, AggState *aggstate,
scratch->d.agg_trans.setoff = setoff;
scratch->d.agg_trans.transno = transno;
scratch->d.agg_trans.aggcontext = aggcontext;
+ scratch->d.agg_trans.bvs = bvs;
ExprEvalPushStep(state, scratch);
/* fix up jumpnull */
@@ -5070,3 +5146,129 @@ ExecInitJsonCoercion(ExprState *state, JsonReturning *returning,
DomainHasConstraints(returning->typid);
ExprEvalPushStep(state, &scratch);
}
+
+/* Is expr a Var node for a non-system attribute? */
+static bool
+expr_is_simple_var(Expr *expr, AttrNumber *out_attno)
+{
+ if (expr == NULL)
+ return false;
+
+ if (IsA(expr, TargetEntry))
+ return expr_is_simple_var((Expr *) ((TargetEntry *) expr)->expr,
+ out_attno);
+ if (IsA(expr, RelabelType))
+ return expr_is_simple_var((Expr *) ((RelabelType *) expr)->arg,
+ out_attno);
+
+ if (IsA(expr, Var) && ((Var *) expr)->varattno > 0)
+ {
+ *out_attno = ((Var *) expr)->varattno;
+ return true;
+ }
+
+ return false;
+}
+
+/* Are all inputs plain Vars (optionally allow RelabelType->Var)? Collect attnos. */
+static bool
+ExprListAllSimpleVars(const List *args, Bitmapset **allattnos)
+{
+ ListCell *lc;
+
+ foreach(lc, args)
+ {
+ TargetEntry *tle = lfirst_node(TargetEntry, lc);
+ Expr *arg = tle->expr;
+ AttrNumber attno;
+
+ if (!expr_is_simple_var(arg, &attno))
+ return false;
+
+ if (!IsA(arg, Var))
+ return false;
+
+ Assert(attno > 0);
+ *allattnos = bms_add_member(*allattnos, attno);
+ }
+
+ return true;
+}
+
+/* ---------- BatchVector stuff ------------- */
+
+static BatchVector *
+BatchVectorCreate(Bitmapset *attnos, AttrNumber last_var)
+{
+ int maxrows = EXEC_BATCH_ROWS;
+ BatchVector *bv;
+ AttrNumber attno;
+ int i;
+
+ bv = palloc(sizeof(BatchVector));
+ bv->ncols = bms_num_members(attnos);
+ bv->maxrows = maxrows;
+ bv->last_var = last_var;
+ bv->attnos = palloc(sizeof(AttrNumber) * bv->ncols);
+ attno = -1;
+ i = 0;
+ while ((attno = bms_next_member(attnos, attno)) > 0)
+ bv->attnos[i++] = attno;
+ bv->cols = palloc(sizeof(Datum *) * bv->ncols);
+ bv->nulls = palloc(sizeof(bool *) * bv->ncols);
+
+ for (i =0; i < bv->ncols; i++)
+ {
+ bv->cols[i] = palloc(sizeof(Datum) * maxrows);
+ bv->nulls[i] = palloc(sizeof(bool) * maxrows);
+ }
+
+ bv->nrows = 0;
+ bv->hasnull = false;
+
+ return bv;
+}
+
+static int16
+BatchVectorFindAttColno(const BatchVector *bv, AttrNumber attno)
+{
+ for (int i = 0; i < bv->ncols; i++)
+ if (bv->attnos[i] == attno)
+ return i;
+
+ return -1;
+}
+
+/*
+ * BatchVectorSliceFromExprArgs
+ * Build a BatchVectorSlice for a List of args.
+ *
+ * For Var args (possibly under RelabelType), store the col index.
+ * For non-Var args, store -1. Caller can handle Consts, etc.
+ */
+static BatchVectorSlice *
+BatchVectorSliceFromExprArgs(const List *args, const BatchVector *bv)
+{
+ BatchVectorSlice *bvs = palloc(sizeof(BatchVectorSlice));
+ int nargs = list_length(args);
+ int i = 0;
+ ListCell *lc;
+
+ Assert(bv);
+ bvs->bv = bv;
+ bvs->nargs = nargs;
+ bvs->argoffs = (int16 *) palloc(sizeof(int16) * nargs);
+
+ foreach (lc, args)
+ {
+ Expr *arg = (Expr *) lfirst(lc);
+ AttrNumber attno;
+
+ if (expr_is_simple_var(arg, &attno))
+ bvs->argoffs[i++] = BatchVectorFindAttColno(bv, attno);
+ else
+ bvs->argoffs[i++] = -1; /* non-Var */
+ }
+
+ return bvs;
+}
diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c
index 68629ad7991..3176679b346 100644
--- a/src/backend/executor/execExprInterp.c
+++ b/src/backend/executor/execExprInterp.c
@@ -606,6 +606,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
&&CASE_EEOP_BUILD_INNER_BATCH_VECTOR,
&&CASE_EEOP_BUILD_OUTER_BATCH_VECTOR,
&&CASE_EEOP_BUILD_SCAN_BATCH_VECTOR,
+ &&CASE_EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP,
&&CASE_EEOP_LAST
};
@@ -2336,6 +2337,14 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
EEO_NEXT();
}
+ EEO_CASE(EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP)
+ {
+ /* too complex for an inline implementation */
+ ExecAggPlainTransBatch(state, op, econtext);
+
+ EEO_NEXT();
+ }
+
EEO_CASE(EEOP_LAST)
{
/* unreachable */
@@ -6039,3 +6048,97 @@ ExecBuildBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext,
}
bv->nrows = i;
}
+
+void
+ExecAggPlainTransBatch(ExprState *state, ExprEvalStep *op, ExprContext *econtext)
+{
+ AggState *aggstate = castNode(AggState, state->parent);
+ AggStatePerTrans pertrans = op->d.agg_trans.pertrans;
+ AggStatePerGroup pergroup =
+ &aggstate->all_pergroups[op->d.agg_trans.setoff][op->d.agg_trans.transno];
+ BatchVectorSlice *bvs = op->d.agg_trans.bvs;
+ FunctionCallInfo fcinfo = pertrans->transfn_fcinfo;
+ FmgrInfo *finfo = fcinfo->flinfo;
+ Datum newVal;
+ TupleBatch *batch = econtext->outer_batch;
+ int batch_nrows = bvs ? bvs->bv->nrows : batch->nvalid;
+ int start_row = 0;
+
+ if (finfo->fn_strict)
+ {
+ if (pergroup->noTransValue && bvs)
+ {
+ const BatchVector *bv = bvs->bv;
+ bool found = false;
+
+ Assert(bv);
+ for (int i = 0; i < batch_nrows; i++)
+ {
+ for (int j = 0; j < bvs->nargs; j++)
+ {
+ if (!bv->nulls[bvs->argoffs[j]][i])
+ {
+ fcinfo->args[1].value = bv->cols[bvs->argoffs[j]][i];
+ fcinfo->args[1].isnull = false;
+ if (j == bvs->nargs - 1)
+ {
+ found = true;
+ break;
+ }
+ }
+ }
+ if (found)
+ break;
+ }
+ /* If transValue has not yet been initialized, do so now. */
+ ExecAggInitGroup(aggstate, pertrans, pergroup,
+ op->d.agg_trans.aggcontext);
+ start_row = 1;
+ }
+ else if (pergroup->transValueIsNull)
+ return;
+ }
+
+ switch (ExecEvalStepOp(state, op))
+ {
+ case EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP:
+ /* Loop rows, call the original transfn per element using vector cols. */
+ for (int i = start_row; i < batch_nrows; i++)
+ {
+ bool hasnull = false;
+
+ /* Set up fcinfo args 1..m from column vectors at row i. */
+ if (bvs)
+ {
+ const BatchVector *bv = bvs->bv;
+
+ for (int j = 0; j < bvs->nargs; j++)
+ {
+ int16 argoff = bvs->argoffs[j];
+
+ fcinfo->args[j+1].value = bv->cols[argoff][i];
+ fcinfo->args[j+1].isnull = bv->nulls[argoff][i];
+ if (!hasnull && bv->nulls[argoff][i])
+ hasnull = true;
+ }
+ }
+ /* fcinfo->args[0] is the existing transition state */
+ if (finfo->fn_strict && hasnull)
+ continue;
+ fcinfo->args[0].value = pergroup->transValue;
+ fcinfo->args[0].isnull = pergroup->transValueIsNull;
+ newVal = FunctionCallInvoke(fcinfo);
+ if (!pertrans->transtypeByVal &&
+ DatumGetPointer(newVal) != DatumGetPointer(pergroup->transValue))
+ newVal = ExecAggCopyTransValue(aggstate, pertrans,
+ newVal, fcinfo->isnull,
+ pergroup->transValue,
+ pergroup->transValueIsNull);
+ pergroup->transValue = newVal;
+ pergroup->transValueIsNull = fcinfo->isnull;
+ }
+ break;
+ default:
+ elog(ERROR, "invalid ExprEvalOp in ExecAggPlainTransBatch()");
+ }
+}
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 3ace6363509..662d8bef43b 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -825,6 +825,16 @@ advance_aggregates_batch(AggState *aggstate, TupleBatch *b)
{
ExprContext *tmpcontext = aggstate->tmpcontext;
ExprState *evaltrans = aggstate->phase->evaltrans;
+ bool batch_trans = aggstate->phase->batch_trans;
+
+ if (batch_trans)
+ {
+ tmpcontext->ecxt_outertuple = TupleBatchGetSlot(b, 0);
+ tmpcontext->outer_batch = b;
+ ExecEvalExprNoReturnSwitchContext(evaltrans, tmpcontext);
+ TupleBatchConsumeAll(b);
+ return;
+ }
while (TupleBatchHasMore(b))
{
@@ -1800,7 +1810,8 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck)
phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase,
dosort, dohash,
- nullcheck);
+ nullcheck,
+ NULL);
/* change back */
aggstate->ss.ps.outerops = outerops;
@@ -3367,7 +3378,7 @@ hashagg_reset_spill_state(AggState *aggstate)
}
}
-static bool
+bool
AggCanUsePlainBatch(AggState *aggstate)
{
const Agg *aggnode = (const Agg *) aggstate->ss.ps.plan;
@@ -4233,7 +4244,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
Assert(false);
phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash,
- false);
+ false, &phase->batch_trans);
/* cache compiled expression for outer slot without NULL check */
phase->evaltrans_cache[0][0] = phase->evaltrans;
diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c
index 848f0b52d6f..efb3ee639fc 100644
--- a/src/backend/jit/llvm/llvmjit_expr.c
+++ b/src/backend/jit/llvm/llvmjit_expr.c
@@ -3026,6 +3026,12 @@ llvm_compile_expr(ExprState *state)
LLVMBuildBr(b, opblocks[opno + 1]);
break;
+ case EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP:
+ build_EvalXFunc(b, mod, "ExecAggPlainTransBatch",
+ v_state, op, v_econtext);
+ LLVMBuildBr(b, opblocks[opno + 1]);
+ break;
+
case EEOP_LAST:
Assert(false);
break;
diff --git a/src/backend/jit/llvm/llvmjit_types.c b/src/backend/jit/llvm/llvmjit_types.c
index 6bb527c3f6f..1b5e06f60cc 100644
--- a/src/backend/jit/llvm/llvmjit_types.c
+++ b/src/backend/jit/llvm/llvmjit_types.c
@@ -186,4 +186,5 @@ void *referenced_functions[] =
ExecBuildInnerBatchVector,
ExecBuildOuterBatchVector,
ExecBuildScanBatchVector,
+ ExecAggPlainTransBatch,
};
diff --git a/src/include/executor/execBatch.h b/src/include/executor/execBatch.h
index 6f1a38d14bd..b50961fc0c9 100644
--- a/src/include/executor/execBatch.h
+++ b/src/include/executor/execBatch.h
@@ -99,4 +99,10 @@ TupleBatchMaterializeAll(TupleBatch *b)
TupleBatchUseInput(b, b->ntuples);
}
+static inline void
+TupleBatchConsumeAll(TupleBatch *b)
+{
+ b->next = b->nvalid;
+}
+
#endif /* EXECBATCH_H */
diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h
index 99c86bac702..1d33e084b69 100644
--- a/src/include/executor/execExpr.h
+++ b/src/include/executor/execExpr.h
@@ -302,6 +302,9 @@ typedef enum ExprEvalOp
EEOP_BUILD_OUTER_BATCH_VECTOR,
EEOP_BUILD_SCAN_BATCH_VECTOR,
+ /* Batched aggregate trans evaluation */
+ EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP, /* per-row fmgr calls */
+
/* non-existent operation, used e.g. to check array lengths */
EEOP_LAST
} ExprEvalOp;
@@ -750,6 +753,7 @@ typedef struct ExprEvalStep
/* for EEOP_AGG_PLAIN_TRANS_[INIT_][STRICT_]{BYVAL,BYREF} */
/* for EEOP_AGG_ORDERED_TRANS_{DATUM,TUPLE} */
+ /* for EEOP_AGG_PLAIN_TRANS_{BATCH,BATCH_ROWLOOP}*/
struct
{
AggStatePerTrans pertrans;
@@ -757,6 +761,7 @@ typedef struct ExprEvalStep
int setno;
int transno;
int setoff;
+ struct BatchVectorSlice *bvs;
} agg_trans;
/* for EEOP_IS_JSON */
@@ -956,8 +961,17 @@ typedef struct BatchVector
int nrows; /* #rows loaded into cols/nulls */
} BatchVector;
+/* A slice of BatchVector that maps caller args to BatchVector columns. */
+typedef struct BatchVectorSlice
+{
+ const BatchVector *bv;
+ int nargs; /* number of args covered */
+ int16 *argoffs; /* length nargs, -1 for non-Var entries */
+} BatchVectorSlice;
+
extern void ExecBuildInnerBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
extern void ExecBuildOuterBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
extern void ExecBuildScanBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
+extern void ExecAggPlainTransBatch(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
#endif /* EXEC_EXPR_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index cf5b0c7e05c..5ba9a523970 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -336,7 +336,8 @@ extern ExprState *ExecInitQual(List *qual, PlanState *parent);
extern ExprState *ExecInitCheck(List *qual, PlanState *parent);
extern List *ExecInitExprList(List *nodes, PlanState *parent);
extern ExprState *ExecBuildAggTrans(AggState *aggstate, struct AggStatePerPhaseData *phase,
- bool doSort, bool doHash, bool nullcheck);
+ bool doSort, bool doHash, bool nullcheck,
+ bool *batch_trans);
extern ExprState *ExecBuildHash32FromAttrs(TupleDesc desc,
const TupleTableSlotOps *ops,
FmgrInfo *hashfunctions,
diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h
index 6c4891bbaeb..5c5ebfc73f2 100644
--- a/src/include/executor/nodeAgg.h
+++ b/src/include/executor/nodeAgg.h
@@ -289,6 +289,7 @@ typedef struct AggStatePerPhaseData
Sort *sortnode; /* Sort node for input ordering for phase */
ExprState *evaltrans; /* evaluation of transition functions */
+ bool batch_trans; /* true if evaltrans contains batch EEOPs */
/*----------
* Cached variants of the compiled expression.
@@ -338,4 +339,5 @@ extern void ExecAggInitializeDSM(AggState *node, ParallelContext *pcxt);
extern void ExecAggInitializeWorker(AggState *node, ParallelWorkerContext *pwcxt);
extern void ExecAggRetrieveInstrumentation(AggState *node);
+extern bool AggCanUsePlainBatch(AggState *aggstate);
#endif /* NODEAGG_H */
--
2.43.0
[application/octet-stream] v2-0007-WIP-Add-EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT.patch (11.2K, 4-v2-0007-WIP-Add-EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT.patch)
download | inline diff:
From c88299a33c376aa8a5a1a5359217e9c8e67b60e8 Mon Sep 17 00:00:00 2001
From: Amit Langote <[email protected]>
Date: Tue, 9 Sep 2025 21:43:29 +0900
Subject: [PATCH v2 7/8] WIP: Add EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT
The new EEOP runs a plain aggregate transition over a TupleBatch with
a single fmgr call. Batch vectors are passed to the transfn via
AggBulkArgs stored in fcinfo->flinfo->fn_extra, avoiding per-row fmgr
overhead.
Gate selection with AggTransfnSupportsBulk(), an allowlist of
built-in transfns updated to accept AggBulkArgs. Some integer
transfns are taught to read AggBulkArgs when present, else fall
back. Rowloop batching remains available; unsupported aggregates keep
the row path.
---
src/backend/executor/execExpr.c | 28 ++++++++++++++++-
src/backend/executor/execExprInterp.c | 43 ++++++++++++++++++++++++++
src/backend/executor/nodeAgg.c | 1 -
src/backend/jit/llvm/llvmjit_expr.c | 1 +
src/backend/utils/adt/int.c | 32 +++++++++++++++++++
src/backend/utils/adt/int8.c | 44 +++++++++++++++++++++++++++
src/backend/utils/adt/numeric.c | 17 +++++++++++
src/include/executor/execExpr.h | 1 +
src/include/executor/executor.h | 20 ++++++++++++
9 files changed, 185 insertions(+), 2 deletions(-)
diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c
index af5ed8b6368..27a5780f557 100644
--- a/src/backend/executor/execExpr.c
+++ b/src/backend/executor/execExpr.c
@@ -47,6 +47,7 @@
#include "utils/acl.h"
#include "utils/array.h"
#include "utils/builtins.h"
+#include "utils/fmgroids.h"
#include "utils/jsonfuncs.h"
#include "utils/jsonpath.h"
#include "utils/lsyscache.h"
@@ -3692,6 +3693,28 @@ AggTransCanUseBatch(AggState *as, AggStatePerTrans pt)
return true;
}
+/* Return true if this transfn OID is known to accept AggBulkArgs. */
+static bool
+AggTransfnSupportsBulk(Oid fn_oid)
+{
+ /* Phase 1: hard-coded allowlist of built-ins you updated. */
+ static const Oid ok[] =
+ {
+ F_INT8INC_ANY, /* COUNT(*) transfn */
+ F_INT8INC, /* COUNT(arg) transfn */
+ F_INT4_SUM, /* SUM(int) transfn */
+ F_INT4SMALLER, /* MIN(int) transfn */
+ F_INT4LARGER, /* MAX(int) transfn */
+ /* add others you make bulk-aware */
+ InvalidOid
+ };
+
+ for (int i = 0; OidIsValid(ok[i]); i++)
+ if (ok[i] == fn_oid)
+ return true;
+ return false;
+}
+
/*
* Build transition/combine function invocations for all aggregate transition
* / combination function invocations in a grouping sets phase. This has to
@@ -4150,7 +4173,10 @@ ExecBuildAggTransCall(ExprState *state, AggState *aggstate,
{
if (bv)
bvs = BatchVectorSliceFromExprArgs(pertrans->aggref->args, bv);
- scratch->opcode = EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP;
+ if (!AggTransfnSupportsBulk(pertrans->transfn_oid))
+ scratch->opcode = EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP;
+ else
+ scratch->opcode = EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT;
}
else if (pertrans->transtypeByVal)
{
diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c
index 3176679b346..41ad9b4838d 100644
--- a/src/backend/executor/execExprInterp.c
+++ b/src/backend/executor/execExprInterp.c
@@ -607,6 +607,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
&&CASE_EEOP_BUILD_OUTER_BATCH_VECTOR,
&&CASE_EEOP_BUILD_SCAN_BATCH_VECTOR,
&&CASE_EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP,
+ &&CASE_EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT,
&&CASE_EEOP_LAST
};
@@ -2345,6 +2346,14 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
EEO_NEXT();
}
+ EEO_CASE(EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT)
+ {
+ /* too complex for an inline implementation */
+ ExecAggPlainTransBatch(state, op, econtext);
+
+ EEO_NEXT();
+ }
+
EEO_CASE(EEOP_LAST)
{
/* unreachable */
@@ -6138,6 +6147,40 @@ ExecAggPlainTransBatch(ExprState *state, ExprEvalStep *op, ExprContext *econtext
pergroup->transValueIsNull = fcinfo->isnull;
}
break;
+
+ case EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT:
+ {
+ void *save = fcinfo->flinfo->fn_extra;
+ AggBulkArgs ba = {batch_nrows, start_row};
+
+ if (bvs)
+ {
+ const BatchVector *bv = bvs->bv;
+
+ Assert(bv);
+ ba.nargs = bvs->nargs;
+ ba.argoffs = bvs->argoffs;
+ ba.args = bv->cols;
+ ba.isnull = bv->nulls;
+ ba.hasnull = bv->hasnull;
+ }
+ fcinfo->flinfo->fn_extra = &ba;
+ fcinfo->args[0].value = pergroup->transValue;
+ fcinfo->args[0].isnull = pergroup->transValueIsNull;
+ fcinfo->isnull = false; /* just in case transfn doesn't set it */
+ newVal = FunctionCallInvoke(fcinfo); /* one call for the entire slice */
+ if (!pertrans->transtypeByVal &&
+ DatumGetPointer(newVal) != DatumGetPointer(pergroup->transValue))
+ newVal = ExecAggCopyTransValue(aggstate, pertrans,
+ newVal, fcinfo->isnull,
+ pergroup->transValue,
+ pergroup->transValueIsNull);
+ pergroup->transValue = newVal;
+ pergroup->transValueIsNull = fcinfo->isnull;
+ fcinfo->flinfo->fn_extra = save;
+ }
+ break;
+
default:
elog(ERROR, "invalid ExprEvalOp in ExecAggPlainTransBatch()");
}
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index 662d8bef43b..a2286ef5e54 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -2687,7 +2687,6 @@ agg_retrieve_direct_batch(AggState *aggstate)
initialize_aggregates(aggstate, aggstate->pergroups,
Max(aggstate->phase->numsets, 1));
-
if (aggstate->grp_firstTuple)
{
ExecForceStoreHeapTuple(aggstate->grp_firstTuple, firstSlot, true);
diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c
index efb3ee639fc..45346124bd7 100644
--- a/src/backend/jit/llvm/llvmjit_expr.c
+++ b/src/backend/jit/llvm/llvmjit_expr.c
@@ -3026,6 +3026,7 @@ llvm_compile_expr(ExprState *state)
LLVMBuildBr(b, opblocks[opno + 1]);
break;
+ case EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT:
case EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP:
build_EvalXFunc(b, mod, "ExecAggPlainTransBatch",
v_state, op, v_econtext);
diff --git a/src/backend/utils/adt/int.c b/src/backend/utils/adt/int.c
index b5781989a64..eb1780b5590 100644
--- a/src/backend/utils/adt/int.c
+++ b/src/backend/utils/adt/int.c
@@ -1363,18 +1363,50 @@ int2smaller(PG_FUNCTION_ARGS)
Datum
int4larger(PG_FUNCTION_ARGS)
{
+ AggBulkArgs *ba = AggGetBulkArgs(fcinfo);
int32 arg1 = PG_GETARG_INT32(0);
int32 arg2 = PG_GETARG_INT32(1);
+ if (unlikely(ba))
+ {
+ int32 result = arg1;
+
+ for (int i = ba->start_row; i < ba->nrows; i++)
+ {
+ if (!ba->isnull[ba->argoffs[0]][i])
+ {
+ arg2 = (int32) ba->args[ba->argoffs[0]][i];
+ if (arg2 > result)
+ result = arg2;
+ }
+ }
+ PG_RETURN_INT32(result);
+ }
PG_RETURN_INT32((arg1 > arg2) ? arg1 : arg2);
}
Datum
int4smaller(PG_FUNCTION_ARGS)
{
+ AggBulkArgs *ba = AggGetBulkArgs(fcinfo);
int32 arg1 = PG_GETARG_INT32(0);
int32 arg2 = PG_GETARG_INT32(1);
+ if (unlikely(ba))
+ {
+ int32 result = arg1;
+
+ for (int i = ba->start_row; i < ba->nrows; i++)
+ {
+ if (!ba->isnull[ba->argoffs[0]][i])
+ {
+ arg2 = ba->args[ba->argoffs[0]][i];
+ if (arg2 < result)
+ result = arg2;
+ }
+ }
+ PG_RETURN_INT32(result);
+ }
PG_RETURN_INT32((arg1 < arg2) ? arg1 : arg2);
}
diff --git a/src/backend/utils/adt/int8.c b/src/backend/utils/adt/int8.c
index bdea490202a..bbabf4e0785 100644
--- a/src/backend/utils/adt/int8.c
+++ b/src/backend/utils/adt/int8.c
@@ -461,10 +461,28 @@ int8up(PG_FUNCTION_ARGS)
Datum
int8pl(PG_FUNCTION_ARGS)
{
+ AggBulkArgs *ba = AggGetBulkArgs(fcinfo);
int64 arg1 = PG_GETARG_INT64(0);
int64 arg2 = PG_GETARG_INT64(1);
int64 result;
+ if (unlikely(ba))
+ {
+ result = arg1;
+ for (int i = ba->start_row; i < ba->nrows; i++)
+ {
+ if (!ba->isnull[ba->argoffs[0]][i])
+ {
+ arg2 = ba->args[ba->argoffs[0]][i];
+ if (unlikely(pg_add_s64_overflow(arg1, arg2, &result)))
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("bigint out of range")));
+ arg1 = result;
+ }
+ }
+ PG_RETURN_INT64(result);
+ }
if (unlikely(pg_add_s64_overflow(arg1, arg2, &result)))
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
@@ -718,9 +736,35 @@ int8lcm(PG_FUNCTION_ARGS)
Datum
int8inc(PG_FUNCTION_ARGS)
{
+ AggBulkArgs *ba = AggGetBulkArgs(fcinfo);
int64 arg = PG_GETARG_INT64(0);
int64 result;
+ if (unlikely(ba))
+ {
+ result = arg;
+ if (!ba->hasnull || ba->nargs == 0)
+ {
+ if (unlikely(pg_add_s64_overflow(arg, ba->nrows, &result)))
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("bigint out of range")));
+ PG_RETURN_INT64(result);
+ }
+ for (int i = ba->start_row; i < ba->nrows; i++)
+ {
+ if (!ba->isnull[ba->argoffs[0]][i])
+ {
+ if (unlikely(pg_add_s64_overflow(arg, 1, &result)))
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("bigint out of range")));
+ arg = result;
+ }
+ }
+ PG_RETURN_INT64(result);
+ }
+
if (unlikely(pg_add_s64_overflow(arg, 1, &result)))
ereport(ERROR,
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c
index 76269918593..b02664c97f5 100644
--- a/src/backend/utils/adt/numeric.c
+++ b/src/backend/utils/adt/numeric.c
@@ -6310,6 +6310,23 @@ int4_sum(PG_FUNCTION_ARGS)
{
int64 oldsum;
int64 newval;
+ AggBulkArgs *ba = AggGetBulkArgs(fcinfo);
+
+ if (unlikely(ba))
+ {
+ int64 result = (!PG_ARGISNULL(0) ? PG_GETARG_INT64(0) : 0);
+
+ for (int i = ba->start_row; i < ba->nrows; i++)
+ {
+ if (!ba->isnull[ba->argoffs[0]][i])
+ {
+ int32 arg2 = ba->args[ba->argoffs[0]][i];
+
+ result = result + arg2;
+ }
+ }
+ PG_RETURN_INT64(result);
+ }
if (PG_ARGISNULL(0))
{
diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h
index 1d33e084b69..f24782ecf58 100644
--- a/src/include/executor/execExpr.h
+++ b/src/include/executor/execExpr.h
@@ -304,6 +304,7 @@ typedef enum ExprEvalOp
/* Batched aggregate trans evaluation */
EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP, /* per-row fmgr calls */
+ EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT, /* call transfn once with AggBulkArgs */
/* non-existent operation, used e.g. to check array lengths */
EEOP_LAST
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 5ba9a523970..c72bd755b79 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -561,6 +561,26 @@ ExecQualAndReset(ExprState *state, ExprContext *econtext)
}
#endif
+#ifndef FRONTEND
+/* Per-call bulk argument vectors for batched aggregate trans functions. */
+typedef struct AggBulkArgs
+{
+ int nrows; /* number of rows in this batch */
+ int start_row;
+ int16 *argoffs;
+ int nargs; /* number of argument vectors */
+ Datum **args; /* args[j][i] = j-th arg at row i */
+ bool **isnull; /* isnull[j][i] */
+ bool hasnull; /* is any datum in args NULL? */
+} AggBulkArgs;
+
+static inline AggBulkArgs *
+AggGetBulkArgs(FunctionCallInfo fcinfo)
+{
+ return (AggBulkArgs *) (fcinfo->flinfo ? fcinfo->flinfo->fn_extra : NULL);
+}
+#endif
+
extern bool ExecCheck(ExprState *state, ExprContext *econtext);
/*
--
2.43.0
[application/octet-stream] v2-0005-WIP-Add-EEOPs-and-helpers-for-TupleBatch-processi.patch (16.9K, 5-v2-0005-WIP-Add-EEOPs-and-helpers-for-TupleBatch-processi.patch)
download | inline diff:
From 3cf02cab36bc9b2420f98ff08c17dea082a84f59 Mon Sep 17 00:00:00 2001
From: Amit Langote <[email protected]>
Date: Mon, 22 Sep 2025 17:01:29 +0900
Subject: [PATCH v2 5/8] WIP: Add EEOPs and helpers for TupleBatch processing
Introduce new EEOP cases to fetch attributes into TupleBatch
vectors:
- EEOP_{INNER,OUTER,SCAN}_FETCHSOME_BATCH
- EEOP_BUILD_{INNER,OUTER,SCAN}_BATCH_VECTOR
Add ExecBuild{Inner,Outer,Scan}BatchVector() helpers to populate
column vectors (values, nulls, nrows, hasnull) from a TupleBatch.
Extend ExprContext with inner_batch, outer_batch, and scan_batch
fields so expression programs can access active batches directly.
Add slot_getsomeattrs_batch() to prefetch attributes across all
slots in a TupleBatch, similar to slot_getsomeattrs() for one slot.
---
src/backend/executor/execExprInterp.c | 127 +++++++++++++++++++++++++-
src/backend/executor/execTuples.c | 32 +++++++
src/backend/jit/llvm/llvmjit_expr.c | 86 +++++++++++++++++
src/backend/jit/llvm/llvmjit_types.c | 4 +
src/include/executor/execExpr.h | 45 ++++++++-
src/include/executor/tuptable.h | 2 +
src/include/nodes/execnodes.h | 24 +++--
7 files changed, 310 insertions(+), 10 deletions(-)
diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c
index 0e1a74976f7..68629ad7991 100644
--- a/src/backend/executor/execExprInterp.c
+++ b/src/backend/executor/execExprInterp.c
@@ -59,6 +59,7 @@
#include "access/heaptoast.h"
#include "catalog/pg_type.h"
#include "commands/sequence.h"
+#include "executor/execBatch.h"
#include "executor/execExpr.h"
#include "executor/nodeSubplan.h"
#include "funcapi.h"
@@ -188,6 +189,11 @@ static pg_attribute_always_inline void ExecAggPlainTransByRef(AggState *aggstate
int setno);
static char *ExecGetJsonValueItemString(JsonbValue *item, bool *resnull);
+static pg_attribute_always_inline void ExecBuildBatchVector(ExprState *state,
+ ExprEvalStep *op,
+ ExprContext *econtext,
+ TupleBatch *b);
+
/*
* ScalarArrayOpExprHashEntry
* Hash table entry type used during EEOP_HASHED_SCALARARRAYOP
@@ -446,7 +452,6 @@ ExecReadyInterpretedExpr(ExprState *state)
state->evalfunc_private = ExecInterpExpr;
}
-
/*
* Evaluate expression identified by "state" in the execution context
* given by "econtext". *isnull is set to the is-null flag for the result,
@@ -466,6 +471,9 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
TupleTableSlot *scanslot;
TupleTableSlot *oldslot;
TupleTableSlot *newslot;
+ TupleBatch *innerbatch;
+ TupleBatch *outerbatch;
+ TupleBatch *scanbatch;
/*
* This array has to be in the same order as enum ExprEvalOp.
@@ -479,6 +487,9 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
&&CASE_EEOP_SCAN_FETCHSOME,
&&CASE_EEOP_OLD_FETCHSOME,
&&CASE_EEOP_NEW_FETCHSOME,
+ &&CASE_EEOP_INNER_FETCHSOME_BATCH,
+ &&CASE_EEOP_OUTER_FETCHSOME_BATCH,
+ &&CASE_EEOP_SCAN_FETCHSOME_BATCH,
&&CASE_EEOP_INNER_VAR,
&&CASE_EEOP_OUTER_VAR,
&&CASE_EEOP_SCAN_VAR,
@@ -592,6 +603,9 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
&&CASE_EEOP_AGG_PRESORTED_DISTINCT_MULTI,
&&CASE_EEOP_AGG_ORDERED_TRANS_DATUM,
&&CASE_EEOP_AGG_ORDERED_TRANS_TUPLE,
+ &&CASE_EEOP_BUILD_INNER_BATCH_VECTOR,
+ &&CASE_EEOP_BUILD_OUTER_BATCH_VECTOR,
+ &&CASE_EEOP_BUILD_SCAN_BATCH_VECTOR,
&&CASE_EEOP_LAST
};
@@ -612,6 +626,9 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
scanslot = econtext->ecxt_scantuple;
oldslot = econtext->ecxt_oldtuple;
newslot = econtext->ecxt_newtuple;
+ innerbatch = econtext->inner_batch;
+ outerbatch = econtext->outer_batch;
+ scanbatch = econtext->scan_batch;
#if defined(EEO_USE_COMPUTED_GOTO)
EEO_DISPATCH();
@@ -658,6 +675,36 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
EEO_NEXT();
}
+ EEO_CASE(EEOP_INNER_FETCHSOME_BATCH)
+ {
+ CheckOpSlotCompatibility(op, innerslot);
+
+ Assert(innerbatch);
+ slot_getsomeattrs_batch(innerbatch, op->d.fetch_batch.last_var);
+
+ EEO_NEXT();
+ }
+
+ EEO_CASE(EEOP_OUTER_FETCHSOME_BATCH)
+ {
+ CheckOpSlotCompatibility(op, outerslot);
+
+ Assert(outerbatch);
+ slot_getsomeattrs_batch(outerbatch, op->d.fetch_batch.last_var);
+
+ EEO_NEXT();
+ }
+
+ EEO_CASE(EEOP_SCAN_FETCHSOME_BATCH)
+ {
+ CheckOpSlotCompatibility(op, scanslot);
+
+ Assert(scanbatch);
+ slot_getsomeattrs_batch(scanbatch, op->d.fetch_batch.last_var);
+
+ EEO_NEXT();
+ }
+
EEO_CASE(EEOP_OLD_FETCHSOME)
{
CheckOpSlotCompatibility(op, oldslot);
@@ -2265,6 +2312,30 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull)
EEO_NEXT();
}
+ EEO_CASE(EEOP_BUILD_INNER_BATCH_VECTOR)
+ {
+ /* too complex for an inline implementation */
+ ExecBuildInnerBatchVector(state, op, econtext);
+
+ EEO_NEXT();
+ }
+
+ EEO_CASE(EEOP_BUILD_OUTER_BATCH_VECTOR)
+ {
+ /* too complex for an inline implementation */
+ ExecBuildOuterBatchVector(state, op, econtext);
+
+ EEO_NEXT();
+ }
+
+ EEO_CASE(EEOP_BUILD_SCAN_BATCH_VECTOR)
+ {
+ /* too complex for an inline implementation */
+ ExecBuildScanBatchVector(state, op, econtext);
+
+ EEO_NEXT();
+ }
+
EEO_CASE(EEOP_LAST)
{
/* unreachable */
@@ -5914,3 +5985,57 @@ ExecAggPlainTransByRef(AggState *aggstate, AggStatePerTrans pertrans,
MemoryContextSwitchTo(oldContext);
}
+
+void
+ExecBuildInnerBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext)
+{
+ Assert(econtext->inner_batch);
+ ExecBuildBatchVector(state, op, econtext, econtext->inner_batch);
+}
+
+void
+ExecBuildOuterBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext)
+{
+ Assert(econtext->outer_batch);
+ ExecBuildBatchVector(state, op, econtext, econtext->outer_batch);
+}
+
+void
+ExecBuildScanBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext)
+{
+ Assert(econtext->scan_batch);
+ ExecBuildBatchVector(state, op, econtext, econtext->scan_batch);
+}
+
+static pg_attribute_always_inline void
+ExecBuildBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext,
+ TupleBatch *b)
+{
+ struct BatchVector *bv = op->d.batch_vector.bv;
+ int i = 0;
+
+ if (bv->ncols == 0)
+ return;
+
+ /* Fetch each requested attribute into column vectors. */
+ TupleBatchRewind(b);
+ while (TupleBatchHasMore(b))
+ {
+ TupleTableSlot *slot = TupleBatchGetNextSlot(b);
+
+ for (int j = 0; j < bv->ncols; j++)
+ {
+ AttrNumber attno = bv->attnos[j];
+ Datum *cols = bv->cols[j];
+ bool *nulls = bv->nulls[j];
+
+ Assert(attno <= slot->tts_nvalid);
+ cols[i] = slot->tts_values[attno - 1];
+ nulls[i] = slot->tts_isnull[attno - 1];
+ if (!bv->hasnull && nulls[i])
+ bv->hasnull = true;
+ }
+ i++;
+ }
+ bv->nrows = i;
+}
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index 8e02d68824f..86d5dea8f8b 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -2111,6 +2111,38 @@ slot_getsomeattrs_int(TupleTableSlot *slot, int attnum)
}
}
+void
+slot_getsomeattrs_batch(struct TupleBatch *b, int attnum)
+{
+ while (TupleBatchHasMore(b))
+ {
+ TupleTableSlot *slot = TupleBatchGetNextSlot(b);
+
+ /* Check for caller errors */
+ Assert(attnum > 0);
+
+ if (unlikely(attnum > slot->tts_tupleDescriptor->natts))
+ elog(ERROR, "invalid attribute number %d", attnum);
+
+ /* XXX - there should perhaps also be a batch-level att_nvalid */
+ if (attnum < slot->tts_nvalid)
+ continue;
+
+ /* Fetch as many attributes as possible from the underlying tuple. */
+ slot->tts_ops->getsomeattrs(slot, attnum);
+
+ /*
+ * If the underlying tuple doesn't have enough attributes, tuple
+ * descriptor must have the missing attributes.
+ */
+ if (unlikely(slot->tts_nvalid < attnum))
+ {
+ slot_getmissingattrs(slot, slot->tts_nvalid, attnum);
+ slot->tts_nvalid = attnum;
+ }
+ }
+}
+
/* ----------------------------------------------------------------
* ExecTypeFromTL
*
diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c
index 712b35df7e5..848f0b52d6f 100644
--- a/src/backend/jit/llvm/llvmjit_expr.c
+++ b/src/backend/jit/llvm/llvmjit_expr.c
@@ -109,6 +109,11 @@ llvm_compile_expr(ExprState *state)
LLVMValueRef v_newslot;
LLVMValueRef v_resultslot;
+ /* batches */
+ LLVMValueRef v_innerbatch;
+ LLVMValueRef v_outerbatch;
+ LLVMValueRef v_scanbatch;
+
/* nulls/values of slots */
LLVMValueRef v_innervalues;
LLVMValueRef v_innernulls;
@@ -221,6 +226,21 @@ llvm_compile_expr(ExprState *state)
v_state,
FIELDNO_EXPRSTATE_RESULTSLOT,
"v_resultslot");
+ v_innerbatch = l_load_struct_gep(b,
+ StructExprContext,
+ v_econtext,
+ FIELDNO_EXPRCONTEXT_OUTERBATCH,
+ "v_innerbatch");
+ v_outerbatch = l_load_struct_gep(b,
+ StructExprContext,
+ v_econtext,
+ FIELDNO_EXPRCONTEXT_OUTERBATCH,
+ "v_outerbatch");
+ v_scanbatch = l_load_struct_gep(b,
+ StructExprContext,
+ v_econtext,
+ FIELDNO_EXPRCONTEXT_SCANBATCH,
+ "v_scanbatch");
/* build global values/isnull pointers */
v_scanvalues = l_load_struct_gep(b,
@@ -439,6 +459,54 @@ llvm_compile_expr(ExprState *state)
break;
}
+ case EEOP_INNER_FETCHSOME_BATCH:
+ {
+ LLVMValueRef params[2];
+
+ params[0] = v_innerbatch;
+ params[1] = l_int32_const(lc, op->d.fetch_batch.last_var);
+
+ l_call(b,
+ llvm_pg_var_func_type("slot_getsomeattrs_batch"),
+ llvm_pg_func(mod, "slot_getsomeattrs_batch"),
+ params, lengthof(params), "");
+
+ LLVMBuildBr(b, opblocks[opno + 1]);
+ break;
+ }
+
+ case EEOP_OUTER_FETCHSOME_BATCH:
+ {
+ LLVMValueRef params[2];
+
+ params[0] = v_outerbatch;
+ params[1] = l_int32_const(lc, op->d.fetch_batch.last_var);
+
+ l_call(b,
+ llvm_pg_var_func_type("slot_getsomeattrs_batch"),
+ llvm_pg_func(mod, "slot_getsomeattrs_batch"),
+ params, lengthof(params), "");
+
+ LLVMBuildBr(b, opblocks[opno + 1]);
+ break;
+ }
+
+ case EEOP_SCAN_FETCHSOME_BATCH:
+ {
+ LLVMValueRef params[2];
+
+ params[0] = v_scanbatch;
+ params[1] = l_int32_const(lc, op->d.fetch_batch.last_var);
+
+ l_call(b,
+ llvm_pg_var_func_type("slot_getsomeattrs_batch"),
+ llvm_pg_func(mod, "slot_getsomeattrs_batch"),
+ params, lengthof(params), "");
+
+ LLVMBuildBr(b, opblocks[opno + 1]);
+ break;
+ }
+
case EEOP_INNER_VAR:
case EEOP_OUTER_VAR:
case EEOP_SCAN_VAR:
@@ -2940,6 +3008,24 @@ llvm_compile_expr(ExprState *state)
LLVMBuildBr(b, opblocks[opno + 1]);
break;
+ case EEOP_BUILD_INNER_BATCH_VECTOR:
+ build_EvalXFunc(b, mod, "ExecBuildInnerBatchVector",
+ v_state, op, v_econtext);
+ LLVMBuildBr(b, opblocks[opno + 1]);
+ break;
+
+ case EEOP_BUILD_OUTER_BATCH_VECTOR:
+ build_EvalXFunc(b, mod, "ExecBuildOuterBatchVector",
+ v_state, op, v_econtext);
+ LLVMBuildBr(b, opblocks[opno + 1]);
+ break;
+
+ case EEOP_BUILD_SCAN_BATCH_VECTOR:
+ build_EvalXFunc(b, mod, "ExecBuildScanBatchVector",
+ v_state, op, v_econtext);
+ LLVMBuildBr(b, opblocks[opno + 1]);
+ break;
+
case EEOP_LAST:
Assert(false);
break;
diff --git a/src/backend/jit/llvm/llvmjit_types.c b/src/backend/jit/llvm/llvmjit_types.c
index 167cd554b9c..6bb527c3f6f 100644
--- a/src/backend/jit/llvm/llvmjit_types.c
+++ b/src/backend/jit/llvm/llvmjit_types.c
@@ -179,7 +179,11 @@ void *referenced_functions[] =
MakeExpandedObjectReadOnlyInternal,
slot_getmissingattrs,
slot_getsomeattrs_int,
+ slot_getsomeattrs_batch,
strlen,
varsize_any,
ExecInterpExprStillValid,
+ ExecBuildInnerBatchVector,
+ ExecBuildOuterBatchVector,
+ ExecBuildScanBatchVector,
};
diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h
index 75366203706..99c86bac702 100644
--- a/src/include/executor/execExpr.h
+++ b/src/include/executor/execExpr.h
@@ -78,6 +78,11 @@ typedef enum ExprEvalOp
EEOP_OLD_FETCHSOME,
EEOP_NEW_FETCHSOME,
+ /* apply slot_getsomeattrs_batch() to corresponding batch */
+ EEOP_INNER_FETCHSOME_BATCH,
+ EEOP_OUTER_FETCHSOME_BATCH,
+ EEOP_SCAN_FETCHSOME_BATCH,
+
/* compute non-system Var value */
EEOP_INNER_VAR,
EEOP_OUTER_VAR,
@@ -292,11 +297,15 @@ typedef enum ExprEvalOp
EEOP_AGG_ORDERED_TRANS_DATUM,
EEOP_AGG_ORDERED_TRANS_TUPLE,
+ /* ExprContext.*_batch -> BatchVector */
+ EEOP_BUILD_INNER_BATCH_VECTOR,
+ EEOP_BUILD_OUTER_BATCH_VECTOR,
+ EEOP_BUILD_SCAN_BATCH_VECTOR,
+
/* non-existent operation, used e.g. to check array lengths */
EEOP_LAST
} ExprEvalOp;
-
typedef struct ExprEvalStep
{
/*
@@ -331,6 +340,12 @@ typedef struct ExprEvalStep
const TupleTableSlotOps *kind;
} fetch;
+ struct
+ {
+ /* attribute number up to which to fetch (inclusive) */
+ int last_var;
+ } fetch_batch;
+
/* for EEOP_INNER/OUTER/SCAN/OLD/NEW_[SYS]VAR */
struct
{
@@ -769,6 +784,12 @@ typedef struct ExprEvalStep
void *json_coercion_cache;
ErrorSaveContext *escontext;
} jsonexpr_coercion;
+
+ /* for batch vector construction */
+ struct
+ {
+ struct BatchVector *bv;
+ } batch_vector;
} d;
} ExprEvalStep;
@@ -917,4 +938,26 @@ extern void ExecEvalAggOrderedTransDatum(ExprState *state, ExprEvalStep *op,
extern void ExecEvalAggOrderedTransTuple(ExprState *state, ExprEvalStep *op,
ExprContext *econtext);
+/* ---------- BatchVector stuff ------------- */
+
+/* Vector fetch spec for a list of simple Vars. */
+typedef struct BatchVector
+{
+ /* immutable after BatchVectorCreate */
+ AttrNumber *attnos; /* [ncols] */
+ int ncols;
+ int maxrows;
+ int last_var;
+
+ /* per batch state */
+ Datum **cols; /* [ncols][maxbatch] */
+ bool **nulls; /* [ncols][maxbatch] */
+ bool hasnull; /* is any datum in cols NULL? */
+ int nrows; /* #rows loaded into cols/nulls */
+} BatchVector;
+
+extern void ExecBuildInnerBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
+extern void ExecBuildOuterBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
+extern void ExecBuildScanBatchVector(ExprState *state, ExprEvalStep *op, ExprContext *econtext);
+
#endif /* EXEC_EXPR_H */
diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h
index 095e4cc82e3..2e2192fb3cf 100644
--- a/src/include/executor/tuptable.h
+++ b/src/include/executor/tuptable.h
@@ -347,6 +347,8 @@ extern Datum ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot);
extern void slot_getmissingattrs(TupleTableSlot *slot, int startAttNum,
int lastAttNum);
extern void slot_getsomeattrs_int(TupleTableSlot *slot, int attnum);
+struct TupleBatch;
+extern void slot_getsomeattrs_batch(struct TupleBatch *b, int attnum);
#ifndef FRONTEND
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 9b81b842161..fdfe8b4ddaf 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -277,6 +277,14 @@ typedef struct ExprContext
#define FIELDNO_EXPRCONTEXT_OUTERTUPLE 3
TupleTableSlot *ecxt_outertuple;
+ /* For batched evaluation using batch-aware EEOPs */
+#define FIELDNO_EXPRCONTEXT_INNERBATCH 4
+ TupleBatch *inner_batch;
+#define FIELDNO_EXPRCONTEXT_OUTERBATCH 5
+ TupleBatch *outer_batch;
+#define FIELDNO_EXPRCONTEXT_SCANBATCH 6
+ TupleBatch *scan_batch;
+
/* Memory contexts for expression evaluation --- see notes above */
MemoryContext ecxt_per_query_memory;
MemoryContext ecxt_per_tuple_memory;
@@ -289,27 +297,27 @@ typedef struct ExprContext
* Values to substitute for Aggref nodes in the expressions of an Agg
* node, or for WindowFunc nodes within a WindowAgg node.
*/
-#define FIELDNO_EXPRCONTEXT_AGGVALUES 8
+#define FIELDNO_EXPRCONTEXT_AGGVALUES 11
Datum *ecxt_aggvalues; /* precomputed values for aggs/windowfuncs */
-#define FIELDNO_EXPRCONTEXT_AGGNULLS 9
+#define FIELDNO_EXPRCONTEXT_AGGNULLS 12
bool *ecxt_aggnulls; /* null flags for aggs/windowfuncs */
/* Value to substitute for CaseTestExpr nodes in expression */
-#define FIELDNO_EXPRCONTEXT_CASEDATUM 10
+#define FIELDNO_EXPRCONTEXT_CASEDATUM 13
Datum caseValue_datum;
-#define FIELDNO_EXPRCONTEXT_CASENULL 11
+#define FIELDNO_EXPRCONTEXT_CASENULL 14
bool caseValue_isNull;
/* Value to substitute for CoerceToDomainValue nodes in expression */
-#define FIELDNO_EXPRCONTEXT_DOMAINDATUM 12
+#define FIELDNO_EXPRCONTEXT_DOMAINDATUM 15
Datum domainValue_datum;
-#define FIELDNO_EXPRCONTEXT_DOMAINNULL 13
+#define FIELDNO_EXPRCONTEXT_DOMAINNULL 16
bool domainValue_isNull;
/* Tuples that OLD/NEW Var nodes in RETURNING may refer to */
-#define FIELDNO_EXPRCONTEXT_OLDTUPLE 14
+#define FIELDNO_EXPRCONTEXT_OLDTUPLE 17
TupleTableSlot *ecxt_oldtuple;
-#define FIELDNO_EXPRCONTEXT_NEWTUPLE 15
+#define FIELDNO_EXPRCONTEXT_NEWTUPLE 18
TupleTableSlot *ecxt_newtuple;
/* Link to containing EState (NULL if a standalone ExprContext) */
--
2.43.0
[application/octet-stream] v2-0004-WIP-Add-agg_retrieve_direct_batch-for-plain-aggre.patch (6.3K, 6-v2-0004-WIP-Add-agg_retrieve_direct_batch-for-plain-aggre.patch)
download | inline diff:
From abb8b1ded7cf192d286662dd320ad93802ce05d2 Mon Sep 17 00:00:00 2001
From: Amit Langote <[email protected]>
Date: Thu, 4 Sep 2025 22:55:25 +0900
Subject: [PATCH v2 4/8] WIP: Add agg_retrieve_direct_batch() for plain
aggregates
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Teach Agg to consume child tuples in batches for AGG_PLAIN. A new
agg_retrieve_direct_batch() pulls TupleBatch from the child via
ExecProcNodeBatch(), materializes as needed, and advances per-agg
transition state over the batch. A first tuple is copied to match
the direct path’s behavior before batch processing.
Add AggCanUsePlainBatch() and select retrieve_plain at init:
batch path when no grouping sets, strategy is AGG_PLAIN, and the
child exposes ExecProcNodeBatch(); otherwise keep the row path.
Plan shape and EXPLAIN remain unchanged. Semantics are identical
to the non-batch direct path; this only reduces per-tuple overhead.
---
src/backend/executor/nodeAgg.c | 123 +++++++++++++++++++++++++++++++++
src/include/nodes/execnodes.h | 5 ++
2 files changed, 128 insertions(+)
diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c
index a4f3d30f307..3ace6363509 100644
--- a/src/backend/executor/nodeAgg.c
+++ b/src/backend/executor/nodeAgg.c
@@ -820,6 +820,20 @@ advance_aggregates(AggState *aggstate)
aggstate->tmpcontext);
}
+static void
+advance_aggregates_batch(AggState *aggstate, TupleBatch *b)
+{
+ ExprContext *tmpcontext = aggstate->tmpcontext;
+ ExprState *evaltrans = aggstate->phase->evaltrans;
+
+ while (TupleBatchHasMore(b))
+ {
+ tmpcontext->ecxt_outertuple = TupleBatchGetNextSlot(b);
+ ExecEvalExprNoReturnSwitchContext(evaltrans, tmpcontext);
+ ResetExprContext(tmpcontext);
+ }
+}
+
/*
* Run the transition function for a DISTINCT or ORDER BY aggregate
* with only one input. This is called after we have completed
@@ -2260,6 +2274,9 @@ ExecAgg(PlanState *pstate)
result = agg_retrieve_hash_table(node);
break;
case AGG_PLAIN:
+ /* init-time choice */
+ result = node->retrieve_plain(node);
+ break;
case AGG_SORTED:
result = agg_retrieve_direct(node);
break;
@@ -2618,6 +2635,91 @@ agg_retrieve_direct(AggState *aggstate)
return NULL;
}
+static TupleTableSlot *
+agg_retrieve_direct_batch(AggState *aggstate)
+{
+ PlanState *child = outerPlanState(aggstate);
+ ExprContext *econtext = aggstate->ss.ps.ps_ExprContext;
+ ExprContext *tmpcontext = aggstate->tmpcontext;
+ const bool hasGroupingSets = aggstate->phase->numsets > 0;
+ TupleTableSlot *firstSlot = aggstate->ss.ss_ScanTupleSlot;
+ TupleBatch *b = NULL;
+
+ Assert(child->ExecProcNodeBatch);
+
+ /* mimic the first-tuple copy from agg_retrieve_direct() */
+ for (;;)
+ {
+ b = ExecProcNodeBatch(child);
+ if (b == NULL)
+ {
+ if (hasGroupingSets)
+ {
+ aggstate->input_done = true;
+ break;
+ }
+ aggstate->agg_done = true;
+ break;
+ }
+ if (b->nvalid == 0)
+ continue;
+
+ TupleBatchMaterializeAll(b);
+ aggstate->grp_firstTuple = ExecCopySlotHeapTuple(TupleBatchGetSlot(b, 0));
+ break;
+ }
+
+ /* initialize_aggregates etc. as in the direct path */
+ ReScanExprContext(econtext);
+ for (int i = 0; i < Max(aggstate->phase->numsets, 1); i++)
+ ReScanExprContext(aggstate->aggcontexts[i]);
+
+ initialize_aggregates(aggstate, aggstate->pergroups,
+ Max(aggstate->phase->numsets, 1));
+
+ if (aggstate->grp_firstTuple)
+ {
+ ExecForceStoreHeapTuple(aggstate->grp_firstTuple, firstSlot, true);
+ aggstate->grp_firstTuple = NULL;
+ tmpcontext->ecxt_outertuple = firstSlot;
+
+ advance_aggregates_batch(aggstate, b);
+ ResetExprContext(tmpcontext);
+ }
+
+ /* consume remaining rows in current and subsequent batches */
+ if (b)
+ {
+ if (TupleBatchHasMore(b))
+ advance_aggregates_batch(aggstate, b);
+ for (;;)
+ {
+ b = ExecProcNodeBatch(child);
+ if (b == NULL)
+ {
+ if (hasGroupingSets)
+ aggstate->input_done = true;
+ else
+ aggstate->agg_done = true;
+ break;
+ }
+ if (b->nvalid == 0)
+ continue;
+
+ TupleBatchMaterializeAll(b);
+ advance_aggregates_batch(aggstate, b);
+ }
+ }
+
+ /* finalize and project like the direct path */
+ econtext->ecxt_outertuple = firstSlot;
+ prepare_projection_slot(aggstate, econtext->ecxt_outertuple, 0);
+ select_current_set(aggstate, 0, false);
+ finalize_aggregates(aggstate, aggstate->peragg, aggstate->pergroups[0]);
+
+ return project_aggregates(aggstate);
+}
+
/*
* ExecAgg for hashed case: read input and build hash table
*/
@@ -3265,6 +3367,22 @@ hashagg_reset_spill_state(AggState *aggstate)
}
}
+static bool
+AggCanUsePlainBatch(AggState *aggstate)
+{
+ const Agg *aggnode = (const Agg *) aggstate->ss.ps.plan;
+
+ Assert(outerPlanState(aggstate));
+
+ /* grouping sets present -> bail */
+ if (aggnode->groupingSets != NIL)
+ return false;
+
+ if (aggstate->phase->aggstrategy != AGG_PLAIN)
+ return false;
+
+ return outerPlanState(aggstate)->ExecProcNodeBatch;
+}
/* -----------------
* ExecInitAgg
@@ -4060,6 +4178,11 @@ ExecInitAgg(Agg *node, EState *estate, int eflags)
(errcode(ERRCODE_GROUPING_ERROR),
errmsg("aggregate function calls cannot be nested")));
+ if (AggCanUsePlainBatch(aggstate))
+ aggstate->retrieve_plain = agg_retrieve_direct_batch;
+ else
+ aggstate->retrieve_plain = agg_retrieve_direct;
+
/*
* Build expressions doing all the transition work at once. We build a
* different one for each phase, as the number of transition function
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a104591ac20..9b81b842161 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -2535,6 +2535,9 @@ typedef struct AggStatePerGroupData *AggStatePerGroup;
typedef struct AggStatePerPhaseData *AggStatePerPhase;
typedef struct AggStatePerHashData *AggStatePerHash;
+struct AggState;
+typedef TupleTableSlot *(*AggRetrievePlainFn)(struct AggState *);
+
typedef struct AggState
{
ScanState ss; /* its first field is NodeTag */
@@ -2610,6 +2613,8 @@ typedef struct AggState
AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than
* ->hash_pergroup */
SharedAggInfo *shared_info; /* one entry per worker */
+
+ AggRetrievePlainFn retrieve_plain; /* init-time choice */
} AggState;
/* ----------------
--
2.43.0
[application/octet-stream] v2-0001-Add-batch-table-AM-API-and-heapam-implementation.patch (13.7K, 7-v2-0001-Add-batch-table-AM-API-and-heapam-implementation.patch)
download | inline diff:
From 3318650e720a01cbd5948349b9fbcdbb8ddda7cf Mon Sep 17 00:00:00 2001
From: Amit Langote <[email protected]>
Date: Mon, 1 Sep 2025 21:56:17 +0900
Subject: [PATCH v2 1/8] Add batch table AM API and heapam implementation
Introduce new table AM callbacks to fetch multiple tuples per call.
This reduces per-tuple call overhead by letting executor nodes work
in batches.
Define a HeapBatch structure and supporting code in tableam.h.
Batches are limited to tuples from a single page and at most
EXEC_BATCH_ROWS (currently 64) entries.
Provide initial heapam support with heapgettup_pagemode_batch().
No executor node is switched over yet; a later commit will adapt
SeqScan to use this API. Other nodes may adopt it in the future.
Also add pgstat_count_heap_getnext_batch() to record batched fetches
in pgstat.
---
src/backend/access/heap/heapam.c | 212 ++++++++++++++++++++++-
src/backend/access/heap/heapam_handler.c | 4 +
src/include/access/heapam.h | 21 +++
src/include/access/tableam.h | 58 +++++++
src/include/pgstat.h | 5 +
5 files changed, 299 insertions(+), 1 deletion(-)
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index ed0c0c2dc9f..f62f7edbf5e 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1008,7 +1008,7 @@ heapgettup_pagemode(HeapScanDesc scan,
int nkeys,
ScanKey key)
{
- HeapTuple tuple = &(scan->rs_ctup);
+ HeapTuple tuple = &scan->rs_ctup;
Page page;
uint32 lineindex;
uint32 linesleft;
@@ -1089,6 +1089,121 @@ continue_page:
scan->rs_inited = false;
}
+/*
+ * heapgettup_pagemode_batch
+ * Collect up to 'maxitems' visible tuples from a single page in page mode.
+ *
+ * This function returns a *batch* of tuples from one heap page. If the
+ * current page (as tracked by the scan desc) has no more tuples left,
+ * it will advance to the next page and prepare it (via heap_prepare_pagescan).
+ * It will not cross a page boundary while filling the batch.
+ *
+ * Return value:
+ * number of tuples written into 'tdata' (0 at end-of-scan).
+ *
+ * Side effects:
+ * - Ensures rs_cbuf pins the page from which tuples were produced.
+ * - Sets rs_cblock, rs_cindex, rs_ntuples consistently (same as
+ * heapgettup_pagemode’s inner-loop effects).
+ * - Does *not* change buffer pin counts except through normal page
+ * transitions performed by heap_fetch_next_buffer().
+ */
+static int
+heapgettup_pagemode_batch(HeapScanDesc scan,
+ ScanDirection dir,
+ int nkeys, ScanKey key,
+ HeapTupleData *tdata,
+ int maxitems)
+{
+ Page page;
+ uint32 lineindex;
+ uint32 linesleft;
+ int nout = 0;
+
+ Assert(ScanDirectionIsForward(dir));
+ Assert(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE);
+ Assert(maxitems > 0);
+
+ /*
+ * If we have no current page (or the current page is exhausted),
+ * advance to the next page that has any visible tuples and prepare it.
+ * This mirrors the outer loop of heapgettup_pagemode(), but we stop
+ * as soon as we have a prepared page; we never produce from two pages.
+ */
+ for (;;)
+ {
+ if (BufferIsValid(scan->rs_cbuf))
+ {
+ /* Are there more visible tuples left on this page? */
+ lineindex = scan->rs_cindex + dir;
+ if (ScanDirectionIsForward(dir))
+ linesleft = (lineindex <= (uint32) scan->rs_ntuples) ?
+ (scan->rs_ntuples - lineindex) : 0;
+ else
+ linesleft = scan->rs_cindex;
+ if (linesleft > 0)
+ break; /* continue on this page */
+ }
+
+ /* Move to next page and prepare its visible tuple list. */
+ heap_fetch_next_buffer(scan, dir);
+
+ if (!BufferIsValid(scan->rs_cbuf))
+ {
+ /* end of scan; keep rs_cbuf invalid like heapgettup_pagemode */
+ scan->rs_cblock = InvalidBlockNumber;
+ scan->rs_prefetch_block = InvalidBlockNumber;
+ scan->rs_inited = false;
+ return 0;
+ }
+
+ Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock);
+ heap_prepare_pagescan((TableScanDesc) scan);
+
+ /* After prepare, either rs_ntuples > 0 or we'll loop again. */
+ if (scan->rs_ntuples > 0)
+ {
+ lineindex = ScanDirectionIsForward(dir) ? 0 : scan->rs_ntuples - 1;
+ linesleft = scan->rs_ntuples - (ScanDirectionIsForward(dir) ? 0 : 0);
+ break;
+ }
+ /* else: page had no visible tuples; continue to next page */
+ }
+
+ /* From here on, we must only read tuples from this single page. */
+ page = BufferGetPage(scan->rs_cbuf);
+
+ /*
+ * Walk rs_vistuples[] from 'lineindex', copying headers into tdata[]
+ * until either the page is exhausted or the batch capacity is reached.
+ */
+ for (; linesleft > 0 && nout < maxitems; linesleft--, lineindex += dir)
+ {
+ OffsetNumber lineoff;
+ ItemId lpp;
+ HeapTupleData *dst = &tdata[nout];
+
+ Assert(lineindex <= (uint32) scan->rs_ntuples);
+ lineoff = scan->rs_vistuples[lineindex];
+ lpp = PageGetItemId(page, lineoff);
+ Assert(ItemIdIsNormal(lpp));
+
+ dst->t_data = (HeapTupleHeader) PageGetItem(page, lpp);
+ dst->t_len = ItemIdGetLength(lpp);
+ dst->t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
+ ItemPointerSet(&(dst->t_self), scan->rs_cblock, lineoff);
+
+ if (key != NULL &&
+ !HeapKeyTest(dst, RelationGetDescr(scan->rs_base.rs_rd),
+ nkeys, key))
+ continue;
+
+ scan->rs_cindex = lineindex;
+ nout++;
+ }
+
+ return nout;
+}
/* ----------------------------------------------------------------
* heap access method interface
@@ -1136,6 +1251,8 @@ heap_beginscan(Relation relation, Snapshot snapshot,
scan->rs_base.rs_parallel = parallel_scan;
scan->rs_strategy = NULL; /* set in initscan */
scan->rs_cbuf = InvalidBuffer;
+ scan->rs_batch_ctup = NULL;
+ scan->rs_batch_cbuf = InvalidBuffer;
/*
* Disable page-at-a-time mode if it's not a MVCC-safe snapshot.
@@ -1315,6 +1432,8 @@ heap_endscan(TableScanDesc sscan)
*/
if (BufferIsValid(scan->rs_cbuf))
ReleaseBuffer(scan->rs_cbuf);
+ if (BufferIsValid(scan->rs_batch_cbuf))
+ ReleaseBuffer(scan->rs_batch_cbuf);
/*
* Must free the read stream before freeing the BufferAccessStrategy.
@@ -1421,6 +1540,97 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s
return true;
}
+/*---------- Batching support -----------*/
+
+/*
+ * heap_scan_begin_batch
+ *
+ * Allocate a HeapBatch with space for 'maxitems' tuple headers. No pin is
+ * taken here. Memory is allocated under the scan's memory context.
+ */
+void *
+heap_begin_batch(TableScanDesc sscan, int maxitems)
+{
+ HeapBatch *hb;
+ Oid relid;
+
+ Assert(maxitems > 0);
+
+ hb = palloc(sizeof(HeapBatch));
+ hb->tupdata = palloc(sizeof(HeapTupleData) * maxitems);
+ hb->maxitems = maxitems;
+ hb->nitems = 0;
+ hb->buf = InvalidBuffer;
+
+ /* Initialize static fields of HeapTupleData. Row bodies remain on page. */
+ relid = RelationGetRelid(sscan->rs_rd);
+ for (int i = 0; i < maxitems; i++)
+ hb->tupdata[i].t_tableOid = relid;
+
+ return hb;
+}
+
+/*
+ * heap_scan_end_batch
+ *
+ * Release any outstanding pin and free the batch allocations. Caller will
+ * not use 'am_batch' after this point.
+ */
+void
+heap_end_batch(TableScanDesc sscan, void *am_batch)
+{
+ HeapBatch *hb = (HeapBatch *) am_batch;
+
+ if (BufferIsValid(hb->buf))
+ ReleaseBuffer(hb->buf);
+
+ pfree(hb->tupdata);
+ pfree(hb);
+}
+
+int
+heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir)
+{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+ HeapBatch *hb = (HeapBatch *) am_batch;
+ Buffer curbuf;
+ int n;
+
+ Assert(ScanDirectionIsForward(dir));
+ Assert(sscan->rs_flags & SO_ALLOW_PAGEMODE);
+ Assert(hb->maxitems > 0);
+
+ /* Drop prior batch pin, if any. */
+ if (BufferIsValid(hb->buf))
+ {
+ ReleaseBuffer(hb->buf);
+ hb->buf = InvalidBuffer;
+ }
+
+ hb->nitems = 0;
+
+ /* One call per batch, never crosses a page. */
+ n = heapgettup_pagemode_batch(scan, dir,
+ sscan->rs_nkeys, sscan->rs_key,
+ hb->tupdata, hb->maxitems);
+
+ if (n == 0)
+ return 0; /* end of scan */
+
+ /* Hold a shared pin for the batch lifetime so t_data stays valid. */
+ curbuf = scan->rs_cbuf;
+ IncrBufferRefCount(curbuf);
+ hb->buf = curbuf;
+
+ /* Per-tuple stats (can be collapsed into a future _multi() call). */
+ pgstat_count_heap_getnext_batch(sscan->rs_rd, n);
+
+ hb->nitems = n;
+ return n;
+}
+
+/*----- End of batching support -----*/
+
void
heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
ItemPointer maxtid)
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index bcbac844bb6..ec4eeccf19c 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -2623,6 +2623,10 @@ static const TableAmRoutine heapam_methods = {
.scan_rescan = heap_rescan,
.scan_getnextslot = heap_getnextslot,
+ .scan_begin_batch = heap_begin_batch,
+ .scan_getnextbatch = heap_getnextbatch,
+ .scan_end_batch = heap_end_batch,
+
.scan_set_tidrange = heap_set_tidrange,
.scan_getnextslot_tidrange = heap_getnextslot_tidrange,
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index e60d34dad25..02f7793fba0 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -74,6 +74,9 @@ typedef struct HeapScanDescData
HeapTupleData rs_ctup; /* current tuple in scan, if any */
+ HeapTupleData *rs_batch_ctup; /* NULL when not using batched mode */
+ Buffer rs_batch_cbuf; /* buffer feeding the batch */
+
/* For scans that stream reads */
ReadStream *rs_read_stream;
@@ -101,6 +104,19 @@ typedef struct HeapScanDescData
} HeapScanDescData;
typedef struct HeapScanDescData *HeapScanDesc;
+/*
+ * HeapBatch -- stateless per-batch buffer. A batch pins one page and
+ * exposes up to maxitems HeapTupleData headers whose t_data point into that
+ * page.
+ */
+typedef struct HeapBatch
+{
+ HeapTupleData *tupdata; /* len = maxitems; headers only */
+ int nitems; /* tuples produced in last getnextbatch() */
+ int maxitems; /* fixed capacity set at begin_batch() */
+ Buffer buf; /* single pinned buffer for this batch */
+} HeapBatch;
+
typedef struct BitmapHeapScanDescData
{
HeapScanDescData rs_heap_base;
@@ -294,6 +310,11 @@ extern void heap_endscan(TableScanDesc sscan);
extern HeapTuple heap_getnext(TableScanDesc sscan, ScanDirection direction);
extern bool heap_getnextslot(TableScanDesc sscan,
ScanDirection direction, TupleTableSlot *slot);
+
+extern void *heap_begin_batch(TableScanDesc sscan, int maxitems);
+extern void heap_end_batch(TableScanDesc sscan, void *am_batch);
+extern int heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir);
+
extern void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
ItemPointer maxtid);
extern bool heap_getnextslot_tidrange(TableScanDesc sscan,
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index e16bf025692..953207eac50 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -351,6 +351,16 @@ typedef struct TableAmRoutine
ScanDirection direction,
TupleTableSlot *slot);
+ /* ------------------------------------------------------------------------
+ * Batched scan support
+ * ------------------------------------------------------------------------
+ */
+
+ void *(*scan_begin_batch)(TableScanDesc sscan, int maxitems);
+ int (*scan_getnextbatch)(TableScanDesc sscan, void *am_batch,
+ ScanDirection dir);
+ void (*scan_end_batch)(TableScanDesc sscan, void *am_batch);
+
/*-----------
* Optional functions to provide scanning for ranges of ItemPointers.
* Implementations must either provide both of these functions, or neither
@@ -1036,6 +1046,54 @@ table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableS
return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
}
+/*
+ * table_scan_begin_batch
+ * Allocate AM-owned batch payload with capacity 'maxitems'.
+ */
+static inline void *
+table_scan_begin_batch(TableScanDesc sscan, int maxitems)
+{
+ const TableAmRoutine *tam = sscan->rs_rd->rd_tableam;
+
+ Assert(tam->scan_begin_batch != NULL);
+
+ return tam->scan_begin_batch(sscan, maxitems);
+}
+
+/*
+ * table_scan_getnextbatch
+ * Fill next batch from the AM. Returns number of tuples, 0 => EOS.
+ * Batches are single-page in v1. Direction is forward only in v1.
+ */
+static inline int
+table_scan_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir)
+{
+ const TableAmRoutine *tam = sscan->rs_rd->rd_tableam;
+
+ /* Only forward scans are supported in the batched mode. */
+ Assert(dir == ForwardScanDirection);
+ Assert(tam->scan_getnextbatch != NULL);
+
+ return tam->scan_getnextbatch(sscan, am_batch, dir);
+}
+
+/*
+ * table_scan_end_batch
+ * Release AM-owned resources for the batch payload.
+ */
+static inline void
+table_scan_end_batch(TableScanDesc sscan, void *am_batch)
+{
+ const TableAmRoutine *tam = sscan->rs_rd->rd_tableam;
+
+ if (am_batch == NULL)
+ return;
+
+ Assert(tam->scan_end_batch != NULL);
+
+ tam->scan_end_batch(sscan, am_batch);
+}
+
/* ----------------------------------------------------------------------------
* TID Range scanning related functions.
* ----------------------------------------------------------------------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index e4a59a30b8c..aaea9520b1d 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -687,6 +687,11 @@ extern void pgstat_report_analyze(Relation rel,
if (pgstat_should_count_relation(rel)) \
(rel)->pgstat_info->counts.tuples_returned++; \
} while (0)
+#define pgstat_count_heap_getnext_batch(rel, n) \
+ do { \
+ if (pgstat_should_count_relation(rel)) \
+ (rel)->pgstat_info->counts.tuples_returned += n; \
+ } while (0)
#define pgstat_count_heap_fetch(rel) \
do { \
if (pgstat_should_count_relation(rel)) \
--
2.43.0
[application/octet-stream] v2-0003-Executor-add-ExecProcNodeBatch-and-integrate-SeqS.patch (9.0K, 8-v2-0003-Executor-add-ExecProcNodeBatch-and-integrate-SeqS.patch)
download | inline diff:
From 10d0df2676462f1931b2ef5072eed7129d936328 Mon Sep 17 00:00:00 2001
From: Amit Langote <[email protected]>
Date: Mon, 1 Sep 2025 22:18:30 +0900
Subject: [PATCH v2 3/8] Executor: add ExecProcNodeBatch() and integrate
SeqScan with batch API
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Introduce a batch-capable executor interface alongside the existing
slot-at-a-time path:
* ExecProcNodeBatch() is added to return a TupleBatch instead of a
TupleTableSlot. PlanState gains ExecProcNodeBatch as a function
pointer.
Integrate SeqScan with this interface:
* Add ExecSeqScanBatch* routines that drive heap via the batch table
AM API and return a TupleBatch.
* At init, set ps.ExecProcNodeBatch to these routines when
ScanCanUseBatching() allows.
* Retain ExecSeqScanBatchSlot* variants for slot-at-a-time consumers.
This builds on 0002, which introduced TupleBatch and made SeqScan
consume the AM’s batch API internally but still surface slots. With this
patch, SeqScan can surface batches directly to batch-aware upper nodes.
Plan shape and EXPLAIN output remain unchanged; only internal tuple flow
differs when batching is enabled and allowed.
---
src/backend/executor/execProcnode.c | 52 +++++++++++++++++++++++++++++
src/backend/executor/nodeSeqscan.c | 35 +++++++++++++++++++
src/include/executor/execScan.h | 51 ++++++++++++++++++++++++++++
src/include/executor/executor.h | 10 ++++++
src/include/nodes/execnodes.h | 5 +++
5 files changed, 153 insertions(+)
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index f5f9cfbeead..a8c0315e874 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -121,6 +121,8 @@
static TupleTableSlot *ExecProcNodeFirst(PlanState *node);
static TupleTableSlot *ExecProcNodeInstr(PlanState *node);
+static TupleBatch *ExecProcNodeBatchFirst(PlanState *node);
+static TupleBatch *ExecProcNodeBatchInstr(PlanState *node);
static bool ExecShutdownNode_walker(PlanState *node, void *context);
@@ -389,6 +391,8 @@ ExecInitNode(Plan *node, EState *estate, int eflags)
}
ExecSetExecProcNode(result, result->ExecProcNode);
+ if (result->ExecProcNodeBatch)
+ ExecSetExecProcNodeBatch(result, result->ExecProcNodeBatch);
/*
* Initialize any initPlans present in this node. The planner put them in
@@ -489,6 +493,54 @@ ExecProcNodeInstr(PlanState *node)
return result;
}
+/*
+ * ExecSetExecProcNodeBatch
+ * Install ExecProcNodeBatch with first-call wrapper, mirroring row path.
+ */
+void
+ExecSetExecProcNodeBatch(PlanState *node, ExecProcNodeBatchMtd function)
+{
+ node->ExecProcNodeBatchReal = function;
+ node->ExecProcNodeBatch = ExecProcNodeBatchFirst;
+}
+
+/*
+ * ExecProcNodeBatchFirst
+ * One-time stack-depth check; then pick instrument/no-instrument wrapper.
+ */
+static TupleBatch *
+ExecProcNodeBatchFirst(PlanState *node)
+{
+ check_stack_depth();
+
+ if (node->instrument)
+ node->ExecProcNodeBatch = ExecProcNodeBatchInstr;
+ else
+ node->ExecProcNodeBatch = node->ExecProcNodeBatchReal;
+
+ return node->ExecProcNodeBatch(node);
+}
+
+/*
+ * ExecProcNodeBatchInstr
+ * Instrumentation wrapper for batch calls.
+ *
+ * Note: we can record nrows as the "tuple" count for this call. That keeps
+ * instrumentation meaningful without changing Instr API.
+ */
+static TupleBatch *
+ExecProcNodeBatchInstr(PlanState *node)
+{
+ TupleBatch *b;
+
+ InstrStartNode(node->instrument);
+
+ b = node->ExecProcNodeBatchReal(node);
+
+ InstrStopNode(node->instrument, b ? (double) b->nvalid : 0.0);
+
+ return b;
+}
/* ----------------------------------------------------------------
* MultiExecProcNode
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 2552d420f1c..a4cf1e51af0 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -334,6 +334,37 @@ ExecSeqScanBatchSlotWithQualProject(PlanState *pstate)
pstate->qual, pstate->ps_ProjInfo);
}
+static TupleBatch *
+ExecSeqScanBatch(PlanState *pstate)
+{
+ SeqScanState *node = castNode(SeqScanState, pstate);
+
+ Assert(pstate->state->es_epq_active == NULL);
+ Assert(pstate->qual == NULL);
+ Assert(pstate->ps_ProjInfo == NULL);
+
+ return ExecScanExtendedBatch(&node->ss,
+ (ExecScanAccessBatchMtd) SeqNextBatch,
+ NULL, NULL);
+}
+
+/*
+ * Variant of ExecSeqScan() but when qual evaluation is required.
+ */
+static TupleBatch *
+ExecSeqScanBatchWithQual(PlanState *pstate)
+{
+ SeqScanState *node = castNode(SeqScanState, pstate);
+
+ Assert(pstate->state->es_epq_active == NULL);
+ pg_assume(pstate->qual != NULL);
+ Assert(pstate->ps_ProjInfo == NULL);
+
+ return ExecScanExtendedBatch(&node->ss,
+ (ExecScanAccessBatchMtd) SeqNextBatchMaterialize,
+ pstate->qual, NULL);
+}
+
/* Batch SeqScan enablement and dispatch */
static void
SeqScanInitBatching(SeqScanState *scanstate, int eflags)
@@ -348,10 +379,12 @@ SeqScanInitBatching(SeqScanState *scanstate, int eflags)
{
if (scanstate->ss.ps.ps_ProjInfo == NULL)
{
+ scanstate->ss.ps.ExecProcNodeBatch = ExecSeqScanBatch;
scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlot;
}
else
{
+ scanstate->ss.ps.ExecProcNodeBatch = NULL;
scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithProject;
}
}
@@ -359,10 +392,12 @@ SeqScanInitBatching(SeqScanState *scanstate, int eflags)
{
if (scanstate->ss.ps.ps_ProjInfo == NULL)
{
+ scanstate->ss.ps.ExecProcNodeBatch = ExecSeqScanBatchWithQual;
scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQual;
}
else
{
+ scanstate->ss.ps.ExecProcNodeBatch = NULL;
scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQualProject;
}
}
diff --git a/src/include/executor/execScan.h b/src/include/executor/execScan.h
index fec606471c8..fb4b57a831c 100644
--- a/src/include/executor/execScan.h
+++ b/src/include/executor/execScan.h
@@ -297,4 +297,55 @@ ExecScanExtendedBatchSlot(ScanState *node,
}
}
+static inline TupleBatch *
+ExecScanExtendedBatch(ScanState *node,
+ ExecScanAccessBatchMtd accessBatchMtd,
+ ExprState *qual, ProjectionInfo *projInfo)
+{
+ ExprContext *econtext = node->ps.ps_ExprContext;
+ TupleBatch *b = node->ps.ps_Batch;
+ int qualified;
+
+ /* Batch path does not support EPQ */
+ Assert(node->ps.state->es_epq_active == NULL);
+ Assert(TupleBatchIsValid(b));
+
+ for (;;)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Get next batch from the AM */
+ if (!accessBatchMtd(node))
+ return NULL;
+
+ if (qual != NULL)
+ {
+ qualified = 0;
+ while (TupleBatchHasMore(b))
+ {
+ TupleTableSlot *in = TupleBatchGetNextSlot(b);
+
+ Assert(in);
+ ResetExprContext(econtext);
+ econtext->ecxt_scantuple = in;
+
+ if (ExecQual(qual, econtext))
+ {
+ TupleBatchStoreInOut(b, qualified, in);
+ qualified++;
+ }
+ else
+ InstrCountFiltered1(node, 1);
+ }
+ TupleBatchUseOutput(b, qualified);
+ }
+ else
+ qualified = b->nvalid;
+
+ if (qualified > 0)
+ return b;
+ /* else get the next batch from the AM */
+ }
+}
+
#endif /* EXECSCAN_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 17258f7ae2d..cf5b0c7e05c 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -294,6 +294,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate);
*/
extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
extern void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function);
+extern void ExecSetExecProcNodeBatch(PlanState *node, ExecProcNodeBatchMtd function);
extern Node *MultiExecProcNode(PlanState *node);
extern void ExecEndNode(PlanState *node);
extern void ExecShutdownNode(PlanState *node);
@@ -315,6 +316,15 @@ ExecProcNode(PlanState *node)
return node->ExecProcNode(node);
}
+
+static inline TupleBatch *
+ExecProcNodeBatch(PlanState *node)
+{
+ if (node->chgParam != NULL) /* something changed? */
+ ExecReScan(node); /* let ReScan handle this */
+
+ return node->ExecProcNodeBatch(node);
+}
#endif
/*
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index f4bb8f7dd7f..a104591ac20 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1147,6 +1147,7 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (PlanState *pstate);
/* Return a batch; may reuse caller-provided envelope. NULL => end of scan. */
struct TupleBatch;
typedef struct TupleBatch TupleBatch;
+typedef TupleBatch *(*ExecProcNodeBatchMtd)(struct PlanState *ps);
/* ----------------
* PlanState node
@@ -1171,6 +1172,10 @@ typedef struct PlanState
ExecProcNodeMtd ExecProcNodeReal; /* actual function, if above is a
* wrapper */
+ /* Optional batch-producing entry point (NULL => no batching). */
+ ExecProcNodeBatchMtd ExecProcNodeBatch;
+ ExecProcNodeBatchMtd ExecProcNodeBatchReal;
+
Instrumentation *instrument; /* Optional runtime stats for this node */
WorkerInstrumentation *worker_instrument; /* per-worker instrumentation */
--
2.43.0
[application/octet-stream] v2-0002-SeqScan-add-batch-driven-variants-returning-slots.patch (27.2K, 9-v2-0002-SeqScan-add-batch-driven-variants-returning-slots.patch)
download | inline diff:
From 6a43a40037e4b656739743b3c0abdfb73a8f9b92 Mon Sep 17 00:00:00 2001
From: Amit Langote <[email protected]>
Date: Mon, 1 Sep 2025 21:59:56 +0900
Subject: [PATCH v2 2/8] SeqScan: add batch-driven variants returning slots
Teach SeqScan to drive the table AM via new the batch API added in
the previous commit, while still returning one TupleTableSlot at a
time to callers. This reduces per tuple AM crossings without
changing the node interface seen by parents.
Add TupleBatch and supporting code in execBatch.c/h to hold executor
side batching state. PlanState gains ps_Batch to carry the active
TupleBatch when a node supports batching.
Wire up runtime selection in ExecInitSeqScan using
ScanCanUseBatching(). When executor_batching is enabled, EPQ is
inactive, the scan is not backward, and the relation supports
batching, ps.ExecProcNode is set to a batch-driven variant. Otherwise
the non-batch path is used.
Plan shape and EXPLAIN output remain unchanged; only the internal
tuple flow differs when batching is enabled and allowed.
Notes / current limits:
- Batching uses EXEC_BATCH_ROWS (currently 64) as the target capacity.
- With the current heapam, batches are composed from a single page, so
the batch may not always be full. Future work may let SeqScan and/or
AMs top up batches across pages when safe to do so.
---
src/backend/access/heap/heapam.c | 29 ++++
src/backend/access/heap/heapam_handler.c | 15 ++
src/backend/access/table/tableam.c | 11 ++
src/backend/executor/Makefile | 1 +
src/backend/executor/execBatch.c | 117 ++++++++++++++
src/backend/executor/execScan.c | 31 ++++
src/backend/executor/meson.build | 1 +
src/backend/executor/nodeSeqscan.c | 176 +++++++++++++++++++++-
src/backend/utils/init/globals.c | 3 +
src/backend/utils/misc/guc_parameters.dat | 7 +
src/include/access/heapam.h | 1 +
src/include/access/tableam.h | 27 ++++
src/include/executor/execBatch.h | 102 +++++++++++++
src/include/executor/execScan.h | 54 +++++++
src/include/executor/executor.h | 4 +
src/include/miscadmin.h | 1 +
src/include/nodes/execnodes.h | 8 +
17 files changed, 587 insertions(+), 1 deletion(-)
create mode 100644 src/backend/executor/execBatch.c
create mode 100644 src/include/executor/execBatch.h
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index f62f7edbf5e..9fd7948482d 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1570,6 +1570,35 @@ heap_begin_batch(TableScanDesc sscan, int maxitems)
return hb;
}
+/*
+ * heap_scan_materialize_all
+ *
+ * Bind all tuples of the current batch into 'slots'. We bind the
+ * HeapTupleData header that points into the pinned page. No per-row copy.
+ */
+void
+heap_materialize_batch_all(void *am_batch, TupleTableSlot **slots, int n)
+{
+ HeapBatch *hb = (HeapBatch *) am_batch;
+
+ Assert(n <= hb->nitems);
+
+ for (int i = 0; i < n; i++)
+ {
+ HeapTupleData *tuple = &hb->tupdata[i];
+ HeapTupleTableSlot *slot = (HeapTupleTableSlot *) slots[i];
+
+ /* Inline of ExecStoreHeapTuple(tuple, slot, false) */
+ slot->tuple = tuple;
+ slot->off = 0;
+ slot->base.tts_nvalid = 0;
+ slot->base.tts_flags &= ~(TTS_FLAG_EMPTY | TTS_FLAG_SHOULDFREE);
+ slot->base.tts_tid = tuple->t_self;
+ slot->base.tts_tableOid = tuple->t_tableOid;
+ slot->base.tts_flags &= ~(TTS_FLAG_SHOULDFREE | TTS_FLAG_EMPTY);
+ }
+}
+
/*
* heap_scan_end_batch
*
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index ec4eeccf19c..8e88cc9e8f1 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -72,6 +72,20 @@ heapam_slot_callbacks(Relation relation)
return &TTSOpsBufferHeapTuple;
}
+/* ------------------------------------------------------------------------
+ * TupleBatch related callbacks for heap AM
+ * ------------------------------------------------------------------------
+ */
+
+static const TupleBatchOps TupleBatchHeapOps = {
+ .materialize_all = heap_materialize_batch_all
+};
+
+static const TupleBatchOps *
+heapam_batch_callbacks(Relation relation)
+{
+ return &TupleBatchHeapOps;
+}
/* ------------------------------------------------------------------------
* Index Scan Callbacks for heap AM
@@ -2617,6 +2631,7 @@ static const TableAmRoutine heapam_methods = {
.type = T_TableAmRoutine,
.slot_callbacks = heapam_slot_callbacks,
+ .batch_callbacks = heapam_batch_callbacks,
.scan_begin = heap_beginscan,
.scan_end = heap_endscan,
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index 5e41404937e..5a8ebb8b97c 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -103,6 +103,17 @@ table_slot_create(Relation relation, List **reglist)
return slot;
}
+/* ----------------------------------------------------------------------------
+ * TupleBatch support routines
+ * ----------------------------------------------------------------------------
+ */
+const TupleBatchOps *
+table_batch_callbacks(Relation relation)
+{
+ if (relation->rd_tableam)
+ return relation->rd_tableam->batch_callbacks(relation);
+ elog(ERROR, "relation does not support TupleBatch operations");
+}
/* ----------------------------------------------------------------------------
* Table scan functions.
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index 11118d0ce02..3e72f3fe03c 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
execAmi.o \
execAsync.o \
+ execBatch.o \
execCurrent.o \
execExpr.o \
execExprInterp.o \
diff --git a/src/backend/executor/execBatch.c b/src/backend/executor/execBatch.c
new file mode 100644
index 00000000000..007ae535687
--- /dev/null
+++ b/src/backend/executor/execBatch.c
@@ -0,0 +1,117 @@
+/*-------------------------------------------------------------------------
+ *
+ * execBatch.c
+ * Helpers for TupleBatch
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execBatch.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "executor/execBatch.h"
+
+/*
+ * TupleBatchCreate
+ * Allocate and initialize a new TupleBatch envelope.
+ */
+TupleBatch *
+TupleBatchCreate(TupleDesc scandesc, int capacity)
+{
+ TupleBatch *b;
+ TupleTableSlot **inslots,
+ **outslots;
+
+ inslots = palloc(sizeof(TupleTableSlot *) * capacity);
+ outslots = palloc(sizeof(TupleTableSlot *) * capacity);
+ for (int i = 0; i < capacity; i++)
+ inslots[i] = MakeSingleTupleTableSlot(scandesc, &TTSOpsHeapTuple);
+
+ b = (TupleBatch *) palloc(sizeof(TupleBatch));
+
+ /* Initial state: empty envelope */
+ b->am_payload = NULL;
+ b->ntuples = 0;
+ b->inslots = inslots;
+ b->outslots = outslots;
+ b->activeslots = NULL;
+ b->outslots = outslots;
+ b->maxslots = capacity;
+
+ b->nvalid = 0;
+ b->next = 0;
+
+ return b;
+}
+
+/*
+ * TupleBatchReset
+ * Reset an existing TupleBatch envelope to empty.
+ */
+void
+TupleBatchReset(TupleBatch *b, bool drop_slots)
+{
+ if (b == NULL)
+ return;
+
+ for (int i = 0; i < b->maxslots; i++)
+ {
+ ExecClearTuple(b->inslots[i]);
+ if (drop_slots)
+ ExecDropSingleTupleTableSlot(b->inslots[i]);
+ }
+
+ if (drop_slots)
+ {
+ pfree(b->inslots);
+ pfree(b->outslots);
+ b->inslots = b->outslots = NULL;
+ }
+
+ b->ntuples = 0;
+ b->nvalid = 0;
+ b->next = 0;
+ b->activeslots = NULL;
+}
+
+void
+TupleBatchUseInput(TupleBatch *b, int nvalid)
+{
+ b->materialized = true;
+ b->activeslots = b->inslots;
+ b->nvalid = nvalid;
+ b->next = 0;
+}
+
+void
+TupleBatchUseOutput(TupleBatch *b, int nvalid)
+{
+ b->materialized = true;
+ b->activeslots = b->outslots;
+ b->nvalid = nvalid;
+ b->next = 0;
+}
+
+bool
+TupleBatchIsValid(TupleBatch *b)
+{
+ return b != NULL &&
+ b->maxslots > 0 &&
+ b->inslots != NULL &&
+ b->outslots != NULL;
+}
+
+void
+TupleBatchRewind(TupleBatch *b)
+{
+ b->next = 0;
+}
+
+int
+TupleBatchGetNumValid(TupleBatch *b)
+{
+ return b->nvalid;
+}
diff --git a/src/backend/executor/execScan.c b/src/backend/executor/execScan.c
index 90726949a87..f24c5d73ae1 100644
--- a/src/backend/executor/execScan.c
+++ b/src/backend/executor/execScan.c
@@ -18,6 +18,7 @@
*/
#include "postgres.h"
+#include "access/tableam.h"
#include "executor/executor.h"
#include "executor/execScan.h"
#include "miscadmin.h"
@@ -154,3 +155,33 @@ ExecScanReScan(ScanState *node)
}
}
}
+
+bool
+ScanCanUseBatching(ScanState *scanstate, int eflags)
+{
+ Relation relation = scanstate->ss_currentRelation;
+
+ return executor_batching &&
+ (scanstate->ps.state->es_epq_active == NULL) &&
+ !(eflags & EXEC_FLAG_BACKWARD) &&
+ relation && table_supports_batching(relation);
+}
+
+void
+ScanResetBatching(ScanState *scanstate, bool drop)
+{
+ TupleBatch *b = scanstate->ps.ps_Batch;
+
+ if (b)
+ {
+ TupleBatchReset(b, drop);
+ if (b->am_payload)
+ {
+ table_scan_end_batch(scanstate->ss_currentScanDesc,
+ b->am_payload);
+ b->am_payload = NULL;
+ }
+ if (drop)
+ pfree(b);
+ }
+}
diff --git a/src/backend/executor/meson.build b/src/backend/executor/meson.build
index 2cea41f8771..40ffc28f3cb 100644
--- a/src/backend/executor/meson.build
+++ b/src/backend/executor/meson.build
@@ -3,6 +3,7 @@
backend_sources += files(
'execAmi.c',
'execAsync.c',
+ 'execBatch.c',
'execCurrent.c',
'execExpr.c',
'execExprInterp.c',
diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c
index 94047d29430..2552d420f1c 100644
--- a/src/backend/executor/nodeSeqscan.c
+++ b/src/backend/executor/nodeSeqscan.c
@@ -203,6 +203,171 @@ ExecSeqScanEPQ(PlanState *pstate)
(ExecScanRecheckMtd) SeqRecheck);
}
+/* ----------------------------------------------------------------
+ * Batch Support
+ * ----------------------------------------------------------------
+ */
+static inline bool
+SeqNextBatch(SeqScanState *node)
+{
+ TableScanDesc scandesc;
+ EState *estate;
+ ScanDirection direction;
+
+ Assert(node->ss.ps.ps_Batch != NULL);
+
+ /*
+ * get information from the estate and scan state
+ */
+ scandesc = node->ss.ss_currentScanDesc;
+ estate = node->ss.ps.state;
+ direction = estate->es_direction;
+ Assert(direction == ForwardScanDirection);
+
+ if (scandesc == NULL)
+ {
+ /*
+ * We reach here if the scan is not parallel, or if we're serially
+ * executing a scan that was planned to be parallel.
+ */
+ scandesc = table_beginscan(node->ss.ss_currentRelation,
+ estate->es_snapshot,
+ 0, NULL);
+ node->ss.ss_currentScanDesc = scandesc;
+ }
+
+ /* Lazily create the AM batch payload. */
+ if (node->ss.ps.ps_Batch->am_payload == NULL)
+ {
+ const TableAmRoutine *tam PG_USED_FOR_ASSERTS_ONLY = scandesc->rs_rd->rd_tableam;
+
+ Assert(tam && tam->scan_begin_batch);
+ node->ss.ps.ps_Batch->am_payload =
+ table_scan_begin_batch(scandesc, node->ss.ps.ps_Batch->maxslots);
+ node->ss.ps.ps_Batch->ops = table_batch_callbacks(node->ss.ss_currentRelation);
+ }
+
+ node->ss.ps.ps_Batch->ntuples =
+ table_scan_getnextbatch(scandesc, node->ss.ps.ps_Batch->am_payload, direction);
+ node->ss.ps.ps_Batch->nvalid = node->ss.ps.ps_Batch->ntuples;
+ node->ss.ps.ps_Batch->materialized = false;
+
+ return node->ss.ps.ps_Batch->ntuples > 0;
+}
+
+static inline bool
+SeqNextBatchMaterialize(SeqScanState *node)
+{
+ if (SeqNextBatch(node))
+ {
+ TupleBatchMaterializeAll(node->ss.ps.ps_Batch);
+ return true;
+ }
+
+ return false;
+}
+
+static TupleTableSlot *
+ExecSeqScanBatchSlot(PlanState *pstate)
+{
+ SeqScanState *node = castNode(SeqScanState, pstate);
+
+ Assert(pstate->state->es_epq_active == NULL);
+ Assert(pstate->qual == NULL);
+ Assert(pstate->ps_ProjInfo == NULL);
+
+ return ExecScanExtendedBatchSlot(&node->ss,
+ (ExecScanAccessBatchMtd) SeqNextBatchMaterialize,
+ NULL, NULL);
+}
+
+static TupleTableSlot *
+ExecSeqScanBatchSlotWithQual(PlanState *pstate)
+{
+ SeqScanState *node = castNode(SeqScanState, pstate);
+
+ /*
+ * Use pg_assume() for != NULL tests to make the compiler realize no
+ * runtime check for the field is needed in ExecScanExtended().
+ */
+ Assert(pstate->state->es_epq_active == NULL);
+ pg_assume(pstate->qual != NULL);
+ Assert(pstate->ps_ProjInfo == NULL);
+
+ return ExecScanExtendedBatchSlot(&node->ss,
+ (ExecScanAccessBatchMtd) SeqNextBatchMaterialize,
+ pstate->qual, NULL);
+}
+
+/*
+ * Variant of ExecSeqScan() but when projection is required.
+ */
+static TupleTableSlot *
+ExecSeqScanBatchSlotWithProject(PlanState *pstate)
+{
+ SeqScanState *node = castNode(SeqScanState, pstate);
+
+ Assert(pstate->state->es_epq_active == NULL);
+ Assert(pstate->qual == NULL);
+ pg_assume(pstate->ps_ProjInfo != NULL);
+
+ return ExecScanExtendedBatchSlot(&node->ss,
+ (ExecScanAccessBatchMtd) SeqNextBatchMaterialize,
+ NULL, pstate->ps_ProjInfo);
+}
+
+/*
+ * Variant of ExecSeqScan() but when qual evaluation and projection are
+ * required.
+ */
+static TupleTableSlot *
+ExecSeqScanBatchSlotWithQualProject(PlanState *pstate)
+{
+ SeqScanState *node = castNode(SeqScanState, pstate);
+
+ Assert(pstate->state->es_epq_active == NULL);
+ pg_assume(pstate->qual != NULL);
+ pg_assume(pstate->ps_ProjInfo != NULL);
+
+ return ExecScanExtendedBatchSlot(&node->ss,
+ (ExecScanAccessBatchMtd) SeqNextBatchMaterialize,
+ pstate->qual, pstate->ps_ProjInfo);
+}
+
+/* Batch SeqScan enablement and dispatch */
+static void
+SeqScanInitBatching(SeqScanState *scanstate, int eflags)
+{
+ const int cap = EXEC_BATCH_ROWS;
+ TupleDesc scandesc = RelationGetDescr(scanstate->ss.ss_currentRelation);
+
+ scanstate->ss.ps.ps_Batch = TupleBatchCreate(scandesc, cap);
+
+ /* Choose batch variant to preserve your specialization matrix */
+ if (scanstate->ss.ps.qual == NULL)
+ {
+ if (scanstate->ss.ps.ps_ProjInfo == NULL)
+ {
+ scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlot;
+ }
+ else
+ {
+ scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithProject;
+ }
+ }
+ else
+ {
+ if (scanstate->ss.ps.ps_ProjInfo == NULL)
+ {
+ scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQual;
+ }
+ else
+ {
+ scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQualProject;
+ }
+ }
+}
+
/* ----------------------------------------------------------------
* ExecInitSeqScan
* ----------------------------------------------------------------
@@ -211,6 +376,7 @@ SeqScanState *
ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
{
SeqScanState *scanstate;
+ bool use_batching;
/*
* Once upon a time it was possible to have an outerPlan of a SeqScan, but
@@ -241,9 +407,12 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
node->scan.scanrelid,
eflags);
+ use_batching = ScanCanUseBatching(&scanstate->ss, eflags);
+
/* and create slot with the appropriate rowtype */
ExecInitScanTupleSlot(estate, &scanstate->ss,
RelationGetDescr(scanstate->ss.ss_currentRelation),
+ use_batching ? &TTSOpsHeapTuple :
table_slot_callbacks(scanstate->ss.ss_currentRelation));
/*
@@ -280,6 +449,9 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
scanstate->ss.ps.ExecProcNode = ExecSeqScanWithQualProject;
}
+ if (use_batching)
+ SeqScanInitBatching(scanstate, eflags);
+
return scanstate;
}
@@ -299,6 +471,8 @@ ExecEndSeqScan(SeqScanState *node)
*/
scanDesc = node->ss.ss_currentScanDesc;
+ ScanResetBatching(&node->ss, true);
+
/*
* close heap scan
*/
@@ -327,7 +501,7 @@ ExecReScanSeqScan(SeqScanState *node)
if (scan != NULL)
table_rescan(scan, /* scan desc */
NULL); /* new scan keys */
-
+ ScanResetBatching(&node->ss, false);
ExecScanReScan((ScanState *) node);
}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index d31cb45a058..b4a0996a717 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -165,3 +165,6 @@ int notify_buffers = 16;
int serializable_buffers = 32;
int subtransaction_buffers = 0;
int transaction_buffers = 0;
+
+/* executor batching */
+bool executor_batching = false;
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 6bc6be13d2a..c9fbb7ffef9 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -880,6 +880,13 @@
boot_val => 'true',
},
+{ name => 'executor_batching', type => 'bool', context => 'PGC_USERSET', group => 'DEVELOPER_OPTIONS',
+ short_desc => 'Use tuple batching during execution.',
+ flags => 'GUC_NOT_IN_SAMPLE',
+ variable => 'executor_batching',
+ boot_val => 'true',
+},
+
{ name => 'data_sync_retry', type => 'bool', context => 'PGC_POSTMASTER', group => 'ERROR_HANDLING_OPTIONS',
short_desc => 'Whether to continue running after a failure to sync data files.',
variable => 'data_sync_retry',
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 02f7793fba0..13ce6166ec3 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -314,6 +314,7 @@ extern bool heap_getnextslot(TableScanDesc sscan,
extern void *heap_begin_batch(TableScanDesc sscan, int maxitems);
extern void heap_end_batch(TableScanDesc sscan, void *am_batch);
extern int heap_getnextbatch(TableScanDesc sscan, void *am_batch, ScanDirection dir);
+extern void heap_materialize_batch_all(void *am_batch, TupleTableSlot **slots, int n);
extern void heap_set_tidrange(TableScanDesc sscan, ItemPointer mintid,
ItemPointer maxtid);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 953207eac50..05f828b9762 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -21,6 +21,7 @@
#include "access/sdir.h"
#include "access/xact.h"
#include "commands/vacuum.h"
+#include "executor/execBatch.h"
#include "executor/tuptable.h"
#include "storage/read_stream.h"
#include "utils/rel.h"
@@ -39,6 +40,7 @@ typedef struct BulkInsertStateData BulkInsertStateData;
typedef struct IndexInfo IndexInfo;
typedef struct SampleScanState SampleScanState;
typedef struct ValidateIndexState ValidateIndexState;
+typedef struct TupleBatchOps TupleBatchOps;
/*
* Bitmask values for the flags argument to the scan_begin callback.
@@ -301,6 +303,7 @@ typedef struct TableAmRoutine
* Return slot implementation suitable for storing a tuple of this AM.
*/
const TupleTableSlotOps *(*slot_callbacks) (Relation rel);
+ const TupleBatchOps *(*batch_callbacks)(Relation rel);
/* ------------------------------------------------------------------------
@@ -361,6 +364,7 @@ typedef struct TableAmRoutine
ScanDirection dir);
void (*scan_end_batch)(TableScanDesc sscan, void *am_batch);
+
/*-----------
* Optional functions to provide scanning for ranges of ItemPointers.
* Implementations must either provide both of these functions, or neither
@@ -872,6 +876,16 @@ extern const TupleTableSlotOps *table_slot_callbacks(Relation relation);
*/
extern TupleTableSlot *table_slot_create(Relation relation, List **reglist);
+/* ----------------------------------------------------------------------------
+ * TupleBatch functions.
+ * ----------------------------------------------------------------------------
+ */
+
+/*
+ * Returns callbacks for manipulating TupleBatch for tuples of the given
+ * relation.
+ */
+extern const TupleBatchOps *table_batch_callbacks(Relation relation);
/* ----------------------------------------------------------------------------
* Table scan functions.
@@ -1046,6 +1060,18 @@ table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableS
return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
}
+/*
+ * table_supports_batching
+ * Does the relation's AM support batching?
+ */
+static inline bool
+table_supports_batching(Relation relation)
+{
+ const TableAmRoutine *tam = relation->rd_tableam;
+
+ return tam->scan_getnextbatch != NULL;
+}
+
/*
* table_scan_begin_batch
* Allocate AM-owned batch payload with capacity 'maxitems'.
@@ -2116,5 +2142,6 @@ extern const TableAmRoutine *GetTableAmRoutine(Oid amhandler);
*/
extern const TableAmRoutine *GetHeapamTableAmRoutine(void);
+extern struct TupleBatchOps *GetHeapamTupleBatchOps(void);
#endif /* TABLEAM_H */
diff --git a/src/include/executor/execBatch.h b/src/include/executor/execBatch.h
new file mode 100644
index 00000000000..6f1a38d14bd
--- /dev/null
+++ b/src/include/executor/execBatch.h
@@ -0,0 +1,102 @@
+/*-------------------------------------------------------------------------
+ *
+ * execBatch.h
+ * Executor batch envelope for passing tuple batch state upward
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/executor/execBatch.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef EXECBATCH_H
+#define EXECBATCH_H
+
+#include "executor/tuptable.h"
+
+/* XXX fixed 64 for PoC */
+#define EXEC_BATCH_ROWS 64
+
+/*
+ * TupleBatchOps -- AM-specific helpers for lazy materialization.
+ */
+typedef struct TupleBatchOps
+{
+ void (*materialize_all)(void *am_payload,
+ TupleTableSlot **dst,
+ int maxslots);
+} TupleBatchOps;
+
+/*
+ * TupleBatch
+ *
+ * Envelope for a batch of tuples produced by a plan node (e.g., SeqScan) per
+ * call to a batch variant of ExecSeqScan().
+ */
+typedef struct TupleBatch
+{
+ void *am_payload;
+ const TupleBatchOps *ops;
+ int ntuples; /* number of tuples in am_payload */
+ bool materialized; /* tuples in slots valid? */
+ struct TupleTableSlot **inslots; /* slots for tuples read "into" batch */
+ struct TupleTableSlot **outslots; /* slots for tuples going "out of"
+ * batch */
+ struct TupleTableSlot **activeslots;
+ int maxslots;
+
+ int nvalid; /* number of returnable tuples in outslots */
+ int next; /* 0-based index of next tuple to be returned */
+} TupleBatch;
+
+
+/* Helpers */
+extern TupleBatch *TupleBatchCreate(TupleDesc scandesc, int capacity);
+extern void TupleBatchReset(TupleBatch *b, bool drop_slots);
+extern void TupleBatchUseInput(TupleBatch *b, int nvalid);
+extern void TupleBatchUseOutput(TupleBatch *b, int nvalid);
+extern bool TupleBatchIsValid(TupleBatch *b);
+extern void TupleBatchRewind(TupleBatch *b);
+extern int TupleBatchGetNumValid(TupleBatch *b);
+
+static inline TupleTableSlot *
+TupleBatchGetNextSlot(TupleBatch *b)
+{
+ return b->next < b->nvalid ? b->activeslots[b->next++] : NULL;
+}
+
+static inline TupleTableSlot *
+TupleBatchGetSlot(TupleBatch *b, int index)
+{
+ Assert(index < b->nvalid);
+ return b->activeslots[index];
+}
+
+static inline void
+TupleBatchStoreInOut(TupleBatch *b, int index, TupleTableSlot *out)
+{
+ Assert(TupleBatchIsValid(b));
+ b->outslots[index] = out;
+}
+
+static inline bool
+TupleBatchHasMore(TupleBatch *b)
+{
+ return b->activeslots && b->next < b->nvalid;
+}
+
+static inline void
+TupleBatchMaterializeAll(TupleBatch *b)
+{
+ if (b->materialized)
+ return;
+
+ if (b->ops == NULL || b->ops->materialize_all == NULL)
+ elog(ERROR, "TupleBatch has no slots and no materialize_all op");
+
+ b->ops->materialize_all(b->am_payload, b->inslots, b->ntuples);
+ TupleBatchUseInput(b, b->ntuples);
+}
+
+#endif /* EXECBATCH_H */
diff --git a/src/include/executor/execScan.h b/src/include/executor/execScan.h
index 837ea7785bb..fec606471c8 100644
--- a/src/include/executor/execScan.h
+++ b/src/include/executor/execScan.h
@@ -243,4 +243,58 @@ ExecScanExtended(ScanState *node,
}
}
+static inline TupleTableSlot *
+ExecScanExtendedBatchSlot(ScanState *node,
+ ExecScanAccessBatchMtd accessBatchMtd,
+ ExprState *qual, ProjectionInfo *projInfo)
+{
+ ExprContext *econtext = node->ps.ps_ExprContext;
+ TupleBatch *b = node->ps.ps_Batch;
+
+ /* Batch path does not support EPQ */
+ Assert(node->ps.state->es_epq_active == NULL);
+ Assert(TupleBatchIsValid(b));
+
+ for (;;)
+ {
+ TupleTableSlot *in;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* Get next input slot from current batch, or refill */
+ if (!TupleBatchHasMore(b))
+ {
+ if (!accessBatchMtd(node))
+ return NULL;
+ }
+
+ in = TupleBatchGetNextSlot(b);
+ Assert(in);
+
+ /* No qual, no projection: direct return */
+ if (qual == NULL && projInfo == NULL)
+ return in;
+
+ ResetExprContext(econtext);
+ econtext->ecxt_scantuple = in;
+
+ /* Qual only */
+ if (projInfo == NULL)
+ {
+ if (qual == NULL || ExecQual(qual, econtext))
+ return in;
+ else
+ InstrCountFiltered1(node, 1);
+ continue;
+ }
+
+ /* Projection (with or without qual) */
+ if (qual == NULL || ExecQual(qual, econtext))
+ return ExecProject(projInfo);
+ else
+ InstrCountFiltered1(node, 1);
+ /* else try next tuple */
+ }
+}
+
#endif /* EXECSCAN_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 3248e78cd28..17258f7ae2d 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -575,12 +575,16 @@ extern Datum ExecMakeFunctionResultSet(SetExprState *fcache,
*/
typedef TupleTableSlot *(*ExecScanAccessMtd) (ScanState *node);
typedef bool (*ExecScanRecheckMtd) (ScanState *node, TupleTableSlot *slot);
+typedef bool (*ExecScanAccessBatchMtd)(ScanState *node);
extern TupleTableSlot *ExecScan(ScanState *node, ExecScanAccessMtd accessMtd,
ExecScanRecheckMtd recheckMtd);
+
extern void ExecAssignScanProjectionInfo(ScanState *node);
extern void ExecAssignScanProjectionInfoWithVarno(ScanState *node, int varno);
extern void ExecScanReScan(ScanState *node);
+extern bool ScanCanUseBatching(ScanState *scanstate, int eflags);
+extern void ScanResetBatching(ScanState *scanstate, bool drop);
/*
* prototypes from functions in execTuples.c
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 1bef98471c3..b8e7afda57c 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -287,6 +287,7 @@ extern PGDLLIMPORT double VacuumCostDelay;
extern PGDLLIMPORT int VacuumCostBalance;
extern PGDLLIMPORT bool VacuumCostActive;
+extern PGDLLIMPORT bool executor_batching;
/* in utils/misc/stack_depth.c */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index a36653c37f9..f4bb8f7dd7f 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -30,6 +30,7 @@
#define EXECNODES_H
#include "access/tupconvert.h"
+#include "executor/execBatch.h"
#include "executor/instrument.h"
#include "fmgr.h"
#include "lib/ilist.h"
@@ -1143,6 +1144,10 @@ typedef struct JsonExprState
*/
typedef TupleTableSlot *(*ExecProcNodeMtd) (PlanState *pstate);
+/* Return a batch; may reuse caller-provided envelope. NULL => end of scan. */
+struct TupleBatch;
+typedef struct TupleBatch TupleBatch;
+
/* ----------------
* PlanState node
*
@@ -1198,6 +1203,9 @@ typedef struct PlanState
ExprContext *ps_ExprContext; /* node's expression-evaluation context */
ProjectionInfo *ps_ProjInfo; /* info for doing tuple projection */
+ /* Batching state if node supports it. */
+ TupleBatch *ps_Batch;
+
bool async_capable; /* true if node is async-capable */
/*
--
2.43.0
view thread (22+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: Batching in executor
In-Reply-To: <CA+HiwqF-31mmZ2hhnBLuWpu1UYTVPXoEzKBW6wrf96KpY=AU7A@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox