public inbox for [email protected]  
help / color / mirror / Atom feed
From: Joel Jacobson <[email protected]>
To: Tom Lane <[email protected]>
To: Arseniy Mukhin <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Thu, 16 Oct 2025 11:39:02 +0200
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAFY6G8dap-bCnAnMG-2Gzew8yv2Vbi9gsx9+yszKMmd57ygfvA@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAE7r3M+E94vsA_D2iV3R=hq_Gq5k+4wYPzS11u1kDrUaLr1-iQ@mail.gmail.com>
	<[email protected]>

On Wed, Oct 15, 2025, at 16:16, Tom Lane wrote:
> Arseniy Mukhin <[email protected]> writes:
>> I think "Direct advancement" is a good idea. But the way it's
>> implemented now has a concurrency bug. Listeners store its current
>> position in the local variable 'pos' during the reading in
>> asyncQueueReadAllNotifications() and don't hold NotifyQueueLock. It
>> means that some notifier can directly advance the listener's position
>> while the listener has an old value in the local variable. The same
>> time we use listener positions to find out the limit we can truncate
>> the queue in asyncQueueAdvanceTail().
>
> Good catch!

I've implemented the three ideas presented below, attached as .txt files
that are diffs on top of v19, which has these changes since v17:

0002-optimize_listen_notify-v19.patch:
* Improve wording of top comment per request from Chao Li.
* Add initChannelHash call to top of SignalBackends,
  to fix bug reported by Arseniy Mukhin.

> I think we can perhaps salvage the idea if we invent a separate
> "advisory" queue position field, which tells its backend "hey,
> you could skip as far as here if you want", but is not used for
> purposes of SLRU truncation.

Above idea is implemented in 0002-optimize_listen_notify-v19-alt1.txt

> Alternatively, split the queue pos
> into "this is where to read next" and "this is as much as I'm
> definitively done with", where the second field gets advanced at
> the end of asyncQueueReadAllNotifications.  Not sure which
> view would be less confusing (in the end I guess they're nearly
> the same thing, differently explained).

Above idea is implemented in 0002-optimize_listen_notify-v19-alt2.txt

> A different line of thought could be to get rid of
> asyncQueueReadAllNotifications's optimization of moving the
> queue pos only once, per
>
> 	 * (We could alternatively retake NotifyQueueLock and move the position
> 	 * before handling each individual message, but that seems like too much
> 	 * lock traffic.)
>
> Since we only need shared lock to advance our own queue pos,
> maybe that wouldn't be too awful.  Not sure.

Above idea is implemented in 0002-optimize_listen_notify-v19-alt3.txt

/Joel
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..44442e927ff 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -264,6 +264,11 @@ typedef struct QueuePosition
 	 (x).page != (y).page ? (x) : \
 	 (x).offset > (y).offset ? (x) : (y))
 
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) || \
+	 ((x).page == (y).page && (x).offset < (y).offset))
+
 /*
  * Parameter determining how often we try to advance the tail pointer:
  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
@@ -286,6 +291,7 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	QueuePosition advisoryPos;	/* backend could skip queue to here */
 	bool		wakeupPending;	/* signal sent but not yet processed */
 } QueueBackendStatus;
 
@@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_ADVISORY_POS(i)	(asyncQueueControl->backend[i].advisoryPos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
 
 /*
@@ -668,6 +675,7 @@ AsyncShmemInit(void)
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
 		}
 	}
@@ -2009,9 +2017,14 @@ SignalBackends(void)
 	 * Even though we may take and release NotifyQueueLock multiple times
 	 * while writing, the heavyweight lock guarantees this region contains
 	 * only our messages.  Therefore, any backend still positioned at the
-	 * queue head from before our write can be safely advanced to the current
+	 * queue head from before our write can be advised to skip to the current
 	 * queue head without waking it.
 	 *
+	 * We use the advisoryPos field rather than directly modifying pos,
+	 * because the listening backend might be concurrently reading
+	 * notifications using its local copy of pos.  The backend controls its
+	 * own pos field and will check advisoryPos when it's safe to do so.
+	 *
 	 * False-positive possibility: if a backend was previously signaled but
 	 * hasn't yet awoken, we'll skip advancing it (because wakeupPending is
 	 * true).  This is safe - the backend will advance its pointer when it
@@ -2038,7 +2051,7 @@ SignalBackends(void)
 		if (pendingNotifies != NULL &&
 			QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
 		{
-			QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+			QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
 			pos = queueHeadAfterWrite;
 		}
 
@@ -2297,6 +2310,26 @@ asyncQueueReadAllNotifications(void)
 	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
 	head = QUEUE_HEAD;
+
+	/*
+	 * Check if another backend has set an advisory position for us.
+	 * If so, and if we haven't yet read past that point, we can safely
+	 * adopt the advisory position and skip the intervening notifications.
+	 * This is safe because the advisory position is only set when we're
+	 * positioned at a known point and the skipped region contains only
+	 * notifications we're not interested in.
+	 */
+	{
+		QueuePosition advisoryPos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber);
+
+		if (!QUEUE_POS_EQUAL(advisoryPos, pos) &&
+			QUEUE_POS_PRECEDES(pos, advisoryPos))
+		{
+			pos = advisoryPos;
+			QUEUE_BACKEND_POS(MyProcNumber) = pos;
+		}
+	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	if (QUEUE_POS_EQUAL(pos, head))

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..e201deb5e54 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -70,14 +70,14 @@
  *	  CommitTransaction() which will then do the actual transaction commit.
  *
  *	  After commit we are called another time (AtCommit_Notify()). Here we
- *	  make any actual updates to the local listen state (listenChannels) and
- *	  shared channel hash table (channelHash).  Then we signal any backends
- *	  that may be interested in our messages (including our own backend,
- *	  if listening).  This is done by SignalBackends(), which consults the
- *	  shared channel hash table to identify listeners for the channels that
- *	  have pending notifications in the current database.  Each selected
- *	  backend is marked as having a wakeup pending to avoid duplicate signals,
- *	  and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it.
+ *	  make any actual updates to the effective listen state (channelHash).
+ *	  Then we signal any backends that may be interested in our messages
+ *	  (including our own backend, if listening).  This is done by
+ *	  SignalBackends(), which consults the shared channel hash table to
+ *	  identify listeners for the channels that have pending notifications
+ *	  in the current database.  Each selected backend is marked as having a
+ *	  wakeup pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
+ *	  signal is sent to it.
  *
  *	  When writing notifications, PreCommit_Notify() records the queue head
  *	  position both before and after the write.  Because all writers serialize
@@ -2282,6 +2282,7 @@ asyncQueueReadAllNotifications(void)
 	volatile QueuePosition pos;
 	QueuePosition head;
 	Snapshot	snapshot;
+	bool		reachedStop;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -2350,77 +2351,83 @@ asyncQueueReadAllNotifications(void)
 	 * It is possible that we fail while trying to send a message to our
 	 * frontend (for example, because of encoding conversion failure).  If
 	 * that happens it is critical that we not try to send the same message
-	 * over and over again.  Therefore, we place a PG_TRY block here that will
-	 * forcibly advance our queue position before we lose control to an error.
-	 * (We could alternatively retake NotifyQueueLock and move the position
-	 * before handling each individual message, but that seems like too much
-	 * lock traffic.)
+	 * over and over again.  Therefore, we must advance our queue position
+	 * regularly as we process messages.
+	 *
+	 * We must also be careful about concurrency: SignalBackends() can
+	 * directly advance our position while we're reading.  To prevent
+	 * overwriting such an advancement with a stale value, we update our
+	 * position in shared memory after processing messages from each page,
+	 * while holding NotifyQueueLock.  Shared lock is sufficient since we're
+	 * only updating our own position.
 	 */
-	PG_TRY();
+	do
 	{
-		bool		reachedStop;
+		int64		curpage = QUEUE_POS_PAGE(pos);
+		int			curoffset = QUEUE_POS_OFFSET(pos);
+		int			slotno;
+		int			copysize;
 
-		do
+		/*
+		 * We copy the data from SLRU into a local buffer, so as to avoid
+		 * holding the SLRU lock while we are examining the entries and
+		 * possibly transmitting them to our frontend.  Copy only the part
+		 * of the page we will actually inspect.
+		 */
+		slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+											InvalidTransactionId);
+		if (curpage == QUEUE_POS_PAGE(head))
 		{
-			int64		curpage = QUEUE_POS_PAGE(pos);
-			int			curoffset = QUEUE_POS_OFFSET(pos);
-			int			slotno;
-			int			copysize;
+			/* we only want to read as far as head */
+			copysize = QUEUE_POS_OFFSET(head) - curoffset;
+			if (copysize < 0)
+				copysize = 0;	/* just for safety */
+		}
+		else
+		{
+			/* fetch all the rest of the page */
+			copysize = QUEUE_PAGESIZE - curoffset;
+		}
+		memcpy(page_buffer.buf + curoffset,
+			   NotifyCtl->shared->page_buffer[slotno] + curoffset,
+			   copysize);
+		/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+		LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
 
-			/*
-			 * We copy the data from SLRU into a local buffer, so as to avoid
-			 * holding the SLRU lock while we are examining the entries and
-			 * possibly transmitting them to our frontend.  Copy only the part
-			 * of the page we will actually inspect.
-			 */
-			slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-												InvalidTransactionId);
-			if (curpage == QUEUE_POS_PAGE(head))
-			{
-				/* we only want to read as far as head */
-				copysize = QUEUE_POS_OFFSET(head) - curoffset;
-				if (copysize < 0)
-					copysize = 0;	/* just for safety */
-			}
-			else
-			{
-				/* fetch all the rest of the page */
-				copysize = QUEUE_PAGESIZE - curoffset;
-			}
-			memcpy(page_buffer.buf + curoffset,
-				   NotifyCtl->shared->page_buffer[slotno] + curoffset,
-				   copysize);
-			/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
-			LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+		/*
+		 * Process messages up to the stop position, end of page, or an
+		 * uncommitted message.
+		 *
+		 * Our stop position is what we found to be the head's position
+		 * when we entered this function. It might have changed already.
+		 * But if it has, we will receive (or have already received and
+		 * queued) another signal and come here again.
+		 *
+		 * We are not holding NotifyQueueLock here! The queue can only
+		 * extend beyond the head pointer (see above).  We update our
+		 * backend's position after processing messages from each page to
+		 * ensure we don't reprocess messages if we fail partway through,
+		 * and to avoid overwriting any direct advancement that
+		 * SignalBackends() might perform concurrently.
+		 */
+		reachedStop = asyncQueueProcessPageEntries(&pos, head,
+												   page_buffer.buf,
+												   snapshot);
 
-			/*
-			 * Process messages up to the stop position, end of page, or an
-			 * uncommitted message.
-			 *
-			 * Our stop position is what we found to be the head's position
-			 * when we entered this function. It might have changed already.
-			 * But if it has, we will receive (or have already received and
-			 * queued) another signal and come here again.
-			 *
-			 * We are not holding NotifyQueueLock here! The queue can only
-			 * extend beyond the head pointer (see above) and we leave our
-			 * backend's pointer where it is so nobody will truncate or
-			 * rewrite pages under us. Especially we don't want to hold a lock
-			 * while sending the notifications to the frontend.
-			 */
-			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf,
-													   snapshot);
-		} while (!reachedStop);
-	}
-	PG_FINALLY();
-	{
-		/* Update shared state */
+		/*
+		 * Update our position in shared memory.  The 'pos' variable now
+		 * holds our new position (advanced past all messages we just
+		 * processed).  This ensures that if we fail while processing
+		 * messages from the next page, we won't reprocess the ones we
+		 * just handled.  It also prevents us from overwriting any direct
+		 * advancement that another backend might have done while we were
+		 * processing messages.
+		 */
 		LWLockAcquire(NotifyQueueLock, LW_SHARED);
 		QUEUE_BACKEND_POS(MyProcNumber) = pos;
 		LWLockRelease(NotifyQueueLock);
-	}
-	PG_END_TRY();
+
+	} while (!reachedStop);
 
 	/* Done with snapshot */
 	UnregisterSnapshot(snapshot);

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..751400b8315 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -285,7 +285,8 @@ typedef struct QueueBackendStatus
 	int32		pid;			/* either a PID or InvalidPid */
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
-	QueuePosition pos;			/* backend has read queue up to here */
+	QueuePosition pos;			/* next position to read from */
+	QueuePosition donePos;		/* backend has definitively processed up to here */
 	bool		wakeupPending;	/* signal sent but not yet processed */
 } QueueBackendStatus;
 
@@ -347,6 +348,7 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_DONEPOS(i)	(asyncQueueControl->backend[i].donePos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
 
 /*
@@ -668,6 +670,7 @@ AsyncShmemInit(void)
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			SET_QUEUE_POS(QUEUE_BACKEND_DONEPOS(i), 0, 0);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
 		}
 	}
@@ -1290,6 +1293,7 @@ Exec_ListenPreCommit(void)
 			prevListener = i;
 	}
 	QUEUE_BACKEND_POS(MyProcNumber) = max;
+	QUEUE_BACKEND_DONEPOS(MyProcNumber) = max;
 	QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
 	/* Insert backend into list of listeners at correct position */
@@ -2415,9 +2419,19 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_FINALLY();
 	{
-		/* Update shared state */
+		/*
+		 * Update shared state.
+		 *
+		 * We update donePos to what we actually read (the local pos variable),
+		 * as this is used for truncation safety.  For the read position (pos),
+		 * we use the maximum of our local position and the current shared
+		 * position, in case another backend used direct advancement to skip us
+		 * ahead while we were reading.  This prevents us from going backwards
+		 * and potentially pointing to a truncated page.
+		 */
 		LWLockAcquire(NotifyQueueLock, LW_SHARED);
-		QUEUE_BACKEND_POS(MyProcNumber) = pos;
+		QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos;
+		QUEUE_BACKEND_POS(MyProcNumber) = QUEUE_POS_MAX(pos, QUEUE_BACKEND_POS(MyProcNumber));
 		LWLockRelease(NotifyQueueLock);
 	}
 	PG_END_TRY();
@@ -2567,7 +2581,13 @@ asyncQueueAdvanceTail(void)
 	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
 	{
 		Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-		min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+		/*
+		 * Use donePos rather than pos for truncation safety.  The donePos
+		 * field represents what the backend has definitively processed, while
+		 * pos can be advanced by other backends via direct advancement.  This
+		 * prevents truncating pages that a backend is still reading from.
+		 */
+		min = QUEUE_POS_MIN(min, QUEUE_BACKEND_DONEPOS(i));
 	}
 	QUEUE_TAIL = min;
 	oldtailpage = QUEUE_STOP_PAGE;


Attachments:

  [application/octet-stream] 0001-optimize_listen_notify-v19.patch (9.3K, 2-0001-optimize_listen_notify-v19.patch)
  download | inline diff:
From f37095250521d0a29d812997b7b79d938ed9c894 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Wed, 8 Oct 2025 09:30:54 +0200
Subject: [PATCH 1/2] Improve LISTEN/NOTIFY test coverage

This adds isolation tests to cover previously untested code paths:

* Check simple NOTIFY reparenting when parent has no action
* Check LISTEN reparenting in subtransaction
* Check LISTEN merge path when both outer and inner transactions have actions
* Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions)
* Check notification_match function (triggered by hash table duplicate detection)
* Check that notifications sent from a backend that has not done LISTEN
  are properly delivered to a listener in another backend

This also adds a test to prepare for the next patch:

* Check ChannelHashAddListener array growth
---
 src/test/isolation/expected/async-notify.out | 114 ++++++++++++++++++-
 src/test/isolation/specs/async-notify.spec   |  68 +++++++++++
 2 files changed, 181 insertions(+), 1 deletion(-)

diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893..443a6eb669f 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 7 sessions
 
 starting permutation: listenc notify1 notify2 notify3 notifyf
 step listenc: LISTEN c1; LISTEN c2;
@@ -47,6 +47,105 @@ notifier: NOTIFY "c2" with payload "payload" from notifier
 notifier: NOTIFY "c1" with payload "payloads" from notifier
 notifier: NOTIFY "c2" with payload "payloads" from notifier
 
+starting permutation: listenc notifys_simple
+step listenc: LISTEN c1; LISTEN c2;
+step notifys_simple: 
+	BEGIN;
+	SAVEPOINT s1;
+	NOTIFY c1, 'simple1';
+	NOTIFY c2, 'simple2';
+	RELEASE SAVEPOINT s1;
+	COMMIT;
+
+notifier: NOTIFY "c1" with payload "simple1" from notifier
+notifier: NOTIFY "c2" with payload "simple2" from notifier
+
+starting permutation: lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lslisten_outer: LISTEN c3;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrollback: ROLLBACK TO SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify_check: NOTIFY c1, 'should_not_receive';
+
+starting permutation: listenc notify_many_with_dup
+step listenc: LISTEN c1; LISTEN c2;
+step notify_many_with_dup: 
+	BEGIN;
+	SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+	SELECT pg_notify('c1', 'msg1');
+	COMMIT;
+
+pg_notify
+---------
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+(17 rows)
+
+pg_notify
+---------
+         
+(1 row)
+
+notifier: NOTIFY "c1" with payload "msg1" from notifier
+notifier: NOTIFY "c1" with payload "msg2" from notifier
+notifier: NOTIFY "c1" with payload "msg3" from notifier
+notifier: NOTIFY "c1" with payload "msg4" from notifier
+notifier: NOTIFY "c1" with payload "msg5" from notifier
+notifier: NOTIFY "c1" with payload "msg6" from notifier
+notifier: NOTIFY "c1" with payload "msg7" from notifier
+notifier: NOTIFY "c1" with payload "msg8" from notifier
+notifier: NOTIFY "c1" with payload "msg9" from notifier
+notifier: NOTIFY "c1" with payload "msg10" from notifier
+notifier: NOTIFY "c1" with payload "msg11" from notifier
+notifier: NOTIFY "c1" with payload "msg12" from notifier
+notifier: NOTIFY "c1" with payload "msg13" from notifier
+notifier: NOTIFY "c1" with payload "msg14" from notifier
+notifier: NOTIFY "c1" with payload "msg15" from notifier
+notifier: NOTIFY "c1" with payload "msg16" from notifier
+notifier: NOTIFY "c1" with payload "msg17" from notifier
+
+starting permutation: listenc llisten l2listen l3listen lslisten
+step listenc: LISTEN c1; LISTEN c2;
+step llisten: LISTEN c1; LISTEN c2;
+step l2listen: LISTEN c1;
+step l3listen: LISTEN c1;
+step lslisten: LISTEN c1; LISTEN c2;
+
 starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
 step llisten: LISTEN c1; LISTEN c2;
 step notify1: NOTIFY c1;
@@ -95,6 +194,8 @@ listener: NOTIFY "c2" with payload "" from notifier
 
 starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
 step l2listen: LISTEN c1;
+listener2: NOTIFY "c1" with payload "" from notifier
+listener2: NOTIFY "c1" with payload "" from notifier
 step l2begin: BEGIN;
 step notify1: NOTIFY c1;
 step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE;
@@ -104,6 +205,17 @@ step l2commit: COMMIT;
 listener2: NOTIFY "c1" with payload "" from notifier
 step l2stop: UNLISTEN *;
 
+starting permutation: lch_listen nch_notify lch_check
+step lch_listen: LISTEN ch;
+step nch_notify: NOTIFY ch, 'aa';
+step lch_check: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+listener_ch: NOTIFY "ch" with payload "aa" from notifier_ch
+
 starting permutation: llisten lbegin usage bignotify usage
 step llisten: LISTEN c1; LISTEN c2;
 step lbegin: BEGIN;
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..0a01e777b98 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -31,6 +31,20 @@ step notifys1	{
 	ROLLBACK TO SAVEPOINT s2;
 	COMMIT;
 }
+step notifys_simple	{
+	BEGIN;
+	SAVEPOINT s1;
+	NOTIFY c1, 'simple1';
+	NOTIFY c2, 'simple2';
+	RELEASE SAVEPOINT s1;
+	COMMIT;
+}
+step notify_many_with_dup	{
+	BEGIN;
+	SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+	SELECT pg_notify('c1', 'msg1');
+	COMMIT;
+}
 step usage		{ SELECT pg_notification_queue_usage() > 0 AS nonzero; }
 step bignotify	{ SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
 teardown		{ UNLISTEN *; }
@@ -53,6 +67,38 @@ step l2begin	{ BEGIN; }
 step l2commit	{ COMMIT; }
 step l2stop		{ UNLISTEN *; }
 
+# Third listener session for testing array growth.
+
+session listener3
+step l3listen	{ LISTEN c1; }
+teardown		{ UNLISTEN *; }
+
+# Listener session for cross-session notification test with channel 'ch'.
+
+session listener_ch
+step lch_listen	{ LISTEN ch; }
+step lch_check	{ SELECT 1 AS x; }
+teardown		{ UNLISTEN *; }
+
+# Notifier session for cross-session notification test with channel 'ch'.
+
+session notifier_ch
+step nch_notify	{ NOTIFY ch, 'aa'; }
+
+# Session for testing LISTEN in subtransaction with separate steps.
+
+session listen_subxact
+step lsbegin	{ BEGIN; }
+step lslisten_outer	{ LISTEN c3; }
+step lssavepoint	{ SAVEPOINT s1; }
+step lslisten	{ LISTEN c1; LISTEN c2; }
+step lsrelease	{ RELEASE SAVEPOINT s1; }
+step lsrollback	{ ROLLBACK TO SAVEPOINT s1; }
+step lscommit	{ COMMIT; }
+step lsnotify	{ NOTIFY c1, 'subxact_test'; }
+step lsnotify_check	{ NOTIFY c1, 'should_not_receive'; }
+teardown		{ UNLISTEN *; }
+
 
 # Trivial cases.
 permutation listenc notify1 notify2 notify3 notifyf
@@ -60,6 +106,24 @@ permutation listenc notify1 notify2 notify3 notifyf
 # Check simple and less-simple deduplication.
 permutation listenc notifyd1 notifyd2 notifys1
 
+# Check simple NOTIFY reparenting when parent has no action.
+permutation listenc notifys_simple
+
+# Check LISTEN reparenting in subtransaction.
+permutation lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN merge path when both outer and inner transactions have actions.
+permutation lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions).
+permutation lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+
+# Check notification_match function (triggered by hash table duplicate detection).
+permutation listenc notify_many_with_dup
+
+# Check ChannelHashAddListener array growth.
+permutation listenc llisten l2listen l3listen lslisten
+
 # Cross-backend notification delivery.  We use a "select 1" to force the
 # listener session to check for notifies.  In principle we could just wait
 # for delivery, but that would require extra support in isolationtester
@@ -73,6 +137,10 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck
 # and notify queue is not empty
 permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
 
+# Check that notifications sent from a backend that has not done LISTEN
+# are properly delivered to a listener in another backend.
+permutation lch_listen nch_notify lch_check
+
 # Verify that pg_notification_queue_usage correctly reports a non-zero result,
 # after submitting notifications while another connection is listening for
 # those notifications and waiting inside an active transaction.  We have to
-- 
2.50.1



  [application/octet-stream] 0002-optimize_listen_notify-v19.patch (31.3K, 3-0002-optimize_listen_notify-v19.patch)
  download | inline diff:
From 8d77fa4296f530b0381cf2e612774f0feaf8b506 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Tue, 14 Oct 2025 08:03:19 +0200
Subject: [PATCH 2/2] Optimize LISTEN/NOTIFY with shared channel map and direct
 advancement

This patch reworks the LISTEN/NOTIFY signaling path to avoid the
long-standing inefficiency where every commit wakes all listening
backends in the same database, even those that are listening on
completely different channels.

Problem
-------

At present, SignalBackends has no central knowledge of which backend
listens on which channel. When a backend commits a transaction that
issued NOTIFY, it simply iterates over all registered listeners in the
same database and sends each one a PROCSIG_NOTIFY_INTERRUPT signal.

That behavior is fine when all listeners are on the same channel, but
when many backends are listening on different channels, each NOTIFY
triggers a storm of unnecessary wakeups and context switches. As the
number of idle listeners grows, this often becomes the bottleneck and
throughput drops sharply.

Overview of the solution
------------------------

This patch introduces a lazily-created dynamic shared hash (dshash)
backed by dynamic shared memory (DSA) that maps (dboid, channel) to
arrays of listening backends (ProcNumbers). This allows the sender to
target only those backends actually listening on the channels for which
it has queued notifications.

At commit time:

* AtCommit_Notify updates the shared channelHash to reflect any LISTEN
  or UNLISTEN actions performed in the transaction.
* SignalBackends consults this hash to find the backends that are
  listening on the channels being notified in the current database, and
  signals only those.

Each backend's entry in AsyncQueueControl now includes a wakeupPending
flag to prevent duplicate signals while a previous wakeup is still being
processed.

Direct advancement
------------------

A further optimization avoids signaling idle backends that are not
listening on any of the channels notified within the transaction.

While queuing notifications, PreCommit_Notify records the queue head
position both before and after writing its notifications. Because all
writers are serialized by the existing cluster-wide heavyweight lock on
"database 0", no backend (from any database) can insert entries between
those two points. This guarantees that the region [oldHead, newHead)
contains only the entries written by our commit.

SignalBackends uses this fact to directly advance any backend still
positioned at oldHead up to newHead, avoiding a needless wakeup for
listeners that would otherwise not find any notifies of interest.

Queue health
------------

If a backend has fallen too far behind (lag >= QUEUE_CLEANUP_DELAY
pages), it is signaled to catch up so the global queue tail can advance.

Other notes
-----------

* Maintains dual data structures: a shared channelHash for determining
  which backends to signal, and a local per-backend listenChannels list
  for fast lock-free lookups during notification processing. This avoids
  contention on the shared hash during the high-frequency IsListeningOn
  checks that occur for every notification read from the queue.
* Backends remain registered in the global listener list as long as
  listenChannels is non-empty.
* Adds LWLock tranche NOTIFY_CHANNEL_HASH and wait event
  NotifyChannelHash for visibility.
* No user-visible behavioral changes; this is an internal optimization
  only.
---
 src/backend/commands/async.c                  | 538 ++++++++++++++++--
 .../utils/activity/wait_event_names.txt       |   1 +
 src/include/storage/lwlocklist.h              |   1 +
 src/tools/pgindent/typedefs.list              |   2 +
 4 files changed, 495 insertions(+), 47 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..90a530cfc61 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,10 @@
  *	  All notification messages are placed in the queue and later read out
  *	  by listening backends.
  *
- *	  There is no central knowledge of which backend listens on which channel;
- *	  every backend has its own list of interesting channels.
+ *	  We also maintain a dynamic shared hash table (dshash) that maps channel
+ *	  names to the set of backends listening on each channel. This table is
+ *	  created lazily on the first LISTEN command and grows dynamically as
+ *	  needed.
  *
  *	  Although there is only one queue, notifications are treated as being
  *	  database-local; this is done by including the sender's database OID
@@ -68,16 +70,27 @@
  *	  CommitTransaction() which will then do the actual transaction commit.
  *
  *	  After commit we are called another time (AtCommit_Notify()). Here we
- *	  make any actual updates to the effective listen state (listenChannels).
- *	  Then we signal any backends that may be interested in our messages
- *	  (including our own backend, if listening).  This is done by
- *	  SignalBackends(), which scans the list of listening backends and sends a
- *	  PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- *	  know which backend is listening on which channel so we must signal them
- *	  all).  We can exclude backends that are already up to date, though, and
- *	  we can also exclude backends that are in other databases (unless they
- *	  are way behind and should be kicked to make them advance their
- *	  pointers).
+ *	  make any actual updates to the local listen state (listenChannels) and
+ *	  shared channel hash table (channelHash).  Then we signal any backends
+ *	  that may be interested in our messages (including our own backend,
+ *	  if listening).  This is done by SignalBackends(), which consults the
+ *	  shared channel hash table to identify listeners for the channels that
+ *	  have pending notifications in the current database.  Each selected
+ *	  backend is marked as having a wakeup pending to avoid duplicate signals,
+ *	  and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it.
+ *
+ *	  When writing notifications, PreCommit_Notify() records the queue head
+ *	  position both before and after the write.  Because all writers serialize
+ *	  on a cluster-wide heavyweight lock, no backend can insert entries between
+ *	  these two points.  SignalBackends() uses this fact to directly advance any
+ *	  backend that is still positioned at the old head, avoiding unnecessary
+ *	  wakeups for idle listeners that have nothing to read.
+ *
+ *	  To maintain queue health, SignalBackends() also wakes one backend
+ *	  positioned at the global queue tail to help advance it, and signals
+ *	  any backend that has fallen too far behind to catch up.  These measures
+ *	  prevent the notification queue from growing indefinitely, while mostly
+ *	  limiting wakeups to the backends that actually need them.
  *
  *	  Finally, after we are out of the transaction altogether and about to go
  *	  idle, we scan the queue for messages that need to be sent to our
@@ -128,6 +141,7 @@
 #include <limits.h>
 #include <unistd.h>
 #include <signal.h>
+#include <string.h>
 
 #include "access/parallel.h"
 #include "access/slru.h"
@@ -137,14 +151,17 @@
 #include "commands/async.h"
 #include "common/hashfn.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
+#include "storage/dsm_registry.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/procsignal.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
+#include "utils/dsa.h"
 #include "utils/guc_hooks.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -162,6 +179,29 @@
  */
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
+/*
+ * Channel hash table definitions
+ *
+ * This hash table maps (database OID, channel name) keys to arrays of
+ * ProcNumbers representing the backends listening on each channel.
+ */
+
+#define INITIAL_LISTENERS_ARRAY_SIZE 4
+
+typedef struct ChannelHashKey
+{
+	Oid			dboid;
+	char		channel[NAMEDATALEN];
+} ChannelHashKey;
+
+typedef struct ChannelEntry
+{
+	ChannelHashKey key;
+	dsa_pointer listenersArray; /* DSA pointer to ProcNumber array */
+	int			numListeners;	/* Number of listeners currently stored */
+	int			allocatedListeners; /* Allocated size of array */
+} ChannelEntry;
+
 /*
  * Struct representing an entry in the global notify queue
  *
@@ -227,8 +267,8 @@ typedef struct QueuePosition
 /*
  * Parameter determining how often we try to advance the tail pointer:
  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
- * also the distance by which a backend in another database needs to be
- * behind before we'll decide we need to wake it up to advance its pointer.
+ * also the distance by which a backend needs to be behind before we'll
+ * decide we need to wake it up to advance its pointer.
  *
  * Resist the temptation to make this really large.  While that would save
  * work in some places, it would add cost in others.  In particular, this
@@ -246,6 +286,7 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	bool		wakeupPending;	/* signal sent but not yet processed */
 } QueueBackendStatus;
 
 /*
@@ -260,9 +301,9 @@ typedef struct QueueBackendStatus
  * (since no other backend will inspect it).
  *
  * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
- * entries of other backends and also change the head pointer. When holding
- * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
- * can change the tail pointers.
+ * entries of other backends, change the head pointer, and advance other
+ * backends' queue positions. When holding both NotifyQueueLock and
+ * NotifyQueueTailLock in EXCLUSIVE mode, backends can change the tail pointers.
  *
  * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
  * the control lock for the pg_notify SLRU buffers.
@@ -288,11 +329,16 @@ typedef struct AsyncQueueControl
 	ProcNumber	firstListener;	/* id of first listener, or
 								 * INVALID_PROC_NUMBER */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
+	dsa_handle	channelHashDSA;
+	dshash_table_handle channelHashDSH;
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
 static AsyncQueueControl *asyncQueueControl;
 
+static dsa_area *channelDSA = NULL;
+static dshash_table *channelHash = NULL;
+
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
@@ -301,6 +347,7 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -418,6 +465,20 @@ static bool unlistenExitRegistered = false;
 /* True if we're currently registered as a listener in asyncQueueControl */
 static bool amRegisteredListener = false;
 
+/*
+ * Queue head positions for direct advancement.
+ * These are captured during PreCommit_Notify while holding the heavyweight
+ * lock on database 0, ensuring no other backend can insert notifications
+ * between them.  SignalBackends uses these to advance idle backends.
+ */
+static QueuePosition queueHeadBeforeWrite;
+static QueuePosition queueHeadAfterWrite;
+
+/*
+ * List of channels with pending notifications in the current transaction.
+ */
+static List *pendingNotifyChannels = NIL;
+
 /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
 static bool tryAdvanceTail = false;
 
@@ -457,6 +518,9 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
+static inline void ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel);
+static dshash_hash channelHashFunc(const void *key, size_t size, void *arg);
+static void initChannelHash(void);
 
 /*
  * Compute the difference between two queue page numbers.
@@ -478,6 +542,80 @@ asyncQueuePagePrecedes(int64 p, int64 q)
 	return p < q;
 }
 
+/*
+ * channelHashFunc
+ *		Hash function for channel keys.
+ */
+static dshash_hash
+channelHashFunc(const void *key, size_t size, void *arg)
+{
+	const ChannelHashKey *k = (const ChannelHashKey *) key;
+	dshash_hash h;
+
+	h = DatumGetUInt32(hash_uint32(k->dboid));
+	h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+								 strnlen(k->channel, NAMEDATALEN)));
+
+	return h;
+}
+
+/* parameters for the channel hash table */
+static const dshash_parameters channelDSHParams = {
+	sizeof(ChannelHashKey),
+	sizeof(ChannelEntry),
+	dshash_memcmp,
+	channelHashFunc,
+	dshash_memcpy,
+	LWTRANCHE_NOTIFY_CHANNEL_HASH
+};
+
+/*
+ * initChannelHash
+ *		Lazy initialization of the channel hash table.
+ */
+static void
+initChannelHash(void)
+{
+	MemoryContext oldcontext;
+
+	/* Quick exit if we already did this */
+	if (asyncQueueControl->channelHashDSH != DSHASH_HANDLE_INVALID &&
+		channelHash != NULL)
+		return;
+
+	/* Otherwise, use a lock to ensure only one process creates the table */
+	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+	/* Be sure any local memory allocated by DSA routines is persistent */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+	if (asyncQueueControl->channelHashDSH == DSHASH_HANDLE_INVALID)
+	{
+		/* Initialize dynamic shared hash table for channel hash */
+		channelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
+		dsa_pin(channelDSA);
+		dsa_pin_mapping(channelDSA);
+		channelHash = dshash_create(channelDSA, &channelDSHParams, NULL);
+
+		/* Store handles in shared memory for other backends to use */
+		asyncQueueControl->channelHashDSA = dsa_get_handle(channelDSA);
+		asyncQueueControl->channelHashDSH =
+			dshash_get_hash_table_handle(channelHash);
+	}
+	else if (!channelHash)
+	{
+		/* Attach to existing dynamic shared hash table */
+		channelDSA = dsa_attach(asyncQueueControl->channelHashDSA);
+		dsa_pin_mapping(channelDSA);
+		channelHash = dshash_attach(channelDSA, &channelDSHParams,
+									asyncQueueControl->channelHashDSH,
+									NULL);
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+	LWLockRelease(NotifyQueueLock);
+}
+
 /*
  * Report space needed for our shared memory area
  */
@@ -521,12 +659,16 @@ AsyncShmemInit(void)
 		QUEUE_STOP_PAGE = 0;
 		QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
 		asyncQueueControl->lastQueueFillWarn = 0;
+		asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
+		asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+
 		for (int i = 0; i < MaxBackends; i++)
 		{
 			QUEUE_BACKEND_PID(i) = InvalidPid;
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
 		}
 	}
 
@@ -894,6 +1036,7 @@ PreCommit_Notify(void)
 	if (pendingNotifies)
 	{
 		ListCell   *nextNotify;
+		bool		firstIteration = true;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
@@ -922,6 +1065,35 @@ PreCommit_Notify(void)
 		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
 						 AccessExclusiveLock);
 
+		/*
+		 * Build list of unique channels for SignalBackends().
+		 */
+		pendingNotifyChannels = NIL;
+		foreach_ptr(Notification, n, pendingNotifies->events)
+		{
+			char	   *channel = n->data;
+
+			/* Add if not already in list */
+			if (!list_member_ptr(pendingNotifyChannels, channel))
+				pendingNotifyChannels = lappend(pendingNotifyChannels, channel);
+		}
+
+		/*
+		 * For the direct advancement optimization in SignalBackends(), we
+		 * need to ensure that no other backend can insert queue entries
+		 * between queueHeadBeforeWrite and queueHeadAfterWrite.  The
+		 * heavyweight lock above provides this guarantee, since it serializes
+		 * all writers.
+		 *
+		 * Note: if the heavyweight lock were ever removed for scalability
+		 * reasons, we could achieve the same guarantee by holding
+		 * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
+		 * than releasing and reacquiring it for each page as we do below.
+		 */
+
+		/* Initialize queueHeadBeforeWrite to a safe default */
+		SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
+
 		/* Now push the notifications into the queue */
 		nextNotify = list_head(pendingNotifies->events);
 		while (nextNotify != NULL)
@@ -939,12 +1111,33 @@ PreCommit_Notify(void)
 			 * point in time we can still roll the transaction back.
 			 */
 			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+			/*
+			 * On the first iteration, save the queue head position before we
+			 * write any notifications.  This is used by SignalBackends() to
+			 * identify backends that can be advanced directly without waking
+			 * them up.
+			 */
+			if (firstIteration)
+			{
+				queueHeadBeforeWrite = QUEUE_HEAD;
+				firstIteration = false;
+			}
+
 			asyncQueueFillWarning();
 			if (asyncQueueIsFull())
 				ereport(ERROR,
 						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
 						 errmsg("too many notifications in the NOTIFY queue")));
 			nextNotify = asyncQueueAddEntries(nextNotify);
+
+			/*
+			 * Capture the queue head after each batch of entries.  On the
+			 * last iteration, this gives us the final queue head position for
+			 * SignalBackends() to use when advancing idle backends.
+			 */
+			queueHeadAfterWrite = QUEUE_HEAD;
+
 			LWLockRelease(NotifyQueueLock);
 		}
 
@@ -1135,6 +1328,10 @@ Exec_ListenPreCommit(void)
 static void
 Exec_ListenCommit(const char *channel)
 {
+	ChannelHashKey key;
+	ChannelEntry *entry;
+	bool		found;
+	ProcNumber *listeners;
 	MemoryContext oldcontext;
 
 	/* Do nothing if we are already listening on this channel */
@@ -1152,21 +1349,84 @@ Exec_ListenCommit(const char *channel)
 	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
 	listenChannels = lappend(listenChannels, pstrdup(channel));
 	MemoryContextSwitchTo(oldcontext);
+
+	/* Now update the shared channelHash for SignalBackends() to use */
+	initChannelHash();
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/*
+	 * For new entries, we initialize listenersArray to InvalidDsaPointer as a
+	 * marker. This handles both the initial creation and potential retry
+	 * after OOM.
+	 */
+	entry = dshash_find_or_insert(channelHash, &key, &found);
+
+	if (!found)
+		entry->listenersArray = InvalidDsaPointer;
+
+	if (!DsaPointerIsValid(entry->listenersArray))
+	{
+		/* First listener for this channel */
+		entry->listenersArray = dsa_allocate(channelDSA,
+											 sizeof(ProcNumber) * INITIAL_LISTENERS_ARRAY_SIZE);
+		entry->numListeners = 0;
+		entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
+	}
+
+	listeners = (ProcNumber *) dsa_get_address(channelDSA,
+											   entry->listenersArray);
+
+	for (int i = 0; i < entry->numListeners; i++)
+	{
+		if (listeners[i] == MyProcNumber)
+		{
+			dshash_release_lock(channelHash, entry);
+			return;				/* Already registered */
+		}
+	}
+
+	if (entry->numListeners >= entry->allocatedListeners)
+	{
+		int			new_size = entry->allocatedListeners * 2;
+		dsa_pointer new_array = dsa_allocate(channelDSA,
+											 sizeof(ProcNumber) * new_size);
+		ProcNumber *new_listeners = (ProcNumber *) dsa_get_address(channelDSA,
+																   new_array);
+
+		memcpy(new_listeners, listeners,
+			   sizeof(ProcNumber) * entry->numListeners);
+
+		dsa_free(channelDSA, entry->listenersArray);
+		entry->listenersArray = new_array;
+		entry->allocatedListeners = new_size;
+		listeners = new_listeners;
+	}
+
+	listeners[entry->numListeners] = MyProcNumber;
+	entry->numListeners++;
+
+	dshash_release_lock(channelHash, entry);
 }
 
 /*
  * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
  *
- * Remove the specified channel name from listenChannels.
+ * Remove the specified channel from the list of channels we are listening on.
  */
 static void
 Exec_UnlistenCommit(const char *channel)
 {
+	ChannelHashKey key;
+	ChannelEntry *entry;
+	ProcNumber *listeners;
 	ListCell   *q;
+	int			i;
 
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
 
+	/* Remove from our local cache */
 	foreach(q, listenChannels)
 	{
 		char	   *lchan = (char *) lfirst(q);
@@ -1179,6 +1439,46 @@ Exec_UnlistenCommit(const char *channel)
 		}
 	}
 
+	/* Now remove from the shared channelHash */
+	if (channelHash == NULL)
+		return;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/* Look up the channel with exclusive lock so we can modify it */
+	entry = dshash_find(channelHash, &key, true);
+	if (entry == NULL)
+		return;
+
+	listeners = (ProcNumber *) dsa_get_address(channelDSA,
+											   entry->listenersArray);
+
+	for (i = 0; i < entry->numListeners; i++)
+	{
+		if (listeners[i] == MyProcNumber)
+		{
+			entry->numListeners--;
+			if (i < entry->numListeners)
+				memmove(&listeners[i], &listeners[i + 1],
+						sizeof(ProcNumber) * (entry->numListeners - i));
+
+			if (entry->numListeners == 0)
+			{
+				/* Last listener for this channel */
+				dsa_free(channelDSA, entry->listenersArray);
+				dshash_delete_entry(channelHash, entry);
+			}
+			else
+			{
+				dshash_release_lock(channelHash, entry);
+			}
+
+			return;
+		}
+	}
+
+	dshash_release_lock(channelHash, entry);
+
 	/*
 	 * We do not complain about unlistening something not being listened;
 	 * should we?
@@ -1193,11 +1493,51 @@ Exec_UnlistenCommit(const char *channel)
 static void
 Exec_UnlistenAllCommit(void)
 {
+	dshash_seq_status status;
+	ChannelEntry *entry;
+
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
 
+	/* Clear our local cache */
 	list_free_deep(listenChannels);
 	listenChannels = NIL;
+
+	/* Now clear from shared channelHash */
+	if (channelHash == NULL)
+		return;
+
+	dshash_seq_init(&status, channelHash, true);
+	while ((entry = dshash_seq_next(&status)) != NULL)
+	{
+		if (entry->key.dboid == MyDatabaseId)
+		{
+			ProcNumber *listeners;
+			int			i;
+
+			listeners = (ProcNumber *) dsa_get_address(channelDSA,
+													   entry->listenersArray);
+
+			for (i = 0; i < entry->numListeners; i++)
+			{
+				if (listeners[i] == MyProcNumber)
+				{
+					entry->numListeners--;
+					if (i < entry->numListeners)
+						memmove(&listeners[i], &listeners[i + 1],
+								sizeof(ProcNumber) * (entry->numListeners - i));
+
+					if (entry->numListeners == 0)
+					{
+						dsa_free(channelDSA, entry->listenersArray);
+						dshash_delete_current(&status);
+					}
+					break;
+				}
+			}
+		}
+	}
+	dshash_seq_term(&status);
 }
 
 /*
@@ -1565,12 +1905,19 @@ asyncQueueFillWarning(void)
 /*
  * Send signals to listening backends.
  *
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send.  However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind.  Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * Normally we signal only backends in our own database, that are
+ * listening on the channels with pending notifies, since only those
+ * backends are interested in notifies we send.
+ *
+ * Backends that are still positioned at the queue head from before our
+ * commit can be safely advanced directly to the new head, since the
+ * queue region we wrote is known to contain only our own notifications.
+ * This avoids unnecessary wakeups when there is nothing of interest to
+ * them.
+ *
+ * In addition, if a backend has fallen too far behind in the queue, we
+ * signal it so that it will advance its position and allow the global
+ * tail pointer to move forward.
  *
  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
  *
@@ -1583,6 +1930,13 @@ SignalBackends(void)
 	int32	   *pids;
 	ProcNumber *procnos;
 	int			count;
+	ListCell   *lc;
+
+	/*
+	 * Attach to the channel hash if needed.  We might not have one if this
+	 * backend hasn't done LISTEN, but we need it to find listeners.
+	 */
+	initChannelHash();
 
 	/*
 	 * Identify backends that we need to signal.  We don't want to send
@@ -1597,36 +1951,111 @@ SignalBackends(void)
 	count = 0;
 
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+	foreach(lc, pendingNotifyChannels)
+	{
+		char	   *channel = (char *) lfirst(lc);
+		ChannelEntry *entry = NULL;
+		ProcNumber *listeners;
+
+		if (channelHash != NULL)
+		{
+			ChannelHashKey key;
+
+			ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+			entry = dshash_find(channelHash, &key, false);
+		}
+
+		if (entry == NULL)
+			continue;			/* No listeners registered for this channel */
+
+		listeners = (ProcNumber *) dsa_get_address(channelDSA,
+												   entry->listenersArray);
+
+		for (int j = 0; j < entry->numListeners; j++)
+		{
+			ProcNumber	i = listeners[j];
+			int32		pid;
+			QueuePosition pos;
+
+			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+			pid = QUEUE_BACKEND_PID(i);
+
+			/* Skip if caught up */
+			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+				continue;
+
+			Assert(pid != InvalidPid);
+
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+			pids[count] = pid;
+			procnos[count] = i;
+			count++;
+		}
+
+		dshash_release_lock(channelHash, entry);
+	}
+
+	/*
+	 * Direct advancement and lagging backend detection.
+	 *
+	 * Direct advancement: avoid waking backends still positioned at the old
+	 * queue head that aren't interested in our notifications.
+	 *
+	 * The heavyweight lock on database 0 (held in PreCommit_Notify) ensures
+	 * no other backend can insert notifications in the region we just wrote.
+	 * Even though we may take and release NotifyQueueLock multiple times
+	 * while writing, the heavyweight lock guarantees this region contains
+	 * only our messages.  Therefore, any backend still positioned at the
+	 * queue head from before our write can be safely advanced to the current
+	 * queue head without waking it.
+	 *
+	 * False-positive possibility: if a backend was previously signaled but
+	 * hasn't yet awoken, we'll skip advancing it (because wakeupPending is
+	 * true).  This is safe - the backend will advance its pointer when it
+	 * does wake up.  The alternative (advancing it anyway) would risk
+	 * advancing over notifications from whoever signaled it.
+	 *
+	 * Lagging backends: we also check if any backend has fallen too far
+	 * behind and signal it to catch up, allowing the global tail to advance.
+	 */
+	for (ProcNumber i = QUEUE_FIRST_LISTENER;
+		 i != INVALID_PROC_NUMBER;
+		 i = QUEUE_NEXT_LISTENER(i))
 	{
-		int32		pid = QUEUE_BACKEND_PID(i);
 		QueuePosition pos;
+		int64		lag;
+		int32		pid;
+
+		if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+			continue;
 
-		Assert(pid != InvalidPid);
 		pos = QUEUE_BACKEND_POS(i);
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+
+		/* Direct advancement for idle backends at the old head */
+		if (pendingNotifies != NULL &&
+			QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
 		{
-			/*
-			 * Always signal listeners in our own database, unless they're
-			 * already caught up (unlikely, but possible).
-			 */
-			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
-				continue;
+			QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+			pos = queueHeadAfterWrite;
 		}
-		else
+
+		/* Signal backends that have fallen too far behind */
+		lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+								 QUEUE_POS_PAGE(pos));
+
+		if (lag >= QUEUE_CLEANUP_DELAY)
 		{
-			/*
-			 * Listeners in other databases should be signaled only if they
-			 * are far behind.
-			 */
-			if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-								   QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
-				continue;
+			pid = QUEUE_BACKEND_PID(i);
+			Assert(pid != InvalidPid);
+
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+			pids[count] = pid;
+			procnos[count] = i;
+			count++;
 		}
-		/* OK, need to signal this one */
-		pids[count] = pid;
-		procnos[count] = i;
-		count++;
 	}
 	LWLockRelease(NotifyQueueLock);
 
@@ -1865,6 +2294,7 @@ asyncQueueReadAllNotifications(void)
 	LWLockAcquire(NotifyQueueLock, LW_SHARED);
 	/* Assert checks that we have a valid state entry */
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
 	head = QUEUE_HEAD;
 	LWLockRelease(NotifyQueueLock);
@@ -2373,7 +2803,7 @@ notification_match(const void *key1, const void *key2, Size keysize)
 	return 1;					/* not equal */
 }
 
-/* Clear the pendingActions and pendingNotifies lists. */
+/* Clear the pendingActions, pendingNotifies, and pendingNotifyChannels lists. */
 static void
 ClearPendingActionsAndNotifies(void)
 {
@@ -2385,6 +2815,7 @@ ClearPendingActionsAndNotifies(void)
 	 */
 	pendingActions = NULL;
 	pendingNotifies = NULL;
+	pendingNotifyChannels = NIL;
 }
 
 /*
@@ -2395,3 +2826,16 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+
+/*
+ * ChannelHashPrepareKey
+ *		Prepare a channel key for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel)
+{
+	memset(key, 0, sizeof(ChannelHashKey));
+	key->dboid = dboid;
+	strlcpy(key->channel, channel, NAMEDATALEN);
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7553f6eacef..a4fadbd0767 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -366,6 +366,7 @@ SubtransBuffer	"Waiting for I/O on a sub-transaction SLRU buffer."
 MultiXactOffsetBuffer	"Waiting for I/O on a multixact offset SLRU buffer."
 MultiXactMemberBuffer	"Waiting for I/O on a multixact member SLRU buffer."
 NotifyBuffer	"Waiting for I/O on a <command>NOTIFY</command> message SLRU buffer."
+NotifyChannelHash	"Waiting to access the <command>NOTIFY</command> channel hash table."
 SerialBuffer	"Waiting for I/O on a serializable transaction conflict SLRU buffer."
 WALInsert	"Waiting to insert WAL data into a memory buffer."
 BufferContent	"Waiting to access a data page in memory."
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 06a1ffd4b08..2768ddf4414 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -100,6 +100,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
 PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
+PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash)
 PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
 PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
 PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5290b91e83e..5ccdd4043e8 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -412,6 +412,8 @@ CatalogIdMapEntry
 CatalogIndexState
 ChangeVarNodes_callback
 ChangeVarNodes_context
+ChannelEntry
+ChannelHashKey
 CheckPoint
 CheckPointStmt
 CheckpointStatsData
-- 
2.50.1



  [text/plain] 0002-optimize_listen_notify-v19-alt1.txt (3.9K, 4-0002-optimize_listen_notify-v19-alt1.txt)
  download | inline diff:
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..44442e927ff 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -264,6 +264,11 @@ typedef struct QueuePosition
 	 (x).page != (y).page ? (x) : \
 	 (x).offset > (y).offset ? (x) : (y))
 
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) || \
+	 ((x).page == (y).page && (x).offset < (y).offset))
+
 /*
  * Parameter determining how often we try to advance the tail pointer:
  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
@@ -286,6 +291,7 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	QueuePosition advisoryPos;	/* backend could skip queue to here */
 	bool		wakeupPending;	/* signal sent but not yet processed */
 } QueueBackendStatus;
 
@@ -347,6 +353,7 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_ADVISORY_POS(i)	(asyncQueueControl->backend[i].advisoryPos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
 
 /*
@@ -668,6 +675,7 @@ AsyncShmemInit(void)
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
 		}
 	}
@@ -2009,9 +2017,14 @@ SignalBackends(void)
 	 * Even though we may take and release NotifyQueueLock multiple times
 	 * while writing, the heavyweight lock guarantees this region contains
 	 * only our messages.  Therefore, any backend still positioned at the
-	 * queue head from before our write can be safely advanced to the current
+	 * queue head from before our write can be advised to skip to the current
 	 * queue head without waking it.
 	 *
+	 * We use the advisoryPos field rather than directly modifying pos,
+	 * because the listening backend might be concurrently reading
+	 * notifications using its local copy of pos.  The backend controls its
+	 * own pos field and will check advisoryPos when it's safe to do so.
+	 *
 	 * False-positive possibility: if a backend was previously signaled but
 	 * hasn't yet awoken, we'll skip advancing it (because wakeupPending is
 	 * true).  This is safe - the backend will advance its pointer when it
@@ -2038,7 +2051,7 @@ SignalBackends(void)
 		if (pendingNotifies != NULL &&
 			QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
 		{
-			QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+			QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
 			pos = queueHeadAfterWrite;
 		}
 
@@ -2297,6 +2310,26 @@ asyncQueueReadAllNotifications(void)
 	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
 	head = QUEUE_HEAD;
+
+	/*
+	 * Check if another backend has set an advisory position for us.
+	 * If so, and if we haven't yet read past that point, we can safely
+	 * adopt the advisory position and skip the intervening notifications.
+	 * This is safe because the advisory position is only set when we're
+	 * positioned at a known point and the skipped region contains only
+	 * notifications we're not interested in.
+	 */
+	{
+		QueuePosition advisoryPos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber);
+
+		if (!QUEUE_POS_EQUAL(advisoryPos, pos) &&
+			QUEUE_POS_PRECEDES(pos, advisoryPos))
+		{
+			pos = advisoryPos;
+			QUEUE_BACKEND_POS(MyProcNumber) = pos;
+		}
+	}
+
 	LWLockRelease(NotifyQueueLock);
 
 	if (QUEUE_POS_EQUAL(pos, head))


  [text/plain] 0002-optimize_listen_notify-v19-alt3.txt (7.4K, 5-0002-optimize_listen_notify-v19-alt3.txt)
  download | inline diff:
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..e201deb5e54 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -70,14 +70,14 @@
  *	  CommitTransaction() which will then do the actual transaction commit.
  *
  *	  After commit we are called another time (AtCommit_Notify()). Here we
- *	  make any actual updates to the local listen state (listenChannels) and
- *	  shared channel hash table (channelHash).  Then we signal any backends
- *	  that may be interested in our messages (including our own backend,
- *	  if listening).  This is done by SignalBackends(), which consults the
- *	  shared channel hash table to identify listeners for the channels that
- *	  have pending notifications in the current database.  Each selected
- *	  backend is marked as having a wakeup pending to avoid duplicate signals,
- *	  and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it.
+ *	  make any actual updates to the effective listen state (channelHash).
+ *	  Then we signal any backends that may be interested in our messages
+ *	  (including our own backend, if listening).  This is done by
+ *	  SignalBackends(), which consults the shared channel hash table to
+ *	  identify listeners for the channels that have pending notifications
+ *	  in the current database.  Each selected backend is marked as having a
+ *	  wakeup pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
+ *	  signal is sent to it.
  *
  *	  When writing notifications, PreCommit_Notify() records the queue head
  *	  position both before and after the write.  Because all writers serialize
@@ -2282,6 +2282,7 @@ asyncQueueReadAllNotifications(void)
 	volatile QueuePosition pos;
 	QueuePosition head;
 	Snapshot	snapshot;
+	bool		reachedStop;
 
 	/* page_buffer must be adequately aligned, so use a union */
 	union
@@ -2350,77 +2351,83 @@ asyncQueueReadAllNotifications(void)
 	 * It is possible that we fail while trying to send a message to our
 	 * frontend (for example, because of encoding conversion failure).  If
 	 * that happens it is critical that we not try to send the same message
-	 * over and over again.  Therefore, we place a PG_TRY block here that will
-	 * forcibly advance our queue position before we lose control to an error.
-	 * (We could alternatively retake NotifyQueueLock and move the position
-	 * before handling each individual message, but that seems like too much
-	 * lock traffic.)
+	 * over and over again.  Therefore, we must advance our queue position
+	 * regularly as we process messages.
+	 *
+	 * We must also be careful about concurrency: SignalBackends() can
+	 * directly advance our position while we're reading.  To prevent
+	 * overwriting such an advancement with a stale value, we update our
+	 * position in shared memory after processing messages from each page,
+	 * while holding NotifyQueueLock.  Shared lock is sufficient since we're
+	 * only updating our own position.
 	 */
-	PG_TRY();
+	do
 	{
-		bool		reachedStop;
+		int64		curpage = QUEUE_POS_PAGE(pos);
+		int			curoffset = QUEUE_POS_OFFSET(pos);
+		int			slotno;
+		int			copysize;
 
-		do
+		/*
+		 * We copy the data from SLRU into a local buffer, so as to avoid
+		 * holding the SLRU lock while we are examining the entries and
+		 * possibly transmitting them to our frontend.  Copy only the part
+		 * of the page we will actually inspect.
+		 */
+		slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
+											InvalidTransactionId);
+		if (curpage == QUEUE_POS_PAGE(head))
 		{
-			int64		curpage = QUEUE_POS_PAGE(pos);
-			int			curoffset = QUEUE_POS_OFFSET(pos);
-			int			slotno;
-			int			copysize;
+			/* we only want to read as far as head */
+			copysize = QUEUE_POS_OFFSET(head) - curoffset;
+			if (copysize < 0)
+				copysize = 0;	/* just for safety */
+		}
+		else
+		{
+			/* fetch all the rest of the page */
+			copysize = QUEUE_PAGESIZE - curoffset;
+		}
+		memcpy(page_buffer.buf + curoffset,
+			   NotifyCtl->shared->page_buffer[slotno] + curoffset,
+			   copysize);
+		/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
+		LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
 
-			/*
-			 * We copy the data from SLRU into a local buffer, so as to avoid
-			 * holding the SLRU lock while we are examining the entries and
-			 * possibly transmitting them to our frontend.  Copy only the part
-			 * of the page we will actually inspect.
-			 */
-			slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
-												InvalidTransactionId);
-			if (curpage == QUEUE_POS_PAGE(head))
-			{
-				/* we only want to read as far as head */
-				copysize = QUEUE_POS_OFFSET(head) - curoffset;
-				if (copysize < 0)
-					copysize = 0;	/* just for safety */
-			}
-			else
-			{
-				/* fetch all the rest of the page */
-				copysize = QUEUE_PAGESIZE - curoffset;
-			}
-			memcpy(page_buffer.buf + curoffset,
-				   NotifyCtl->shared->page_buffer[slotno] + curoffset,
-				   copysize);
-			/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
-			LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
+		/*
+		 * Process messages up to the stop position, end of page, or an
+		 * uncommitted message.
+		 *
+		 * Our stop position is what we found to be the head's position
+		 * when we entered this function. It might have changed already.
+		 * But if it has, we will receive (or have already received and
+		 * queued) another signal and come here again.
+		 *
+		 * We are not holding NotifyQueueLock here! The queue can only
+		 * extend beyond the head pointer (see above).  We update our
+		 * backend's position after processing messages from each page to
+		 * ensure we don't reprocess messages if we fail partway through,
+		 * and to avoid overwriting any direct advancement that
+		 * SignalBackends() might perform concurrently.
+		 */
+		reachedStop = asyncQueueProcessPageEntries(&pos, head,
+												   page_buffer.buf,
+												   snapshot);
 
-			/*
-			 * Process messages up to the stop position, end of page, or an
-			 * uncommitted message.
-			 *
-			 * Our stop position is what we found to be the head's position
-			 * when we entered this function. It might have changed already.
-			 * But if it has, we will receive (or have already received and
-			 * queued) another signal and come here again.
-			 *
-			 * We are not holding NotifyQueueLock here! The queue can only
-			 * extend beyond the head pointer (see above) and we leave our
-			 * backend's pointer where it is so nobody will truncate or
-			 * rewrite pages under us. Especially we don't want to hold a lock
-			 * while sending the notifications to the frontend.
-			 */
-			reachedStop = asyncQueueProcessPageEntries(&pos, head,
-													   page_buffer.buf,
-													   snapshot);
-		} while (!reachedStop);
-	}
-	PG_FINALLY();
-	{
-		/* Update shared state */
+		/*
+		 * Update our position in shared memory.  The 'pos' variable now
+		 * holds our new position (advanced past all messages we just
+		 * processed).  This ensures that if we fail while processing
+		 * messages from the next page, we won't reprocess the ones we
+		 * just handled.  It also prevents us from overwriting any direct
+		 * advancement that another backend might have done while we were
+		 * processing messages.
+		 */
 		LWLockAcquire(NotifyQueueLock, LW_SHARED);
 		QUEUE_BACKEND_POS(MyProcNumber) = pos;
 		LWLockRelease(NotifyQueueLock);
-	}
-	PG_END_TRY();
+
+	} while (!reachedStop);
 
 	/* Done with snapshot */
 	UnregisterSnapshot(snapshot);


  [text/plain] 0002-optimize_listen_notify-v19-alt2.txt (3.2K, 6-0002-optimize_listen_notify-v19-alt2.txt)
  download | inline diff:
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 90a530cfc61..751400b8315 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -285,7 +285,8 @@ typedef struct QueueBackendStatus
 	int32		pid;			/* either a PID or InvalidPid */
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
-	QueuePosition pos;			/* backend has read queue up to here */
+	QueuePosition pos;			/* next position to read from */
+	QueuePosition donePos;		/* backend has definitively processed up to here */
 	bool		wakeupPending;	/* signal sent but not yet processed */
 } QueueBackendStatus;
 
@@ -347,6 +348,7 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_DONEPOS(i)	(asyncQueueControl->backend[i].donePos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
 
 /*
@@ -668,6 +670,7 @@ AsyncShmemInit(void)
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			SET_QUEUE_POS(QUEUE_BACKEND_DONEPOS(i), 0, 0);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
 		}
 	}
@@ -1290,6 +1293,7 @@ Exec_ListenPreCommit(void)
 			prevListener = i;
 	}
 	QUEUE_BACKEND_POS(MyProcNumber) = max;
+	QUEUE_BACKEND_DONEPOS(MyProcNumber) = max;
 	QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
 	/* Insert backend into list of listeners at correct position */
@@ -2415,9 +2419,19 @@ asyncQueueReadAllNotifications(void)
 	}
 	PG_FINALLY();
 	{
-		/* Update shared state */
+		/*
+		 * Update shared state.
+		 *
+		 * We update donePos to what we actually read (the local pos variable),
+		 * as this is used for truncation safety.  For the read position (pos),
+		 * we use the maximum of our local position and the current shared
+		 * position, in case another backend used direct advancement to skip us
+		 * ahead while we were reading.  This prevents us from going backwards
+		 * and potentially pointing to a truncated page.
+		 */
 		LWLockAcquire(NotifyQueueLock, LW_SHARED);
-		QUEUE_BACKEND_POS(MyProcNumber) = pos;
+		QUEUE_BACKEND_DONEPOS(MyProcNumber) = pos;
+		QUEUE_BACKEND_POS(MyProcNumber) = QUEUE_POS_MAX(pos, QUEUE_BACKEND_POS(MyProcNumber));
 		LWLockRelease(NotifyQueueLock);
 	}
 	PG_END_TRY();
@@ -2567,7 +2581,13 @@ asyncQueueAdvanceTail(void)
 	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
 	{
 		Assert(QUEUE_BACKEND_PID(i) != InvalidPid);
-		min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i));
+		/*
+		 * Use donePos rather than pos for truncation safety.  The donePos
+		 * field represents what the backend has definitively processed, while
+		 * pos can be advanced by other backends via direct advancement.  This
+		 * prevents truncating pages that a backend is still reading from.
+		 */
+		min = QUEUE_POS_MIN(min, QUEUE_BACKEND_DONEPOS(i));
 	}
 	QUEUE_TAIL = min;
 	oldtailpage = QUEUE_STOP_PAGE;


reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected]
  Subject: Re: Optimize LISTEN/NOTIFY
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox