From e76a49df42dbf22a3169eb2e1d880d9282c1f02f Mon Sep 17 00:00:00 2001 From: Amit Langote Date: Thu, 5 Mar 2026 11:28:16 +0900 Subject: [PATCH v7 4/5] SeqScan: add batch-driven variants returning slots Teach SeqScan to drive the table AM via the new batch API added in the previous commit, while still returning one TupleTableSlot at a time to callers. This reduces per-tuple AM crossings without changing the node interface seen by parents. SeqScanState gains a RowBatch pointer that holds the current batch when batching is active. Batch state is localized to SeqScanState -- no changes to PlanState or ScanState. Add executor_batch_rows GUC (DEVELOPER_OPTIONS, default 64) to control the maximum batch size. Setting it to 0 disables batching. XXX currently ignored when reading from heapam tables. Wire up runtime selection in ExecInitSeqScan via SeqScanCanUseBatching(). When executor_batch_rows > 1, EPQ is inactive, the scan is forward-only, and the relation's AM supports batching, ExecProcNode is set to a batch-driven variant. Otherwise the non-batch path is used with zero overhead. Plan shape and EXPLAIN output remain unchanged; only the internal tuple flow differs when batching is enabled. Reviewed-by: Daniil Davydov <3danissimo@gmail.com> Reviewed-by: ChangAo Chen <2624345507@qq.com> Discussion: https://postgr.es/m/CA+HiwqFfAY_ZFqN8wcAEMw71T9hM_kA8UtyHaZZEZtuT3UyogA@mail.gmail.com --- src/backend/executor/nodeSeqscan.c | 278 ++++++++++++++++++++++ src/backend/utils/init/globals.c | 3 + src/backend/utils/misc/guc_parameters.dat | 9 + src/include/miscadmin.h | 1 + src/include/nodes/execnodes.h | 2 + 5 files changed, 293 insertions(+) diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 04803b0e37d..d0ce8858c49 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -29,12 +29,17 @@ #include "access/relscan.h" #include "access/tableam.h" +#include "executor/execRowBatch.h" #include "executor/execScan.h" #include "executor/executor.h" #include "executor/nodeSeqscan.h" #include "utils/rel.h" static TupleTableSlot *SeqNext(SeqScanState *node); +static TupleTableSlot *ExecSeqScanBatchSlot(PlanState *pstate); +static TupleTableSlot *ExecSeqScanBatchSlotWithQual(PlanState *pstate); +static TupleTableSlot *ExecSeqScanBatchSlotWithProject(PlanState *pstate); +static TupleTableSlot *ExecSeqScanBatchSlotWithQualProject(PlanState *pstate); /* ---------------------------------------------------------------- * Scan Support @@ -205,6 +210,273 @@ ExecSeqScanEPQ(PlanState *pstate) (ExecScanRecheckMtd) SeqRecheck); } +/* ---------------------------------------------------------------- + * Batch Support + * ---------------------------------------------------------------- + */ + +/* + * SeqScanCanUseBatching + * Check whether this SeqScan can use batch mode execution. + * + * Batching requires: the GUC is enabled, no EPQ recheck is active, the scan + * is forward-only, and the table AM supports batching with the current + * snapshot (see table_supports_batching()). + */ +static bool +SeqScanCanUseBatching(SeqScanState *scanstate, int eflags) +{ + Relation relation = scanstate->ss.ss_currentRelation; + + return executor_batch_rows > 1 && + relation && + table_supports_batching(relation, + scanstate->ss.ps.state->es_snapshot) && + !(eflags & EXEC_FLAG_BACKWARD) && + scanstate->ss.ps.state->es_epq_active == NULL; +} + +/* + * SeqScanInitBatching + * Set up batch execution state and select the appropriate + * ExecProcNode variant for batch mode. + * + * Called from ExecInitSeqScan when SeqScanCanUseBatching returns true. + * Overwrites the ExecProcNode pointer set by the non-batch path. + */ +static void +SeqScanInitBatching(SeqScanState *scanstate) +{ + RowBatch *batch = RowBatchCreate(MaxHeapTuplesPerPage); + + batch->slot = scanstate->ss.ss_ScanTupleSlot; + scanstate->batch = batch; + + /* Choose batch variant */ + if (scanstate->ss.ps.qual == NULL) + { + if (scanstate->ss.ps.ps_ProjInfo == NULL) + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlot; + else + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithProject; + } + else + { + if (scanstate->ss.ps.ps_ProjInfo == NULL) + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQual; + else + scanstate->ss.ps.ExecProcNode = ExecSeqScanBatchSlotWithQualProject; + } +} + +/* + * SeqScanResetBatching + * Reset or tear down batch execution state. + * + * When drop is false (rescan), resets the RowBatch and releases any + * AM-held resources like buffer pins, but keeps allocations for reuse. + * When drop is true (end of node), frees everything. + */ +static void +SeqScanResetBatching(SeqScanState *scanstate, bool drop) +{ + RowBatch *b = scanstate->batch; + + if (b) + { + RowBatchReset(b, drop); + if (b->am_payload) + { + if (drop) + { + table_scan_end_batch(scanstate->ss.ss_currentScanDesc, b); + b->am_payload = NULL; + } + else + table_scan_reset_batch(scanstate->ss.ss_currentScanDesc, b); + } + if (drop) + pfree(b); + } +} + +/* + * SeqNextBatch + * Fetch the next batch of tuples from the table AM. + * + * Lazily initializes the scan descriptor and AM batch state on first + * call. Returns false at end of scan. + */ +static bool +SeqNextBatch(SeqScanState *node) +{ + TableScanDesc scandesc; + EState *estate; + ScanDirection direction; + RowBatch *b = node->batch; + + Assert(b != NULL); + + /* + * get information from the estate and scan state + */ + scandesc = node->ss.ss_currentScanDesc; + estate = node->ss.ps.state; + direction = estate->es_direction; + Assert(ScanDirectionIsForward(direction)); + + if (scandesc == NULL) + { + /* + * We reach here if the scan is not parallel, or if we're serially + * executing a scan that was planned to be parallel. + */ + scandesc = table_beginscan(node->ss.ss_currentRelation, + estate->es_snapshot, + 0, NULL, + ScanRelIsReadOnly(&node->ss) ? + SO_HINT_REL_READ_ONLY : SO_NONE); + node->ss.ss_currentScanDesc = scandesc; + } + + /* Lazily create the AM batch payload. */ + if (b->am_payload == NULL) + { + const TableAmRoutine *tam PG_USED_FOR_ASSERTS_ONLY = scandesc->rs_rd->rd_tableam; + + Assert(tam && tam->scan_begin_batch); + table_scan_begin_batch(scandesc, b); + } + + if (!table_scan_getnextbatch(scandesc, b, direction)) + return false; + + return true; +} + +/* + * SeqScanBatchSlot + * Core loop for batch-driven SeqScan variants. + * + * Internally fetches tuples in batches from the table AM, but returns + * one slot at a time to preserve the single-slot interface expected by + * parent nodes. When the current batch is exhausted, fetches and + * materializes the next one. + * + * qual and projInfo are passed explicitly so the compiler can eliminate + * dead branches when inlined into the typed wrapper functions (e.g. + * ExecSeqScanBatchSlot passes NULL for both). + * + * EPQ is not supported in the batch path; asserted at entry. + */ +static inline TupleTableSlot * +SeqScanBatchSlot(SeqScanState *node, + ExprState *qual, ProjectionInfo *projInfo) +{ + ExprContext *econtext = node->ss.ps.ps_ExprContext; + RowBatch *b = node->batch; + + /* Batch path does not support EPQ */ + Assert(node->ss.ps.state->es_epq_active == NULL); + Assert(RowBatchIsValid(b)); + + for (;;) + { + TupleTableSlot *in; + + CHECK_FOR_INTERRUPTS(); + + /* Get next input slot from current batch, or refill */ + if (!RowBatchHasMore(b)) + { + if (!SeqNextBatch(node)) + return NULL; + } + + in = RowBatchGetNextSlot(b); + Assert(in); + + /* No qual, no projection: direct return */ + if (qual == NULL && projInfo == NULL) + return in; + + ResetExprContext(econtext); + econtext->ecxt_scantuple = in; + + /* Check qual if present */ + if (qual != NULL && !ExecQual(qual, econtext)) + { + InstrCountFiltered1(node, 1); + continue; + } + + /* Project if needed, otherwise return scan tuple directly */ + if (projInfo != NULL) + return ExecProject(projInfo); + + return in; + } +} + +static TupleTableSlot * +ExecSeqScanBatchSlot(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + Assert(pstate->qual == NULL); + Assert(pstate->ps_ProjInfo == NULL); + + return SeqScanBatchSlot(node, NULL, NULL); +} + +static TupleTableSlot * +ExecSeqScanBatchSlotWithQual(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + /* + * Use pg_assume() for != NULL tests to make the compiler realize no + * runtime check for the field is needed in ExecScanExtended(). + */ + Assert(pstate->state->es_epq_active == NULL); + pg_assume(pstate->qual != NULL); + Assert(pstate->ps_ProjInfo == NULL); + + return SeqScanBatchSlot(node, pstate->qual, NULL); +} + +/* + * Variant of ExecSeqScan() but when projection is required. + */ +static TupleTableSlot * +ExecSeqScanBatchSlotWithProject(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + Assert(pstate->qual == NULL); + pg_assume(pstate->ps_ProjInfo != NULL); + + return SeqScanBatchSlot(node, NULL, pstate->ps_ProjInfo); +} + +/* + * Variant of ExecSeqScan() but when qual evaluation and projection are + * required. + */ +static TupleTableSlot * +ExecSeqScanBatchSlotWithQualProject(PlanState *pstate) +{ + SeqScanState *node = castNode(SeqScanState, pstate); + + Assert(pstate->state->es_epq_active == NULL); + pg_assume(pstate->qual != NULL); + pg_assume(pstate->ps_ProjInfo != NULL); + + return SeqScanBatchSlot(node, pstate->qual, pstate->ps_ProjInfo); +} + /* ---------------------------------------------------------------- * ExecInitSeqScan * ---------------------------------------------------------------- @@ -283,6 +555,9 @@ ExecInitSeqScan(SeqScan *node, EState *estate, int eflags) scanstate->ss.ps.ExecProcNode = ExecSeqScanWithQualProject; } + if (SeqScanCanUseBatching(scanstate, eflags)) + SeqScanInitBatching(scanstate); + return scanstate; } @@ -302,6 +577,8 @@ ExecEndSeqScan(SeqScanState *node) */ scanDesc = node->ss.ss_currentScanDesc; + SeqScanResetBatching(node, true); + /* * close heap scan */ @@ -331,6 +608,7 @@ ExecReScanSeqScan(SeqScanState *node) table_rescan(scan, /* scan desc */ NULL); /* new scan keys */ + SeqScanResetBatching(node, false); ExecScanReScan((ScanState *) node); } diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index 36ad708b360..535e29d7823 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -165,3 +165,6 @@ int notify_buffers = 16; int serializable_buffers = 32; int subtransaction_buffers = 0; int transaction_buffers = 0; + +/* executor batching */ +int executor_batch_rows = 64; diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index a315c4ab8ab..a59b5d012a2 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1045,6 +1045,15 @@ boot_val => 'true', }, +{ name => 'executor_batch_rows', type => 'int', context => 'PGC_USERSET', group => 'DEVELOPER_OPTIONS', + short_desc => 'Number of rows to include in batches during execution.', + flags => 'GUC_NOT_IN_SAMPLE', + variable => 'executor_batch_rows', + boot_val => '64', + min => '0', + max => '1024', +}, + { name => 'exit_on_error', type => 'bool', context => 'PGC_USERSET', group => 'ERROR_HANDLING_OPTIONS', short_desc => 'Terminate session on any error.', variable => 'ExitOnAnyError', diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 7277c37e779..302c0e33165 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -288,6 +288,7 @@ extern PGDLLIMPORT double VacuumCostDelay; extern PGDLLIMPORT int VacuumCostBalance; extern PGDLLIMPORT bool VacuumCostActive; +extern PGDLLIMPORT int executor_batch_rows; /* in utils/misc/stack_depth.c */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 3ecae7552fc..0f8431ee854 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -70,6 +70,7 @@ typedef struct TupleTableSlot TupleTableSlot; typedef struct TupleTableSlotOps TupleTableSlotOps; typedef struct WalUsage WalUsage; typedef struct WorkerNodeInstrumentation WorkerNodeInstrumentation; +typedef struct RowBatch RowBatch; /* ---------------- @@ -1670,6 +1671,7 @@ typedef struct SeqScanState { ScanState ss; /* its first field is NodeTag */ Size pscan_len; /* size of parallel heap scan descriptor */ + RowBatch *batch; /* NULL if batching disabled */ } SeqScanState; /* ---------------- -- 2.47.3