From 9ac8600d3ef3e91f7bebb5a2862b0f357278eacd Mon Sep 17 00:00:00 2001 From: Tatsuo Ishii Date: Sun, 15 Feb 2026 17:47:49 +0900 Subject: [PATCH v43 5/8] Row pattern recognition patch (executor and commands). --- src/backend/commands/explain.c | 461 +++++ src/backend/executor/nodeWindowAgg.c | 2343 +++++++++++++++++++++++++- src/backend/utils/adt/windowfuncs.c | 25 +- src/include/catalog/pg_proc.dat | 6 + src/include/nodes/execnodes.h | 119 ++ 5 files changed, 2943 insertions(+), 11 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index b9587983f88..575236f0bc6 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -29,6 +29,7 @@ #include "nodes/extensible.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "optimizer/rpr.h" #include "parser/analyze.h" #include "parser/parsetree.h" #include "rewrite/rewriteHandler.h" @@ -117,6 +118,20 @@ static void show_window_def(WindowAggState *planstate, static void show_window_keys(StringInfo buf, PlanState *planstate, int nkeys, AttrNumber *keycols, List *ancestors, ExplainState *es); +static void append_rpr_quantifier(StringInfo buf, RPRPatternElement *elem); +static char *deparse_rpr_pattern(RPRPattern *pattern); +static void deparse_rpr_elements(RPRPattern *pattern, int *idx, + StringInfoData *buf, RPRDepth groupDepth, + RPRDepth *prevDepth, bool *needSpace); +static void deparse_rpr_group(RPRPattern *pattern, int *idx, + StringInfoData *buf, RPRDepth *prevDepth, + bool *needSpace); +static void deparse_rpr_alt(RPRPattern *pattern, int *idx, + StringInfoData *buf, RPRDepth *prevDepth, + bool *needSpace, List **altSeps); +static void deparse_rpr_var(RPRPattern *pattern, int *idx, + StringInfoData *buf, RPRDepth *prevDepth, + bool *needSpace, List **altSeps); static void show_storage_info(char *maxStorageType, int64 maxSpaceUsed, ExplainState *es); static void show_tablesample(TableSampleClause *tsc, PlanState *planstate, @@ -127,6 +142,7 @@ static void show_incremental_sort_info(IncrementalSortState *incrsortstate, static void show_hash_info(HashState *hashstate, ExplainState *es); static void show_material_info(MaterialState *mstate, ExplainState *es); static void show_windowagg_info(WindowAggState *winstate, ExplainState *es); +static void show_rpr_nfa_stats(WindowAggState *winstate, ExplainState *es); static void show_ctescan_info(CteScanState *ctescanstate, ExplainState *es); static void show_table_func_scan_info(TableFuncScanState *tscanstate, ExplainState *es); @@ -2889,6 +2905,284 @@ show_sortorder_options(StringInfo buf, Node *sortexpr, } } +/* + * Append quantifier suffix for a pattern element. + */ +static void +append_rpr_quantifier(StringInfo buf, RPRPatternElement *elem) +{ + /* Append quantifier if not {1,1} */ + if (elem->min == 0 && elem->max == RPR_QUANTITY_INF) + appendStringInfoChar(buf, '*'); + else if (elem->min == 1 && elem->max == RPR_QUANTITY_INF) + appendStringInfoChar(buf, '+'); + else if (elem->min == 0 && elem->max == 1) + appendStringInfoChar(buf, '?'); + else if (elem->max == RPR_QUANTITY_INF) + appendStringInfo(buf, "{%d,}", elem->min); + else if (elem->min == elem->max && elem->min != 1) + appendStringInfo(buf, "{%d}", elem->min); + else if (elem->min != 1 || elem->max != 1) + appendStringInfo(buf, "{%d,%d}", elem->min, elem->max); + + if (RPRElemIsReluctant(elem)) + { + if (elem->min == 1 && elem->max == 1) + appendStringInfo(buf, "{1}"); /* make reluctant ? unambiguous */ + appendStringInfoChar(buf, '?'); + } + + /* Append absorption markers: " for judgment point, ' for branch only */ + if (RPRElemIsAbsorbable(elem)) + { + Assert(elem->max == RPR_QUANTITY_INF); + appendStringInfoChar(buf, '"'); + } + else if (RPRElemIsAbsorbableBranch(elem)) + appendStringInfoChar(buf, '\''); +} + +/* + * Deparse a compiled RPRPattern (bytecode) back to pattern string. + * + * Walks the flat bytecode array using mutual recursion: deparse_rpr_elements + * processes sequential elements, and deparse_rpr_group handles BEGIN...END + * groups by recursing back into deparse_rpr_elements for the group content. + */ +static char * +deparse_rpr_pattern(RPRPattern *pattern) +{ + StringInfoData buf; + int idx = 0; + RPRDepth prevDepth = 0; + bool needSpace = false; + + Assert(pattern != NULL && pattern->numElements >= 2); + + initStringInfo(&buf); + + deparse_rpr_elements(pattern, &idx, &buf, RPR_DEPTH_NONE, + &prevDepth, &needSpace); + + /* Close remaining open parens */ + while (prevDepth > 0) + { + appendStringInfoChar(&buf, ')'); + prevDepth--; + } + + return buf.data; +} + +/* + * Process pattern elements sequentially until FIN or END at groupDepth. + * + * When groupDepth >= 0, stops at the matching END element (leaving idx + * pointing to it) so the caller (deparse_rpr_group) can consume it. + * When groupDepth < 0, processes until FIN (top-level call). + */ +static void +deparse_rpr_elements(RPRPattern *pattern, int *idx, StringInfoData *buf, + RPRDepth groupDepth, RPRDepth *prevDepth, + bool *needSpace) +{ + List *altSeps = NIL; /* pending alternation separator indices */ + + while (*idx < pattern->numElements) + { + RPRPatternElement *elem = &pattern->elements[*idx]; + + if (RPRElemIsFin(elem)) + break; + + /* Stop at END matching our group depth; caller handles it */ + if (RPRElemIsEnd(elem) && elem->depth == groupDepth) + break; + + /* Alternation separator */ + if (list_member_int(altSeps, *idx)) + { + /* Close parens to match separator depth first */ + while (*prevDepth > elem->depth) + { + appendStringInfoChar(buf, ')'); + (*prevDepth)--; + } + appendStringInfoString(buf, " | "); + *needSpace = false; + altSeps = list_delete_int(altSeps, *idx); + } + + /* Dispatch to element-type handlers */ + if (RPRElemIsAlt(elem)) + deparse_rpr_alt(pattern, idx, buf, prevDepth, + needSpace, &altSeps); + else if (RPRElemIsBegin(elem)) + deparse_rpr_group(pattern, idx, buf, prevDepth, + needSpace); + else if (RPRElemIsVar(elem)) + deparse_rpr_var(pattern, idx, buf, prevDepth, + needSpace, &altSeps); + } + list_free(altSeps); +} + +/* + * Process a BEGIN...END group. + * + * Consumes BEGIN, recurses into deparse_rpr_elements for group content, + * then consumes END and outputs the group quantifier. + * + * When the group wraps a single ALT with no siblings, the group-level + * parenthesis is suppressed since the ALT-to-children depth transition + * already provides it (avoids double parens like "((a | b))+"). + */ +static void +deparse_rpr_group(RPRPattern *pattern, int *idx, StringInfoData *buf, + RPRDepth *prevDepth, bool *needSpace) +{ + RPRPatternElement *begin = &pattern->elements[*idx]; + RPRDepth childDepth = begin->depth + 1; + bool singleAlt = false; + RPRPatternElement *end; + + /* + * Check if this group wraps a single ALT with no siblings. Scan from + * after ALT to END: if no element at childDepth exists, the ALT is the + * sole child. + */ + if (*idx + 1 < pattern->numElements && + RPRElemIsAlt(&pattern->elements[*idx + 1])) + { + int j; + + singleAlt = true; + for (j = *idx + 2; j < pattern->numElements; j++) + { + RPRPatternElement *e = &pattern->elements[j]; + + if (RPRElemIsEnd(e) && e->depth == begin->depth) + break; + if (e->depth <= childDepth) + { + singleAlt = false; + break; + } + } + } + + /* Open group paren (unless single ALT provides it) */ + if (!singleAlt) + { + if (*needSpace) + appendStringInfoChar(buf, ' '); + appendStringInfoChar(buf, '('); + *needSpace = false; + } + *prevDepth = childDepth; + (*idx)++; /* consume BEGIN */ + + /* Process group children; stops at matching END */ + deparse_rpr_elements(pattern, idx, buf, begin->depth, + prevDepth, needSpace); + + /* Consume END and output quantifier */ + Assert(*idx < pattern->numElements); + end = &pattern->elements[*idx]; + Assert(RPRElemIsEnd(end) && end->depth == begin->depth); + + while (*prevDepth > end->depth + 1) + { + appendStringInfoChar(buf, ')'); + (*prevDepth)--; + } + if (!singleAlt) + appendStringInfoChar(buf, ')'); + append_rpr_quantifier(buf, end); + *prevDepth = end->depth; + *needSpace = true; + (*idx)++; /* consume END */ +} + +/* + * Process an ALT element: adjust depth parens and register separator positions. + */ +static void +deparse_rpr_alt(RPRPattern *pattern, int *idx, StringInfoData *buf, + RPRDepth *prevDepth, bool *needSpace, List **altSeps) +{ + RPRPatternElement *elem = &pattern->elements[*idx]; + + /* Close parens for depth decrease */ + while (*prevDepth > elem->depth) + { + appendStringInfoChar(buf, ')'); + (*prevDepth)--; + *needSpace = true; + } + + /* Open parens up to ALT's depth */ + while (*prevDepth < elem->depth) + { + if (*needSpace) + appendStringInfoChar(buf, ' '); + appendStringInfoChar(buf, '('); + (*prevDepth)++; + *needSpace = false; + } + + /* Register next alternation separator position */ + if (elem->next != RPR_ELEMIDX_INVALID) + { + RPRPatternElement *firstElem = &pattern->elements[elem->next]; + + if (firstElem->jump != RPR_ELEMIDX_INVALID) + *altSeps = lappend_int(*altSeps, firstElem->jump); + } + if (elem->jump != RPR_ELEMIDX_INVALID) + *altSeps = lappend_int(*altSeps, elem->jump); + (*idx)++; +} + +/* + * Process a VAR element: adjust depth parens and output variable name. + */ +static void +deparse_rpr_var(RPRPattern *pattern, int *idx, StringInfoData *buf, + RPRDepth *prevDepth, bool *needSpace, List **altSeps) +{ + RPRPatternElement *elem = &pattern->elements[*idx]; + + /* Open parens for depth increase */ + while (*prevDepth < elem->depth) + { + if (*needSpace) + appendStringInfoChar(buf, ' '); + appendStringInfoChar(buf, '('); + (*prevDepth)++; + *needSpace = false; + } + + /* Close parens for depth decrease */ + while (*prevDepth > elem->depth) + { + appendStringInfoChar(buf, ')'); + (*prevDepth)--; + } + + if (*needSpace) + appendStringInfoChar(buf, ' '); + + Assert(elem->varId < pattern->numVars); + appendStringInfoString(buf, pattern->varNames[elem->varId]); + append_rpr_quantifier(buf, elem); + *needSpace = true; + + if (elem->jump != RPR_ELEMIDX_INVALID) + *altSeps = lappend_int(*altSeps, elem->jump); + (*idx)++; +} + /* * Show the window definition for a WindowAgg node. */ @@ -2947,6 +3241,18 @@ show_window_def(WindowAggState *planstate, List *ancestors, ExplainState *es) appendStringInfoChar(&wbuf, ')'); ExplainPropertyText("Window", wbuf.data, es); pfree(wbuf.data); + + /* Show Row Pattern Recognition pattern if present */ + if (wagg->rpPattern != NULL) + { + char *patternStr = deparse_rpr_pattern(wagg->rpPattern); + + if (patternStr != NULL) + { + ExplainPropertyText("Pattern", patternStr, es); + pfree(patternStr); + } + } } /* @@ -3499,6 +3805,7 @@ show_windowagg_info(WindowAggState *winstate, ExplainState *es) { char *maxStorageType; int64 maxSpaceUsed; + WindowAgg *wagg = (WindowAgg *) winstate->ss.ps.plan; Tuplestorestate *tupstore = winstate->buffer; @@ -3511,6 +3818,160 @@ show_windowagg_info(WindowAggState *winstate, ExplainState *es) tuplestore_get_stats(tupstore, &maxStorageType, &maxSpaceUsed); show_storage_info(maxStorageType, maxSpaceUsed, es); + + /* Show NFA statistics for Row Pattern Recognition */ + if (wagg->rpPattern != NULL) + show_rpr_nfa_stats(winstate, es); +} + +/* + * Show NFA statistics for Row Pattern Recognition on WindowAgg node. + */ +static void +show_rpr_nfa_stats(WindowAggState *winstate, ExplainState *es) +{ + if (es->format != EXPLAIN_FORMAT_TEXT) + { + /* State and context counters */ + ExplainPropertyInteger("NFA States Peak", NULL, winstate->nfaStatesMax, es); + ExplainPropertyInteger("NFA States Total", NULL, winstate->nfaStatesTotalCreated, es); + ExplainPropertyInteger("NFA States Merged", NULL, winstate->nfaStatesMerged, es); + ExplainPropertyInteger("NFA Contexts Peak", NULL, winstate->nfaContextsMax, es); + ExplainPropertyInteger("NFA Contexts Total", NULL, winstate->nfaContextsTotalCreated, es); + ExplainPropertyInteger("NFA Contexts Absorbed", NULL, winstate->nfaContextsAbsorbed, es); + ExplainPropertyInteger("NFA Contexts Skipped", NULL, winstate->nfaContextsSkipped, es); + ExplainPropertyInteger("NFA Contexts Pruned", NULL, winstate->nfaContextsPruned, es); + + /* Match/mismatch counts and length statistics */ + ExplainPropertyInteger("NFA Matched", NULL, winstate->nfaMatchesSucceeded, es); + ExplainPropertyInteger("NFA Mismatched", NULL, winstate->nfaMatchesFailed, es); + if (winstate->nfaMatchesSucceeded > 0) + { + ExplainPropertyInteger("NFA Match Length Min", NULL, winstate->nfaMatchLen.min, es); + ExplainPropertyInteger("NFA Match Length Max", NULL, winstate->nfaMatchLen.max, es); + ExplainPropertyFloat("NFA Match Length Avg", NULL, + (double) winstate->nfaMatchLen.total / winstate->nfaMatchesSucceeded, 1, + es); + } + if (winstate->nfaMatchesFailed > 0) + { + ExplainPropertyInteger("NFA Mismatch Length Min", NULL, winstate->nfaFailLen.min, es); + ExplainPropertyInteger("NFA Mismatch Length Max", NULL, winstate->nfaFailLen.max, es); + ExplainPropertyFloat("NFA Mismatch Length Avg", NULL, + (double) winstate->nfaFailLen.total / winstate->nfaMatchesFailed, 1, + es); + } + + /* Absorbed/skipped context length statistics */ + if (winstate->nfaContextsAbsorbed > 0) + { + ExplainPropertyInteger("NFA Absorbed Length Min", NULL, winstate->nfaAbsorbedLen.min, es); + ExplainPropertyInteger("NFA Absorbed Length Max", NULL, winstate->nfaAbsorbedLen.max, es); + ExplainPropertyFloat("NFA Absorbed Length Avg", NULL, + (double) winstate->nfaAbsorbedLen.total / winstate->nfaContextsAbsorbed, 1, + es); + } + if (winstate->nfaContextsSkipped > 0) + { + ExplainPropertyInteger("NFA Skipped Length Min", NULL, winstate->nfaSkippedLen.min, es); + ExplainPropertyInteger("NFA Skipped Length Max", NULL, winstate->nfaSkippedLen.max, es); + ExplainPropertyFloat("NFA Skipped Length Avg", NULL, + (double) winstate->nfaSkippedLen.total / winstate->nfaContextsSkipped, 1, + es); + } + } + else + { + /* State and context counters */ + ExplainIndentText(es); + appendStringInfo(es->str, + "NFA States: " INT64_FORMAT " peak, " INT64_FORMAT " total, " INT64_FORMAT " merged\n", + winstate->nfaStatesMax, + winstate->nfaStatesTotalCreated, + winstate->nfaStatesMerged); + ExplainIndentText(es); + appendStringInfo(es->str, + "NFA Contexts: " INT64_FORMAT " peak, " INT64_FORMAT " total, " INT64_FORMAT " pruned\n", + winstate->nfaContextsMax, + winstate->nfaContextsTotalCreated, + winstate->nfaContextsPruned); + + /* Match/mismatch counts with length min/max/avg */ + ExplainIndentText(es); + appendStringInfo(es->str, "NFA: "); + if (winstate->nfaMatchesSucceeded > 0) + { + double avgLen = (double) winstate->nfaMatchLen.total / winstate->nfaMatchesSucceeded; + + appendStringInfo(es->str, + INT64_FORMAT " matched (len " INT64_FORMAT "/" INT64_FORMAT "/%.1f)", + winstate->nfaMatchesSucceeded, + winstate->nfaMatchLen.min, + winstate->nfaMatchLen.max, + avgLen); + } + else + { + appendStringInfo(es->str, "0 matched"); + } + if (winstate->nfaMatchesFailed > 0) + { + double avgFail = (double) winstate->nfaFailLen.total / winstate->nfaMatchesFailed; + + appendStringInfo(es->str, + ", " INT64_FORMAT " mismatched (len " INT64_FORMAT "/" INT64_FORMAT "/%.1f)", + winstate->nfaMatchesFailed, + winstate->nfaFailLen.min, + winstate->nfaFailLen.max, + avgFail); + } + else + { + appendStringInfo(es->str, ", 0 mismatched"); + } + appendStringInfoChar(es->str, '\n'); + + /* Absorbed/skipped context length statistics */ + if (winstate->nfaContextsAbsorbed > 0 || winstate->nfaContextsSkipped > 0) + { + ExplainIndentText(es); + appendStringInfo(es->str, "NFA: "); + + if (winstate->nfaContextsAbsorbed > 0) + { + double avgAbsorbed = (double) winstate->nfaAbsorbedLen.total / winstate->nfaContextsAbsorbed; + + appendStringInfo(es->str, + INT64_FORMAT " absorbed (len " INT64_FORMAT "/" INT64_FORMAT "/%.1f)", + winstate->nfaContextsAbsorbed, + winstate->nfaAbsorbedLen.min, + winstate->nfaAbsorbedLen.max, + avgAbsorbed); + } + else + { + appendStringInfo(es->str, "0 absorbed"); + } + + if (winstate->nfaContextsSkipped > 0) + { + double avgSkipped = (double) winstate->nfaSkippedLen.total / winstate->nfaContextsSkipped; + + appendStringInfo(es->str, + ", " INT64_FORMAT " skipped (len " INT64_FORMAT "/" INT64_FORMAT "/%.1f)", + winstate->nfaContextsSkipped, + winstate->nfaSkippedLen.min, + winstate->nfaSkippedLen.max, + avgSkipped); + } + else + { + appendStringInfo(es->str, ", 0 skipped"); + } + + appendStringInfoChar(es->str, '\n'); + } + } } /* diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c index d9b64b0f465..9b2c4b6a1d7 100644 --- a/src/backend/executor/nodeWindowAgg.c +++ b/src/backend/executor/nodeWindowAgg.c @@ -36,18 +36,23 @@ #include "access/htup_details.h" #include "catalog/objectaccess.h" #include "catalog/pg_aggregate.h" +#include "catalog/pg_collation_d.h" #include "catalog/pg_proc.h" #include "executor/executor.h" #include "executor/nodeWindowAgg.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" +#include "nodes/plannodes.h" #include "optimizer/clauses.h" #include "optimizer/optimizer.h" +#include "optimizer/rpr.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" +#include "regex/regex.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/datum.h" +#include "utils/fmgroids.h" #include "utils/expandeddatum.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -170,6 +175,15 @@ typedef struct WindowStatePerAggData bool restart; /* need to restart this agg in this cycle? */ } WindowStatePerAggData; +/* + * Structure used by check_rpr_navigation() and rpr_navigation_walker(). + */ +typedef struct NavigationInfo +{ + bool is_prev; /* true if PREV */ + int num_vars; /* number of var nodes */ +} NavigationInfo; + static void initialize_windowaggregate(WindowAggState *winstate, WindowStatePerFunc perfuncstate, WindowStatePerAgg peraggstate); @@ -206,6 +220,9 @@ static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1, TupleTableSlot *slot2); +static int WinGetSlotInFrame(WindowObject winobj, TupleTableSlot *slot, + int relpos, int seektype, bool set_mark, + bool *isnull, bool *isout); static bool window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot); @@ -224,6 +241,91 @@ static uint8 get_notnull_info(WindowObject winobj, int64 pos, int argno); static void put_notnull_info(WindowObject winobj, int64 pos, int argno, bool isnull); +static void attno_map(Node *node); +static bool attno_map_walker(Node *node, void *context); +static int row_is_in_reduced_frame(WindowObject winobj, int64 pos); +static bool rpr_is_defined(WindowAggState *winstate); + +static void create_reduced_frame_map(WindowAggState *winstate); +static int get_reduced_frame_map(WindowAggState *winstate, int64 pos); +static void register_reduced_frame_map(WindowAggState *winstate, int64 pos, + int val); +static void clear_reduced_frame_map(WindowAggState *winstate); +static void update_reduced_frame(WindowObject winobj, int64 pos); + +static void check_rpr_navigation(Node *node, bool is_prev); +static bool rpr_navigation_walker(Node *node, void *context); + +/* Forward declarations - NFA row processing */ +static void nfa_process_row(WindowAggState *winstate, int64 currentPos, + bool hasLimitedFrame, int64 frameOffset); + +/* Forward declarations - NFA state management */ +static RPRNFAState *nfa_state_alloc(WindowAggState *winstate); +static void nfa_state_free(WindowAggState *winstate, RPRNFAState *state); +static void nfa_state_free_list(WindowAggState *winstate, RPRNFAState *list); +static RPRNFAState *nfa_state_create(WindowAggState *winstate, int16 elemIdx, + int16 altPriority, int32 *counts, + bool sourceAbsorbable); +static bool nfa_states_equal(WindowAggState *winstate, RPRNFAState *s1, + RPRNFAState *s2); +static bool nfa_add_state_unique(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state); +static void nfa_add_matched_state(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, int64 matchEndRow); + +/* Forward declarations - NFA context management */ +static RPRNFAContext *nfa_context_alloc(WindowAggState *winstate); +static void nfa_unlink_context(WindowAggState *winstate, RPRNFAContext *ctx); +static void nfa_context_free(WindowAggState *winstate, RPRNFAContext *ctx); +static RPRNFAContext *nfa_start_context(WindowAggState *winstate, int64 startPos); +static RPRNFAContext *nfa_get_head_context(WindowAggState *winstate, int64 pos); + +/* Forward declarations - NFA statistics */ +static void nfa_update_length_stats(int64 count, NFALengthStats *stats, int64 newLen); +static void nfa_record_context_success(WindowAggState *winstate, int64 matchLen); +static void nfa_record_context_failure(WindowAggState *winstate, int64 failedLen); +static void nfa_record_context_skipped(WindowAggState *winstate, int64 skippedLen); +static void nfa_record_context_absorbed(WindowAggState *winstate, int64 absorbedLen); + +/* Forward declarations - NFA row evaluation */ +static bool nfa_evaluate_row(WindowObject winobj, int64 pos, bool *varMatched); + +/* Forward declarations - NFA context lifecycle */ +static void nfa_cleanup_dead_contexts(WindowAggState *winstate, RPRNFAContext *excludeCtx); +static void nfa_finalize_all_contexts(WindowAggState *winstate, int64 lastPos); + +/* Forward declarations - NFA absorption */ +static void nfa_update_absorption_flags(RPRNFAContext *ctx); +static bool nfa_states_covered(RPRPattern *pattern, RPRNFAContext *older, + RPRNFAContext *newer); +static bool nfa_try_absorb_context(WindowAggState *winstate, RPRNFAContext *ctx); +static void nfa_absorb_contexts(WindowAggState *winstate); + +/* Forward declarations - NFA match and advance */ +static inline bool nfa_eval_var_match(WindowAggState *winstate, + RPRPatternElement *elem, bool *varMatched); +static void nfa_match(WindowAggState *winstate, RPRNFAContext *ctx, + bool *varMatched); +static void nfa_advance_state(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, int64 currentPos, bool initialAdvance); +static void nfa_route_to_elem(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *nextElem, + int64 currentPos, bool initialAdvance); +static void nfa_advance_alt(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *elem, + int64 currentPos, bool initialAdvance); +static void nfa_advance_begin(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *elem, + int64 currentPos, bool initialAdvance); +static void nfa_advance_end(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *elem, + int64 currentPos, bool initialAdvance); +static void nfa_advance_var(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *elem, + int64 currentPos, bool initialAdvance); +static void nfa_advance(WindowAggState *winstate, RPRNFAContext *ctx, + int64 currentPos, bool initialAdvance); /* * Not null info bit array consists of 2-bit items @@ -817,6 +919,7 @@ eval_windowaggregates(WindowAggState *winstate) * transition function, or * - we have an EXCLUSION clause, or * - if the new frame doesn't overlap the old one + * - if RPR is enabled * * Note that we don't strictly need to restart in the last case, but if * we're going to remove all rows from the aggregation anyway, a restart @@ -831,7 +934,8 @@ eval_windowaggregates(WindowAggState *winstate) (winstate->aggregatedbase != winstate->frameheadpos && !OidIsValid(peraggstate->invtransfn_oid)) || (winstate->frameOptions & FRAMEOPTION_EXCLUSION) || - winstate->aggregatedupto <= winstate->frameheadpos) + winstate->aggregatedupto <= winstate->frameheadpos || + rpr_is_defined(winstate)) { peraggstate->restart = true; numaggs_restart++; @@ -905,7 +1009,22 @@ eval_windowaggregates(WindowAggState *winstate) * head, so that tuplestore can discard unnecessary rows. */ if (agg_winobj->markptr >= 0) - WinSetMarkPosition(agg_winobj, winstate->frameheadpos); + { + int64 markpos = winstate->frameheadpos; + + if (rpr_is_defined(winstate)) + { + /* + * If RPR is used, it is possible PREV wants to look at the + * previous row. So the mark pos should be frameheadpos - 1 + * unless it is below 0. + */ + markpos -= 1; + if (markpos < 0) + markpos = 0; + } + WinSetMarkPosition(agg_winobj, markpos); + } /* * Now restart the aggregates that require it. @@ -960,6 +1079,14 @@ eval_windowaggregates(WindowAggState *winstate) { winstate->aggregatedupto = winstate->frameheadpos; ExecClearTuple(agg_row_slot); + + /* + * If RPR is defined, we do not use aggregatedupto_nonrestarted. To + * avoid assertion failure below, we reset aggregatedupto_nonrestarted + * to frameheadpos. + */ + if (rpr_is_defined(winstate)) + aggregatedupto_nonrestarted = winstate->frameheadpos; } /* @@ -973,6 +1100,12 @@ eval_windowaggregates(WindowAggState *winstate) { int ret; +#ifdef RPR_DEBUG + printf("===== loop in frame starts: aggregatedupto: " INT64_FORMAT " aggregatedbase: " INT64_FORMAT "\n", + winstate->aggregatedupto, + winstate->aggregatedbase); +#endif + /* Fetch next row if we didn't already */ if (TupIsNull(agg_row_slot)) { @@ -989,9 +1122,53 @@ eval_windowaggregates(WindowAggState *winstate) agg_row_slot, false); if (ret < 0) break; + if (ret == 0) goto next_tuple; + if (rpr_is_defined(winstate)) + { +#ifdef RPR_DEBUG + printf("reduced_frame_map: %d aggregatedupto: " INT64_FORMAT " aggregatedbase: " INT64_FORMAT "\n", + get_reduced_frame_map(winstate, + winstate->aggregatedupto), + winstate->aggregatedupto, + winstate->aggregatedbase); +#endif + + /* + * If the row status at currentpos is already decided and current + * row status is not decided yet, it means we passed the last + * reduced frame. Time to break the loop. + */ + if (get_reduced_frame_map(winstate, winstate->currentpos) + != RF_NOT_DETERMINED && + get_reduced_frame_map(winstate, winstate->aggregatedupto) + == RF_NOT_DETERMINED) + break; + + /* + * Otherwise we need to calculate the reduced frame. + */ + ret = row_is_in_reduced_frame(winstate->agg_winobj, + winstate->aggregatedupto); + if (ret == -1) /* unmatched row */ + break; + + /* + * Check if current row needs to be skipped due to no match. + */ + if (get_reduced_frame_map(winstate, + winstate->aggregatedupto) == RF_SKIPPED && + winstate->aggregatedupto == winstate->aggregatedbase) + { +#ifdef RPR_DEBUG + printf("skip current row for aggregation\n"); +#endif + break; + } + } + /* Set tuple context for evaluation of aggregate arguments */ winstate->tmpcontext->ecxt_outertuple = agg_row_slot; @@ -1020,6 +1197,7 @@ next_tuple: ExecClearTuple(agg_row_slot); } + /* The frame's end is not supposed to move backwards, ever */ Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto); @@ -1243,6 +1421,7 @@ begin_partition(WindowAggState *winstate) winstate->framehead_valid = false; winstate->frametail_valid = false; winstate->grouptail_valid = false; + create_reduced_frame_map(winstate); winstate->spooled_rows = 0; winstate->currentpos = 0; winstate->frameheadpos = 0; @@ -1464,6 +1643,15 @@ release_partition(WindowAggState *winstate) tuplestore_clear(winstate->buffer); winstate->partition_spooled = false; winstate->next_partition = true; + + /* Reset NFA state for new partition */ + winstate->nfaContext = NULL; + winstate->nfaContextTail = NULL; + winstate->nfaContextFree = NULL; + winstate->nfaStateFree = NULL; + winstate->nfaLastProcessedRow = -1; + winstate->nfaStatesActive = 0; + winstate->nfaContextsActive = 0; } /* @@ -2237,6 +2425,11 @@ ExecWindowAgg(PlanState *pstate) CHECK_FOR_INTERRUPTS(); +#ifdef RPR_DEBUG + printf("ExecWindowAgg called. pos: " INT64_FORMAT "\n", + winstate->currentpos); +#endif + if (winstate->status == WINDOWAGG_DONE) return NULL; @@ -2345,6 +2538,17 @@ ExecWindowAgg(PlanState *pstate) /* don't evaluate the window functions when we're in pass-through mode */ if (winstate->status == WINDOWAGG_RUN) { + /* + * If RPR is defined and skip mode is next row, we need to clear + * existing reduced frame info so that we newly calculate the info + * starting from current row. + */ + if (rpr_is_defined(winstate)) + { + if (winstate->rpSkipTo == ST_NEXT_ROW) + clear_reduced_frame_map(winstate); + } + /* * Evaluate true window functions */ @@ -2511,6 +2715,9 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) TupleDesc scanDesc; ListCell *l; + TargetEntry *te; + Expr *expr; + /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -2609,6 +2816,16 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) winstate->temp_slot_2 = ExecInitExtraTupleSlot(estate, scanDesc, &TTSOpsMinimalTuple); + winstate->prev_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + + winstate->next_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + + winstate->null_slot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + winstate->null_slot = ExecStoreAllNullTuple(winstate->null_slot); + /* * create frame head and tail slots only if needed (must create slots in * exactly the same cases that update_frameheadpos and update_frametailpos @@ -2795,6 +3012,67 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) winstate->inRangeAsc = node->inRangeAsc; winstate->inRangeNullsFirst = node->inRangeNullsFirst; + /* Set up SKIP TO type */ + winstate->rpSkipTo = node->rpSkipTo; + /* Set up row pattern recognition PATTERN clause (compiled NFA) */ + winstate->rpPattern = node->rpPattern; + + /* Calculate NFA state size for allocation */ + if (node->rpPattern != NULL) + { + winstate->nfaStateSize = offsetof(RPRNFAState, counts) + + sizeof(int32) * node->rpPattern->maxDepth; + } + + /* Set up row pattern recognition DEFINE clause */ + winstate->defineVariableList = NIL; + winstate->defineClauseList = NIL; + if (node->defineClause != NIL) + { + /* + * Tweak arg var of PREV/NEXT so that it refers to scan/inner slot. + */ + foreach(l, node->defineClause) + { + char *name; + ExprState *exps; + + te = lfirst(l); + name = te->resname; + expr = te->expr; + +#ifdef RPR_DEBUG + printf("defineVariable name: %s\n", name); +#endif + winstate->defineVariableList = + lappend(winstate->defineVariableList, + makeString(pstrdup(name))); + attno_map((Node *) expr); + exps = ExecInitExpr(expr, (PlanState *) winstate); + winstate->defineClauseList = + lappend(winstate->defineClauseList, exps); + } + } + + /* Initialize NFA free lists for row pattern matching */ + winstate->nfaContext = NULL; + winstate->nfaContextTail = NULL; + winstate->nfaContextFree = NULL; + winstate->nfaStateFree = NULL; + winstate->nfaLastProcessedRow = -1; + winstate->nfaStatesActive = 0; + winstate->nfaContextsActive = 0; + + /* + * Allocate varMatched array for NFA evaluation. With the new varNames + * ordering (DEFINE order first), varId == defineIdx for all defined + * variables, so no mapping is needed. + */ + if (list_length(winstate->defineVariableList) > 0) + winstate->nfaVarMatched = palloc0(sizeof(bool) * + list_length(winstate->defineVariableList)); + else + winstate->nfaVarMatched = NULL; winstate->all_first = true; winstate->partition_spooled = false; winstate->more_partitions = false; @@ -2803,6 +3081,111 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags) return winstate; } +/* + * Rewrite varno of Var nodes that are the argument of PREV/NET so that they + * see scan tuple (PREV) or inner tuple (NEXT). Also we check the arguments + * of PREV/NEXT include at least 1 column reference. This is required by the + * SQL standard. + */ +static void +attno_map(Node *node) +{ + (void) expression_tree_walker(node, attno_map_walker, NULL); +} + +static bool +attno_map_walker(Node *node, void *context) +{ + FuncExpr *func; + int nargs; + bool is_prev; + + if (node == NULL) + return false; + + if (IsA(node, FuncExpr)) + { + func = (FuncExpr *) node; + + if (func->funcid == F_PREV || func->funcid == F_NEXT) + { + /* + * The SQL standard allows to have two more arguments form of + * PREV/NEXT. But currently we allow only 1 argument form. + */ + nargs = list_length(func->args); + if (list_length(func->args) != 1) + elog(ERROR, "PREV/NEXT must have 1 argument but function %d has %d args", + func->funcid, nargs); + + /* + * Check expr of PREV/NEXT aruguments and replace varno. + */ + is_prev = (func->funcid == F_PREV) ? true : false; + check_rpr_navigation(node, is_prev); + } + } + return expression_tree_walker(node, attno_map_walker, NULL); +} + +/* + * Rewrite varno of Var of RPR navigation operations (PREV/NEXT). + * If is_prev is true, we take care PREV, otherwise NEXT. + */ +static void +check_rpr_navigation(Node *node, bool is_prev) +{ + NavigationInfo context; + + context.is_prev = is_prev; + context.num_vars = 0; + (void) expression_tree_walker(node, rpr_navigation_walker, &context); + if (context.num_vars < 1) + ereport(ERROR, + errmsg("row pattern navigation operation's argument must include at least one column reference")); +} + +static bool +rpr_navigation_walker(Node *node, void *context) +{ + NavigationInfo *nav = (NavigationInfo *) context; + + if (node == NULL) + return false; + + switch (nodeTag(node)) + { + case T_Var: + { + Var *var = (Var *) node; + + nav->num_vars++; + + if (nav->is_prev) + { + /* + * Rewrite varno from OUTER_VAR to regular var no so that + * the var references scan tuple. + */ + var->varno = var->varnosyn; + } + else + var->varno = INNER_VAR; + } + break; + case T_Const: + case T_FuncExpr: + case T_OpExpr: + break; + + default: + ereport(ERROR, + errmsg("row pattern navigation operation's argument includes unsupported expression")); + } + return expression_tree_walker(node, rpr_navigation_walker, context); +} + + /* ----------------- * ExecEndWindowAgg * ----------------- @@ -2860,6 +3243,8 @@ ExecReScanWindowAgg(WindowAggState *node) ExecClearTuple(node->agg_row_slot); ExecClearTuple(node->temp_slot_1); ExecClearTuple(node->temp_slot_2); + ExecClearTuple(node->prev_slot); + ExecClearTuple(node->next_slot); if (node->framehead_slot) ExecClearTuple(node->framehead_slot); if (node->frametail_slot) @@ -3220,7 +3605,8 @@ window_gettupleslot(WindowObject winobj, int64 pos, TupleTableSlot *slot) return false; if (pos < winobj->markpos) - elog(ERROR, "cannot fetch row before WindowObject's mark position"); + elog(ERROR, "cannot fetch row: " INT64_FORMAT " before WindowObject's mark position: " INT64_FORMAT, + pos, winobj->markpos); oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory); @@ -3337,6 +3723,7 @@ ignorenulls_getfuncarginframe(WindowObject winobj, int argno, int notnull_offset; int notnull_relpos; int forward; + int num_reduced_frame; Assert(WindowObjectIsValid(winobj)); winstate = winobj->winstate; @@ -3365,6 +3752,13 @@ ignorenulls_getfuncarginframe(WindowObject winobj, int argno, /* rejecting relpos > 0 is easy and simplifies code below */ if (relpos > 0) goto out_of_frame; + + /* + * RPR cares about frame head pos. Need to call + * update_frameheadpos + */ + update_frameheadpos(winstate); + update_frametailpos(winstate); abs_pos = winstate->frametailpos - 1; mark_pos = 0; /* keep compiler quiet */ @@ -3380,6 +3774,35 @@ ignorenulls_getfuncarginframe(WindowObject winobj, int argno, * Get the next nonnull value in the frame, moving forward or backward * until we find a value or reach the frame's end. */ + + /* + * Check whether current row is in reduced frame. + */ + num_reduced_frame = row_is_in_reduced_frame(winobj, winstate->frameheadpos); + if (num_reduced_frame < 0) /* unmatched or skipped row */ + goto out_of_frame; + else if (num_reduced_frame > 0) /* the first row of the reduced frame */ + { + /* + * Early check if row could be out of reduced frame. When RPR is + * enabled, EXCUDE clause cannot be specified and the frame is always + * contiguous. So we can do the check followings safely. Note, + * however, it is possible that a row is out of reduced frame if + * there's a NULL in the middle. So we need to check it in the + * following do lopp. + */ + if (seektype == WINDOW_SEEK_HEAD && relpos >= num_reduced_frame) + goto out_of_frame; + if (seektype == WINDOW_SEEK_TAIL) + { + if (notnull_relpos >= num_reduced_frame) + goto out_of_frame; + + /* not out of reduced frame. Set abspos as a starting point */ + abs_pos = winstate->frameheadpos + num_reduced_frame - 1; + } + } + do { int inframe; @@ -3441,6 +3864,16 @@ ignorenulls_getfuncarginframe(WindowObject winobj, int argno, } advance: abs_pos += forward; + if (rpr_is_defined(winstate)) + { + /* + * Check whether we are still in the reduced frame. (also check + * if we succeeded in getting the target row). + */ + num_reduced_frame--; + if (num_reduced_frame <= 0 && notnull_offset <= notnull_relpos) + goto out_of_frame; + } } while (notnull_offset <= notnull_relpos); if (set_mark) @@ -3922,8 +4355,6 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, WindowAggState *winstate; ExprContext *econtext; TupleTableSlot *slot; - int64 abs_pos; - int64 mark_pos; Assert(WindowObjectIsValid(winobj)); winstate = winobj->winstate; @@ -3934,6 +4365,48 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, return ignorenulls_getfuncarginframe(winobj, argno, relpos, seektype, set_mark, isnull, isout); + if (WinGetSlotInFrame(winobj, slot, + relpos, seektype, set_mark, + isnull, isout) == 0) + { + econtext->ecxt_outertuple = slot; + return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), + econtext, isnull); + } + + if (isout) + *isout = true; + *isnull = true; + return (Datum) 0; +} + +/* + * WinGetSlotInFrame + * slot: TupleTableSlot to store the result + * relpos: signed rowcount offset from the seek position + * seektype: WINDOW_SEEK_HEAD or WINDOW_SEEK_TAIL + * set_mark: If the row is found/in frame and set_mark is true, the mark is + * moved to the row as a side-effect. + * isnull: output argument, receives isnull status of result + * isout: output argument, set to indicate whether target row position + * is out of frame (can pass NULL if caller doesn't care about this) + * + * Returns 0 if we successfullt got the slot. false if out of frame. + * (also isout is set) + */ +static int +WinGetSlotInFrame(WindowObject winobj, TupleTableSlot *slot, + int relpos, int seektype, bool set_mark, + bool *isnull, bool *isout) +{ + WindowAggState *winstate; + int64 abs_pos; + int64 mark_pos; + int num_reduced_frame; + + Assert(WindowObjectIsValid(winobj)); + winstate = winobj->winstate; + switch (seektype) { case WINDOW_SEEK_CURRENT: @@ -4000,11 +4473,25 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, winstate->frameOptions); break; } + num_reduced_frame = row_is_in_reduced_frame(winobj, + winstate->frameheadpos); + if (num_reduced_frame < 0) + goto out_of_frame; + else if (num_reduced_frame > 0) + if (relpos >= num_reduced_frame) + goto out_of_frame; break; case WINDOW_SEEK_TAIL: /* rejecting relpos > 0 is easy and simplifies code below */ if (relpos > 0) goto out_of_frame; + + /* + * RPR cares about frame head pos. Need to call + * update_frameheadpos + */ + update_frameheadpos(winstate); + update_frametailpos(winstate); abs_pos = winstate->frametailpos - 1 + relpos; @@ -4071,6 +4558,14 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, mark_pos = 0; /* keep compiler quiet */ break; } + + num_reduced_frame = row_is_in_reduced_frame(winobj, + winstate->frameheadpos + relpos); + if (num_reduced_frame < 0) + goto out_of_frame; + else if (num_reduced_frame > 0) + abs_pos = winstate->frameheadpos + relpos + + num_reduced_frame - 1; break; default: elog(ERROR, "unrecognized window seek type: %d", seektype); @@ -4089,15 +4584,13 @@ WinGetFuncArgInFrame(WindowObject winobj, int argno, *isout = false; if (set_mark) WinSetMarkPosition(winobj, mark_pos); - econtext->ecxt_outertuple = slot; - return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), - econtext, isnull); + return 0; out_of_frame: if (isout) *isout = true; *isnull = true; - return (Datum) 0; + return -1; } /* @@ -4128,3 +4621,1835 @@ WinGetFuncArgCurrent(WindowObject winobj, int argno, bool *isnull) return ExecEvalExpr((ExprState *) list_nth(winobj->argstates, argno), econtext, isnull); } + +/* + * rpr_is_defined + * return true if Row pattern recognition is defined. + */ +static bool +rpr_is_defined(WindowAggState *winstate) +{ + return winstate->rpPattern != NULL; +} + +/* + * ----------------- + * row_is_in_reduced_frame + * Determine whether a row is in the current row's reduced window frame + * according to row pattern matching + * + * The row must has been already determined that it is in a full window frame + * and fetched it into slot. + * + * Returns: + * = 0, RPR is not defined. + * >0, if the row is the first in the reduced frame. Return the number of rows + * in the reduced frame. + * -1, if the row is unmatched row + * -2, if the row is in the reduced frame but needed to be skipped because of + * AFTER MATCH SKIP PAST LAST ROW + * ----------------- + */ +static int +row_is_in_reduced_frame(WindowObject winobj, int64 pos) +{ + WindowAggState *winstate = winobj->winstate; + int state; + int rtn; + + if (!rpr_is_defined(winstate)) + { + /* + * RPR is not defined. Assume that we are always in the the reduced + * window frame. + */ + rtn = 0; +#ifdef RPR_DEBUG + printf("row_is_in_reduced_frame returns %d: pos: " INT64_FORMAT "\n", + rtn, pos); +#endif + return rtn; + } + + state = get_reduced_frame_map(winstate, pos); + + if (state == RF_NOT_DETERMINED) + { + update_frameheadpos(winstate); + update_reduced_frame(winobj, pos); + } + + state = get_reduced_frame_map(winstate, pos); + + switch (state) + { + int64 i; + int num_reduced_rows; + + case RF_FRAME_HEAD: + num_reduced_rows = 1; + for (i = pos + 1; + get_reduced_frame_map(winstate, i) == RF_SKIPPED; i++) + num_reduced_rows++; + rtn = num_reduced_rows; + break; + + case RF_SKIPPED: + rtn = -2; + break; + + case RF_UNMATCHED: + rtn = -1; + break; + + default: + elog(ERROR, "Unrecognized state: %d at: " INT64_FORMAT, + state, pos); + break; + } + +#ifdef RPR_DEBUG + printf("row_is_in_reduced_frame returns %d: pos: " INT64_FORMAT "\n", + rtn, pos); +#endif + return rtn; +} + +#define REDUCED_FRAME_MAP_INIT_SIZE 1024L + +/* + * create_reduced_frame_map + * Create reduced frame map + */ +static void +create_reduced_frame_map(WindowAggState *winstate) +{ + winstate->reduced_frame_map = + MemoryContextAlloc(winstate->partcontext, + REDUCED_FRAME_MAP_INIT_SIZE); + winstate->alloc_sz = REDUCED_FRAME_MAP_INIT_SIZE; + clear_reduced_frame_map(winstate); +} + +/* + * clear_reduced_frame_map + * Clear reduced frame map + */ +static void +clear_reduced_frame_map(WindowAggState *winstate) +{ + Assert(winstate->reduced_frame_map != NULL); + MemSet(winstate->reduced_frame_map, RF_NOT_DETERMINED, + winstate->alloc_sz); +} + +/* + * get_reduced_frame_map + * Get reduced frame map specified by pos + */ +static int +get_reduced_frame_map(WindowAggState *winstate, int64 pos) +{ + Assert(winstate->reduced_frame_map != NULL); + Assert(pos >= 0); + + /* + * If pos is not in the reduced frame map, it means that any info + * regarding the pos has not been registered yet. So we return + * RF_NOT_DETERMINED. + */ + if (pos >= winstate->alloc_sz) + return RF_NOT_DETERMINED; + + return winstate->reduced_frame_map[pos]; +} + +/* + * register_reduced_frame_map + * Add/replace reduced frame map member at pos. + * If there's no enough space, expand the map. + */ +static void +register_reduced_frame_map(WindowAggState *winstate, int64 pos, int val) +{ + int64 realloc_sz; + + Assert(winstate->reduced_frame_map != NULL); + + if (pos < 0) + elog(ERROR, "wrong pos: " INT64_FORMAT, pos); + + while (pos > winstate->alloc_sz - 1) + { + realloc_sz = winstate->alloc_sz * 2; + + winstate->reduced_frame_map = + repalloc(winstate->reduced_frame_map, realloc_sz); + + MemSet(winstate->reduced_frame_map + winstate->alloc_sz, + RF_NOT_DETERMINED, realloc_sz - winstate->alloc_sz); + + winstate->alloc_sz = realloc_sz; + } + + winstate->reduced_frame_map[pos] = val; +} + +/* + * update_reduced_frame + * Update reduced frame info using multi-context NFA pattern matching. + * + * Maintains multiple NFA contexts simultaneously, one for each potential + * match start position. This allows sharing row evaluations across contexts, + * avoiding redundant DEFINE clause evaluations when rewinding for SKIP TO + * NEXT ROW mode. + * + * Key optimizations: + * - Row evaluations (expensive DEFINE clauses) happen only once per row + * - All active contexts share the same evaluation results + * - Contexts persist across calls, enabling O(n) DEFINE evaluations + */ +static void +update_reduced_frame(WindowObject winobj, int64 pos) +{ + WindowAggState *winstate = winobj->winstate; + RPRNFAContext *targetCtx; + int64 currentPos; + int64 startPos; + int frameOptions = winstate->frameOptions; + bool hasLimitedFrame; + int64 frameOffset = 0; + int64 matchLen; + + /* + * Check if we have a limited frame (ROWS ... N FOLLOWING). Each context + * needs its own frame end based on matchStartRow + offset. + */ + hasLimitedFrame = (frameOptions & FRAMEOPTION_ROWS) && + !(frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING); + if (hasLimitedFrame && winstate->endOffsetValue != 0) + frameOffset = DatumGetInt64(winstate->endOffsetValue); + + /* + * Case 1: pos is before any existing context's start position. This means + * the position was already processed and determined unmatched. Head is + * the oldest context (lowest matchStartRow) since contexts are added at + * tail with increasing positions. + */ + if (winstate->nfaContext != NULL && + pos < winstate->nfaContext->matchStartRow) + { + register_reduced_frame_map(winstate, pos, RF_UNMATCHED); + return; + } + + /* + * Case 2: Find existing context for this pos, or create new one. + */ + targetCtx = nfa_get_head_context(winstate, pos); + if (targetCtx == NULL) + { + /* + * No context exists. If pos is already processed, it means this row + * was already determined to be unmatched or skipped - no need to + * reprocess. + */ + if (pos <= winstate->nfaLastProcessedRow) + { + register_reduced_frame_map(winstate, pos, RF_UNMATCHED); + return; + } + /* Not yet processed - create new context and start fresh */ + targetCtx = nfa_start_context(winstate, pos); + } + else if (targetCtx->states == NULL) + { + /* Context already completed - skip to result registration */ + goto register_result; + } + + /* + * Determine where to start processing. Usually nfaLastProcessedRow+1 >= + * pos since contexts are created at currentPos+1 during processing. + * However, pos can exceed this when rows are skipped (e.g., unmatched + * rows don't update nfaLastProcessedRow). + */ + startPos = Max(pos, winstate->nfaLastProcessedRow + 1); + + /* + * Process rows until target context completes or we hit boundaries. Each + * row evaluation is shared across all active contexts. + */ + for (currentPos = startPos; targetCtx->states != NULL; currentPos++) + { + bool rowExists; + + /* + * Evaluate variables for this row - done only once, shared by all + * contexts + */ + rowExists = nfa_evaluate_row(winobj, currentPos, winstate->nfaVarMatched); + + /* No more rows in partition? Finalize all contexts */ + if (!rowExists) + { + nfa_finalize_all_contexts(winstate, currentPos - 1); + /* Clean up dead contexts from finalization */ + nfa_cleanup_dead_contexts(winstate, targetCtx); + /* Absorb contexts at partition boundary */ + if (winstate->rpPattern->isAbsorbable) + { + nfa_absorb_contexts(winstate); + } + break; + } + + /* Update last processed row */ + winstate->nfaLastProcessedRow = currentPos; + + /*-------------------------- + * Process all contexts for this row: + * 1. Match all (convergence) + * 2. Absorb redundant + * 3. Advance all (divergence) + */ + nfa_process_row(winstate, currentPos, hasLimitedFrame, frameOffset); + + /* + * Create a new context for the next potential start position. This + * enables overlapping match detection for SKIP TO NEXT ROW. + */ + nfa_start_context(winstate, currentPos + 1); + + /* + * Clean up dead contexts (failed with no active states and no match). + * This removes contexts that failed during processing and counts them + * appropriately as pruned or mismatched. + */ + nfa_cleanup_dead_contexts(winstate, targetCtx); + } + +register_result: + Assert(pos == targetCtx->matchStartRow); + + /* + * Register reduced frame map based on match result. + */ + if (targetCtx->matchEndRow < targetCtx->matchStartRow) + { + matchLen = targetCtx->lastProcessedRow - targetCtx->matchStartRow + 1; + + register_reduced_frame_map(winstate, targetCtx->matchStartRow, RF_UNMATCHED); + nfa_record_context_failure(winstate, matchLen); + nfa_context_free(winstate, targetCtx); + return; + } + + /* Match succeeded - register frame map and record statistics */ + matchLen = targetCtx->matchEndRow - targetCtx->matchStartRow + 1; + + register_reduced_frame_map(winstate, targetCtx->matchStartRow, RF_FRAME_HEAD); + for (int64 i = targetCtx->matchStartRow + 1; i <= targetCtx->matchEndRow; i++) + { + register_reduced_frame_map(winstate, i, RF_SKIPPED); + } + nfa_record_context_success(winstate, matchLen); + + /* Remove the matched context */ + nfa_context_free(winstate, targetCtx); +} + +/* + * NFA-based pattern matching implementation + * + * These functions implement direct NFA execution using the compiled + * RPRPattern structure, avoiding regex compilation overhead. + * + * Execution Flow: match -> absorb -> advance + * ----------------------------------------- + * The NFA execution follows a three-phase cycle for each row: + * + * 1. MATCH (convergence): Evaluate all waiting states against current row. + * States on VAR elements are checked against their defining conditions. + * Failed matches are removed, successful ones may transition forward. + * This is a "convergence" phase - the number of states tends to decrease. + * + * 2. ABSORB: After matching, check if any context can absorb another. + * Context absorption is an optimization that merges equivalent contexts. + * A context can only be absorbed if ALL its states are absorbable. + * + * 3. ADVANCE (divergence): Expand states through epsilon transitions. + * States advance through ALT (alternation), END (group end), and + * optional elements until reaching VAR or FIN elements where they wait. + * This is a "divergence" phase - ALT creates multiple branch states. + * + * Key Design Decisions: + * --------------------- + * - VAR->END transition in match phase: When a simple VAR (max=1) matches + * and the next element is END, we transition immediately in the match + * phase rather than waiting for advance. This is necessary for correct + * absorption: states must be at END to be marked absorbable before the + * absorption check occurs. + * + * - Optional VAR skip paths: When advance lands on a VAR with min=0, + * we create both a waiting state AND a skip state (like ALT branches). + * This ensures patterns like "A B? C" work correctly - we need a state + * waiting for B AND a state that has already skipped to C. + * + * - END->END count increment: When transitioning from one END to another + * END within advance, we must increment the outer END's count. This + * handles nested groups like "((A|B)+)+" correctly - exiting the inner + * group counts as one iteration of the outer group. + * + * - initialAdvance flag: The first advance after context creation must + * skip FIN recording. Reaching FIN without evaluating any VAR would + * create a zero-length match, which is invalid. + * + * Context Absorption Runtime: + * --------------------------- + * Absorption uses flags computed at planning time (in rpr.c) and two + * context-level flags maintained at runtime: + * + * State-level: + * state.isAbsorbable: true if state is in the absorbable region. + * - Set at creation: elem->flags & RPR_ELEM_ABSORBABLE_BRANCH + * - At transition: prevAbsorbable && (newElem->flags & ABSORBABLE_BRANCH) + * - Monotonic: once false, stays false forever + * + * Context-level: + * ctx.hasAbsorbableState: can this context absorb others? + * - True if at least one state has isAbsorbable=true + * - Monotonic: true->false only (optimization: skip recalc when false) + * + * ctx.allStatesAbsorbable: can this context be absorbed? + * - True if ALL states have isAbsorbable=true + * - Dynamic: can change false->true (when non-absorbable states die) + * + * Absorption Algorithm: + * For each pair (older Ctx1, newer Ctx2): + * 1. Pre-check: Ctx1.hasAbsorbableState && Ctx2.allStatesAbsorbable + * -> If false, skip (fast filter) + * 2. Coverage check: For each Ctx2 state with isAbsorbable=true, + * find Ctx1 state with same elemIdx and count >= Ctx2.count + * 3. If all Ctx2 absorbable states are covered, absorb Ctx2 + * + * Example: Pattern A+ B + * Row 1: Ctx1 at A (count=1) + * Row 2: Ctx1 at A (count=2), Ctx2 at A (count=1) + * -> Both at same elemIdx (A), Ctx1.count >= Ctx2.count + * -> Ctx2 absorbed + * + * The asymmetric design (Ctx1 needs hasAbsorbable, Ctx2 needs allAbsorbable) + * allows absorption even when Ctx1 has extra non-absorbable states. + */ + +/* + * nfa_process_row + * + * Process all contexts for one row: + * 1. Match all contexts (convergence) - evaluate VARs, prune dead states + * 2. Absorb redundant contexts - ideal timing after convergence + * 3. Advance all contexts (divergence) - create new states for next row + */ +static void +nfa_process_row(WindowAggState *winstate, int64 currentPos, + bool hasLimitedFrame, int64 frameOffset) +{ + RPRNFAContext *ctx; + bool *varMatched = winstate->nfaVarMatched; + + /* + * Phase 1: Match all contexts (convergence) Evaluate VAR elements, update + * counts, remove dead states. + */ + for (ctx = winstate->nfaContext; ctx != NULL; ctx = ctx->next) + { + if (ctx->states == NULL) + continue; + + /* Check frame boundary - finalize if exceeded */ + if (hasLimitedFrame) + { + int64 ctxFrameEnd = ctx->matchStartRow + frameOffset + 1; + + if (currentPos >= ctxFrameEnd) + { + /* Frame boundary exceeded: force mismatch */ + nfa_match(winstate, ctx, NULL); + continue; + } + } + + nfa_match(winstate, ctx, varMatched); + ctx->lastProcessedRow = currentPos; + } + + /* + * Phase 2: Absorb redundant contexts After match phase, states have + * converged - ideal for absorption. First update absorption flags that + * may have changed due to state removal. + */ + if (winstate->rpPattern->isAbsorbable) + { + for (ctx = winstate->nfaContext; ctx != NULL; ctx = ctx->next) + nfa_update_absorption_flags(ctx); + + nfa_absorb_contexts(winstate); + } + + /* + * Phase 3: Advance all contexts (divergence) Create new states + * (loop/exit) from surviving matched states. + */ + for (ctx = winstate->nfaContext; ctx != NULL; ctx = ctx->next) + { + if (ctx->states == NULL) + continue; + + /* + * Phase 1 already handled frame boundary exceeded contexts by forcing + * mismatch (nfa_match with NULL), which removes all states (all + * states are at VAR positions after advance). So any surviving + * context here must be within its frame boundary. + */ + Assert(!hasLimitedFrame || + currentPos < ctx->matchStartRow + frameOffset + 1); + + nfa_advance(winstate, ctx, currentPos, false); + } +} + +/* + * nfa_state_alloc + * + * Allocate an NFA state, reusing from freeList if available. + * freeList is stored in WindowAggState for reuse across match attempts. + * Uses flexible array member for counts[]. + */ +static RPRNFAState * +nfa_state_alloc(WindowAggState *winstate) +{ + RPRNFAState *state; + + /* Try to reuse from free list first */ + if (winstate->nfaStateFree != NULL) + { + state = winstate->nfaStateFree; + winstate->nfaStateFree = state->next; + } + else + { + /* Allocate in partition context for proper lifetime */ + state = MemoryContextAlloc(winstate->partcontext, winstate->nfaStateSize); + } + + /* Initialize entire state to zero */ + memset(state, 0, winstate->nfaStateSize); + + /* Update statistics */ + winstate->nfaStatesActive++; + winstate->nfaStatesTotalCreated++; + if (winstate->nfaStatesActive > winstate->nfaStatesMax) + winstate->nfaStatesMax = winstate->nfaStatesActive; + + return state; +} + +/* + * nfa_state_free + * + * Return a state to the free list for later reuse. + */ +static void +nfa_state_free(WindowAggState *winstate, RPRNFAState *state) +{ + winstate->nfaStatesActive--; + state->next = winstate->nfaStateFree; + winstate->nfaStateFree = state; +} + +/* + * nfa_state_free_list + * + * Return all states in a list to the free list. + */ +static void +nfa_state_free_list(WindowAggState *winstate, RPRNFAState *list) +{ + RPRNFAState *next; + + for (; list != NULL; list = next) + { + next = list->next; + nfa_state_free(winstate, list); + } +} + +/* + * nfa_state_create + * + * Create a new state with given elemIdx, altPriority and counts. + * isAbsorbable is computed immediately: inherited AND new element's flag. + * Monotonic property: once false, stays false through all transitions. + * + * Caller is responsible for linking the returned state. + */ +static RPRNFAState * +nfa_state_create(WindowAggState *winstate, int16 elemIdx, int16 altPriority, + int32 *counts, bool sourceAbsorbable) +{ + RPRPattern *pattern = winstate->rpPattern; + int maxDepth = pattern->maxDepth; + RPRNFAState *state = nfa_state_alloc(winstate); + RPRPatternElement *elem = &pattern->elements[elemIdx]; + + state->elemIdx = elemIdx; + state->altPriority = altPriority; + if (counts != NULL && maxDepth > 0) + memcpy(state->counts, counts, sizeof(int32) * maxDepth); + + /* + * Compute isAbsorbable immediately at transition time. isAbsorbable = + * sourceAbsorbable && (elem->flags & ABSORBABLE_BRANCH) Monotonic: once + * false, stays false (can't re-enter absorbable region). + */ + state->isAbsorbable = sourceAbsorbable && RPRElemIsAbsorbableBranch(elem); + + return state; +} + +/* + * nfa_states_equal + * + * Check if two states are equivalent (same elemIdx and counts). + */ +static bool +nfa_states_equal(WindowAggState *winstate, RPRNFAState *s1, RPRNFAState *s2) +{ + RPRPattern *pattern = winstate->rpPattern; + RPRPatternElement *elem; + int compareDepth; + + if (s1->elemIdx != s2->elemIdx) + return false; + + /* Compare counts up to current element's depth */ + elem = &pattern->elements[s1->elemIdx]; + compareDepth = elem->depth + 1; /* depth 0 needs 1 count, etc. */ + + if (compareDepth > 0 && + memcmp(s1->counts, s2->counts, sizeof(int32) * compareDepth) != 0) + return false; + + return true; +} + +/* + * nfa_add_state_unique + * + * Add a state to ctx->states at the END, only if no duplicate exists. + * Returns true if state was added, false if duplicate found (state is freed). + * Earlier states have lower altPriority (lexical order), so existing wins. + */ +static bool +nfa_add_state_unique(WindowAggState *winstate, RPRNFAContext *ctx, RPRNFAState *state) +{ + RPRNFAState *s; + RPRNFAState *tail = NULL; + + /* Check for duplicate and find tail */ + for (s = ctx->states; s != NULL; s = s->next) + { + if (nfa_states_equal(winstate, s, state)) + { + /* + * Duplicate found - existing has better lexical order, discard + * new + */ + nfa_state_free(winstate, state); + winstate->nfaStatesMerged++; + return false; + } + tail = s; + } + + /* No duplicate, add at end */ + state->next = NULL; + if (tail == NULL) + ctx->states = state; + else + tail->next = state; + + return true; +} + +/* + * nfa_add_matched_state + * + * Record a matched state following SQL standard semantics. + * Lexical order (lower altPriority) wins first. Among same lexical order, + * longer match wins (greedy). + * + * FIXME: altPriority is a single value that only tracks the last ALT choice. + * For patterns with repeated or nested ALTs like (A|B)+, this cannot correctly + * implement SQL standard lexical order, which requires comparing the full path + * from left to right. For example: + * Pattern: (A | B)+ + * Path "A B A" vs "B A B" + * Current: compares last choice (A vs B) → altPriority 10 vs 20 + * Correct: should compare first choice (A < B) → "A B A" wins + * + * A classifier structure tracking the entire ALT path is required for correct + * implementation. Without it, patterns with repeated or nested ALTs will + * produce incorrect match selection. + */ +static void +nfa_add_matched_state(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, int64 matchEndRow) +{ + bool shouldUpdate = false; + + if (ctx->matchedState == NULL) + shouldUpdate = true; + else if (state->altPriority < ctx->matchedState->altPriority) + shouldUpdate = true; /* Better lexical order wins */ + else if (state->altPriority == ctx->matchedState->altPriority && + matchEndRow > ctx->matchEndRow) + shouldUpdate = true; /* Same lexical order, longer wins */ + + if (shouldUpdate) + { + /* Free old matchedState if exists */ + if (ctx->matchedState != NULL) + nfa_state_free(winstate, ctx->matchedState); + + /* Take ownership of the new state */ + ctx->matchedState = state; + state->next = NULL; + ctx->matchEndRow = matchEndRow; + + /*---------- + * SKIP PAST LAST ROW: eagerly prune contexts within match range. + * + * This function is called whenever a FIN state is reached, including + * during greedy matching when intermediate (shorter) matches are + * found. Each time we update matchEndRow (whether extending a greedy + * match or finding a new match), we can prune pending contexts that + * started within the current match range. + * + * SKIP PAST LAST ROW uses lexical order (matchStartRow). Therefore, + * any pending context that started at or before matchEndRow can never + * produce a valid output row - it would be skipped anyway per SQL + * standard. + * + * Example (greedy matching in progress): + * Pattern: START UP+ + * Rows: 1 2 3 4 5 + * Context A starts at row 1: + * - Matches START UP (rows 1-2) → matchEndRow=2 → prune Context B(row 2) + * - Matches START UP UP (rows 1-3) → matchEndRow=3 → prune Context C(row 3) + * - Continues greedy extension while pruning incrementally + *---------- + */ + if (winstate->rpSkipTo == ST_PAST_LAST_ROW) + { + RPRNFAContext *nextCtx; + int64 skippedLen; + + while (ctx->next != NULL && + ctx->next->matchStartRow <= matchEndRow) + { + nextCtx = ctx->next; + ctx->next = ctx->next->next; + + Assert(nextCtx->lastProcessedRow >= nextCtx->matchStartRow); + skippedLen = nextCtx->lastProcessedRow - nextCtx->matchStartRow + 1; + nfa_record_context_skipped(winstate, skippedLen); + + nfa_context_free(winstate, nextCtx); + } + if (ctx->next == NULL) + winstate->nfaContextTail = ctx; + } + } + else + { + /* This state didn't win, free it */ + nfa_state_free(winstate, state); + } +} + +/* + * nfa_context_alloc + * + * Allocate an NFA context, reusing from free list if available. + */ +static RPRNFAContext * +nfa_context_alloc(WindowAggState *winstate) +{ + RPRNFAContext *ctx; + + if (winstate->nfaContextFree != NULL) + { + ctx = winstate->nfaContextFree; + winstate->nfaContextFree = ctx->next; + } + else + { + /* Allocate in partition context for proper lifetime */ + ctx = MemoryContextAlloc(winstate->partcontext, sizeof(RPRNFAContext)); + } + + ctx->next = NULL; + ctx->prev = NULL; + ctx->states = NULL; + ctx->matchStartRow = -1; + ctx->matchEndRow = -1; + ctx->lastProcessedRow = -1; + ctx->matchedState = NULL; + /* Initialize two-flag absorption design based on pattern */ + ctx->hasAbsorbableState = (winstate->rpPattern != NULL && + winstate->rpPattern->isAbsorbable); + ctx->allStatesAbsorbable = (winstate->rpPattern != NULL && + winstate->rpPattern->isAbsorbable); + + /* Update statistics */ + winstate->nfaContextsActive++; + winstate->nfaContextsTotalCreated++; + if (winstate->nfaContextsActive > winstate->nfaContextsMax) + winstate->nfaContextsMax = winstate->nfaContextsActive; + + return ctx; +} + +/* + * nfa_unlink_context + * + * Remove a context from the doubly-linked active context list. + * Updates head (nfaContext) and tail (nfaContextTail) as needed. + */ +static void +nfa_unlink_context(WindowAggState *winstate, RPRNFAContext *ctx) +{ + if (ctx->prev != NULL) + ctx->prev->next = ctx->next; + else + winstate->nfaContext = ctx->next; /* was head */ + + if (ctx->next != NULL) + ctx->next->prev = ctx->prev; + else + winstate->nfaContextTail = ctx->prev; /* was tail */ + + ctx->next = NULL; + ctx->prev = NULL; +} + +/* + * nfa_context_free + * + * Unlink context from active list and return it to free list. + * Also frees any states in the context. + */ +static void +nfa_context_free(WindowAggState *winstate, RPRNFAContext *ctx) +{ + /* Unlink from active list first */ + nfa_unlink_context(winstate, ctx); + + /* Update statistics */ + winstate->nfaContextsActive--; + + if (ctx->states != NULL) + nfa_state_free_list(winstate, ctx->states); + if (ctx->matchedState != NULL) + nfa_state_free(winstate, ctx->matchedState); + + ctx->states = NULL; + ctx->matchedState = NULL; + ctx->next = winstate->nfaContextFree; + winstate->nfaContextFree = ctx; +} + +/* + * nfa_start_context + * + * Start a new match context at given position. + * Initializes context, state absorption flags, and performs initial advance + * to expand epsilon transitions (ALT branches, optional elements). + * Adds context to the tail of winstate->nfaContext list. + */ +static RPRNFAContext * +nfa_start_context(WindowAggState *winstate, int64 startPos) +{ + RPRNFAContext *ctx; + RPRPattern *pattern = winstate->rpPattern; + + ctx = nfa_context_alloc(winstate); + ctx->matchStartRow = startPos; + ctx->states = nfa_state_alloc(winstate); /* initial state at elem 0 */ + + /*-------------------------- + * Initialize two-flag absorption design: + * hasAbsorbableState: can this context absorb others? (>= 1 absorbable state) + * allStatesAbsorbable: can this context be absorbed? (ALL states absorbable) + * Both initialized from pattern->isAbsorbable at context start. + */ + ctx->hasAbsorbableState = (pattern != NULL && pattern->isAbsorbable); + ctx->allStatesAbsorbable = (pattern != NULL && pattern->isAbsorbable); + + if (ctx->states != NULL && pattern != NULL && pattern->numElements > 0) + { + RPRPatternElement *elem = &pattern->elements[0]; + + /* + * Initial state at element 0. Check if element 0 is in absorbable + * branch. + */ + if (RPRElemIsAbsorbableBranch(elem)) + { + /* Element 0 is in absorbable branch - flags stay true */ + ctx->states->isAbsorbable = true; + } + else + { + /* Element 0 is NOT in absorbable branch - turn flags OFF */ + ctx->hasAbsorbableState = false; + ctx->allStatesAbsorbable = false; + ctx->states->isAbsorbable = false; + } + } + + /* Add to tail of active context list (doubly-linked, oldest-first) */ + ctx->prev = winstate->nfaContextTail; + ctx->next = NULL; + if (winstate->nfaContextTail != NULL) + winstate->nfaContextTail->next = ctx; + else + winstate->nfaContext = ctx; /* first context becomes head */ + winstate->nfaContextTail = ctx; + + /* + * Initial advance (divergence): expand ALT branches and create exit + * states for VAR elements with min=0. This prepares the context for the + * first row's match phase. + * + * Pass initialAdvance=true to prevent recording zero-length matches when + * optional patterns can skip all VARs to reach FIN immediately. + */ + nfa_advance(winstate, ctx, startPos, true); + + return ctx; +} + +/* + * nfa_get_head_context + * + * Return the head context if its start position matches pos. + * Returns NULL if no context exists or head doesn't match pos. + */ +static RPRNFAContext * +nfa_get_head_context(WindowAggState *winstate, int64 pos) +{ + RPRNFAContext *ctx = winstate->nfaContext; + + /* + * Contexts are sorted by matchStartRow ascending. If the head context + * doesn't match pos, no context exists for this position. + */ + if (ctx == NULL || ctx->matchStartRow != pos) + return NULL; + + return ctx; +} + +/* + * nfa_update_length_stats + * + * Helper function to update min/max/total length statistics. + * Called when tracking match/mismatch/absorbed/skipped lengths. + */ +static void +nfa_update_length_stats(int64 count, NFALengthStats *stats, int64 newLen) +{ + if (count == 1) + { + stats->min = newLen; + stats->max = newLen; + } + else + { + if (newLen < stats->min) + stats->min = newLen; + if (newLen > stats->max) + stats->max = newLen; + } + stats->total += newLen; +} + +/* + * nfa_record_context_success + * + * Record a successful context in statistics. + */ +static void +nfa_record_context_success(WindowAggState *winstate, int64 matchLen) +{ + winstate->nfaMatchesSucceeded++; + nfa_update_length_stats(winstate->nfaMatchesSucceeded, + &winstate->nfaMatchLen, + matchLen); +} + +/* + * nfa_record_context_failure + * + * Record a failed context in statistics. + * If failedLen == 1, count as pruned (failed on first row). + * If failedLen > 1, count as mismatched and update length stats. + */ +static void +nfa_record_context_failure(WindowAggState *winstate, int64 failedLen) +{ + if (failedLen == 1) + { + winstate->nfaContextsPruned++; + } + else + { + winstate->nfaMatchesFailed++; + nfa_update_length_stats(winstate->nfaMatchesFailed, + &winstate->nfaFailLen, + failedLen); + } +} + +/* + * nfa_record_context_skipped + * + * Record a skipped context in statistics. + */ +static void +nfa_record_context_skipped(WindowAggState *winstate, int64 skippedLen) +{ + winstate->nfaContextsSkipped++; + nfa_update_length_stats(winstate->nfaContextsSkipped, + &winstate->nfaSkippedLen, + skippedLen); +} + +/* + * nfa_record_context_absorbed + * + * Record an absorbed context in statistics. + */ +static void +nfa_record_context_absorbed(WindowAggState *winstate, int64 absorbedLen) +{ + winstate->nfaContextsAbsorbed++; + nfa_update_length_stats(winstate->nfaContextsAbsorbed, + &winstate->nfaAbsorbedLen, + absorbedLen); +} + +/* + * nfa_evaluate_row + * + * Evaluate all DEFINE variables for current row. + * Returns true if the row exists, false if out of partition. + * If row exists, fills varMatched array. + * varMatched[i] = true if variable i matched at current row. + */ +static bool +nfa_evaluate_row(WindowObject winobj, int64 pos, bool *varMatched) +{ + WindowAggState *winstate = winobj->winstate; + ExprContext *econtext = winstate->ss.ps.ps_ExprContext; + int numDefineVars = list_length(winstate->defineVariableList); + ListCell *lc; + int varIdx = 0; + TupleTableSlot *slot; + + /* + * Set up slots for current, previous, and next rows. We don't call + * get_slots() here to avoid recursion through row_is_in_frame -> + * update_reduced_frame -> nfa_process_row. + */ + + /* Current row -> ecxt_outertuple */ + slot = winstate->temp_slot_1; + if (!window_gettupleslot(winobj, pos, slot)) + return false; /* No row exists */ + econtext->ecxt_outertuple = slot; + + /* Previous row -> ecxt_scantuple (for PREV) */ + if (pos > 0) + { + slot = winstate->prev_slot; + if (!window_gettupleslot(winobj, pos - 1, slot)) + econtext->ecxt_scantuple = winstate->null_slot; + else + econtext->ecxt_scantuple = slot; + } + else + econtext->ecxt_scantuple = winstate->null_slot; + + /* Next row -> ecxt_innertuple (for NEXT) */ + slot = winstate->next_slot; + if (!window_gettupleslot(winobj, pos + 1, slot)) + econtext->ecxt_innertuple = winstate->null_slot; + else + econtext->ecxt_innertuple = slot; + + foreach(lc, winstate->defineClauseList) + { + ExprState *exprState = (ExprState *) lfirst(lc); + Datum result; + bool isnull; + + /* Evaluate DEFINE expression */ + result = ExecEvalExpr(exprState, econtext, &isnull); + + varMatched[varIdx] = (!isnull && DatumGetBool(result)); + + varIdx++; + if (varIdx >= numDefineVars) + break; + } + + return true; /* Row exists */ +} + +/* + * nfa_cleanup_dead_contexts + * + * Remove contexts that have failed (no active states and no match). + * These are contexts that failed during normal processing and should be + * counted as pruned (if length 1) or mismatched (if length > 1). + */ +static void +nfa_cleanup_dead_contexts(WindowAggState *winstate, RPRNFAContext *excludeCtx) +{ + RPRNFAContext *ctx; + RPRNFAContext *next; + + for (ctx = winstate->nfaContext; ctx != NULL; ctx = next) + { + next = ctx->next; + + /* Skip the target context and contexts still processing */ + if (ctx == excludeCtx || ctx->states != NULL) + continue; + + /* Skip successfully matched contexts (will be handled by SKIP logic) */ + if (ctx->matchEndRow >= ctx->matchStartRow) + continue; + + /* + * This is a failed context - count and remove it. Only count if it + * actually processed its start row. Contexts created for + * beyond-partition rows are silently removed. + */ + if (ctx->lastProcessedRow >= ctx->matchStartRow) + { + int64 failedLen = ctx->lastProcessedRow - ctx->matchStartRow + 1; + + nfa_record_context_failure(winstate, failedLen); + } + /* else: context was never processed (beyond-partition), just remove */ + + nfa_context_free(winstate, ctx); + } +} + +/* + * nfa_finalize_all_contexts + * + * Finalize all active contexts when partition ends. + * Match with NULL to force mismatch, then advance to process epsilon transitions. + */ +static void +nfa_finalize_all_contexts(WindowAggState *winstate, int64 lastPos) +{ + RPRNFAContext *ctx; + + for (ctx = winstate->nfaContext; ctx != NULL; ctx = ctx->next) + { + if (ctx->states != NULL) + { + nfa_match(winstate, ctx, NULL); + nfa_advance(winstate, ctx, lastPos, false); + } + } +} + +/* + * nfa_update_absorption_flags + * + * Update context's absorption flags after state changes. + * + * Two flags control absorption behavior: + * hasAbsorbableState: true if context has at least one absorbable state. + * This flag is monotonic (true -> false only). Once all absorbable states + * die, no new absorbable states can be created through transitions. + * allStatesAbsorbable: true if ALL states in context are absorbable. + * This flag is dynamic and can change false -> true when non-absorbable + * states die off. + * + * Optimization: Once hasAbsorbableState becomes false, both flags remain false + * permanently, so we skip recalculation. + */ +static void +nfa_update_absorption_flags(RPRNFAContext *ctx) +{ + RPRNFAState *state; + bool hasAbsorbable = false; + bool allAbsorbable = true; + + /* + * Optimization: Once hasAbsorbableState becomes false, it stays false. No + * need to recalculate - both flags remain false permanently. + */ + if (!ctx->hasAbsorbableState) + { + ctx->allStatesAbsorbable = false; + return; + } + + /* No states means no absorbable states */ + if (ctx->states == NULL) + { + ctx->hasAbsorbableState = false; + ctx->allStatesAbsorbable = false; + return; + } + + /* + * Iterate through all states to check absorption status. Uses + * state->isAbsorbable which tracks if state is in absorbable region. This + * is different from RPRElemIsAbsorbable(elem) which checks judgment + * point. + */ + for (state = ctx->states; state != NULL; state = state->next) + { + if (state->isAbsorbable) + hasAbsorbable = true; + else + allAbsorbable = false; + } + + ctx->hasAbsorbableState = hasAbsorbable; + ctx->allStatesAbsorbable = allAbsorbable; +} + +/* + * nfa_states_covered + * + * Check if all states in newer context are "covered" by older context. + * + * A newer state is covered when older context has an absorbable state at the + * same pattern element (elemIdx) with count >= newer's count at that depth. + * The covering state must be absorbable because only absorbable states can + * guarantee to produce superset matches. + * + * If all newer states are covered, newer context's eventual matches will be + * a subset of older context's matches, making newer redundant. + */ +static bool +nfa_states_covered(RPRPattern *pattern, RPRNFAContext *older, RPRNFAContext *newer) +{ + RPRNFAState *newerState; + + for (newerState = newer->states; newerState != NULL; newerState = newerState->next) + { + RPRNFAState *olderState; + RPRPatternElement *elem; + int depth; + bool found = false; + + /* All states are absorbable (caller checks allStatesAbsorbable) */ + elem = &pattern->elements[newerState->elemIdx]; + depth = elem->depth; + + for (olderState = older->states; olderState != NULL; olderState = olderState->next) + { + /* Covering state must also be absorbable */ + if (olderState->isAbsorbable && + olderState->elemIdx == newerState->elemIdx && + olderState->counts[depth] >= newerState->counts[depth]) + { + found = true; + break; + } + } + + if (!found) + return false; + } + + return true; +} + +/* + * nfa_try_absorb_context + * + * Try to absorb ctx (newer) into an older in-progress context. + * Returns true if ctx was absorbed and freed. + * + * Absorption requires three conditions: + * 1. ctx must have all states absorbable (allStatesAbsorbable). + * If ctx has any non-absorbable state, it may produce unique matches. + * 2. older must have at least one absorbable state (hasAbsorbableState). + * Without absorbable states, older cannot cover newer's states. + * 3. All ctx states must be covered by older's absorbable states. + * This ensures older will produce all matches that ctx would produce. + * + * Context list is ordered by creation time (oldest first via prev chain). + * Each row creates at most one context, so earlier contexts have smaller + * matchStartRow values. + */ +static bool +nfa_try_absorb_context(WindowAggState *winstate, RPRNFAContext *ctx) +{ + RPRPattern *pattern = winstate->rpPattern; + RPRNFAContext *older; + + /* Early exit: ctx must have all states absorbable */ + if (!ctx->allStatesAbsorbable) + return false; + + for (older = ctx->prev; older != NULL; older = older->prev) + { + /* + * By invariant: ctx->prev chain is in creation order (oldest first), + * and each row creates at most one context. So all contexts in this + * chain have matchStartRow < ctx->matchStartRow. + */ + + /* Older must also be in-progress */ + if (older->states == NULL) + continue; + + /* Older must have at least one absorbable state */ + if (!older->hasAbsorbableState) + continue; + + /* Check if all newer states are covered by older */ + if (nfa_states_covered(pattern, older, ctx)) + { + int64 absorbedLen = ctx->lastProcessedRow - ctx->matchStartRow + 1; + + nfa_context_free(winstate, ctx); + nfa_record_context_absorbed(winstate, absorbedLen); + return true; + } + } + + return false; +} + +/* + * nfa_absorb_contexts + * + * Absorb redundant contexts to reduce memory usage and computation. + * + * For patterns like A+, newer contexts starting later will produce subset + * matches of older contexts with higher counts. By absorbing these redundant + * contexts early, we avoid duplicate work. + * + * Iterates from tail (newest) toward head (oldest) via prev chain. + * Only in-progress contexts (states != NULL) are candidates for absorption; + * completed contexts represent valid match results. + */ +static void +nfa_absorb_contexts(WindowAggState *winstate) +{ + RPRNFAContext *ctx; + RPRNFAContext *nextCtx; + + for (ctx = winstate->nfaContextTail; ctx != NULL; ctx = nextCtx) + { + nextCtx = ctx->prev; + + /* + * Only absorb in-progress contexts; completed contexts are valid + * results + */ + if (ctx->states != NULL) + nfa_try_absorb_context(winstate, ctx); + } +} + +/* + * nfa_eval_var_match + * + * Evaluate if a VAR element matches the current row. + * Undefined variables (varId >= defineVariableList length) default to TRUE. + */ +static inline bool +nfa_eval_var_match(WindowAggState *winstate, RPRPatternElement *elem, + bool *varMatched) +{ + /* This function should only be called for VAR elements */ + Assert(RPRElemIsVar(elem)); + + if (varMatched == NULL) + return false; + if (elem->varId >= list_length(winstate->defineVariableList)) + return true; + return varMatched[elem->varId]; +} + +/* + * nfa_match + * + * Match phase (convergence): evaluate VAR elements against current row. + * Only updates counts and removes dead states. Minimal transitions. + * + * For VAR elements: + * - matched: count++, keep state (unless count > max) + * - not matched: remove state (exit alternatives already exist from + * previous advance when count >= min was satisfied) + * + * For simple VARs (min=max=1) followed by END: + * - Advance to END and update group count before absorb phase + * - This ensures absorption can compare states by group completion + * + * Non-VAR elements (ALT, END, FIN) are kept as-is for advance phase. + */ +static void +nfa_match(WindowAggState *winstate, RPRNFAContext *ctx, bool *varMatched) +{ + RPRPattern *pattern = winstate->rpPattern; + RPRPatternElement *elements = pattern->elements; + RPRNFAState **prevPtr = &ctx->states; + RPRNFAState *state; + RPRNFAState *nextState; + + /* + * Evaluate VAR elements against current row. For simple VARs with END + * next, advance to END and update group count inline so absorb phase can + * compare states properly. + */ + for (state = ctx->states; state != NULL; state = nextState) + { + RPRPatternElement *elem = &elements[state->elemIdx]; + + nextState = state->next; + + if (RPRElemIsVar(elem)) + { + bool matched; + int depth = elem->depth; + int32 count = state->counts[depth]; + + matched = nfa_eval_var_match(winstate, elem, varMatched); + + if (matched) + { + /* Increment count */ + if (count < RPR_COUNT_MAX) + count++; + + /* Max constraint should not be exceeded */ + Assert(elem->max == RPR_QUANTITY_INF || count <= elem->max); + + state->counts[depth] = count; + + /* + * For simple VAR (min=max=1) with END next, advance to END + * and update group count inline. This keeps state in place, + * preserving lexical order. + */ + if (elem->min == 1 && elem->max == 1 && + RPRElemIsEnd(&elements[elem->next])) + { + RPRPatternElement *endElem = &elements[elem->next]; + int endDepth = endElem->depth; + int32 endCount = state->counts[endDepth]; + + Assert(count == 1); + + /* Increment group count with overflow protection */ + if (endCount < RPR_COUNT_MAX) + endCount++; + + /* + * END's max can never be exceeded here because + * nfa_advance_end only loops when count < max, so + * endCount entering inline advance is at most max-1, and + * incrementing yields at most max. + */ + Assert(endElem->max == RPR_QUANTITY_INF || + endCount <= endElem->max); + + state->elemIdx = elem->next; + state->counts[endDepth] = endCount; + } + /* else: stay at VAR for advance phase */ + } + else + { + /* + * Not matched - remove state. Exit alternatives were already + * created by advance phase when count >= min was satisfied. + */ + *prevPtr = nextState; + nfa_state_free(winstate, state); + continue; + } + } + /* Non-VAR elements: keep as-is for advance phase */ + + prevPtr = &state->next; + } +} + +/* + * nfa_route_to_elem + * + * Route state to next element. If VAR, add to ctx->states and process + * skip path if optional. Otherwise, continue epsilon expansion via recursion. + */ +static void +nfa_route_to_elem(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *nextElem, + int64 currentPos, bool initialAdvance) +{ + if (RPRElemIsVar(nextElem)) + { + nfa_add_state_unique(winstate, ctx, state); + if (RPRElemCanSkip(nextElem)) + { + RPRNFAState *skipState; + + skipState = nfa_state_create(winstate, nextElem->next, + state->altPriority, state->counts, + state->isAbsorbable); + nfa_advance_state(winstate, ctx, skipState, currentPos, initialAdvance); + } + } + else + { + nfa_advance_state(winstate, ctx, state, currentPos, initialAdvance); + } +} + +/* + * nfa_advance_alt + * + * Handle ALT element: expand all branches in lexical order (DFS). + * Sets altPriority to element index to preserve lexical order for match selection. + */ +static void +nfa_advance_alt(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *elem, + int64 currentPos, bool initialAdvance) +{ + RPRPattern *pattern = winstate->rpPattern; + RPRPatternElement *elements = pattern->elements; + RPRElemIdx altIdx = elem->next; + bool first = true; + + while (altIdx >= 0 && altIdx < pattern->numElements) + { + RPRPatternElement *altElem = &elements[altIdx]; + RPRNFAState *newState; + + /* Stop if element is outside ALT scope (not a branch) */ + if (altElem->depth <= elem->depth) + break; + + if (first) + { + state->elemIdx = altIdx; + state->altPriority = altIdx; + newState = state; + first = false; + } + else + { + newState = nfa_state_create(winstate, altIdx, altIdx, + state->counts, state->isAbsorbable); + } + + /* Recursively process this branch before next */ + nfa_advance_state(winstate, ctx, newState, currentPos, initialAdvance); + altIdx = altElem->jump; + } + + /* ALT must have at least one branch */ + Assert(!first); +} + +/* + * nfa_advance_begin + * + * Handle BEGIN element: group entry logic. + * BEGIN is only visited at initial group entry (count is always 0). + * If min=0, creates a skip path past the group. + * Loop-back from END goes directly to first child, bypassing BEGIN. + */ +static void +nfa_advance_begin(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *elem, + int64 currentPos, bool initialAdvance) +{ + RPRPattern *pattern = winstate->rpPattern; + RPRPatternElement *elements = pattern->elements; + RPRNFAState *skipState = NULL; + + state->counts[elem->depth] = 0; + + /* Optional group: create skip path (but don't route yet) */ + if (elem->min == 0) + { + skipState = nfa_state_create(winstate, elem->jump, state->altPriority, + state->counts, state->isAbsorbable); + } + + /* Enter group: route to first child (lexically first) */ + state->elemIdx = elem->next; + nfa_route_to_elem(winstate, ctx, state, + &elements[state->elemIdx], currentPos, initialAdvance); + + /* Now route skip path (lexically second) */ + if (skipState != NULL) + { + nfa_route_to_elem(winstate, ctx, skipState, + &elements[elem->jump], currentPos, initialAdvance); + } +} + +/* + * nfa_advance_end + * + * Handle END element: group repetition logic. + * Decides whether to loop back or exit based on count vs min/max. + */ +static void +nfa_advance_end(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *elem, + int64 currentPos, bool initialAdvance) +{ + RPRPattern *pattern = winstate->rpPattern; + RPRPatternElement *elements = pattern->elements; + int depth = elem->depth; + int32 count = state->counts[depth]; + + if (count < elem->min) + { + /* Must loop back */ + RPRPatternElement *jumpElem; + + for (int d = depth + 1; d < pattern->maxDepth; d++) + state->counts[d] = 0; + state->elemIdx = elem->jump; + jumpElem = &elements[state->elemIdx]; + + nfa_route_to_elem(winstate, ctx, state, jumpElem, currentPos, initialAdvance); + } + else if ((elem->max != RPR_QUANTITY_INF && count >= elem->max) || + (count == 0 && elem->min == 0)) + { + /*---------- + * Must exit: either reached max iterations, or group matched empty. + * + * FIXME: The (count == 0 && min == 0) condition is insufficient for + * cycle prevention. Cycles can occur at any count value when loop back + * happens without consuming rows. For example: + * Pattern: (A*)* + * After matching 3 A's (count=3), loop back at a B row + * Inner A* matches 0 times (skip path) → same (elemIdx, count=3) + * Infinite cycle at count=3, not count=0 + * + * Currently, cycles are silently prevented by nfa_add_state_unique + * detecting duplicate states, but this is implicit and not guaranteed + * for all code paths. Explicit cycle detection is needed. + *---------- + */ + RPRPatternElement *nextElem; + + state->counts[depth] = 0; + state->elemIdx = elem->next; + nextElem = &elements[state->elemIdx]; + + /* END->END: increment outer END's count */ + if (RPRElemIsEnd(nextElem) && state->counts[nextElem->depth] < RPR_COUNT_MAX) + state->counts[nextElem->depth]++; + + nfa_route_to_elem(winstate, ctx, state, nextElem, currentPos, initialAdvance); + } + else + { + /* + * Between min and max (with at least one iteration) - can exit or + * loop + */ + RPRElemIdx exitAltPriority; + RPRNFAState *exitState; + RPRPatternElement *jumpElem; + RPRPatternElement *nextElem; + + /* Preserve altPriority for greedy extension */ + exitAltPriority = state->altPriority; + if (ctx->matchedState != NULL) + exitAltPriority = ctx->matchedState->altPriority; + + /* + * Create exit state first (need original counts before modifying + * state) + */ + exitState = nfa_state_create(winstate, elem->next, exitAltPriority, + state->counts, state->isAbsorbable); + exitState->counts[depth] = 0; + nextElem = &elements[exitState->elemIdx]; + + /* END->END: increment outer END's count */ + if (RPRElemIsEnd(nextElem) && exitState->counts[nextElem->depth] < RPR_COUNT_MAX) + exitState->counts[nextElem->depth]++; + + /* Route loop state first (earlier in pattern = lexical order) */ + for (int d = depth + 1; d < pattern->maxDepth; d++) + state->counts[d] = 0; + state->elemIdx = elem->jump; + jumpElem = &elements[state->elemIdx]; + + nfa_route_to_elem(winstate, ctx, state, jumpElem, currentPos, initialAdvance); + + /* Then route exit state */ + nfa_route_to_elem(winstate, ctx, exitState, nextElem, currentPos, initialAdvance); + } +} + +/* + * nfa_advance_var + * + * Handle VAR element: loop/exit transitions. + * After match phase, all VAR states have matched - decide next action. + */ +static void +nfa_advance_var(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, RPRPatternElement *elem, + int64 currentPos, bool initialAdvance) +{ + RPRPattern *pattern = winstate->rpPattern; + RPRPatternElement *elements = pattern->elements; + int depth = elem->depth; + int32 count = state->counts[depth]; + bool canLoop = (elem->max == RPR_QUANTITY_INF || count < elem->max); + bool canExit = (count >= elem->min); + + /* After a successful match, count >= 1, so at least one must be true */ + Assert(canLoop || canExit); + + if (canLoop && canExit) + { + /* Both: clone for loop, modify original for exit */ + RPRNFAState *loopState; + RPRPatternElement *nextElem; + + loopState = nfa_state_create(winstate, state->elemIdx, state->altPriority, + state->counts, state->isAbsorbable); + nfa_add_state_unique(winstate, ctx, loopState); + + /* Exit: advance to next element */ + state->counts[depth] = 0; + state->elemIdx = elem->next; + nextElem = &elements[state->elemIdx]; + + nfa_route_to_elem(winstate, ctx, state, nextElem, currentPos, initialAdvance); + } + else if (canLoop) + { + /* Loop only: keep state as-is */ + nfa_add_state_unique(winstate, ctx, state); + } + else if (canExit) + { + /* Exit only: advance to next element */ + RPRPatternElement *nextElem; + + state->counts[depth] = 0; + state->elemIdx = elem->next; + nextElem = &elements[state->elemIdx]; + + nfa_route_to_elem(winstate, ctx, state, nextElem, currentPos, initialAdvance); + } +} + +/* + * nfa_advance_state + * + * Recursively process a single state through epsilon transitions. + * Uses DFS traversal to maintain lexical order: lower altPriority paths + * are fully processed before higher altPriority paths, ensuring states + * are added to ctx->states in lexical order. + */ +static void +nfa_advance_state(WindowAggState *winstate, RPRNFAContext *ctx, + RPRNFAState *state, int64 currentPos, bool initialAdvance) +{ + RPRPattern *pattern = winstate->rpPattern; + RPRPatternElement *elem; + + Assert(state->elemIdx >= 0 && state->elemIdx < pattern->numElements); + elem = &pattern->elements[state->elemIdx]; + + switch (elem->varId) + { + case RPR_VARID_FIN: + /* FIN: record match (skip for initial advance) */ + if (!initialAdvance) + nfa_add_matched_state(winstate, ctx, state, currentPos); + else + nfa_state_free(winstate, state); + break; + + case RPR_VARID_ALT: + nfa_advance_alt(winstate, ctx, state, elem, currentPos, initialAdvance); + break; + + case RPR_VARID_BEGIN: + nfa_advance_begin(winstate, ctx, state, elem, currentPos, initialAdvance); + break; + + case RPR_VARID_END: + nfa_advance_end(winstate, ctx, state, elem, currentPos, initialAdvance); + break; + + default: + /* VAR element */ + nfa_advance_var(winstate, ctx, state, elem, currentPos, initialAdvance); + break; + } +} + +/* + * nfa_advance + * + * Advance phase (divergence): transition from all surviving states. + * Called after match phase with matched VAR states, or at context creation + * for initial epsilon expansion (initialAdvance=true skips FIN matches). + * + * Processes states in order, using recursive DFS to maintain lexical order. + */ +static void +nfa_advance(WindowAggState *winstate, RPRNFAContext *ctx, int64 currentPos, + bool initialAdvance) +{ + RPRNFAState *states = ctx->states; + RPRNFAState *state; + + ctx->states = NULL; /* Will rebuild */ + + /* Process each state in order */ + while (states != NULL) + { + state = states; + states = states->next; + state->next = NULL; + + nfa_advance_state(winstate, ctx, state, currentPos, initialAdvance); + } +} diff --git a/src/backend/utils/adt/windowfuncs.c b/src/backend/utils/adt/windowfuncs.c index 78b7f05aba2..efb60c99052 100644 --- a/src/backend/utils/adt/windowfuncs.c +++ b/src/backend/utils/adt/windowfuncs.c @@ -41,7 +41,6 @@ static bool rank_up(WindowObject winobj); static Datum leadlag_common(FunctionCallInfo fcinfo, bool forward, bool withoffset, bool withdefault); - /* * utility routine for *_rank functions. */ @@ -683,7 +682,7 @@ window_last_value(PG_FUNCTION_ARGS) WinCheckAndInitializeNullTreatment(winobj, true, fcinfo); result = WinGetFuncArgInFrame(winobj, 0, - 0, WINDOW_SEEK_TAIL, true, + 0, WINDOW_SEEK_TAIL, false, &isnull, NULL); if (isnull) PG_RETURN_NULL(); @@ -724,3 +723,25 @@ window_nth_value(PG_FUNCTION_ARGS) PG_RETURN_DATUM(result); } + +/* + * prev + * Dummy function to invoke RPR's navigation operator "PREV". + * This is *not* a window function. + */ +Datum +window_prev(PG_FUNCTION_ARGS) +{ + PG_RETURN_DATUM(PG_GETARG_DATUM(0)); +} + +/* + * next + * Dummy function to invoke RPR's navigation operation "NEXT". + * This is *not* a window function. + */ +Datum +window_next(PG_FUNCTION_ARGS) +{ + PG_RETURN_DATUM(PG_GETARG_DATUM(0)); +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 83f6501df38..ea35b9cb1de 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -10853,6 +10853,12 @@ { oid => '3114', descr => 'fetch the Nth row value', proname => 'nth_value', prokind => 'w', prorettype => 'anyelement', proargtypes => 'anyelement int4', prosrc => 'window_nth_value' }, +{ oid => '8126', descr => 'previous value', + proname => 'prev', provolatile => 's', prorettype => 'anyelement', + proargtypes => 'anyelement', prosrc => 'window_prev' }, +{ oid => '8127', descr => 'next value', + proname => 'next', provolatile => 's', prorettype => 'anyelement', + proargtypes => 'anyelement', prosrc => 'window_next' }, # functions for range types { oid => '3832', descr => 'I/O', diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 63c067d5aae..cd6f794f62b 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -41,6 +41,7 @@ #include "nodes/plannodes.h" #include "nodes/tidbitmap.h" #include "partitioning/partdefs.h" +#include "regex/regex.h" #include "storage/condition_variable.h" #include "utils/hsearch.h" #include "utils/queryenvironment.h" @@ -2513,6 +2514,76 @@ typedef enum WindowAggStatus * tuples during spool */ } WindowAggStatus; +#define RF_NOT_DETERMINED 0 +#define RF_FRAME_HEAD 1 +#define RF_SKIPPED 2 +#define RF_UNMATCHED 3 + +/* + * RPRNFAState - single NFA state for pattern matching + * + * counts[] tracks repetition counts at each nesting depth. + * altPriority tracks lexical order for alternation (lower = earlier alternative). + * + * isAbsorbable tracks if state is in absorbable region (ABSORBABLE_BRANCH). + * Monotonic property: once false, stays false (can't re-enter region). + * + * Absorption comparison uses elemIdx and counts[depth] directly because: + * - VAR elements consume a row, forcing states to wait for next row + * - END loop puts states at group start with iteration count preserved + * - At row end, comparable states are at the same position (elemIdx) + */ +typedef struct RPRNFAState +{ + struct RPRNFAState *next; /* next state in linked list */ + int16 elemIdx; /* current pattern element index */ + int16 altPriority; /* lexical order priority (lower = preferred) */ + bool isAbsorbable; /* true if state is in absorbable region */ + int32 counts[FLEXIBLE_ARRAY_MEMBER]; /* repetition counts by depth */ +} RPRNFAState; + +/* + * RPRNFAContext - context for NFA pattern matching execution + * + * Two-flag absorption design: + * hasAbsorbableState: can this context absorb others? (>=1 absorbable state) + * - Monotonic: true->false only, cannot recover once false + * - Used to skip absorption attempts once all absorbable states are gone + * allStatesAbsorbable: can this context be absorbed? (ALL states absorbable) + * - Dynamic: can change false->true (when non-absorbable states die) + * - Used to determine if this context is eligible for absorption + */ +typedef struct RPRNFAContext +{ + struct RPRNFAContext *next; /* next context in linked list */ + struct RPRNFAContext *prev; /* previous context (for reverse traversal) */ + RPRNFAState *states; /* active states (linked list) */ + + int64 matchStartRow; /* row where match started */ + int64 matchEndRow; /* row where match ended (-1 = no match) */ + int64 lastProcessedRow; /* last row processed (for fail depth) */ + RPRNFAState *matchedState; /* FIN state for greedy fallback (cloned) */ + + /* Two-flag absorption optimization */ + bool hasAbsorbableState; /* can absorb others (>=1 absorbable + * state) */ + bool allStatesAbsorbable; /* can be absorbed (ALL states + * absorbable) */ +} RPRNFAContext; + +/* + * NFALengthStats + * + * Statistics for length measurements (min/max/total) used for computing + * average lengths in EXPLAIN ANALYZE output. + */ +typedef struct NFALengthStats +{ + int64 min; /* minimum length */ + int64 max; /* maximum length */ + int64 total; /* total length (for computing average) */ +} NFALengthStats; + typedef struct WindowAggState { ScanState ss; /* its first field is NodeTag */ @@ -2572,6 +2643,42 @@ typedef struct WindowAggState int64 groupheadpos; /* current row's peer group head position */ int64 grouptailpos; /* " " " " tail position (group end+1) */ + /* these fields are used in Row pattern recognition: */ + RPSkipTo rpSkipTo; /* Row Pattern Skip To type */ + struct RPRPattern *rpPattern; /* compiled pattern for NFA execution */ + List *defineVariableList; /* list of row pattern definition + * variables (list of String) */ + List *defineClauseList; /* expression for row pattern definition + * search conditions ExprState list */ + RPRNFAContext *nfaContext; /* active matching contexts (head) */ + RPRNFAContext *nfaContextTail; /* tail of active contexts (for reverse + * traversal) */ + RPRNFAContext *nfaContextFree; /* recycled NFA context nodes */ + RPRNFAState *nfaStateFree; /* recycled NFA state nodes */ + Size nfaStateSize; /* pre-calculated RPRNFAState size */ + bool *nfaVarMatched; /* per-row cache: varMatched[varId] for varId + * < numDefines */ + int64 nfaLastProcessedRow; /* last row processed by NFA (-1 = + * none) */ + + /* NFA statistics for EXPLAIN ANALYZE */ + int64 nfaStatesActive; /* current active states (internal) */ + int64 nfaStatesMax; /* peak active states */ + int64 nfaStatesTotalCreated; /* total states allocated */ + int64 nfaStatesMerged; /* states merged (deduplicated) */ + int64 nfaContextsActive; /* current active contexts (internal) */ + int64 nfaContextsMax; /* peak active contexts */ + int64 nfaContextsTotalCreated; /* total contexts allocated */ + int64 nfaContextsAbsorbed; /* contexts absorbed (optimization) */ + int64 nfaContextsSkipped; /* contexts skipped (SKIP PAST LAST ROW) */ + int64 nfaContextsPruned; /* contexts pruned on first row */ + int64 nfaMatchesSucceeded; /* successful pattern matches */ + int64 nfaMatchesFailed; /* failed pattern matches */ + NFALengthStats nfaMatchLen; /* successful match length stats */ + NFALengthStats nfaFailLen; /* mismatch length stats */ + NFALengthStats nfaAbsorbedLen; /* absorbed context length stats */ + NFALengthStats nfaSkippedLen; /* skipped context length stats */ + MemoryContext partcontext; /* context for partition-lifespan data */ MemoryContext aggcontext; /* shared context for aggregate working data */ MemoryContext curaggcontext; /* current aggregate's working data */ @@ -2599,6 +2706,18 @@ typedef struct WindowAggState TupleTableSlot *agg_row_slot; TupleTableSlot *temp_slot_1; TupleTableSlot *temp_slot_2; + + /* temporary slots for RPR */ + TupleTableSlot *prev_slot; /* PREV row navigation operator */ + TupleTableSlot *next_slot; /* NEXT row navigation operator */ + TupleTableSlot *null_slot; /* all NULL slot */ + + /* + * Each byte corresponds to a row positioned at absolute its pos in + * partition. See above definition for RF_*. Used for RPR. + */ + char *reduced_frame_map; + int64 alloc_sz; /* size of the map */ } WindowAggState; /* ---------------- -- 2.43.0