From 0ed285dc51016e273694e9724e588493ee243564 Mon Sep 17 00:00:00 2001 From: Henson Choi Date: Tue, 16 Jun 2026 13:02:05 +0900 Subject: [PATCH 05/13] Drive RPR row pattern matching once per row Row pattern matching only advanced when a window function read the frame, so a row whose only window function skips the frame (e.g. nth_value() with a NULL offset) left the match state behind the current row, producing silently wrong results and a spurious "cannot fetch row before mark position" error. Advance the match once per row, before the window functions run, so it tracks the row scan rather than frame access. Extract the reduced-frame loop into advance_reduced_frame_nfa() and the mark advance into advance_nav_mark(), advancing the navigation mark from the frontier the match reached rather than from the output row, so tuplestore_trim() frees rows sooner. Add regression coverage for the frame-skipping and PREV-only deferred-frame cases, and assert the contexts' nondecreasing matchStartRow ordering. --- src/backend/executor/execRPR.c | 8 +- src/backend/executor/nodeWindowAgg.c | 254 ++++++++++++++++----------- src/test/regress/expected/rpr.out | 149 ++++++++++++++++ src/test/regress/sql/rpr.sql | 75 ++++++++ 4 files changed, 385 insertions(+), 101 deletions(-) diff --git a/src/backend/executor/execRPR.c b/src/backend/executor/execRPR.c index de78b06d277..cea7e0b2973 100644 --- a/src/backend/executor/execRPR.c +++ b/src/backend/executor/execRPR.c @@ -1666,7 +1666,13 @@ ExecRPRStartContext(WindowAggState *winstate, int64 startPos) ctx->states->isAbsorbable = false; } - /* Add to tail of active context list (doubly-linked, oldest-first) */ + /* + * Add to tail of active context list (doubly-linked, oldest-first). + * matchStartRow is nondecreasing along the list, so the head holds the + * smallest -- an ordering other code relies on. + */ + Assert(winstate->nfaContextTail == NULL || + startPos >= winstate->nfaContextTail->matchStartRow); ctx->prev = winstate->nfaContextTail; ctx->next = NULL; if (winstate->nfaContextTail != NULL) diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index 90f33bdee40..2d97710da8a 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -239,6 +239,10 @@ static int64 row_is_in_reduced_frame(WindowObject winobj, int64 pos); static void clear_reduced_frame(WindowAggState *winstate); static int get_reduced_frame_status(WindowAggState *winstate, int64 pos); +static void advance_nav_mark(WindowAggState *winstate, int64 currentPos); +static void advance_reduced_frame_nfa(WindowObject winobj, + RPRNFAContext *targetCtx, int64 pos, + bool hasLimitedFrame, int64 frameOffset); static void update_reduced_frame(WindowObject winobj, int64 pos); /* Forward declarations - NFA row evaluation */ @@ -2521,6 +2525,16 @@ ExecWindowAgg(PlanState *pstate) { if (winstate->rpSkipTo == ST_NEXT_ROW) clear_reduced_frame(winstate); + + /* + * Drive the row pattern match every row, so it tracks the row + * scan rather than frame access: a window function that skips + * the frame (e.g. nth_value() with a NULL offset) must not + * leave the match state behind currentpos. + */ + Assert(winstate->nav_winobj != NULL); + (void) row_is_in_reduced_frame(winstate->nav_winobj, + winstate->currentpos); } /* @@ -2562,43 +2576,6 @@ ExecWindowAgg(PlanState *pstate) if (winstate->grouptail_ptr >= 0) update_grouptailpos(winstate); - /* - * Advance RPR navigation mark pointer if possible, so that - * tuplestore_trim() can free rows no longer reachable by navigation. - */ - if (winstate->nav_winobj && - winstate->rpPattern != NULL && - winstate->navMaxOffsetKind == RPR_NAV_OFFSET_FIXED) - { - int64 navmarkpos; - - /* Backward reach from PREV/LAST/compound PREV_LAST/NEXT_LAST */ - if (winstate->currentpos > winstate->navMaxOffset) - navmarkpos = winstate->currentpos - winstate->navMaxOffset; - else - navmarkpos = 0; - - /* - * If FIRST is used, also consider match_start + navFirstOffset. - * The oldest active context (nfaContext) has the smallest - * matchStartRow. - */ - if (winstate->hasFirstNav && - winstate->navFirstOffsetKind == RPR_NAV_OFFSET_FIXED && - winstate->nfaContext != NULL) - { - int64 firstreach; - - if (!pg_add_s64_overflow(winstate->nfaContext->matchStartRow, - winstate->navFirstOffset, - &firstreach)) - navmarkpos = Min(navmarkpos, Max(firstreach, 0)); - } - - if (navmarkpos > winstate->nav_winobj->markpos) - WinSetMarkPosition(winstate->nav_winobj, navmarkpos); - } - /* * Truncate any no-longer-needed rows from the tuplestore. */ @@ -4382,6 +4359,143 @@ get_reduced_frame_status(WindowAggState *winstate, int64 pos) return RF_SKIPPED; } +/* + * advance_nav_mark + * Advance the RPR navigation mark, derived from the NFA frontier + * (currentPos) but held back by the navigation's backward reach, so + * tuplestore_trim() can free rows no longer reachable by navigation. + * + * The nav read pointer is independent of the aggregate and per-function read + * pointers, so moving its mark does not affect their fetches; it only bounds + * the DEFINE clause's own PREV/LAST/FIRST lookups. Backward reach (PREV/LAST) + * is measured from the frontier. FIRST reaches back from the head context's + * matchStartRow instead, so it is bounded separately; without FIRST the mark + * can follow the frontier freely. + */ +static void +advance_nav_mark(WindowAggState *winstate, int64 currentPos) +{ + int64 navmarkpos; + + /* No RPR navigation read pointer: nothing to advance */ + if (winstate->nav_winobj == NULL) + return; + + /* RETAIN_ALL disables trim for the backward (PREV/LAST) dimension */ + if (winstate->navMaxOffsetKind == RPR_NAV_OFFSET_RETAIN_ALL) + return; + + /* navMax is FIXED here: NEEDS_EVAL resolved, RETAIN_ALL returned */ + Assert(winstate->navMaxOffsetKind == RPR_NAV_OFFSET_FIXED); + + if (currentPos > winstate->navMaxOffset) + navmarkpos = currentPos - winstate->navMaxOffset; + else + navmarkpos = 0; + + if (winstate->hasFirstNav && winstate->nfaContext != NULL) + { + int64 firstreach; + + /* navFirst is always FIXED; it never takes RETAIN_ALL */ + Assert(winstate->navFirstOffsetKind == RPR_NAV_OFFSET_FIXED); + + /* + * Head context has the smallest matchStartRow (contexts appended in + * nondecreasing order), so bounding by it covers every FIRST reach. + */ + if (!pg_add_s64_overflow(winstate->nfaContext->matchStartRow, + winstate->navFirstOffset, + &firstreach)) + navmarkpos = Min(navmarkpos, Max(firstreach, 0)); + } + + if (navmarkpos > winstate->nav_winobj->markpos) + WinSetMarkPosition(winstate->nav_winobj, navmarkpos); +} + +/* + * advance_reduced_frame_nfa + * Drive the NFA forward until targetCtx completes or the partition ends. + * + * This is the match driver, extracted from update_reduced_frame(), which calls + * it to advance the match and then records the resolved result. Row + * evaluations are shared across all active contexts. + */ +static void +advance_reduced_frame_nfa(WindowObject winobj, RPRNFAContext *targetCtx, + int64 pos, bool hasLimitedFrame, int64 frameOffset) +{ + WindowAggState *winstate = winobj->winstate; + int64 currentPos; + int64 startPos; + + /* + * Determine where to start processing. Usually nfaLastProcessedRow+1 >= + * pos since contexts are created at currentPos+1 during processing. + * However, pos can exceed this when rows are skipped (e.g., unmatched + * rows don't update nfaLastProcessedRow). + */ + startPos = Max(pos, winstate->nfaLastProcessedRow + 1); + + /* + * Process rows until target context completes or we hit boundaries. Each + * row evaluation is shared across all active contexts. + */ + for (currentPos = startPos; targetCtx->states != NULL; currentPos++) + { + bool rowExists; + + /* + * Evaluate variables for this row - done only once, shared by all + * contexts. + * + * Set nav_match_start to the head context's matchStartRow for + * FIRST/LAST navigation. Match_start-dependent variables (FIRST, + * LAST-with-offset) are re-evaluated per-context in ExecRPRProcessRow + * when matchStartRow differs. + */ + winstate->nav_match_start = targetCtx->matchStartRow; + rowExists = nfa_evaluate_row(winobj, currentPos, winstate->nfaVarMatched); + + /* No more rows in partition? Finalize all contexts */ + if (!rowExists) + { + ExecRPRFinalizeAllContexts(winstate, currentPos - 1); + /* Clean up dead contexts from finalization */ + ExecRPRCleanupDeadContexts(winstate, targetCtx); + break; + } + + /* Update last processed row */ + winstate->nfaLastProcessedRow = currentPos; + + /*-------------------------- + * Process all contexts for this row: + * 1. Match all (convergence) + * 2. Absorb redundant + * 3. Advance all (divergence) + */ + ExecRPRProcessRow(winstate, currentPos, hasLimitedFrame, frameOffset); + + /* + * Create a new context for the next potential start position. This + * enables overlapping match detection for SKIP TO NEXT ROW. + */ + ExecRPRStartContext(winstate, currentPos + 1); + + /* + * Clean up dead contexts (failed with no active states and no match). + * This removes contexts that failed during processing and counts them + * appropriately as pruned or mismatched. + */ + ExecRPRCleanupDeadContexts(winstate, targetCtx); + + /* Advance the nav mark to the frontier so trim can free old rows. */ + advance_nav_mark(winstate, currentPos); + } +} + /* * update_reduced_frame * Update reduced frame info using multi-context NFA pattern matching. @@ -4401,8 +4515,6 @@ update_reduced_frame(WindowObject winobj, int64 pos) { WindowAggState *winstate = winobj->winstate; RPRNFAContext *targetCtx; - int64 currentPos; - int64 startPos; int frameOptions = winstate->frameOptions; bool hasLimitedFrame; int64 frameOffset = 0; @@ -4468,67 +4580,9 @@ update_reduced_frame(WindowObject winobj, int64 pos) goto register_result; } - /* - * Determine where to start processing. Usually nfaLastProcessedRow+1 >= - * pos since contexts are created at currentPos+1 during processing. - * However, pos can exceed this when rows are skipped (e.g., unmatched - * rows don't update nfaLastProcessedRow). - */ - startPos = Max(pos, winstate->nfaLastProcessedRow + 1); - - /* - * Process rows until target context completes or we hit boundaries. Each - * row evaluation is shared across all active contexts. - */ - for (currentPos = startPos; targetCtx->states != NULL; currentPos++) - { - bool rowExists; - - /* - * Evaluate variables for this row - done only once, shared by all - * contexts. - * - * Set nav_match_start to the head context's matchStartRow for - * FIRST/LAST navigation. Match_start-dependent variables (FIRST, - * LAST-with-offset) are re-evaluated per-context in ExecRPRProcessRow - * when matchStartRow differs. - */ - winstate->nav_match_start = targetCtx->matchStartRow; - rowExists = nfa_evaluate_row(winobj, currentPos, winstate->nfaVarMatched); - - /* No more rows in partition? Finalize all contexts */ - if (!rowExists) - { - ExecRPRFinalizeAllContexts(winstate, currentPos - 1); - /* Clean up dead contexts from finalization */ - ExecRPRCleanupDeadContexts(winstate, targetCtx); - break; - } - - /* Update last processed row */ - winstate->nfaLastProcessedRow = currentPos; - - /*-------------------------- - * Process all contexts for this row: - * 1. Match all (convergence) - * 2. Absorb redundant - * 3. Advance all (divergence) - */ - ExecRPRProcessRow(winstate, currentPos, hasLimitedFrame, frameOffset); - - /* - * Create a new context for the next potential start position. This - * enables overlapping match detection for SKIP TO NEXT ROW. - */ - ExecRPRStartContext(winstate, currentPos + 1); - - /* - * Clean up dead contexts (failed with no active states and no match). - * This removes contexts that failed during processing and counts them - * appropriately as pruned or mismatched. - */ - ExecRPRCleanupDeadContexts(winstate, targetCtx); - } + /* Drive the NFA forward until pos's match is resolved. */ + advance_reduced_frame_nfa(winobj, targetCtx, pos, hasLimitedFrame, + frameOffset); register_result: Assert(pos == targetCtx->matchStartRow); diff --git a/src/test/regress/expected/rpr.out b/src/test/regress/expected/rpr.out index dc5140fecc9..6ad830b9e36 100644 --- a/src/test/regress/expected/rpr.out +++ b/src/test/regress/expected/rpr.out @@ -3297,6 +3297,155 @@ WINDOW w AS ( 4 | 20 | | 0 (4 rows) +-- +-- nth_value with a NULL offset +-- +CREATE TABLE rpr_dormant (id int, price int); +INSERT INTO rpr_dormant SELECT g, g*10 FROM generate_series(1,60) g; +-- reference: first_value(id) is the start row of the match beginning at the +-- current row, count(*) is that match's length over the reduced frame +SELECT * FROM ( + SELECT id, first_value(id) OVER w AS match_start, count(*) OVER w AS match_len + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(FIRST(price), 50)) +) s WHERE id > 50 ORDER BY id; + id | match_start | match_len +----+-------------+----------- + 51 | 51 | 10 + 52 | | 0 + 53 | | 0 + 54 | | 0 + 55 | | 0 + 56 | | 0 + 57 | | 0 + 58 | | 0 + 59 | | 0 + 60 | | 0 +(10 rows) + +-- nth_value with a NULL offset; FIRST navigation in DEFINE, SKIP PAST LAST ROW +SELECT * FROM ( + SELECT id, nv FROM ( + SELECT id, nth_value(price, CASE WHEN id < 50 THEN NULL ELSE 1 END) OVER w AS nv + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(FIRST(price), 50)) + ) s +) t WHERE id > 50 ORDER BY id; + id | nv +----+----- + 51 | 510 + 52 | + 53 | + 54 | + 55 | + 56 | + 57 | + 58 | + 59 | + 60 | +(10 rows) + +-- the same window with first_value and count alongside nth_value +SELECT * FROM ( + SELECT id, nv, fv, cnt FROM ( + SELECT id, nth_value(price, CASE WHEN id < 50 THEN NULL ELSE 1 END) OVER w AS nv, + first_value(id) OVER w AS fv, count(*) OVER w AS cnt + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(FIRST(price), 50)) + ) s +) t WHERE id > 50 ORDER BY id; + id | nv | fv | cnt +----+-----+----+----- + 51 | 510 | 51 | 10 + 52 | | | 0 + 53 | | | 0 + 54 | | | 0 + 55 | | | 0 + 56 | | | 0 + 57 | | | 0 + 58 | | | 0 + 59 | | | 0 + 60 | | | 0 +(10 rows) + +-- the same nth_value with a non-navigation DEFINE +SELECT * FROM ( + SELECT id, nv FROM ( + SELECT id, nth_value(price, CASE WHEN id < 50 THEN NULL ELSE 1 END) OVER w AS nv + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > 0) + ) s +) t WHERE id > 50 ORDER BY id; + id | nv +----+---- + 51 | + 52 | + 53 | + 54 | + 55 | + 56 | + 57 | + 58 | + 59 | + 60 | +(10 rows) + +-- the same nth_value with a PREV-only DEFINE (no FIRST navigation) +SELECT * FROM ( + SELECT id, nv FROM ( + SELECT id, nth_value(price, CASE WHEN id < 50 THEN NULL ELSE 1 END) OVER w AS nv + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(price, 50)) + ) s +) t WHERE id > 50 ORDER BY id; + id | nv +----+----- + 51 | 510 + 52 | + 53 | + 54 | + 55 | + 56 | + 57 | + 58 | + 59 | + 60 | +(10 rows) + +-- nth_value with a NULL offset band in the middle of the partition +SELECT * FROM ( + SELECT id, nv FROM ( + SELECT id, nth_value(price, CASE WHEN id BETWEEN 20 AND 40 THEN NULL ELSE 1 END) OVER w AS nv + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(FIRST(price), 50)) + ) s +) t WHERE id BETWEEN 38 AND 46 ORDER BY id; + id | nv +----+---- + 38 | + 39 | + 40 | + 41 | + 42 | + 43 | + 44 | + 45 | + 46 | +(9 rows) + +DROP TABLE rpr_dormant; -- -- NULL handling -- diff --git a/src/test/regress/sql/rpr.sql b/src/test/regress/sql/rpr.sql index e3e9de789db..3363691c041 100644 --- a/src/test/regress/sql/rpr.sql +++ b/src/test/regress/sql/rpr.sql @@ -1812,6 +1812,81 @@ WINDOW w AS ( B AS val IS NULL ); +-- +-- nth_value with a NULL offset +-- + +CREATE TABLE rpr_dormant (id int, price int); +INSERT INTO rpr_dormant SELECT g, g*10 FROM generate_series(1,60) g; + +-- reference: first_value(id) is the start row of the match beginning at the +-- current row, count(*) is that match's length over the reduced frame +SELECT * FROM ( + SELECT id, first_value(id) OVER w AS match_start, count(*) OVER w AS match_len + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(FIRST(price), 50)) +) s WHERE id > 50 ORDER BY id; + +-- nth_value with a NULL offset; FIRST navigation in DEFINE, SKIP PAST LAST ROW +SELECT * FROM ( + SELECT id, nv FROM ( + SELECT id, nth_value(price, CASE WHEN id < 50 THEN NULL ELSE 1 END) OVER w AS nv + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(FIRST(price), 50)) + ) s +) t WHERE id > 50 ORDER BY id; + +-- the same window with first_value and count alongside nth_value +SELECT * FROM ( + SELECT id, nv, fv, cnt FROM ( + SELECT id, nth_value(price, CASE WHEN id < 50 THEN NULL ELSE 1 END) OVER w AS nv, + first_value(id) OVER w AS fv, count(*) OVER w AS cnt + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(FIRST(price), 50)) + ) s +) t WHERE id > 50 ORDER BY id; + +-- the same nth_value with a non-navigation DEFINE +SELECT * FROM ( + SELECT id, nv FROM ( + SELECT id, nth_value(price, CASE WHEN id < 50 THEN NULL ELSE 1 END) OVER w AS nv + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > 0) + ) s +) t WHERE id > 50 ORDER BY id; + +-- the same nth_value with a PREV-only DEFINE (no FIRST navigation) +SELECT * FROM ( + SELECT id, nv FROM ( + SELECT id, nth_value(price, CASE WHEN id < 50 THEN NULL ELSE 1 END) OVER w AS nv + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(price, 50)) + ) s +) t WHERE id > 50 ORDER BY id; + +-- nth_value with a NULL offset band in the middle of the partition +SELECT * FROM ( + SELECT id, nv FROM ( + SELECT id, nth_value(price, CASE WHEN id BETWEEN 20 AND 40 THEN NULL ELSE 1 END) OVER w AS nv + FROM rpr_dormant + WINDOW w AS (ORDER BY id ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING + AFTER MATCH SKIP PAST LAST ROW + PATTERN (A+) DEFINE A AS price > PREV(FIRST(price), 50)) + ) s +) t WHERE id BETWEEN 38 AND 46 ORDER BY id; + +DROP TABLE rpr_dormant; + -- -- NULL handling -- -- 2.50.1 (Apple Git-155)