public inbox for [email protected]  
help / color / mirror / Atom feed
From: Joel Jacobson <[email protected]>
To: Chao Li <[email protected]>
To: Arseniy Mukhin <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Thu, 06 Nov 2025 09:33:18 +0100
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAFY6G8dap-bCnAnMG-2Gzew8yv2Vbi9gsx9+yszKMmd57ygfvA@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAE7r3MLivh1sHWF06hrVXkiQbw-KChPcQsh+9CheXprm5vRVMQ@mail.gmail.com>
	<[email protected]>
	<CAE7r3MK-3AOdh1mpZ8hw9h6F_i0D5RMoAy7CttnfCJRpB8GJDA@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAE7r3MJxxycYwDFJdTJ+HmzeNJ_JGKyr4HpT5njN1ouVX56OUg@mail.gmail.com>
	<[email protected]>
	<CAE7r3MJ0VoAJzdLzX0dgfPJpBJxW+wg_pYaCi6mQJi47+qukhg@mail.gmail.com>
	<[email protected]>

On Thu, Nov 6, 2025, at 00:21, Chao Li wrote:
> That’s what we don’t know. We now lack a performance test for 
> evaluating how “direct advancement” efficiently helps if it only 
> handles sleeping listeners. So what I was suggesting is that we should 
> first create some tests, maybe also add a few more statistics, so that 
> we can evaluate different solutions. If a simple implementation that 
> only handles sleeping listeners would have performed good enough, of 
> course we can take it; otherwise we may need to either pursue a better 
> solution.

Just for the sake of evaluating this patch, I've added instrumentation
of async.c that increments counters for the different branches in
asyncQueueReadAllNotifications and SignalBackends. (I'm just using
atomics without any locking, but should be fine since this is just
statistics.)

pg_get_async_wakeup_stats-patch.txt adds the SQL-callable
catalog functions pg_reset_async_wakeup_stats() and 
pg_get_async_wakeup_stats(), which should not be included in the patch,
they are just for evaluating. It can be applied on top of the v23 patch.

Below is just an example of how to compile and an arbitrary mix of
command line options. I've tired a lot of combinations, and we seem to
be holding up fine in all cases I've tried.

async-notify-test-5.c will detect if the pg_*_async_wakeup_stats() functions
exists, and only show the extra histograms if so.

% gcc -Wall -Wextra -O2 -pthread -I/Users/joel/pg19/include/postgresql/server -I/Users/joel/pg19/include -o async-notify-test-5 async-notify-test-5.c -L/Users/joel/pg19/lib -lpq -pthread -lm

% ./async-notify-test-5 --listeners 10 --notifiers 10 --channels 10 --sleep 0.1 --sleep-exp 2.0 --batch 10
10 s: 38100 sent (3690/s), 381000 received (36900/s)
Notification Latency Distribution:
 0.00-0.01ms                0 (0.0%) avg: 0.000ms
 0.01-0.10ms     #          23 (0.0%) avg: 0.092ms
 0.10-1.00ms     #######    298002 (78.2%) avg: 0.563ms
 1.00-10.00ms    ##         82975 (21.8%) avg: 1.506ms
 10.00-100.00ms             0 (0.0%) avg: 0.000ms
>100.00ms                  0 (0.0%) avg: 0.000ms

asyncQueueReadAllNotifications Statistics:
necessary_wakeups    ########   35469 (88.2%)
unnecessary_wakeups  #          4762 (11.8%)

SignalBackends Statistics:
signaled_needed     #          34983 (9.5%)
avoided_wakeups     ########   325874 (88.9%)
already_advancing   #          3 (0.0%)
signaled_uncertain  #          5347 (1.5%)
already_ahead       #          375 (0.1%)

Thoughts on how to interpret results:

- Is the notification latency distribution good enough, for the given
  workload? Naturally, if the workload is too high, we cannot expect to
  ever achieve sub millisecond latency anyway, so it's a judgement.

- Even if the "unnecessary_wakeups" is high relative to
  "necessary_wakeups", it's not necessarily a problem, if the latency
  distribution still is good enough. We should also think about the
  ratio between "unnecessary_wakeups" and "avoided_wakeups", since even
  if "unnecessary_wakeups" is high in absolute numbers, if the
  "avoided_wakeups" is magnitudes larger, that means the cost of the
  context switching has been dramatically reduced already. I think there
  is always a risk when optimizing to forget what problem one was trying
  to solve initially, usually a bottleneck. When the bottleneck is gone
  and is somewhere else instead, then the efforts should IMO usually be
  spent elsewhere, especially if more optimizations would need a
  insignificant increase of code complexity.

- It's the "signaled_uncertain" that primarily contribute to
  "unnecessary_wakeups".

/Joel
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dac12f8124..7e8e0b14f42 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -137,6 +137,7 @@
 #include <signal.h>
 #include <string.h>
 
+#include "access/htup_details.h"
 #include "access/parallel.h"
 #include "access/slru.h"
 #include "access/transam.h"
@@ -332,6 +333,13 @@ typedef struct AsyncQueueControl
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
 	dsa_handle	channelHashDSA;
 	dshash_table_handle channelHashDSH;
+	pg_atomic_uint32 signaledNeeded;	/* listening to some of the channels; signal needed */
+	pg_atomic_uint32 avoidedWakeups;	/* directly advanced */
+	pg_atomic_uint32 alreadyAdvancing;	/* already advancing its position */
+	pg_atomic_uint32 signaledUncertain;	/* signaled due to uncertain need */
+	pg_atomic_uint32 alreadyAhead;	/* already ahead, no action needed */
+	pg_atomic_uint32 necessaryWakeups;		/* wakeups where at least one message was interesting */
+	pg_atomic_uint32 unnecessaryWakeups;	/* wakeups where no messages were interesting */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
@@ -517,7 +525,8 @@ static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
 										 char *page_buffer,
-										 Snapshot snapshot);
+										 Snapshot snapshot,
+										 bool *interested);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -683,6 +692,13 @@ AsyncShmemInit(void)
 		asyncQueueControl->lastQueueFillWarn = 0;
 		asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
 		asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+		pg_atomic_init_u32(&asyncQueueControl->signaledNeeded, 0);
+		pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0);
+		pg_atomic_init_u32(&asyncQueueControl->alreadyAdvancing, 0);
+		pg_atomic_init_u32(&asyncQueueControl->signaledUncertain, 0);
+		pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0);
+		pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0);
+		pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0);
 
 		for (int i = 0; i < MaxBackends; i++)
 		{
@@ -997,6 +1013,81 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 	SRF_RETURN_DONE(funcctx);
 }
 
+/*
+ * SQL function: return statistics about NOTIFY wakeups
+ *
+ * This function returns a single row with:
+ * - necessary_wakeups: wakeups where at least one message was interesting
+ * - unnecessary_wakeups: wakeups where no messages were interesting
+ * - direct_advancements_success: directly advanced
+ * - already_advancing: already advancing its position
+ * - signaled_uncertain: signaled due to uncertain need
+ * - already_ahead: already ahead, no action needed
+ */
+Datum
+pg_get_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+	TupleDesc	tupdesc;
+	Datum		values[7];
+	bool		nulls[7];
+	HeapTuple	tuple;
+	uint32		signaled_needed;
+	uint32		direct_advancements_success;
+	uint32		already_advancing;
+	uint32		signaled_uncertain;
+	uint32		already_ahead;
+	uint32		necessary_wakeups;
+	uint32		unnecessary_wakeups;
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("function returning record called in context that cannot accept type record")));
+
+	/* Read the atomic counters */
+	signaled_needed = pg_atomic_read_u32(&asyncQueueControl->signaledNeeded);
+	direct_advancements_success = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups);
+	already_advancing = pg_atomic_read_u32(&asyncQueueControl->alreadyAdvancing);
+	signaled_uncertain = pg_atomic_read_u32(&asyncQueueControl->signaledUncertain);
+	already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead);
+	necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups);
+	unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups);
+
+	/* Fill in the values */
+	memset(nulls, 0, sizeof(nulls));
+	values[0] = Int64GetDatum((int64) signaled_needed);
+	values[1] = Int64GetDatum((int64) direct_advancements_success);
+	values[2] = Int64GetDatum((int64) already_advancing);
+	values[3] = Int64GetDatum((int64) signaled_uncertain);
+	values[4] = Int64GetDatum((int64) already_ahead);
+	values[5] = Int64GetDatum((int64) necessary_wakeups);
+	values[6] = Int64GetDatum((int64) unnecessary_wakeups);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * SQL function: reset NOTIFY wakeup statistics
+ *
+ * This function resets all the async wakeup counters to zero.
+ */
+Datum
+pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+	/* Reset all the atomic counters to zero */
+	pg_atomic_write_u32(&asyncQueueControl->signaledNeeded, 0);
+	pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0);
+	pg_atomic_write_u32(&asyncQueueControl->alreadyAdvancing, 0);
+	pg_atomic_write_u32(&asyncQueueControl->signaledUncertain, 0);
+	pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0);
+	pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0);
+	pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
+	PG_RETURN_VOID();
+}
+
 /*
  * Async_UnlistenOnExit
  *
@@ -2014,6 +2105,7 @@ SignalBackends(void)
 
 			Assert(pid != InvalidPid);
 
+			pg_atomic_fetch_add_u32(&asyncQueueControl->signaledNeeded, 1);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
 			pids[count] = pid;
 			procnos[count] = i;
@@ -2049,7 +2141,14 @@ SignalBackends(void)
 				 * currently advancing its position.
 				 */
 				if (!QUEUE_BACKEND_ADVANCING_POS(i))
+				{
 					QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+					pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1);
+				}
+				else
+				{
+					pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAdvancing, 1);
+				}
 			}
 			else if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
 			{
@@ -2060,6 +2159,7 @@ SignalBackends(void)
 				 */
 				Assert(pid != InvalidPid);
 
+				pg_atomic_fetch_add_u32(&asyncQueueControl->signaledUncertain, 1);
 				QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
 				pids[count] = pid;
 				procnos[count] = i;
@@ -2071,6 +2171,7 @@ SignalBackends(void)
 				 * The backend is already ahead of the notifications we wrote.
 				 * No need to do anything.
 				 */
+				pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1);
 				Assert(QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos));
 			}
 		}
@@ -2301,6 +2402,7 @@ asyncQueueReadAllNotifications(void)
 	volatile QueuePosition pos;
 	QueuePosition head;
 	Snapshot	snapshot;
+	bool		interested = false;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -2435,7 +2537,8 @@ asyncQueueReadAllNotifications(void)
 			 */
 			reachedStop = asyncQueueProcessPageEntries(&pos, head,
 													   page_buffer.buf,
-													   snapshot);
+													   snapshot,
+													   &interested);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -2450,6 +2553,11 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_END_TRY();
 
+	if (interested)
+		pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1);
+	else
+		pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1);
+
 	/* Done with snapshot */
 	UnregisterSnapshot(snapshot);
 }
@@ -2474,7 +2582,8 @@ static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
 							 char *page_buffer,
-							 Snapshot snapshot)
+							 Snapshot snapshot,
+							 bool *interested)
 {
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
@@ -2535,6 +2644,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 					char	   *payload = qe->data + strlen(channel) + 1;
 
 					NotifyMyFrontEnd(channel, payload, qe->srcPid);
+
+					/* Mark were interested in at least one message */
+					*interested = true;
 				}
 			}
 			else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..0bbd7db39c7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8571,7 +8571,18 @@
   proname => 'pg_notification_queue_usage', provolatile => 'v',
   proparallel => 'r', prorettype => 'float8', proargtypes => '',
   prosrc => 'pg_notification_queue_usage' },
-
+{ oid => '9315',
+  descr => 'get statistics about NOTIFY wakeups',
+  proname => 'pg_get_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o}',
+  proargnames => '{signaled_needed,avoided_wakeups,already_advancing,signaled_uncertain,already_ahead,necessary_wakeups,unnecessary_wakeups}',
+  prosrc => 'pg_get_async_wakeup_stats' },
+{ oid => '9316',
+  descr => 'reset statistics about NOTIFY wakeups',
+  proname => 'pg_reset_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'void', proargtypes => '',
+  prosrc => 'pg_reset_async_wakeup_stats' },
 # shared memory usage
 { oid => '5052', descr => 'allocations from the main shared memory segment',
   proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',


Attachments:

  [application/octet-stream] async-notify-test-5.c (24.9K, 2-async-notify-test-5.c)
  download

  [text/plain] pg_get_async_wakeup_stats-patch.txt (9.2K, 3-pg_get_async_wakeup_stats-patch.txt)
  download | inline diff:
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dac12f8124..7e8e0b14f42 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -137,6 +137,7 @@
 #include <signal.h>
 #include <string.h>
 
+#include "access/htup_details.h"
 #include "access/parallel.h"
 #include "access/slru.h"
 #include "access/transam.h"
@@ -332,6 +333,13 @@ typedef struct AsyncQueueControl
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
 	dsa_handle	channelHashDSA;
 	dshash_table_handle channelHashDSH;
+	pg_atomic_uint32 signaledNeeded;	/* listening to some of the channels; signal needed */
+	pg_atomic_uint32 avoidedWakeups;	/* directly advanced */
+	pg_atomic_uint32 alreadyAdvancing;	/* already advancing its position */
+	pg_atomic_uint32 signaledUncertain;	/* signaled due to uncertain need */
+	pg_atomic_uint32 alreadyAhead;	/* already ahead, no action needed */
+	pg_atomic_uint32 necessaryWakeups;		/* wakeups where at least one message was interesting */
+	pg_atomic_uint32 unnecessaryWakeups;	/* wakeups where no messages were interesting */
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
@@ -517,7 +525,8 @@ static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
 										 QueuePosition stop,
 										 char *page_buffer,
-										 Snapshot snapshot);
+										 Snapshot snapshot,
+										 bool *interested);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -683,6 +692,13 @@ AsyncShmemInit(void)
 		asyncQueueControl->lastQueueFillWarn = 0;
 		asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
 		asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+		pg_atomic_init_u32(&asyncQueueControl->signaledNeeded, 0);
+		pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0);
+		pg_atomic_init_u32(&asyncQueueControl->alreadyAdvancing, 0);
+		pg_atomic_init_u32(&asyncQueueControl->signaledUncertain, 0);
+		pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0);
+		pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0);
+		pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0);
 
 		for (int i = 0; i < MaxBackends; i++)
 		{
@@ -997,6 +1013,81 @@ pg_listening_channels(PG_FUNCTION_ARGS)
 	SRF_RETURN_DONE(funcctx);
 }
 
+/*
+ * SQL function: return statistics about NOTIFY wakeups
+ *
+ * This function returns a single row with:
+ * - necessary_wakeups: wakeups where at least one message was interesting
+ * - unnecessary_wakeups: wakeups where no messages were interesting
+ * - direct_advancements_success: directly advanced
+ * - already_advancing: already advancing its position
+ * - signaled_uncertain: signaled due to uncertain need
+ * - already_ahead: already ahead, no action needed
+ */
+Datum
+pg_get_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+	TupleDesc	tupdesc;
+	Datum		values[7];
+	bool		nulls[7];
+	HeapTuple	tuple;
+	uint32		signaled_needed;
+	uint32		direct_advancements_success;
+	uint32		already_advancing;
+	uint32		signaled_uncertain;
+	uint32		already_ahead;
+	uint32		necessary_wakeups;
+	uint32		unnecessary_wakeups;
+
+	/* Build a tuple descriptor for our result type */
+	if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("function returning record called in context that cannot accept type record")));
+
+	/* Read the atomic counters */
+	signaled_needed = pg_atomic_read_u32(&asyncQueueControl->signaledNeeded);
+	direct_advancements_success = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups);
+	already_advancing = pg_atomic_read_u32(&asyncQueueControl->alreadyAdvancing);
+	signaled_uncertain = pg_atomic_read_u32(&asyncQueueControl->signaledUncertain);
+	already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead);
+	necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups);
+	unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups);
+
+	/* Fill in the values */
+	memset(nulls, 0, sizeof(nulls));
+	values[0] = Int64GetDatum((int64) signaled_needed);
+	values[1] = Int64GetDatum((int64) direct_advancements_success);
+	values[2] = Int64GetDatum((int64) already_advancing);
+	values[3] = Int64GetDatum((int64) signaled_uncertain);
+	values[4] = Int64GetDatum((int64) already_ahead);
+	values[5] = Int64GetDatum((int64) necessary_wakeups);
+	values[6] = Int64GetDatum((int64) unnecessary_wakeups);
+
+	tuple = heap_form_tuple(tupdesc, values, nulls);
+	PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * SQL function: reset NOTIFY wakeup statistics
+ *
+ * This function resets all the async wakeup counters to zero.
+ */
+Datum
+pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+	/* Reset all the atomic counters to zero */
+	pg_atomic_write_u32(&asyncQueueControl->signaledNeeded, 0);
+	pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0);
+	pg_atomic_write_u32(&asyncQueueControl->alreadyAdvancing, 0);
+	pg_atomic_write_u32(&asyncQueueControl->signaledUncertain, 0);
+	pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0);
+	pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0);
+	pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
+	PG_RETURN_VOID();
+}
+
 /*
  * Async_UnlistenOnExit
  *
@@ -2014,6 +2105,7 @@ SignalBackends(void)
 
 			Assert(pid != InvalidPid);
 
+			pg_atomic_fetch_add_u32(&asyncQueueControl->signaledNeeded, 1);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
 			pids[count] = pid;
 			procnos[count] = i;
@@ -2049,7 +2141,14 @@ SignalBackends(void)
 				 * currently advancing its position.
 				 */
 				if (!QUEUE_BACKEND_ADVANCING_POS(i))
+				{
 					QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+					pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1);
+				}
+				else
+				{
+					pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAdvancing, 1);
+				}
 			}
 			else if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
 			{
@@ -2060,6 +2159,7 @@ SignalBackends(void)
 				 */
 				Assert(pid != InvalidPid);
 
+				pg_atomic_fetch_add_u32(&asyncQueueControl->signaledUncertain, 1);
 				QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
 				pids[count] = pid;
 				procnos[count] = i;
@@ -2071,6 +2171,7 @@ SignalBackends(void)
 				 * The backend is already ahead of the notifications we wrote.
 				 * No need to do anything.
 				 */
+				pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1);
 				Assert(QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos));
 			}
 		}
@@ -2301,6 +2402,7 @@ asyncQueueReadAllNotifications(void)
 	volatile QueuePosition pos;
 	QueuePosition head;
 	Snapshot	snapshot;
+	bool		interested = false;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -2435,7 +2537,8 @@ asyncQueueReadAllNotifications(void)
 			 */
 			reachedStop = asyncQueueProcessPageEntries(&pos, head,
 													   page_buffer.buf,
-													   snapshot);
+													   snapshot,
+													   &interested);
 		} while (!reachedStop);
 	}
 	PG_FINALLY();
@@ -2450,6 +2553,11 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_END_TRY();
 
+	if (interested)
+		pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1);
+	else
+		pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1);
+
 	/* Done with snapshot */
 	UnregisterSnapshot(snapshot);
 }
@@ -2474,7 +2582,8 @@ static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
 							 QueuePosition stop,
 							 char *page_buffer,
-							 Snapshot snapshot)
+							 Snapshot snapshot,
+							 bool *interested)
 {
 	bool		reachedStop = false;
 	bool		reachedEndOfPage;
@@ -2535,6 +2644,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
 					char	   *payload = qe->data + strlen(channel) + 1;
 
 					NotifyMyFrontEnd(channel, payload, qe->srcPid);
+
+					/* Mark were interested in at least one message */
+					*interested = true;
 				}
 			}
 			else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..0bbd7db39c7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8571,7 +8571,18 @@
   proname => 'pg_notification_queue_usage', provolatile => 'v',
   proparallel => 'r', prorettype => 'float8', proargtypes => '',
   prosrc => 'pg_notification_queue_usage' },
-
+{ oid => '9315',
+  descr => 'get statistics about NOTIFY wakeups',
+  proname => 'pg_get_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o}',
+  proargnames => '{signaled_needed,avoided_wakeups,already_advancing,signaled_uncertain,already_ahead,necessary_wakeups,unnecessary_wakeups}',
+  prosrc => 'pg_get_async_wakeup_stats' },
+{ oid => '9316',
+  descr => 'reset statistics about NOTIFY wakeups',
+  proname => 'pg_reset_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'void', proargtypes => '',
+  prosrc => 'pg_reset_async_wakeup_stats' },
 # shared memory usage
 { oid => '5052', descr => 'allocations from the main shared memory segment',
   proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',


reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected]
  Subject: Re: Optimize LISTEN/NOTIFY
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox