From 9eea71db3c7bb137e676ad0a27f6256d9c6971f0 Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Tue, 9 Sep 2025 21:43:29 +0900 Subject: [PATCH v3 7/9] WIP: Add EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT The new EEOP runs a plain aggregate transition over a TupleBatch with a single fmgr call. Batch vectors are passed to the transfn via AggBulkArgs stored in fcinfo->flinfo->fn_extra, avoiding per-row fmgr overhead. Gate selection with AggTransfnSupportsBulk(), an allowlist of built-in transfns updated to accept AggBulkArgs. Some integer transfns are taught to read AggBulkArgs when present, else fall back. Rowloop batching remains available; unsupported aggregates keep the row path. --- src/backend/executor/execExpr.c | 28 ++++++++++++++++- src/backend/executor/execExprInterp.c | 43 ++++++++++++++++++++++++++ src/backend/executor/nodeAgg.c | 1 - src/backend/jit/llvm/llvmjit_expr.c | 1 + src/backend/utils/adt/int.c | 32 +++++++++++++++++++ src/backend/utils/adt/int8.c | 44 +++++++++++++++++++++++++++ src/backend/utils/adt/numeric.c | 17 +++++++++++ src/include/executor/execExpr.h | 1 + src/include/executor/executor.h | 20 ++++++++++++ 9 files changed, 185 insertions(+), 2 deletions(-) diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index af5ed8b6368..27a5780f557 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -47,6 +47,7 @@ #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/jsonfuncs.h" #include "utils/jsonpath.h" #include "utils/lsyscache.h" @@ -3692,6 +3693,28 @@ AggTransCanUseBatch(AggState *as, AggStatePerTrans pt) return true; } +/* Return true if this transfn OID is known to accept AggBulkArgs. */ +static bool +AggTransfnSupportsBulk(Oid fn_oid) +{ + /* Phase 1: hard-coded allowlist of built-ins you updated. */ + static const Oid ok[] = + { + F_INT8INC_ANY, /* COUNT(*) transfn */ + F_INT8INC, /* COUNT(arg) transfn */ + F_INT4_SUM, /* SUM(int) transfn */ + F_INT4SMALLER, /* MIN(int) transfn */ + F_INT4LARGER, /* MAX(int) transfn */ + /* add others you make bulk-aware */ + InvalidOid + }; + + for (int i = 0; OidIsValid(ok[i]); i++) + if (ok[i] == fn_oid) + return true; + return false; +} + /* * Build transition/combine function invocations for all aggregate transition * / combination function invocations in a grouping sets phase. This has to @@ -4150,7 +4173,10 @@ ExecBuildAggTransCall(ExprState *state, AggState *aggstate, { if (bv) bvs = BatchVectorSliceFromExprArgs(pertrans->aggref->args, bv); - scratch->opcode = EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP; + if (!AggTransfnSupportsBulk(pertrans->transfn_oid)) + scratch->opcode = EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP; + else + scratch->opcode = EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT; } else if (pertrans->transtypeByVal) { diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index 3176679b346..41ad9b4838d 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -607,6 +607,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) &&CASE_EEOP_BUILD_OUTER_BATCH_VECTOR, &&CASE_EEOP_BUILD_SCAN_BATCH_VECTOR, &&CASE_EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP, + &&CASE_EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT, &&CASE_EEOP_LAST }; @@ -2345,6 +2346,14 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) EEO_NEXT(); } + EEO_CASE(EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT) + { + /* too complex for an inline implementation */ + ExecAggPlainTransBatch(state, op, econtext); + + EEO_NEXT(); + } + EEO_CASE(EEOP_LAST) { /* unreachable */ @@ -6138,6 +6147,40 @@ ExecAggPlainTransBatch(ExprState *state, ExprEvalStep *op, ExprContext *econtext pergroup->transValueIsNull = fcinfo->isnull; } break; + + case EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT: + { + void *save = fcinfo->flinfo->fn_extra; + AggBulkArgs ba = {batch_nrows, start_row}; + + if (bvs) + { + const BatchVector *bv = bvs->bv; + + Assert(bv); + ba.nargs = bvs->nargs; + ba.argoffs = bvs->argoffs; + ba.args = bv->cols; + ba.isnull = bv->nulls; + ba.hasnull = bv->hasnull; + } + fcinfo->flinfo->fn_extra = &ba; + fcinfo->args[0].value = pergroup->transValue; + fcinfo->args[0].isnull = pergroup->transValueIsNull; + fcinfo->isnull = false; /* just in case transfn doesn't set it */ + newVal = FunctionCallInvoke(fcinfo); /* one call for the entire slice */ + if (!pertrans->transtypeByVal && + DatumGetPointer(newVal) != DatumGetPointer(pergroup->transValue)) + newVal = ExecAggCopyTransValue(aggstate, pertrans, + newVal, fcinfo->isnull, + pergroup->transValue, + pergroup->transValueIsNull); + pergroup->transValue = newVal; + pergroup->transValueIsNull = fcinfo->isnull; + fcinfo->flinfo->fn_extra = save; + } + break; + default: elog(ERROR, "invalid ExprEvalOp in ExecAggPlainTransBatch()"); } diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 662d8bef43b..a2286ef5e54 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -2687,7 +2687,6 @@ agg_retrieve_direct_batch(AggState *aggstate) initialize_aggregates(aggstate, aggstate->pergroups, Max(aggstate->phase->numsets, 1)); - if (aggstate->grp_firstTuple) { ExecForceStoreHeapTuple(aggstate->grp_firstTuple, firstSlot, true); diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c index efb3ee639fc..45346124bd7 100644 --- a/src/backend/jit/llvm/llvmjit_expr.c +++ b/src/backend/jit/llvm/llvmjit_expr.c @@ -3026,6 +3026,7 @@ llvm_compile_expr(ExprState *state) LLVMBuildBr(b, opblocks[opno + 1]); break; + case EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT: case EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP: build_EvalXFunc(b, mod, "ExecAggPlainTransBatch", v_state, op, v_econtext); diff --git a/src/backend/utils/adt/int.c b/src/backend/utils/adt/int.c index b5781989a64..eb1780b5590 100644 --- a/src/backend/utils/adt/int.c +++ b/src/backend/utils/adt/int.c @@ -1363,18 +1363,50 @@ int2smaller(PG_FUNCTION_ARGS) Datum int4larger(PG_FUNCTION_ARGS) { + AggBulkArgs *ba = AggGetBulkArgs(fcinfo); int32 arg1 = PG_GETARG_INT32(0); int32 arg2 = PG_GETARG_INT32(1); + if (unlikely(ba)) + { + int32 result = arg1; + + for (int i = ba->start_row; i < ba->nrows; i++) + { + if (!ba->isnull[ba->argoffs[0]][i]) + { + arg2 = (int32) ba->args[ba->argoffs[0]][i]; + if (arg2 > result) + result = arg2; + } + } + PG_RETURN_INT32(result); + } PG_RETURN_INT32((arg1 > arg2) ? arg1 : arg2); } Datum int4smaller(PG_FUNCTION_ARGS) { + AggBulkArgs *ba = AggGetBulkArgs(fcinfo); int32 arg1 = PG_GETARG_INT32(0); int32 arg2 = PG_GETARG_INT32(1); + if (unlikely(ba)) + { + int32 result = arg1; + + for (int i = ba->start_row; i < ba->nrows; i++) + { + if (!ba->isnull[ba->argoffs[0]][i]) + { + arg2 = ba->args[ba->argoffs[0]][i]; + if (arg2 < result) + result = arg2; + } + } + PG_RETURN_INT32(result); + } PG_RETURN_INT32((arg1 < arg2) ? arg1 : arg2); } diff --git a/src/backend/utils/adt/int8.c b/src/backend/utils/adt/int8.c index bdea490202a..bbabf4e0785 100644 --- a/src/backend/utils/adt/int8.c +++ b/src/backend/utils/adt/int8.c @@ -461,10 +461,28 @@ int8up(PG_FUNCTION_ARGS) Datum int8pl(PG_FUNCTION_ARGS) { + AggBulkArgs *ba = AggGetBulkArgs(fcinfo); int64 arg1 = PG_GETARG_INT64(0); int64 arg2 = PG_GETARG_INT64(1); int64 result; + if (unlikely(ba)) + { + result = arg1; + for (int i = ba->start_row; i < ba->nrows; i++) + { + if (!ba->isnull[ba->argoffs[0]][i]) + { + arg2 = ba->args[ba->argoffs[0]][i]; + if (unlikely(pg_add_s64_overflow(arg1, arg2, &result))) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("bigint out of range"))); + arg1 = result; + } + } + PG_RETURN_INT64(result); + } if (unlikely(pg_add_s64_overflow(arg1, arg2, &result))) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), @@ -718,9 +736,35 @@ int8lcm(PG_FUNCTION_ARGS) Datum int8inc(PG_FUNCTION_ARGS) { + AggBulkArgs *ba = AggGetBulkArgs(fcinfo); int64 arg = PG_GETARG_INT64(0); int64 result; + if (unlikely(ba)) + { + result = arg; + if (!ba->hasnull || ba->nargs == 0) + { + if (unlikely(pg_add_s64_overflow(arg, ba->nrows, &result))) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("bigint out of range"))); + PG_RETURN_INT64(result); + } + for (int i = ba->start_row; i < ba->nrows; i++) + { + if (!ba->isnull[ba->argoffs[0]][i]) + { + if (unlikely(pg_add_s64_overflow(arg, 1, &result))) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("bigint out of range"))); + arg = result; + } + } + PG_RETURN_INT64(result); + } + if (unlikely(pg_add_s64_overflow(arg, 1, &result))) ereport(ERROR, (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c index 2501007d981..907c4fddba0 100644 --- a/src/backend/utils/adt/numeric.c +++ b/src/backend/utils/adt/numeric.c @@ -6310,6 +6310,23 @@ int4_sum(PG_FUNCTION_ARGS) { int64 oldsum; int64 newval; + AggBulkArgs *ba = AggGetBulkArgs(fcinfo); + + if (unlikely(ba)) + { + int64 result = (!PG_ARGISNULL(0) ? PG_GETARG_INT64(0) : 0); + + for (int i = ba->start_row; i < ba->nrows; i++) + { + if (!ba->isnull[ba->argoffs[0]][i]) + { + int32 arg2 = ba->args[ba->argoffs[0]][i]; + + result = result + arg2; + } + } + PG_RETURN_INT64(result); + } if (PG_ARGISNULL(0)) { diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index 1d33e084b69..f24782ecf58 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -304,6 +304,7 @@ typedef enum ExprEvalOp /* Batched aggregate trans evaluation */ EEOP_AGG_PLAIN_TRANS_BATCH_ROWLOOP, /* per-row fmgr calls */ + EEOP_AGG_PLAIN_TRANS_BATCH_DIRECT, /* call transfn once with AggBulkArgs */ /* non-existent operation, used e.g. to check array lengths */ EEOP_LAST diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 5ba9a523970..c72bd755b79 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -561,6 +561,26 @@ ExecQualAndReset(ExprState *state, ExprContext *econtext) } #endif +#ifndef FRONTEND +/* Per-call bulk argument vectors for batched aggregate trans functions. */ +typedef struct AggBulkArgs +{ + int nrows; /* number of rows in this batch */ + int start_row; + int16 *argoffs; + int nargs; /* number of argument vectors */ + Datum **args; /* args[j][i] = j-th arg at row i */ + bool **isnull; /* isnull[j][i] */ + bool hasnull; /* is any datum in args NULL? */ +} AggBulkArgs; + +static inline AggBulkArgs * +AggGetBulkArgs(FunctionCallInfo fcinfo) +{ + return (AggBulkArgs *) (fcinfo->flinfo ? fcinfo->flinfo->fn_extra : NULL); +} +#endif + extern bool ExecCheck(ExprState *state, ExprContext *econtext); /* -- 2.47.3