public inbox for [email protected]  
help / color / mirror / Atom feed
From: Haibo Yan <[email protected]>
To: PostgreSQL-development <[email protected]>
Subject: [PATCH] DISTINCT in plain aggregate window functions
Date: Tue, 7 Apr 2026 22:31:39 -0700
Message-ID: <CABXr29H2X+HtaPw-R3EheZUgv9fM7nSBjQCCaWCRv62mDYdM3w@mail.gmail.com> (raw)

Hi Hackers

I’d like to start a patch series to add support for DISTINCT in plain
aggregate window functions.

PostgreSQL currently rejects cases such as:

---------------------------------------------------------------------------------------------------------

count(DISTINCT x) OVER (PARTITION BY p)

sum(DISTINCT x)   OVER ()

---------------------------------------------------------------------------------------------------------

My plan is to implement this incrementally, by frame class and by feature
dimension, rather than trying to solve every case in a single patch.

For the first step, I’m posting patches 1-2 only and would appreciate your
review on those.

Patch 1 is intentionally very small:


   - add parse/deparse plumbing for DISTINCT in plain aggregate window
   functions
   - carry the information through WindowFunc
   - preserve it in ruleutils / deparse
   - but still reject execution

Patch 1 by itself does not add user-visible execution support, so I think
it is best reviewed together with patch 2.

Patch 2 adds the first real executor support:


   - plain aggregate window functions only
   - single-argument DISTINCT only
   - whole-partition frames only

That means support for cases where the frame is effectively the entire
partition, for example:

---------------------------------------------------------------------------------------------------------

count(DISTINCT x) OVER (PARTITION BY p)
sum(DISTINCT x)   OVER ()
avg(DISTINCT x)   OVER (
    PARTITION BY p
    ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
)

---------------------------------------------------------------------------------------------------------

The executor approach in patch 2 is deliberately conservative:


   - collect the partition’s aggregate inputs
   - sort and deduplicate them
   - feed the distinct values into the aggregate transition function
   - finalize once
   - reuse the cached result for all rows in the partition

This avoids the much harder moving-frame cases for now.

My proposed overall roadmap is below:

Patch 1


   - parse/deparse plumbing only
   - allow DISTINCT to be represented on plain aggregate window functions
   - preserve it through deparse / view definition
   - still reject execution

Patch 2


   - executor support for whole-partition frames
   - plain aggregate window functions only
   - single-argument DISTINCT only
   - sort-and-dedup implementation

Patch 3


   - executor support for non-shrinking frames
   - frames starting at UNBOUNDED PRECEDING with no EXCLUDE
   - incremental hash-based seen-set
   - covers default ORDER BY frame and supported ... CURRENT ROW / ...
   FOLLOWING cases

Patch 4


   - executor support for sliding ROWS frames
   - refcounted DISTINCT state
   - add/remove distinct contributions as rows enter and leave the frame
   - fallback to restart/recompute for aggregates without inverse
   transition support

Patch 5


   - extend the sliding DISTINCT machinery to sliding RANGE and GROUPS
   - keep the same refcounted model
   - no EXCLUDE yet

Patch 6


   - support EXCLUDE clauses
   - likely correctness-first, with restart/recompute where incremental
   maintenance is too awkward

Patch 7


   - support multi-argument DISTINCT
   - upgrade DISTINCT keys from single datum to tuple/composite key
   representation

Patch 8


   - support aggregate ORDER BY inside window aggregates
   - left until last because it is orthogonal to frame-shape support and
   substantially complicates both parse representation and executor behavior

In short, the roadmap is:


   1. plumbing
   2. whole-partition
   3. non-shrinking
   4. sliding ROWS
   5. sliding RANGE / GROUPS
   6. EXCLUDE
   7. multi-arg DISTINCT
   8. aggregate ORDER BY

For this posting, I’d especially appreciate feedback on:


   - whether patch 1 + patch 2 is a reasonable first split
   - whether whole-partition-only executor support is a good first
   executable step
   - whether the proposed long-term breakdown seems sensible

Thanks in advance for any review or comments.

Best regards,

Haibo Yan


Attachments:

  [application/octet-stream] v1-0001-Support-parsing-and-deparsing-DISTINCT-in-window-.patch (8.8K, 3-v1-0001-Support-parsing-and-deparsing-DISTINCT-in-window-.patch)
  download | inline diff:
From 0259fdb1c5a725cd9f6b587b80298baf9e3b6220 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Mon, 6 Apr 2026 23:50:50 -0700
Subject: [PATCH v1 1/5] Support parsing and deparsing DISTINCT in window
 aggregate calls

PostgreSQL has long rejected syntax like

  count(DISTINCT x) OVER (...)
  sum(DISTINCT x) OVER (...)

at parse time with:

  DISTINCT is not implemented for window functions

That parser-time rejection makes it impossible to carry the syntax
through parse analysis, node serialization, ruleutils deparse, or view
round-tripping, and it also forces all future work on execution support
to start by reopening the parser decision.

This patch does not implement execution semantics for DISTINCT in window
aggregates yet.  Instead, it performs the first, intentionally narrow,
step: preserve the syntax in the parse tree and move the rejection to
executor initialization.

Specifically, this patch:

* adds a new boolean field, `windistinct`, to `WindowFunc`, recording
  whether DISTINCT was syntactically specified on a window aggregate
  call;

* removes the existing parser-time FEATURE_NOT_SUPPORTED error in
  `parse_func.c` for DISTINCT on window aggregates, and sets
  `wfunc->windistinct` instead;

* teaches `ruleutils.c` to print DISTINCT for `WindowFunc` nodes with
  `windistinct = true`, so stored rules and `pg_get_viewdef()` preserve
  the original syntax;

* adds a temporary executor-side FEATURE_NOT_SUPPORTED error in the
  WindowAgg initialization path:

    DISTINCT is not yet implemented for window aggregates

  so execution still fails cleanly until real executor support is added.

As a result, queries using DISTINCT in window aggregate calls now parse
successfully, can be preserved in stored representations, and deparse
correctly, but still fail at execution time rather than parse time.

This patch is intentionally executor-strategy-agnostic.  In particular,
it does not yet decide any of the following:

* whether v1 execution support should be limited to whole-partition
  frames, or also include grow-only frames;

* whether DISTINCT tracking should be sort-based, hash-based, or a
  hybrid depending on frame shape and type capabilities;

* whether any future execution path should impose hashability
  restrictions for some frame classes.

Deferring those questions keeps this patch small and reviewable, while
creating the minimal infrastructure needed for follow-up executor work.

Regression tests cover:

* successful parsing of a DISTINCT window aggregate call;
* preservation of DISTINCT through deparse / view definition output;
* the new executor-side FEATURE_NOT_SUPPORTED failure.

Future work will replace the temporary executor-side rejection with real
WindowAgg support for DISTINCT semantics.
---
 src/backend/executor/nodeWindowAgg.c |  9 +++++++++
 src/backend/optimizer/util/clauses.c |  1 +
 src/backend/parser/parse_func.c      |  6 ++++--
 src/backend/utils/adt/ruleutils.c    |  2 ++
 src/include/nodes/primnodes.h        |  2 ++
 src/test/regress/expected/window.out | 23 +++++++++++++++++++++++
 src/test/regress/sql/window.sql      | 18 ++++++++++++++++++
 7 files changed, 59 insertions(+), 2 deletions(-)

diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 784ceeb8246..9431cae9ae0 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -2908,6 +2908,15 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
 	int			i;
 	ListCell   *lc;
 
+	/*
+	 * Temporary: reject DISTINCT window aggregates until executor support
+	 * lands.  Patch 2 will replace this with actual DISTINCT handling.
+	 */
+	if (wfunc->windistinct)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("DISTINCT is not yet implemented for window aggregates")));
+
 	numArguments = list_length(wfunc->args);
 
 	i = 0;
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index 9fb266d089d..9749056e2d3 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -2814,6 +2814,7 @@ eval_const_expressions_mutator(Node *node,
 				newexpr->winref = expr->winref;
 				newexpr->winstar = expr->winstar;
 				newexpr->winagg = expr->winagg;
+				newexpr->windistinct = expr->windistinct;
 				newexpr->ignore_nulls = expr->ignore_nulls;
 				newexpr->location = expr->location;
 
diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c
index 35ff6427147..d216d53e530 100644
--- a/src/backend/parser/parse_func.c
+++ b/src/backend/parser/parse_func.c
@@ -847,6 +847,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
 		/* winref will be set by transformWindowFuncCall */
 		wfunc->winstar = agg_star;
 		wfunc->winagg = (fdresult == FUNCDETAIL_AGGREGATE);
+		wfunc->windistinct = agg_distinct;
 		wfunc->aggfilter = agg_filter;
 		wfunc->ignore_nulls = ignore_nulls;
 		wfunc->runCondition = NIL;
@@ -854,11 +855,12 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs,
 
 		/*
 		 * agg_star is allowed for aggregate functions but distinct isn't
+		 * allowed for non-aggregate window functions.
 		 */
-		if (agg_distinct)
+		if (agg_distinct && !wfunc->winagg)
 			ereport(ERROR,
 					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-					 errmsg("DISTINCT is not implemented for window functions"),
+					 errmsg("DISTINCT is not implemented for non-aggregate window functions"),
 					 parser_errposition(pstate, location)));
 
 		/*
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 35083fcc733..602446eefad 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -11659,6 +11659,8 @@ get_windowfunc_expr_helper(WindowFunc *wfunc, deparse_context *context,
 		appendStringInfoChar(buf, '*');
 	else
 	{
+		if (wfunc->windistinct)
+			appendStringInfoString(buf, "DISTINCT ");
 		if (is_json_objectagg)
 		{
 			get_rule_expr((Node *) linitial(wfunc->args), context, false);
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index 6dfc946c20b..1e984dfbfda 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -614,6 +614,8 @@ typedef struct WindowFunc
 	bool		winstar pg_node_attr(query_jumble_ignore);
 	/* is function a simple aggregate? */
 	bool		winagg pg_node_attr(query_jumble_ignore);
+	/* true if aggregate arguments were marked DISTINCT */
+	bool		windistinct;
 	/* ignore nulls. One of the Null Treatment options */
 	int			ignore_nulls;
 	/* token location, or -1 if unknown */
diff --git a/src/test/regress/expected/window.out b/src/test/regress/expected/window.out
index 7a04d3a7a9f..0f4dc2fe96f 100644
--- a/src/test/regress/expected/window.out
+++ b/src/test/regress/expected/window.out
@@ -5876,3 +5876,26 @@ WINDOW w AS (ORDER BY x ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING);
 --cleanup
 DROP TABLE planets CASCADE;
 NOTICE:  drop cascades to view planets_view
+--
+-- Test DISTINCT in window aggregates (parse/deparse plumbing only;
+-- execution support is not yet implemented)
+--
+-- Should parse successfully and round-trip through a view definition
+CREATE TEMP VIEW window_distinct_view AS
+SELECT count(DISTINCT four) OVER (PARTITION BY ten) AS cnt
+FROM tenk1;
+SELECT pg_get_viewdef('window_distinct_view') LIKE '%DISTINCT%' AS has_distinct;
+ has_distinct 
+--------------
+ t
+(1 row)
+
+DROP VIEW window_distinct_view;
+-- DISTINCT on a non-aggregate window function is still a parse error
+SELECT ntile(DISTINCT 4) OVER () FROM tenk1; -- error
+ERROR:  DISTINCT is not implemented for non-aggregate window functions
+LINE 1: SELECT ntile(DISTINCT 4) OVER () FROM tenk1;
+               ^
+-- Execution fails with a clear executor-side error
+SELECT count(DISTINCT four) OVER (PARTITION BY ten) FROM tenk1; -- error
+ERROR:  DISTINCT is not yet implemented for window aggregates
diff --git a/src/test/regress/sql/window.sql b/src/test/regress/sql/window.sql
index 37d837a2f66..be45bd5f14f 100644
--- a/src/test/regress/sql/window.sql
+++ b/src/test/regress/sql/window.sql
@@ -2133,3 +2133,21 @@ WINDOW w AS (ORDER BY x ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING);
 
 --cleanup
 DROP TABLE planets CASCADE;
+
+--
+-- Test DISTINCT in window aggregates (parse/deparse plumbing only;
+-- execution support is not yet implemented)
+--
+
+-- Should parse successfully and round-trip through a view definition
+CREATE TEMP VIEW window_distinct_view AS
+SELECT count(DISTINCT four) OVER (PARTITION BY ten) AS cnt
+FROM tenk1;
+SELECT pg_get_viewdef('window_distinct_view') LIKE '%DISTINCT%' AS has_distinct;
+DROP VIEW window_distinct_view;
+
+-- DISTINCT on a non-aggregate window function is still a parse error
+SELECT ntile(DISTINCT 4) OVER () FROM tenk1; -- error
+
+-- Execution fails with a clear executor-side error
+SELECT count(DISTINCT four) OVER (PARTITION BY ten) FROM tenk1; -- error
-- 
2.52.0



  [application/octet-stream] v1-0002-Implement-executor-support-for-DISTINCT-in-whole-.patch (27.1K, 4-v1-0002-Implement-executor-support-for-DISTINCT-in-whole-.patch)
  download | inline diff:
From ee7ca6d6a27b1872c7fa0d43d2df90631b5df1b0 Mon Sep 17 00:00:00 2001
From: Haibo Yan <[email protected]>
Date: Tue, 7 Apr 2026 11:57:50 -0700
Subject: [PATCH v1 2/5] Implement executor support for DISTINCT in
 whole-partition window aggregates

Patch 1 taught PostgreSQL to parse and deparse DISTINCT in window
aggregate calls, while still rejecting execution with a temporary
FEATURE_NOT_SUPPORTED error.

This patch implements the first executor support for that feature, but
keeps the scope intentionally narrow: DISTINCT is now supported for
plain aggregate window functions only when the effective window frame is
equivalent to the entire partition.

Examples now supported include:

  count(DISTINCT x) OVER (PARTITION BY p)
  sum(DISTINCT x)   OVER (PARTITION BY p)
  avg(DISTINCT x)   OVER ()
  count(DISTINCT x) OVER (
      PARTITION BY p
      ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)

The implementation does not yet support:

  count(DISTINCT x) OVER (PARTITION BY p ORDER BY o)
  count(DISTINCT x) OVER (
      PARTITION BY p
      ORDER BY o
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
  count(DISTINCT x) OVER (
      PARTITION BY p
      ROWS BETWEEN 3 PRECEDING AND CURRENT ROW)

nor any other moving/grow-only/sliding frame cases, EXCLUDE clauses,
aggregate ORDER BY within window functions, or multi-argument DISTINCT.

The key design choice is to cut scope by frame shape rather than by
aggregate kind.  The hard part of DISTINCT in window aggregates is not
whether the aggregate is count(), sum(), or avg(), but whether the
window frame changes from row to row.  Once the frame can move, DISTINCT
needs incremental membership tracking and, for true sliding frames,
correct handling of values leaving the frame.  That is a much larger
executor problem.

For frames equivalent to the whole partition, the semantics are much
cleaner: the DISTINCT aggregate result is constant for every row in the
partition.  The executor can therefore:

  1. collect the aggregate input values for the partition,
  2. apply FILTER during collection,
  3. sort the collected values,
  4. deduplicate them,
  5. invoke the aggregate transition function once per distinct value,
  6. finalize once, and
  7. reuse the cached final result for all rows in the partition.

This patch implements that model inside WindowAgg using a dedicated
sort-and-dedup path for DISTINCT aggregates in scope.  Existing
finalization and result caching logic is reused.  Ordinary non-DISTINCT
window aggregates continue to use the existing row-by-row accumulation
path, so mixed queries containing both DISTINCT and non-DISTINCT window
aggregates are supported.

The implementation currently supports only single-argument DISTINCT.
That keeps the first executor patch small and avoids introducing tuple-
sorting and multi-column equality machinery into this step.  Multi-
argument DISTINCT can be added later as a follow-up.

Out-of-scope cases continue to fail at executor init time with
FEATURE_NOT_SUPPORTED, replacing the temporary blanket rejection from
Patch 1 with a narrower scope check.

Regression tests cover:

* basic count/sum/avg DISTINCT window aggregate execution;
* default no-ORDER-BY whole-partition frames;
* explicit whole-partition frames;
* NULL and duplicate-heavy inputs;
* mixed DISTINCT and non-DISTINCT window aggregates;
* multiple DISTINCT window aggregates in the same query; and
* rejection of ORDER BY default-frame, ROWS CURRENT ROW, sliding, EXCLUDE,
  and multi-argument DISTINCT cases.

This is intended as the first executor step for DISTINCT in window
aggregates.  More complex frame classes can be added in later patches.
---
 src/backend/executor/nodeWindowAgg.c | 393 ++++++++++++++++++++++++++-
 src/test/regress/expected/window.out | 166 ++++++++++-
 src/test/regress/sql/window.sql      |  64 ++++-
 3 files changed, 609 insertions(+), 14 deletions(-)

diff --git a/src/backend/executor/nodeWindowAgg.c b/src/backend/executor/nodeWindowAgg.c
index 9431cae9ae0..9e9115141b7 100644
--- a/src/backend/executor/nodeWindowAgg.c
+++ b/src/backend/executor/nodeWindowAgg.c
@@ -46,6 +46,7 @@
 #include "optimizer/optimizer.h"
 #include "parser/parse_agg.h"
 #include "parser/parse_coerce.h"
+#include "parser/parse_oper.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/datum.h"
@@ -54,6 +55,7 @@
 #include "utils/memutils.h"
 #include "utils/regproc.h"
 #include "utils/syscache.h"
+#include "utils/tuplesort.h"
 #include "utils/tuplestore.h"
 #include "windowapi.h"
 
@@ -170,6 +172,14 @@ typedef struct WindowStatePerAggData
 
 	/* Data local to eval_windowaggregates() */
 	bool		restart;		/* need to restart this agg in this cycle? */
+
+	/* DISTINCT support */
+	bool		windistinct;	/* DISTINCT specified on this aggregate */
+	Oid			inputtypeOid;	/* OID of the single DISTINCT argument type */
+	Oid			sortOperator;	/* btree < operator for sorting */
+	Oid			sortCollation;	/* collation for sort/equality */
+	bool		sortNullsFirst; /* NULLS FIRST? */
+	FmgrInfo	equalfn;		/* equality comparison function */
 } WindowStatePerAggData;
 
 static void initialize_windowaggregate(WindowAggState *winstate,
@@ -206,6 +216,11 @@ static WindowStatePerAggData *initialize_peragg(WindowAggState *winstate,
 												WindowStatePerAgg peraggstate);
 static Datum GetAggInitVal(Datum textInitVal, Oid transtype);
 
+static bool is_whole_partition_frame(WindowAggState *winstate);
+static void eval_windowaggregate_distinct(WindowAggState *winstate,
+										  WindowStatePerFunc perfuncstate,
+										  WindowStatePerAgg peraggstate);
+
 static bool are_peers(WindowAggState *winstate, TupleTableSlot *slot1,
 					  TupleTableSlot *slot2);
 static bool window_gettupleslot(WindowObject winobj, int64 pos,
@@ -693,6 +708,297 @@ finalize_windowaggregate(WindowAggState *winstate,
 	MemoryContextSwitchTo(oldContext);
 }
 
+/*
+ * is_whole_partition_frame
+ *
+ * Returns true if the window frame is guaranteed to cover the entire
+ * partition.  This is the case when the start is UNBOUNDED PRECEDING,
+ * there is no EXCLUSION clause, and the end is either UNBOUNDED FOLLOWING
+ * or CURRENT ROW with no ORDER BY in RANGE or GROUPS mode (which means
+ * all rows are peers, so CURRENT ROW extends to the partition boundary).
+ * In ROWS mode, CURRENT ROW always means exactly the current row.
+ */
+static bool
+is_whole_partition_frame(WindowAggState *winstate)
+{
+	WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
+	int			frameOptions = winstate->frameOptions;
+
+	/* Must start at UNBOUNDED PRECEDING */
+	if (!(frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING))
+		return false;
+
+	/* Must not have an EXCLUSION clause */
+	if (frameOptions & FRAMEOPTION_EXCLUSION)
+		return false;
+
+	/* End must be UNBOUNDED FOLLOWING ... */
+	if (frameOptions & FRAMEOPTION_END_UNBOUNDED_FOLLOWING)
+		return true;
+
+	/*
+	 * ... or CURRENT ROW with no ORDER BY (all rows are peers), but only
+	 * for RANGE or GROUPS mode.  In ROWS mode, CURRENT ROW means exactly
+	 * the current row regardless of peers.
+	 */
+	if ((frameOptions & FRAMEOPTION_END_CURRENT_ROW) &&
+		!(frameOptions & FRAMEOPTION_ROWS) &&
+		node->ordNumCols == 0)
+		return true;
+
+	return false;
+}
+
+/*
+ * eval_windowaggregate_distinct
+ *
+ * Compute a single-argument DISTINCT window aggregate over the whole
+ * partition.  We collect all argument values (applying any FILTER clause),
+ * sort them, skip duplicates, and feed the distinct values into the
+ * aggregate's transition function.
+ *
+ * This follows the pattern of process_ordered_aggregate_single() in
+ * nodeAgg.c.
+ */
+static void
+eval_windowaggregate_distinct(WindowAggState *winstate,
+							  WindowStatePerFunc perfuncstate,
+							  WindowStatePerAgg peraggstate)
+{
+	WindowObject agg_winobj = winstate->agg_winobj;
+	TupleTableSlot *temp_slot = winstate->temp_slot_1;
+	ExprContext *econtext = winstate->tmpcontext;
+	WindowFuncExprState *wfuncstate = perfuncstate->wfuncstate;
+	ExprState  *filter = wfuncstate->aggfilter;
+	int			numArguments = perfuncstate->numArguments;
+	LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
+	Tuplesortstate *sortstate;
+	Datum		newVal;
+	bool		newIsNull;
+	Datum		newAbbrevVal;
+	Datum		oldVal = (Datum) 0;
+	bool		oldIsNull = true;
+	bool		haveOldVal = false;
+	Datum		oldAbbrevVal = (Datum) 0;
+	MemoryContext oldContext;
+	int64		total_rows;
+	int64		row;
+
+	/* Ensure all partition rows are spooled */
+	spool_tuples(winstate, -1);
+	total_rows = winstate->spooled_rows;
+
+	/* Create a tuplesort for the single DISTINCT argument */
+	sortstate = tuplesort_begin_datum(peraggstate->inputtypeOid,
+									  peraggstate->sortOperator,
+									  peraggstate->sortCollation,
+									  peraggstate->sortNullsFirst,
+									  work_mem, NULL, TUPLESORT_NONE);
+
+	/*
+	 * Loop over all rows in the partition, evaluate FILTER and the argument,
+	 * and feed values into the sort.
+	 */
+	for (row = 0; row < total_rows; row++)
+	{
+		if (!window_gettupleslot(agg_winobj, row, temp_slot))
+			break;
+
+		econtext->ecxt_outertuple = temp_slot;
+
+		oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+
+		/* Skip anything FILTERed out */
+		if (filter)
+		{
+			bool		isnull;
+			Datum		res = ExecEvalExpr(filter, econtext, &isnull);
+
+			if (isnull || !DatumGetBool(res))
+			{
+				MemoryContextSwitchTo(oldContext);
+				ResetExprContext(econtext);
+				ExecClearTuple(temp_slot);
+				continue;
+			}
+		}
+
+		/* Evaluate the single argument */
+		{
+			ExprState  *argstate = (ExprState *) linitial(wfuncstate->args);
+			Datum		val;
+			bool		isnull;
+
+			val = ExecEvalExpr(argstate, econtext, &isnull);
+
+			MemoryContextSwitchTo(oldContext);
+
+			/* Feed into sort */
+			tuplesort_putdatum(sortstate, val, isnull);
+		}
+
+		ResetExprContext(econtext);
+		ExecClearTuple(temp_slot);
+	}
+
+	/* Sort */
+	tuplesort_performsort(sortstate);
+
+	/*
+	 * Read back sorted values, skip duplicates, and feed distinct values
+	 * into the transition function.  This mirrors
+	 * process_ordered_aggregate_single() in nodeAgg.c.
+	 */
+	while (tuplesort_getdatum(sortstate, true, false,
+							  &newVal, &newIsNull, &newAbbrevVal))
+	{
+		ResetExprContext(econtext);
+		oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+
+		/*
+		 * If DISTINCT mode, skip if not distinct from prior value.
+		 */
+		if (haveOldVal &&
+			((oldIsNull && newIsNull) ||
+			 (!oldIsNull && !newIsNull &&
+			  oldAbbrevVal == newAbbrevVal &&
+			  DatumGetBool(FunctionCall2Coll(&peraggstate->equalfn,
+											 peraggstate->sortCollation,
+											 oldVal, newVal)))))
+		{
+			MemoryContextSwitchTo(oldContext);
+			continue;
+		}
+
+		/*
+		 * Advance the transition function with this distinct value.
+		 * This replicates the strict-function handling from
+		 * advance_windowaggregate().
+		 */
+		if (peraggstate->transfn.fn_strict)
+		{
+			/* For strict transfn, skip NULL inputs */
+			if (newIsNull)
+			{
+				MemoryContextSwitchTo(oldContext);
+				goto remember_value;
+			}
+
+			/*
+			 * For strict transition functions with initial value NULL,
+			 * use the first non-NULL input as the initial state.
+			 */
+			if (peraggstate->transValueCount == 0 &&
+				peraggstate->transValueIsNull)
+			{
+				MemoryContextSwitchTo(peraggstate->aggcontext);
+				peraggstate->transValue = datumCopy(newVal,
+													peraggstate->transtypeByVal,
+													peraggstate->transtypeLen);
+				peraggstate->transValueIsNull = false;
+				peraggstate->transValueCount = 1;
+				MemoryContextSwitchTo(oldContext);
+				goto remember_value;
+			}
+
+			if (peraggstate->transValueIsNull)
+			{
+				/*
+				 * Don't call a strict function with NULL inputs.
+				 */
+				MemoryContextSwitchTo(oldContext);
+				goto remember_value;
+			}
+		}
+
+		/* OK to call the transition function */
+		InitFunctionCallInfoData(*fcinfo, &(peraggstate->transfn),
+								 numArguments + 1,
+								 perfuncstate->winCollation,
+								 (Node *) winstate, NULL);
+		fcinfo->args[0].value = peraggstate->transValue;
+		fcinfo->args[0].isnull = peraggstate->transValueIsNull;
+		fcinfo->args[1].value = newVal;
+		fcinfo->args[1].isnull = newIsNull;
+		winstate->curaggcontext = peraggstate->aggcontext;
+
+		{
+			Datum		result;
+
+			result = FunctionCallInvoke(fcinfo);
+			winstate->curaggcontext = NULL;
+
+			peraggstate->transValueCount++;
+
+			/*
+			 * If pass-by-ref datatype, must copy the new value into
+			 * aggcontext and free the prior transValue.  But if transfn
+			 * returned a pointer to its first input, we don't need to do
+			 * anything.  Also, if transfn returned a pointer to a R/W
+			 * expanded object that is already a child of the aggcontext,
+			 * assume we can adopt that value without copying it.
+			 *
+			 * This must match advance_windowaggregate's logic exactly.
+			 */
+			if (!peraggstate->transtypeByVal &&
+				DatumGetPointer(result) != DatumGetPointer(peraggstate->transValue))
+			{
+				if (!fcinfo->isnull)
+				{
+					MemoryContextSwitchTo(peraggstate->aggcontext);
+					if (DatumIsReadWriteExpandedObject(result,
+													   false,
+													   peraggstate->transtypeLen) &&
+						MemoryContextGetParent(DatumGetEOHP(result)->eoh_context) == CurrentMemoryContext)
+						 /* do nothing */ ;
+					else
+						result = datumCopy(result,
+										   peraggstate->transtypeByVal,
+										   peraggstate->transtypeLen);
+				}
+				if (!peraggstate->transValueIsNull)
+				{
+					if (DatumIsReadWriteExpandedObject(peraggstate->transValue,
+													   false,
+													   peraggstate->transtypeLen))
+						DeleteExpandedObject(peraggstate->transValue);
+					else
+						pfree(DatumGetPointer(peraggstate->transValue));
+				}
+			}
+
+			MemoryContextSwitchTo(oldContext);
+			peraggstate->transValue = result;
+			peraggstate->transValueIsNull = fcinfo->isnull;
+		}
+
+remember_value:
+		/*
+		 * Remember the current value for subsequent duplicate checks.
+		 */
+		if (!peraggstate->inputtypeByVal)
+		{
+			if (!oldIsNull)
+				pfree(DatumGetPointer(oldVal));
+			if (!newIsNull)
+				oldVal = datumCopy(newVal, peraggstate->inputtypeByVal,
+								   peraggstate->inputtypeLen);
+			else
+				oldVal = (Datum) 0;
+		}
+		else
+			oldVal = newVal;
+		oldAbbrevVal = newAbbrevVal;
+		oldIsNull = newIsNull;
+		haveOldVal = true;
+	}
+
+	if (!oldIsNull && !peraggstate->inputtypeByVal)
+		pfree(DatumGetPointer(oldVal));
+
+	tuplesort_end(sortstate);
+}
+
 /*
  * eval_windowaggregates
  * evaluate plain aggregates being used as window functions
@@ -946,6 +1252,22 @@ eval_windowaggregates(WindowAggState *winstate)
 		}
 	}
 
+	/*
+	 * Compute DISTINCT aggregates for the whole partition.  These are handled
+	 * separately via sort-based deduplication rather than the main
+	 * accumulation loop below.
+	 */
+	for (i = 0; i < numaggs; i++)
+	{
+		peraggstate = &winstate->peragg[i];
+		if (!peraggstate->windistinct || !peraggstate->restart)
+			continue;
+		wfuncno = peraggstate->wfuncno;
+		eval_windowaggregate_distinct(winstate,
+									  &winstate->perfunc[wfuncno],
+									  peraggstate);
+	}
+
 	/*
 	 * Non-restarted aggregates now contain the rows between aggregatedbase
 	 * (i.e., frameheadpos) and aggregatedupto, while restarted aggregates
@@ -1002,6 +1324,10 @@ eval_windowaggregates(WindowAggState *winstate)
 		{
 			peraggstate = &winstate->peragg[i];
 
+			/* DISTINCT aggregates are handled separately */
+			if (peraggstate->windistinct)
+				continue;
+
 			/* Non-restarted aggs skip until aggregatedupto_nonrestarted */
 			if (!peraggstate->restart &&
 				winstate->aggregatedupto < aggregatedupto_nonrestarted)
@@ -1169,6 +1495,26 @@ prepare_tuplestore(WindowAggState *winstate)
 			readptr_flags |= EXEC_FLAG_BACKWARD;
 		}
 
+		/*
+		 * If any aggregate uses DISTINCT, the read pointer also needs
+		 * BACKWARD capability.  The DISTINCT helper reads through the
+		 * entire partition to collect values for sorting, which advances
+		 * the read pointer to the end.  The main accumulation loop (for
+		 * non-DISTINCT aggregates in the same WindowAgg node) then needs
+		 * to rewind back to the frame head.
+		 */
+		if (!(readptr_flags & EXEC_FLAG_BACKWARD))
+		{
+			for (int i = 0; i < winstate->numaggs; i++)
+			{
+				if (winstate->peragg[i].windistinct)
+				{
+					readptr_flags |= EXEC_FLAG_BACKWARD;
+					break;
+				}
+			}
+		}
+
 		agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
 															readptr_flags);
 	}
@@ -2882,7 +3228,8 @@ ExecReScanWindowAgg(WindowAggState *node)
 /*
  * initialize_peragg
  *
- * Almost same as in nodeAgg.c, except we don't support DISTINCT currently.
+ * Almost same as in nodeAgg.c, except we only support DISTINCT for
+ * whole-partition frames and single-argument aggregates.
  */
 static WindowStatePerAggData *
 initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
@@ -2909,13 +3256,20 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
 	ListCell   *lc;
 
 	/*
-	 * Temporary: reject DISTINCT window aggregates until executor support
-	 * lands.  Patch 2 will replace this with actual DISTINCT handling.
+	 * Validate DISTINCT usage.  Currently we only support DISTINCT for
+	 * whole-partition frames (where the result is constant across the
+	 * partition) and single-argument aggregates only.
 	 */
-	if (wfunc->windistinct)
+	if (wfunc->windistinct && !is_whole_partition_frame(winstate))
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-				 errmsg("DISTINCT is not yet implemented for window aggregates")));
+				 errmsg("DISTINCT is only supported for window functions with a frame that covers the entire partition"),
+				 errhint("Remove ORDER BY from the window definition, or use ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.")));
+
+	if (wfunc->windistinct && list_length(wfunc->args) != 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("DISTINCT is not supported for window aggregate functions with more than one argument")));
 
 	numArguments = list_length(wfunc->args);
 
@@ -3161,6 +3515,35 @@ initialize_peragg(WindowAggState *winstate, WindowFunc *wfunc,
 	else
 		peraggstate->aggcontext = winstate->aggcontext;
 
+	/*
+	 * Set up DISTINCT state if needed.  We need sort and equality operators
+	 * for the single argument type, plus its type length and by-value info
+	 * for datum copying during the dedup loop.
+	 */
+	if (wfunc->windistinct)
+	{
+		Oid			ltOpr,
+					eqOpr;
+		Oid			inputType = inputTypes[0];
+
+		peraggstate->windistinct = true;
+		peraggstate->inputtypeOid = inputType;
+
+		get_sort_group_operators(inputType,
+								 true, true, false,
+								 &ltOpr, &eqOpr, NULL,
+								 NULL);
+
+		peraggstate->sortOperator = ltOpr;
+		peraggstate->sortCollation = wfunc->inputcollid;
+		peraggstate->sortNullsFirst = false;
+		fmgr_info(get_opcode(eqOpr), &peraggstate->equalfn);
+
+		get_typlenbyval(inputType,
+						&peraggstate->inputtypeLen,
+						&peraggstate->inputtypeByVal);
+	}
+
 	ReleaseSysCache(aggTuple);
 
 	return peraggstate;
diff --git a/src/test/regress/expected/window.out b/src/test/regress/expected/window.out
index 0f4dc2fe96f..d2db09f83b8 100644
--- a/src/test/regress/expected/window.out
+++ b/src/test/regress/expected/window.out
@@ -5877,8 +5877,7 @@ WINDOW w AS (ORDER BY x ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING);
 DROP TABLE planets CASCADE;
 NOTICE:  drop cascades to view planets_view
 --
--- Test DISTINCT in window aggregates (parse/deparse plumbing only;
--- execution support is not yet implemented)
+-- Test DISTINCT in window aggregates
 --
 -- Should parse successfully and round-trip through a view definition
 CREATE TEMP VIEW window_distinct_view AS
@@ -5896,6 +5895,163 @@ SELECT ntile(DISTINCT 4) OVER () FROM tenk1; -- error
 ERROR:  DISTINCT is not implemented for non-aggregate window functions
 LINE 1: SELECT ntile(DISTINCT 4) OVER () FROM tenk1;
                ^
--- Execution fails with a clear executor-side error
-SELECT count(DISTINCT four) OVER (PARTITION BY ten) FROM tenk1; -- error
-ERROR:  DISTINCT is not yet implemented for window aggregates
+-- Basic DISTINCT whole-partition cases (should succeed)
+SELECT count(DISTINCT four) OVER (PARTITION BY ten)
+FROM tenk1 LIMIT 20;
+ count 
+-------
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+     2
+(20 rows)
+
+-- DISTINCT with no PARTITION BY (whole single partition)
+SELECT x, sum(DISTINCT x % 3) OVER ()
+FROM generate_series(1, 9) g(x);
+ x | sum 
+---+-----
+ 1 |   3
+ 2 |   3
+ 3 |   3
+ 4 |   3
+ 5 |   3
+ 6 |   3
+ 7 |   3
+ 8 |   3
+ 9 |   3
+(9 rows)
+
+-- DISTINCT with explicit UNBOUNDED PRECEDING to UNBOUNDED FOLLOWING
+SELECT x, avg(DISTINCT x % 4) OVER (PARTITION BY x > 5
+  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
+FROM generate_series(1, 10) g(x);
+ x  |        avg         
+----+--------------------
+  1 | 1.5000000000000000
+  2 | 1.5000000000000000
+  3 | 1.5000000000000000
+  4 | 1.5000000000000000
+  5 | 1.5000000000000000
+  6 | 1.5000000000000000
+  7 | 1.5000000000000000
+  8 | 1.5000000000000000
+  9 | 1.5000000000000000
+ 10 | 1.5000000000000000
+(10 rows)
+
+-- DISTINCT with FILTER
+SELECT x,
+       count(DISTINCT x % 3) FILTER (WHERE x > 3) OVER (PARTITION BY x > 5)
+FROM generate_series(1, 10) g(x);
+ x  | count 
+----+-------
+  1 |     2
+  2 |     2
+  3 |     2
+  4 |     2
+  5 |     2
+  6 |     3
+  7 |     3
+  8 |     3
+  9 |     3
+ 10 |     3
+(10 rows)
+
+-- NULL handling
+SELECT x,
+       count(DISTINCT x) OVER ()
+FROM (VALUES (1),(2),(NULL),(2),(NULL),(1),(3)) v(x);
+ x | count 
+---+-------
+ 1 |     3
+ 2 |     3
+   |     3
+ 2 |     3
+   |     3
+ 1 |     3
+ 3 |     3
+(7 rows)
+
+-- Mixed DISTINCT and non-DISTINCT aggregates in same window
+SELECT x,
+       count(DISTINCT x % 3) OVER w,
+       sum(x) OVER w
+FROM generate_series(1, 9) g(x)
+WINDOW w AS (PARTITION BY x > 5);
+ x | count | sum 
+---+-------+-----
+ 1 |     3 |  15
+ 2 |     3 |  15
+ 3 |     3 |  15
+ 4 |     3 |  15
+ 5 |     3 |  15
+ 6 |     3 |  30
+ 7 |     3 |  30
+ 8 |     3 |  30
+ 9 |     3 |  30
+(9 rows)
+
+-- Multiple DISTINCT aggregates in same query
+SELECT x,
+       count(DISTINCT x % 2) OVER (PARTITION BY x > 5),
+       sum(DISTINCT x % 3) OVER (PARTITION BY x > 5)
+FROM generate_series(1, 10) g(x);
+ x  | count | sum 
+----+-------+-----
+  1 |     2 |   3
+  2 |     2 |   3
+  3 |     2 |   3
+  4 |     2 |   3
+  5 |     2 |   3
+  6 |     2 |   3
+  7 |     2 |   3
+  8 |     2 |   3
+  9 |     2 |   3
+ 10 |     2 |   3
+(10 rows)
+
+-- Error: non-whole-partition frame (has ORDER BY -> RANGE UNBOUNDED PRECEDING to CURRENT ROW)
+SELECT count(DISTINCT x) OVER (PARTITION BY x > 5 ORDER BY x)
+FROM generate_series(1, 10) g(x); -- error
+ERROR:  DISTINCT is only supported for window functions with a frame that covers the entire partition
+HINT:  Remove ORDER BY from the window definition, or use ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
+-- Error: partial ROWS frame
+SELECT count(DISTINCT x) OVER (ROWS 3 PRECEDING)
+FROM generate_series(1, 10) g(x); -- error
+ERROR:  DISTINCT is only supported for window functions with a frame that covers the entire partition
+HINT:  Remove ORDER BY from the window definition, or use ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
+-- Error: EXCLUDE clause
+SELECT count(DISTINCT x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING EXCLUDE CURRENT ROW)
+FROM generate_series(1, 10) g(x); -- error
+ERROR:  DISTINCT is only supported for window functions with a frame that covers the entire partition
+HINT:  Remove ORDER BY from the window definition, or use ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
+-- Error: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW (not whole-partition
+-- even without ORDER BY, because ROWS CURRENT ROW means exactly one row)
+SELECT x,
+       count(DISTINCT x % 3) OVER (
+           ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+       )
+FROM generate_series(1, 10) g(x); -- error
+ERROR:  DISTINCT is only supported for window functions with a frame that covers the entire partition
+HINT:  Remove ORDER BY from the window definition, or use ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING.
+-- Error: multi-argument DISTINCT window aggregate (not yet supported)
+SELECT string_agg(DISTINCT four::text, ',') OVER (PARTITION BY ten)
+FROM tenk1; -- error
+ERROR:  DISTINCT is not supported for window aggregate functions with more than one argument
diff --git a/src/test/regress/sql/window.sql b/src/test/regress/sql/window.sql
index be45bd5f14f..9c05c778108 100644
--- a/src/test/regress/sql/window.sql
+++ b/src/test/regress/sql/window.sql
@@ -2135,8 +2135,7 @@ WINDOW w AS (ORDER BY x ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING);
 DROP TABLE planets CASCADE;
 
 --
--- Test DISTINCT in window aggregates (parse/deparse plumbing only;
--- execution support is not yet implemented)
+-- Test DISTINCT in window aggregates
 --
 
 -- Should parse successfully and round-trip through a view definition
@@ -2149,5 +2148,62 @@ DROP VIEW window_distinct_view;
 -- DISTINCT on a non-aggregate window function is still a parse error
 SELECT ntile(DISTINCT 4) OVER () FROM tenk1; -- error
 
--- Execution fails with a clear executor-side error
-SELECT count(DISTINCT four) OVER (PARTITION BY ten) FROM tenk1; -- error
+-- Basic DISTINCT whole-partition cases (should succeed)
+SELECT count(DISTINCT four) OVER (PARTITION BY ten)
+FROM tenk1 LIMIT 20;
+
+-- DISTINCT with no PARTITION BY (whole single partition)
+SELECT x, sum(DISTINCT x % 3) OVER ()
+FROM generate_series(1, 9) g(x);
+
+-- DISTINCT with explicit UNBOUNDED PRECEDING to UNBOUNDED FOLLOWING
+SELECT x, avg(DISTINCT x % 4) OVER (PARTITION BY x > 5
+  ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
+FROM generate_series(1, 10) g(x);
+
+-- DISTINCT with FILTER
+SELECT x,
+       count(DISTINCT x % 3) FILTER (WHERE x > 3) OVER (PARTITION BY x > 5)
+FROM generate_series(1, 10) g(x);
+
+-- NULL handling
+SELECT x,
+       count(DISTINCT x) OVER ()
+FROM (VALUES (1),(2),(NULL),(2),(NULL),(1),(3)) v(x);
+
+-- Mixed DISTINCT and non-DISTINCT aggregates in same window
+SELECT x,
+       count(DISTINCT x % 3) OVER w,
+       sum(x) OVER w
+FROM generate_series(1, 9) g(x)
+WINDOW w AS (PARTITION BY x > 5);
+
+-- Multiple DISTINCT aggregates in same query
+SELECT x,
+       count(DISTINCT x % 2) OVER (PARTITION BY x > 5),
+       sum(DISTINCT x % 3) OVER (PARTITION BY x > 5)
+FROM generate_series(1, 10) g(x);
+
+-- Error: non-whole-partition frame (has ORDER BY -> RANGE UNBOUNDED PRECEDING to CURRENT ROW)
+SELECT count(DISTINCT x) OVER (PARTITION BY x > 5 ORDER BY x)
+FROM generate_series(1, 10) g(x); -- error
+
+-- Error: partial ROWS frame
+SELECT count(DISTINCT x) OVER (ROWS 3 PRECEDING)
+FROM generate_series(1, 10) g(x); -- error
+
+-- Error: EXCLUDE clause
+SELECT count(DISTINCT x) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING EXCLUDE CURRENT ROW)
+FROM generate_series(1, 10) g(x); -- error
+
+-- Error: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW (not whole-partition
+-- even without ORDER BY, because ROWS CURRENT ROW means exactly one row)
+SELECT x,
+       count(DISTINCT x % 3) OVER (
+           ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+       )
+FROM generate_series(1, 10) g(x); -- error
+
+-- Error: multi-argument DISTINCT window aggregate (not yet supported)
+SELECT string_agg(DISTINCT four::text, ',') OVER (PARTITION BY ten)
+FROM tenk1; -- error
-- 
2.52.0



view thread (2+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected]
  Subject: Re: [PATCH] DISTINCT in plain aggregate window functions
  In-Reply-To: <CABXr29H2X+HtaPw-R3EheZUgv9fM7nSBjQCCaWCRv62mDYdM3w@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox