public inbox for [email protected]  
help / color / mirror / Atom feed
From: Joel Jacobson <[email protected]>
To: pgsql-hackers <[email protected]>
Cc: Thomas Munro <[email protected]>
Cc: Heikki Linnakangas <[email protected]>
Cc: Rishu Bagga <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Wed, 23 Jul 2025 03:39:30 +0200
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
	<[email protected]>

On Thu, Jul 17, 2025, at 09:43, Joel Jacobson wrote:
> On Wed, Jul 16, 2025, at 02:20, Rishu Bagga wrote:
>> If we are doing this optimization, why not maintain a list of backends
>> for each channel, and only wake up those channels?
>
> Thanks for a contributing a great idea, it actually turned out to work
> really well in practice!
>
> The attached new v4 of the patch implements your multicast idea:

Hi hackers,

While my previous attempts of $subject has only focused on optimizing
the multi-channel scenario, I thought it would be really nice if
LISTEN/NOTIFY could be optimize in the general case, benefiting all
users, including those who just listen on a single channel.

To my surprise, this was not only possible, but actually quite simple.

The main idea in this patch, is to introduce an atomic state machine,
with three states, IDLE, SIGNALLED, and PROCESSED, so that we don't
interrupt backends that are already in the process of catching up.

Thanks to Thomas Munro for making me aware of his, Heikki Linnakanga's
and others work in the  "Interrupts vs signals" [1] thread.

Maybe my patch is redundant due to their patch set, I'm not really sure?

Their patch seems to refactors the underlying wakeup mechanism. It
replaces the old, complex chain of events (SIGUSR1 signal -> handler ->
flag -> latch) with a single, direct function call: SendInterrupt(). For
async.c, this seems to be a low-level plumbing change that simplifies
how a notification wakeup is delivered.

My patch optimizes the high-level notification protocol. It introduces a
state machine (IDLE, SIGNALLED, PROCESSING) to only signal backends when
needed.

In their patch, in asyn.c's SignalBackends(), they do
SendInterrupt(INTERRUPT_ASYNC_NOTIFY, procno) instead of
SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]). They don't
seem to check if the backend is already signalled or not, but maybe
SendInterrupt() has signal coalescing built-in so it would be a noop
with almost no cost?

I'm happy to rebase my LISTEN/NOTIFY work on top of [1], but I could
also see benefits of doing the opposite.

I'm also happy to help with benchmarking of your work in [1].

Note that this patch doesn't contain the hash table to keep track of
listeners per backend, as proposed in earlier patches. I will propose
such a patch again later, but first we need to figure out if I should
rebase onto [1] or master (HEAD).

--- PATCH ---

    Optimize NOTIFY signaling to avoid redundant backend signals

    Previously, a NOTIFY would send SIGUSR1 to all listening backends, which
    could lead to a "thundering herd" of redundant signals under high
    traffic. To address this inefficiency, this patch replaces the simple
    volatile notifyInterruptPending flag with a per-backend atomic state
    machine, stored in asyncQueueControl->backend[i].state. This state
    variable can be in one of three states: IDLE (awaiting signal),
    SIGNALLED (signal received, work pending), or PROCESSING (actively
    reading the queue).

    From the notifier's perspective, SignalBackends now uses an atomic
    compare-and-swap (CAS) to transition a listener from IDLE to SIGNALLED.
    Only on a successful transition is a signal sent. If the listener is
    already SIGNALLED or another notifier wins the race, no redundant signal
    is sent. If the listener is in the PROCESSING state, the notifier will
    also transition it to SIGNALLED to ensure the listener re-scans the
    queue after its current work is done.

    On the listener side, ProcessIncomingNotify first transitions its state
    from SIGNALLED to PROCESSING. After reading notifications, it attempts
    to transition from PROCESSING back to IDLE. If this CAS fails, it means
    a new notification arrived during processing and a notifier has already
    set the state back to SIGNALLED. The listener then simply re-latches
    itself to process the new notifications, avoiding a tight loop.

    The primary benefit is a significant reduction in syscall overhead and
    unnecessary kernel wakeups in high-traffic scenarios. This dramatically
    improves performance for workloads with many concurrent notifiers.
    Benchmarks show a substantial increase in NOTIFY-only transaction
    throughput, with gains exceeding 200% at higher
    concurrency levels.

 src/backend/commands/async.c | 209 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
 src/backend/tcop/postgres.c  |   4 ++--
 src/include/commands/async.h |   4 +++-
 3 files changed, 185 insertions(+), 32 deletions(-)

--- BENCHMARK ---

The attached benchmark script does LISTEN on one connection,
and then uses pgbench to send NOTIFY on a varying number of
connections and jobs, to cause a high procsignal load.

I've run the benchmark on my MacBook Pro M3 Max,
10 seconds per run, 3 runs.

(I reused the same benchmark script as in the other thread, "Optimize ProcSignal to avoid redundant SIGUSR1 signals")

 Connections=Jobs | TPS (master) | TPS (patch) | Relative Diff (%) | StdDev (master) | StdDev (patch)
------------------+--------------+-------------+-------------------+-----------------+----------------
                1 |       118833 |      151510 | 27.50%            |             484 |            923
                2 |       156005 |      239051 | 53.23%            |            3145 |           1596
                4 |       177351 |      250910 | 41.48%            |            4305 |           4891
                8 |       116597 |      171944 | 47.47%            |            1549 |           2752
               16 |        40835 |      165482 | 305.25%           |            2695 |           2825
               32 |        37940 |      145150 | 282.58%           |            2533 |           1566
               64 |        35495 |      131836 | 271.42%           |            1837 |            573
              128 |        40193 |      121333 | 201.88%           |            2254 |            874
(8 rows)

/Joel

https://www.postgresql.org/message-id/flat/CA%2BhUKG%2B3MkS21yK4jL4cgZywdnnGKiBg0jatoV6kzaniBmcqbQ%4...

Attachments:

  [application/octet-stream] 0001-Optimize-NOTIFY-signaling-to-avoid-redundant-backend.patch (14.4K, 2-0001-Optimize-NOTIFY-signaling-to-avoid-redundant-backend.patch)
  download | inline diff:
From d4f01cda8bcd4042f0d751d73e13b561d8b1eaab Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Tue, 22 Jul 2025 10:32:34 +0200
Subject: [PATCH] Optimize NOTIFY signaling to avoid redundant backend signals

Previously, a NOTIFY would send SIGUSR1 to all listening backends, which
could lead to a "thundering herd" of redundant signals under high
traffic. To address this inefficiency, this patch replaces the simple
volatile notifyInterruptPending flag with a per-backend atomic state
machine, stored in asyncQueueControl->backend[i].state. This state
variable can be in one of three states: IDLE (awaiting signal),
SIGNALLED (signal received, work pending), or PROCESSING (actively
reading the queue).

From the notifier's perspective, SignalBackends now uses an atomic
compare-and-swap (CAS) to transition a listener from IDLE to SIGNALLED.
Only on a successful transition is a signal sent. If the listener is
already SIGNALLED or another notifier wins the race, no redundant signal
is sent. If the listener is in the PROCESSING state, the notifier will
also transition it to SIGNALLED to ensure the listener re-scans the
queue after its current work is done.

On the listener side, ProcessIncomingNotify first transitions its state
from SIGNALLED to PROCESSING. After reading notifications, it attempts
to transition from PROCESSING back to IDLE. If this CAS fails, it means
a new notification arrived during processing and a notifier has already
set the state back to SIGNALLED. The listener then simply re-latches
itself to process the new notifications, avoiding a tight loop.

The primary benefit is a significant reduction in syscall overhead and
unnecessary kernel wakeups in high-traffic scenarios. This dramatically
improves performance for workloads with many concurrent notifiers.
Benchmarks show a substantial increase in NOTIFY-only transaction
throughput, with gains exceeding 200% at higher
concurrency levels.
---
 src/backend/commands/async.c | 209 ++++++++++++++++++++++++++++++-----
 src/backend/tcop/postgres.c  |   4 +-
 src/include/commands/async.h |   4 +-
 3 files changed, 185 insertions(+), 32 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..ae20017af9b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -150,8 +150,19 @@
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
+#include "port/atomics.h"
 
 
+/*
+ * Async notification state machine states
+ */
+typedef enum AsyncListenerState
+{
+	ASYNC_STATE_IDLE = 0,		/* Backend is idle, waiting for signal */
+	ASYNC_STATE_SIGNALLED = 1,	/* Backend has been signaled, will process soon */
+	ASYNC_STATE_PROCESSING = 2	/* Backend is actively processing notifications */
+} AsyncListenerState;
+
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
  * must be kept small enough so that a notification message fits on one
@@ -246,6 +257,7 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	pg_atomic_uint32 state;		/* async state machine state */
 } QueueBackendStatus;
 
 /*
@@ -301,6 +313,7 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_STATE(i)		(asyncQueueControl->backend[i].state)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -405,12 +418,10 @@ static NotificationList *pendingNotifies = NULL;
 
 /*
  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
- * called from inside a signal handler. That just sets the
- * notifyInterruptPending flag and sets the process
+ * called from inside a signal handler. That just sets the process
  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
  * actually deal with the interrupt.
  */
-volatile sig_atomic_t notifyInterruptPending = false;
 
 /* True if we've registered an on_shmem_exit cleanup */
 static bool unlistenExitRegistered = false;
@@ -527,6 +538,7 @@ AsyncShmemInit(void)
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			pg_atomic_init_u32(&QUEUE_BACKEND_STATE(i), ASYNC_STATE_IDLE);
 		}
 	}
 
@@ -1099,6 +1111,8 @@ Exec_ListenPreCommit(void)
 	QUEUE_BACKEND_POS(MyProcNumber) = max;
 	QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+	/* Initialize the atomic state to IDLE */
+	pg_atomic_write_u32(&QUEUE_BACKEND_STATE(MyProcNumber), ASYNC_STATE_IDLE);
 	/* Insert backend into list of listeners at correct position */
 	if (prevListener != INVALID_PROC_NUMBER)
 	{
@@ -1242,6 +1256,8 @@ asyncQueueUnregister(void)
 	/* Mark our entry as invalid */
 	QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
+	/* Reset state to IDLE to prevent zombie listeners */
+	pg_atomic_write_u32(&QUEUE_BACKEND_STATE(MyProcNumber), ASYNC_STATE_IDLE);
 	/* and remove it from the list */
 	if (QUEUE_FIRST_LISTENER == MyProcNumber)
 		QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
@@ -1634,25 +1650,84 @@ SignalBackends(void)
 	for (int i = 0; i < count; i++)
 	{
 		int32		pid = pids[i];
+		ProcNumber	procno = procnos[i];
+		uint32		expected;
+		bool		signal_needed = false;
 
 		/*
-		 * If we are signaling our own process, no need to involve the kernel;
-		 * just set the flag directly.
+		 * Implement state machine transitions for the notifier.
+		 * We use a loop to handle race conditions where the state
+		 * changes between our read and the CAS operation.
 		 */
-		if (pid == MyProcPid)
+		uint32	current_state = pg_atomic_read_membarrier_u32(&QUEUE_BACKEND_STATE(procno));
+
+		switch (current_state)
 		{
-			notifyInterruptPending = true;
-			continue;
+			case ASYNC_STATE_IDLE:
+				/* Try to transition from IDLE to SIGNALLED */
+				expected = ASYNC_STATE_IDLE;
+				if (pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(procno),
+													&expected,
+													ASYNC_STATE_SIGNALLED))
+				{
+					/* Success - need to send signal */
+					signal_needed = true;
+					if (Trace_notify)
+						elog(DEBUG1, "SignalBackends: transitioned backend %d from IDLE to SIGNALLED", pid);
+				}
+				/* Another notifier already signaled - we're done */
+				break;
+
+			case ASYNC_STATE_SIGNALLED:
+				/* Backend is already signaled - nothing to do */
+				if (Trace_notify)
+					elog(DEBUG1, "SignalBackends: backend %d already in SIGNALLED state, skipping", pid);
+				break;
+
+			case ASYNC_STATE_PROCESSING:
+				/* Try to transition from PROCESSING to SIGNALLED */
+				expected = ASYNC_STATE_PROCESSING;
+				if (pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(procno),
+													&expected,
+													ASYNC_STATE_SIGNALLED))
+				{
+					/* Success - need to send signal for re-scan */
+					signal_needed = true;
+					if (Trace_notify)
+						elog(DEBUG1, "SignalBackends: transitioned backend %d from PROCESSING to SIGNALLED for re-scan", pid);
+					break;
+				}
+				/* Another notifier already signaled - we're done */
+				break;
+
+			default:
+				/* Should never happen */
+				elog(ERROR, "unexpected async state %u for backend %d",
+						current_state, pid);
 		}
 
-		/*
-		 * Note: assuming things aren't broken, a signal failure here could
-		 * only occur if the target backend exited since we released
-		 * NotifyQueueLock; which is unlikely but certainly possible. So we
-		 * just log a low-level debug message if it happens.
-		 */
-		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
-			elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
+		/* Send signal if needed */
+		if (signal_needed)
+		{
+			/*
+			 * For our own process, no need to involve the kernel
+			 */
+			if (pid == MyProcPid)
+			{
+				SetLatch(MyLatch);
+			}
+			else
+			{
+				/*
+				 * Note: assuming things aren't broken, a signal failure here could
+				 * only occur if the target backend exited since we released
+				 * NotifyQueueLock; which is unlikely but certainly possible. So we
+				 * just log a low-level debug message if it happens.
+				 */
+				if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procno) < 0)
+					elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
+			}
+		}
 	}
 
 	pfree(pids);
@@ -1805,20 +1880,43 @@ HandleNotifyInterrupt(void)
 {
 	/*
 	 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
-	 * you do here.
+	 * you do here. The actual state transition has already been done by
+	 * the notifier before sending the signal, so we only need to set the
+	 * latch to ensure the backend wakes up and processes the notification.
 	 */
 
-	/* signal that work needs to be done */
-	notifyInterruptPending = true;
-
 	/* make sure the event is processed in due course */
 	SetLatch(MyLatch);
 }
 
+/*
+ * IsNotifyInterruptPending
+ *
+ *		Check if there's a pending notify interrupt for this backend
+ */
+bool
+IsNotifyInterruptPending(void)
+{
+	uint32		state;
+
+	/* If not registered as a listener, no notifications are pending */
+	if (!amRegisteredListener)
+		return false;
+
+	/*
+	 * Read the current state with a memory barrier to ensure we see
+	 * the most recent value written by notifiers.
+	 */
+	state = pg_atomic_read_membarrier_u32(&QUEUE_BACKEND_STATE(MyProcNumber));
+
+	/* Notification is pending if state is SIGNALLED */
+	return (state == ASYNC_STATE_SIGNALLED);
+}
+
 /*
  * ProcessNotifyInterrupt
  *
- *		This is called if we see notifyInterruptPending set, just before
+ *		This is called if we see a notification interrupt is pending, just before
  *		transmitting ReadyForQuery at the end of a frontend command, and
  *		also if a notify signal occurs while reading from the frontend.
  *		HandleNotifyInterrupt() will cause the read to be interrupted
@@ -1837,7 +1935,7 @@ ProcessNotifyInterrupt(bool flush)
 		return;					/* not really idle */
 
 	/* Loop in case another signal arrives while sending messages */
-	while (notifyInterruptPending)
+	while (IsNotifyInterruptPending())
 		ProcessIncomingNotify(flush);
 }
 
@@ -2182,28 +2280,81 @@ asyncQueueAdvanceTail(void)
 static void
 ProcessIncomingNotify(bool flush)
 {
-	/* We *must* reset the flag */
-	notifyInterruptPending = false;
+	uint32		expected;
 
-	/* Do nothing else if we aren't actively listening */
+	/* Do nothing if we aren't actively listening */
 	if (listenChannels == NIL)
 		return;
 
+	/*
+	 * Perform state transition from SIGNALLED to PROCESSING.
+	 * This is the "acquire lock" operation for the listener.
+	 */
+	expected = ASYNC_STATE_SIGNALLED;
+	if (!pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(MyProcNumber),
+										&expected,
+										ASYNC_STATE_PROCESSING))
+	{
+		/*
+		 * CAS failed - the state was not SIGNALLED. This should not happen
+		 * as ProcessNotifyInterrupt only calls us when state is SIGNALLED.
+		 */
+		elog(ERROR, "unexpected async state %u in ProcessIncomingNotify, expected SIGNALLED",
+			 expected);
+	}
+
 	if (Trace_notify)
-		elog(DEBUG1, "ProcessIncomingNotify");
+		elog(DEBUG1, "ProcessIncomingNotify: transitioned to PROCESSING");
 
 	set_ps_display("notify interrupt");
 
 	/*
-	 * We must run asyncQueueReadAllNotifications inside a transaction, else
-	 * bad things happen if it gets an error.
-	 */
+		* We must run asyncQueueReadAllNotifications inside a transaction, else
+		* bad things happen if it gets an error.
+		*/
 	StartTransactionCommand();
 
 	asyncQueueReadAllNotifications();
 
 	CommitTransactionCommand();
 
+	/*
+	 * Try to transition from PROCESSING back to IDLE.
+	 * This is the "release lock" operation for the listener.
+	 */
+	expected = ASYNC_STATE_PROCESSING;
+	if (pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(MyProcNumber),
+										&expected,
+										ASYNC_STATE_IDLE))
+	{
+		/* Success - we're done, transitioned to IDLE */
+		if (Trace_notify)
+			elog(DEBUG1, "ProcessIncomingNotify: transitioned to IDLE");
+	}
+	else
+	{
+		/* CAS failed - check what the new state is */
+		if (expected == ASYNC_STATE_SIGNALLED)
+		{
+			/*
+				* A notifier set our state to SIGNALLED while we were processing.
+				* We are done with this batch of work, but we know there is more
+				* to do. Rather than loop here and risk starving other backend
+				* activity, we set our own latch to ensure we are woken up again
+				* to re-process, and then exit. The state is left as SIGNALLED.
+				*/
+			if (Trace_notify)
+				elog(DEBUG1, "ProcessIncomingNotify: signalled while processing");
+			SetLatch(MyLatch);
+		}
+		else
+		{
+			/* Any other state is an error */
+			elog(ERROR, "unexpected async state %u when trying to return to IDLE",
+					expected);
+		}
+	}
+
 	/*
 	 * If this isn't an end-of-command case, we must flush the notify messages
 	 * to ensure frontend gets them promptly.
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 2f8c3d5f918..3216247a58b 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -512,7 +512,7 @@ ProcessClientReadInterrupt(bool blocked)
 			ProcessCatchupInterrupt();
 
 		/* Process notify interrupts, if any */
-		if (notifyInterruptPending)
+		if (IsNotifyInterruptPending())
 			ProcessNotifyInterrupt(true);
 	}
 	else if (ProcDiePending)
@@ -4603,7 +4603,7 @@ PostgresMain(const char *dbname, const char *username)
 				 * were received during the just-finished transaction, they'll
 				 * be seen by the client before ReadyForQuery is.
 				 */
-				if (notifyInterruptPending)
+				if (IsNotifyInterruptPending())
 					ProcessNotifyInterrupt(false);
 
 				/*
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..7f2e0ac0b9f 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -17,7 +17,6 @@
 
 extern PGDLLIMPORT bool Trace_notify;
 extern PGDLLIMPORT int max_notify_queue_pages;
-extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
@@ -46,4 +45,7 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* check if notification interrupt is pending */
+extern bool IsNotifyInterruptPending(void);
+
 #endif							/* ASYNC_H */
-- 
2.47.1



reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected]
  Subject: Re: Optimize LISTEN/NOTIFY
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox