public inbox for [email protected]  
help / color / mirror / Atom feed
From: Ilmar Yunusov <[email protected]>
To: [email protected]
Cc: Ilmar Yunusov <[email protected]>
Subject: [RFC PATCH v0 2/7] Aggregate EXPLAIN WAITS from parallel workers
Date: Sat,  9 May 2026 04:22:32 +0500
Message-ID: <eb5f9c83a0f248f2def60ff6e93e998149047f06.1778280923.git.tanswis42@gmail.com> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>

---
 src/backend/commands/explain.c          |   4 +
 src/backend/executor/execMain.c         |   1 +
 src/backend/executor/execParallel.c     | 129 ++++++++++++++++++++++++
 src/backend/executor/execUtils.c        |   1 +
 src/backend/utils/activity/wait_event.c |  22 +++-
 src/include/executor/execParallel.h     |   2 +
 src/include/nodes/execnodes.h           |   2 +
 src/include/utils/wait_event.h          |   3 +
 src/test/regress/expected/explain.out   |  17 ++++
 src/test/regress/sql/explain.sql        |  12 +++
 10 files changed, 190 insertions(+), 3 deletions(-)

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 0b7cc5c15c6..9d7372f5415 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -594,6 +594,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 			waitEventUsagePtr = &waitEventUsage;
 			pgstat_begin_wait_event_usage(waitEventUsagePtr,
 										  queryDesc->estate->es_query_cxt);
+			queryDesc->estate->es_wait_event_usage = waitEventUsagePtr;
 		}
 
 		/* run the plan */
@@ -607,7 +608,10 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es,
 		PG_FINALLY();
 		{
 			if (waitEventUsagePtr)
+			{
 				pgstat_end_wait_event_usage(waitEventUsagePtr);
+				queryDesc->estate->es_wait_event_usage = NULL;
+			}
 		}
 		PG_END_TRY();
 
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4b30f768680..86ab124c1c0 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -3066,6 +3066,7 @@ EvalPlanQualStart(EPQState *epqstate, Plan *planTree)
 	/* es_trig_target_relations must NOT be copied */
 	rcestate->es_top_eflags = parentestate->es_top_eflags;
 	rcestate->es_instrument = parentestate->es_instrument;
+	rcestate->es_wait_event_usage = parentestate->es_wait_event_usage;
 	/* es_auxmodifytables must NOT be copied */
 
 	/*
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 81b87d82fab..8213565a708 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -51,6 +51,7 @@
 #include "utils/dsa.h"
 #include "utils/lsyscache.h"
 #include "utils/snapmgr.h"
+#include "utils/wait_event.h"
 
 /*
  * Magic numbers for parallel executor communication.  We use constants
@@ -67,6 +68,7 @@
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_WAIT_EVENT_USAGE	UINT64CONST(0xE00000000000000B)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
@@ -114,6 +116,18 @@ struct SharedExecutorInstrumentation
 	(StaticAssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
 	 (NodeInstrumentation *) (((char *) sei) + sei->instrument_offset))
 
+typedef struct SharedWaitEventUsageWorker
+{
+	int			nentries;
+	dsa_pointer entries;
+} SharedWaitEventUsageWorker;
+
+struct SharedWaitEventUsage
+{
+	int			num_workers;
+	SharedWaitEventUsageWorker worker_usage[FLEXIBLE_ARRAY_MEMBER];
+};
+
 /* Context object for ExecParallelEstimate. */
 typedef struct ExecParallelEstimateContext
 {
@@ -141,6 +155,10 @@ static bool ExecParallelReInitializeDSM(PlanState *planstate,
 										ParallelContext *pcxt);
 static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
 												SharedExecutorInstrumentation *instrumentation);
+static void ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei);
+static void ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared,
+											 dsa_area *area,
+											 const WaitEventUsage *usage);
 
 /* Helper function that runs in the parallel worker. */
 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
@@ -664,10 +682,12 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	char	   *paramlistinfo_space;
 	BufferUsage *bufusage_space;
 	WalUsage   *walusage_space;
+	SharedWaitEventUsage *wait_event_usage = NULL;
 	SharedExecutorInstrumentation *instrumentation = NULL;
 	SharedJitInstrumentation *jit_instrumentation = NULL;
 	int			pstmt_len;
 	int			paramlistinfo_len;
+	int			wait_event_usage_len = 0;
 	int			instrumentation_len = 0;
 	int			jit_instrumentation_len = 0;
 	int			instrument_offset = 0;
@@ -744,6 +764,16 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 						   mul_size(sizeof(WalUsage), pcxt->nworkers));
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+	/* Estimate space for per-worker wait event usage metadata. */
+	if (estate->es_wait_event_usage != NULL)
+	{
+		wait_event_usage_len =
+			offsetof(SharedWaitEventUsage, worker_usage) +
+			mul_size(sizeof(SharedWaitEventUsageWorker), pcxt->nworkers);
+		shm_toc_estimate_chunk(&pcxt->estimator, wait_event_usage_len);
+		shm_toc_estimate_keys(&pcxt->estimator, 1);
+	}
+
 	/* Estimate space for tuple queues. */
 	shm_toc_estimate_chunk(&pcxt->estimator,
 						   mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
@@ -839,6 +869,21 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
 	pei->wal_usage = walusage_space;
 
+	/* Allocate metadata for each worker's wait event usage, if requested. */
+	if (estate->es_wait_event_usage != NULL)
+	{
+		wait_event_usage = shm_toc_allocate(pcxt->toc, wait_event_usage_len);
+		wait_event_usage->num_workers = nworkers;
+		for (int i = 0; i < nworkers; i++)
+		{
+			wait_event_usage->worker_usage[i].nentries = 0;
+			wait_event_usage->worker_usage[i].entries = InvalidDsaPointer;
+		}
+		shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAIT_EVENT_USAGE,
+					   wait_event_usage);
+		pei->wait_event_usage = wait_event_usage;
+	}
+
 	/* Set up the tuple queues that the workers will write into. */
 	pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
 
@@ -1213,6 +1258,68 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate,
 	memcpy(planstate->worker_jit_instrument, shared_jit, ibytes);
 }
 
+static void
+ExecParallelRetrieveWaitEventUsage(ParallelExecutorInfo *pei)
+{
+	SharedWaitEventUsage *shared = pei->wait_event_usage;
+	WaitEventUsage *usage;
+
+	if (shared == NULL)
+		return;
+
+	usage = pei->planstate->state->es_wait_event_usage;
+	if (usage == NULL)
+		return;
+
+	for (int i = 0; i < shared->num_workers; i++)
+	{
+		SharedWaitEventUsageWorker *worker = &shared->worker_usage[i];
+		WaitEventUsageEntry *entries;
+
+		if (worker->nentries <= 0 || !DsaPointerIsValid(worker->entries))
+			continue;
+
+		entries = dsa_get_address(pei->area, worker->entries);
+		pgstat_accumulate_wait_event_usage(usage,
+										   entries,
+										   worker->nentries);
+		dsa_free(pei->area, worker->entries);
+		worker->nentries = 0;
+		worker->entries = InvalidDsaPointer;
+	}
+}
+
+static void
+ExecParallelReportWaitEventUsage(SharedWaitEventUsage *shared,
+								 dsa_area *area,
+								 const WaitEventUsage *usage)
+{
+	SharedWaitEventUsageWorker *worker;
+	WaitEventUsageEntry *entries;
+	dsa_pointer entries_dsa;
+	Size		entries_size;
+
+	Assert(shared != NULL);
+	Assert(area != NULL);
+	Assert(usage != NULL);
+	Assert(IsParallelWorker());
+	Assert(ParallelWorkerNumber < shared->num_workers);
+
+	if (usage->nentries <= 0)
+		return;
+
+	worker = &shared->worker_usage[ParallelWorkerNumber];
+	entries_size = mul_size(sizeof(WaitEventUsageEntry), usage->nentries);
+	entries_dsa = dsa_allocate(area, entries_size);
+	entries = dsa_get_address(area, entries_dsa);
+	memcpy(entries, usage->entries, entries_size);
+
+	if (DsaPointerIsValid(worker->entries))
+		dsa_free(area, worker->entries);
+	worker->nentries = usage->nentries;
+	worker->entries = entries_dsa;
+}
+
 /*
  * Finish parallel execution.  We wait for parallel workers to finish, and
  * accumulate their buffer/WAL usage.
@@ -1261,6 +1368,9 @@ ExecParallelFinish(ParallelExecutorInfo *pei)
 	for (i = 0; i < nworkers; i++)
 		InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
 
+	/* Accumulate wait event usage, if requested. */
+	ExecParallelRetrieveWaitEventUsage(pei);
+
 	pei->finished = true;
 }
 
@@ -1516,10 +1626,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	FixedParallelExecutorState *fpes;
 	BufferUsage *buffer_usage;
 	WalUsage   *wal_usage;
+	SharedWaitEventUsage *wait_event_usage;
 	DestReceiver *receiver;
 	QueryDesc  *queryDesc;
 	SharedExecutorInstrumentation *instrumentation;
 	SharedJitInstrumentation *jit_instrumentation;
+	WaitEventUsage waitEventUsage;
+	WaitEventUsage *waitEventUsagePtr = NULL;
 	int			instrument_options = 0;
 	void	   *area_space;
 	dsa_area   *area;
@@ -1535,6 +1648,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 		instrument_options = instrumentation->instrument_options;
 	jit_instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_JIT_INSTRUMENTATION,
 										 true);
+	wait_event_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAIT_EVENT_USAGE, true);
 	queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options);
 
 	/* Setting debug_query_string for individual workers */
@@ -1576,6 +1690,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	 */
 	InstrStartParallelQuery();
 
+	if (wait_event_usage != NULL)
+	{
+		waitEventUsagePtr = &waitEventUsage;
+		pgstat_begin_wait_event_usage(waitEventUsagePtr,
+									  queryDesc->estate->es_query_cxt);
+	}
+
 	/*
 	 * Run the plan.  If we specified a tuple bound, be careful not to demand
 	 * more tuples than that.
@@ -1587,6 +1708,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	/* Shut down the executor */
 	ExecutorFinish(queryDesc);
 
+	if (waitEventUsagePtr != NULL)
+	{
+		pgstat_end_wait_event_usage(waitEventUsagePtr);
+		ExecParallelReportWaitEventUsage(wait_event_usage,
+										 area,
+										 waitEventUsagePtr);
+	}
+
 	/* Report buffer/WAL usage during parallel execution. */
 	buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
 	wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index 1eb6b9f1f40..80ea777632b 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -151,6 +151,7 @@ CreateExecutorState(void)
 
 	estate->es_top_eflags = 0;
 	estate->es_instrument = 0;
+	estate->es_wait_event_usage = NULL;
 	estate->es_finished = false;
 
 	estate->es_exprcontexts = NIL;
diff --git a/src/backend/utils/activity/wait_event.c b/src/backend/utils/activity/wait_event.c
index 60d37ccbb73..eb01bc3d88c 100644
--- a/src/backend/utils/activity/wait_event.c
+++ b/src/backend/utils/activity/wait_event.c
@@ -37,7 +37,7 @@ static const char *pgstat_get_wait_ipc(WaitEventIPC w);
 static const char *pgstat_get_wait_timeout(WaitEventTimeout w);
 static const char *pgstat_get_wait_io(WaitEventIO w);
 static void WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
-							  const instr_time *elapsed);
+							  uint64 calls, const instr_time *elapsed);
 
 
 static uint32 local_my_wait_event_info;
@@ -442,15 +442,31 @@ pgstat_count_wait_event_end(void)
 
 	WaitEventUsageAdd(pgstat_wait_event_usage,
 					  pgstat_wait_event_usage_current,
+					  1,
 					  &elapsed);
 
 	pgstat_wait_event_usage_current = 0;
 	INSTR_TIME_SET_ZERO(pgstat_wait_event_usage_start);
 }
 
+void
+pgstat_accumulate_wait_event_usage(WaitEventUsage *usage,
+								   const WaitEventUsageEntry *entries,
+								   int nentries)
+{
+	Assert(usage != NULL);
+	Assert(nentries == 0 || entries != NULL);
+
+	for (int i = 0; i < nentries; i++)
+		WaitEventUsageAdd(usage,
+						  entries[i].wait_event_info,
+						  entries[i].calls,
+						  &entries[i].time);
+}
+
 static void
 WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
-				  const instr_time *elapsed)
+				  uint64 calls, const instr_time *elapsed)
 {
 	WaitEventUsageEntry *entry = NULL;
 
@@ -494,7 +510,7 @@ WaitEventUsageAdd(WaitEventUsage *usage, uint32 wait_event_info,
 		INSTR_TIME_SET_ZERO(entry->time);
 	}
 
-	entry->calls++;
+	entry->calls += calls;
 	INSTR_TIME_ADD(entry->time, *elapsed);
 }
 
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index 5a2034811d5..71df2c2511c 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -20,6 +20,7 @@
 #include "utils/dsa.h"
 
 typedef struct SharedExecutorInstrumentation SharedExecutorInstrumentation;
+typedef struct SharedWaitEventUsage SharedWaitEventUsage;
 
 typedef struct ParallelExecutorInfo
 {
@@ -27,6 +28,7 @@ typedef struct ParallelExecutorInfo
 	ParallelContext *pcxt;		/* parallel context we're using */
 	BufferUsage *buffer_usage;	/* points to bufusage area in DSM */
 	WalUsage   *wal_usage;		/* walusage area in DSM */
+	SharedWaitEventUsage *wait_event_usage;	/* optional */
 	SharedExecutorInstrumentation *instrumentation; /* optional */
 	struct SharedJitInstrumentation *jit_instrumentation;	/* optional */
 	dsa_area   *area;			/* points to DSA area in DSM */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 13359180d25..781c8c79132 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -69,6 +69,7 @@ typedef struct Tuplestorestate Tuplestorestate;
 typedef struct TupleTableSlot TupleTableSlot;
 typedef struct TupleTableSlotOps TupleTableSlotOps;
 typedef struct WalUsage WalUsage;
+typedef struct WaitEventUsage WaitEventUsage;
 typedef struct WorkerNodeInstrumentation WorkerNodeInstrumentation;
 
 
@@ -754,6 +755,7 @@ typedef struct EState
 
 	int			es_top_eflags;	/* eflags passed to ExecutorStart */
 	int			es_instrument;	/* OR of InstrumentOption flags */
+	WaitEventUsage *es_wait_event_usage;	/* EXPLAIN WAITS accumulator */
 	bool		es_finished;	/* true when ExecutorFinish is done */
 
 	List	   *es_exprcontexts;	/* List of ExprContexts within EState */
diff --git a/src/include/utils/wait_event.h b/src/include/utils/wait_event.h
index f7fab5736bb..63992137ee7 100644
--- a/src/include/utils/wait_event.h
+++ b/src/include/utils/wait_event.h
@@ -39,6 +39,9 @@ extern void pgstat_reset_wait_event_storage(void);
 extern void pgstat_begin_wait_event_usage(WaitEventUsage *usage,
 										  MemoryContext memcontext);
 extern void pgstat_end_wait_event_usage(WaitEventUsage *usage);
+extern void pgstat_accumulate_wait_event_usage(WaitEventUsage *usage,
+											   const WaitEventUsageEntry *entries,
+											   int nentries);
 extern void pgstat_count_wait_event_start(uint32 wait_event_info);
 extern void pgstat_count_wait_event_end(void);
 
diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out
index 2c7a7e1d4c6..e3847e222be 100644
--- a/src/test/regress/expected/explain.out
+++ b/src/test/regress/expected/explain.out
@@ -114,6 +114,23 @@ select explain_filter_to_json('explain (analyze, waits, costs off, summary off,
  {"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": "Timeout"}
 (1 row)
 
+begin;
+create function pg_temp.parallel_pg_sleep(float8) returns void
+  language internal volatile parallel safe as 'pg_sleep';
+set local debug_parallel_query = on;
+set local max_parallel_workers_per_gather = 1;
+select jsonb_path_query_first(
+  explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json)
+                         select pg_temp.parallel_pg_sleep(0.01)
+                         from tenk1 where unique1 = 1') #> '{0,Wait Events}',
+  '$[*] ? (@."Wait Event" == "PgSleep")'
+);
+                              jsonb_path_query_first                              
+----------------------------------------------------------------------------------
+ {"Time": 0.0, "Calls": 0, "Wait Event": "PgSleep", "Wait Event Type": "Timeout"}
+(1 row)
+
+rollback;
 explain (waits) select 1;
 ERROR:  EXPLAIN option WAITS requires ANALYZE
 \a
diff --git a/src/test/regress/sql/explain.sql b/src/test/regress/sql/explain.sql
index fe025ddeac5..8821250bcef 100644
--- a/src/test/regress/sql/explain.sql
+++ b/src/test/regress/sql/explain.sql
@@ -71,6 +71,18 @@ select explain_filter('explain (buffers, format text) select * from int8_tbl i8'
 -- WAITS option
 select explain_filter('explain (analyze, waits, costs off, summary off, timing off, buffers off) select pg_sleep(0.01)');
 select explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json) select pg_sleep(0.01)') #> '{0,Wait Events,0}';
+begin;
+create function pg_temp.parallel_pg_sleep(float8) returns void
+  language internal volatile parallel safe as 'pg_sleep';
+set local debug_parallel_query = on;
+set local max_parallel_workers_per_gather = 1;
+select jsonb_path_query_first(
+  explain_filter_to_json('explain (analyze, waits, costs off, summary off, timing off, buffers off, format json)
+                         select pg_temp.parallel_pg_sleep(0.01)
+                         from tenk1 where unique1 = 1') #> '{0,Wait Events}',
+  '$[*] ? (@."Wait Event" == "PgSleep")'
+);
+rollback;
 explain (waits) select 1;
 
 \a
-- 
2.52.0






reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected]
  Subject: Re: [RFC PATCH v0 2/7] Aggregate EXPLAIN WAITS from parallel workers
  In-Reply-To: <eb5f9c83a0f248f2def60ff6e93e998149047f06.1778280923.git.tanswis42@gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox