public inbox for [email protected]  
help / color / mirror / Atom feed
From: Chao Li <[email protected]>
To: Arseniy Mukhin <[email protected]>
Cc: Joel Jacobson <[email protected]>
Cc: Tom Lane <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Sun, 26 Oct 2025 12:11:25 +0800
Message-ID: <[email protected]> (raw)
In-Reply-To: <CAE7r3MK-3AOdh1mpZ8hw9h6F_i0D5RMoAy7CttnfCJRpB8GJDA@mail.gmail.com>
References: <[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAFY6G8dap-bCnAnMG-2Gzew8yv2Vbi9gsx9+yszKMmd57ygfvA@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAE7r3MLivh1sHWF06hrVXkiQbw-KChPcQsh+9CheXprm5vRVMQ@mail.gmail.com>
	<[email protected]>
	<CAE7r3MK-3AOdh1mpZ8hw9h6F_i0D5RMoAy7CttnfCJRpB8GJDA@mail.gmail.com>



> On Oct 23, 2025, at 18:02, Arseniy Mukhin <[email protected]> wrote:
> 
> Hi,
> 
> On Thu, Oct 23, 2025 at 11:17 AM Chao Li <[email protected]> wrote:
>> 
>> 
>> 
>>> On Oct 21, 2025, at 00:43, Arseniy Mukhin <[email protected]> wrote:
>>> 
>>> 
>>> I managed to reproduce the race with v20-alt3. I tried to write a TAP
>>> test reproducing the issue, so it was easier to validate changes.
>>> Please find the attached TAP test. I added it to some random package
>>> for simplicity.
>>> 
>> 
>> With alt3, as we have acquired the notification lock after reading every message to update the POS, I think we can do a little bit more optimization:
>> 
>> The notifier: in SignalBackend()
>>    * Now we check if a listener’s pos equals to beforeWritePos, then we do “directly advancement”
>>    * We can change to if a listener’s pos is between beforeWritePos and afterWritePos, then we can do the advancement.
>> 
>> The listener: in asyncQueueReadAllNotifications():
>>    * With alt3, we only lock and update pos
>>    * We can do more. If current pos in shared memory is after that local pos, then meaning some notifier has done an advancement, so it can stop reading.
>> 
> 
> I think this would be a reasonable optimization if it weren't for the
> race condition mentioned above. The problem is that if the local pos
> lags behind the shared memory pos, it could point to a truncated queue
> segment, so we shouldn't allow that.
> 

I figured out a way to resolve the race condition for alt3:

* add an awakening flag for every listener, this flag is only set by listeners
* add an advisory pos for every listener, similar to alt1
* if a listener is awaken, notify only updates the listener’s advisory pos; otherwise directly advance its position.
* If a running listener see current pos is behind advisory pos, then stop reading

See more details in attach patch file, I added code comments for my changes. Now the TAP test won’t hit the race condition. 
```
# +++ tap check in src/test/authentication +++
t/008_listen-pos-race.pl .. skipped: Injection points not supported by this build
Files=1, Tests=0,  0 wallclock secs ( 0.00 usr  0.00 sys +  0.03 cusr  0.01 csys =  0.04 CPU)
Result: NOTESTS
```

And with my solution, listeners longer will still use local pos, so that no longer need to acquire notification lock in every loop.

The patch stack is: v20 patch -> alt3 patch -> tap patch -> my patch. Please see if my solution works.

I also made a tiny change in the TAP script to allow it to terminate gracefully.

Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/








Attachments:

  [application/octet-stream] fix-race.patch (7.2K, 2-fix-race.patch)
  download | inline diff:
commit c7daefa51118d2041623b14b7f26c9177ac0b6cd
Author: Chao Li (Evan) <[email protected]>
Date:   Sat Oct 25 15:42:26 2025 +0800

    fix race condition

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6e8d728e9ce..3c8a640ebed 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -250,6 +250,11 @@ typedef struct QueuePosition
 #define QUEUE_POS_EQUAL(x,y) \
 	((x).page == (y).page && (x).offset == (y).offset)
 
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+	(asyncQueuePagePrecedes((x).page, (y).page) || \
+	 ((x).page == (y).page && (x).offset < (y).offset))
+
 #define QUEUE_POS_IS_ZERO(x) \
 	((x).page == 0 && (x).offset == 0)
 
@@ -287,7 +292,9 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	QueuePosition advisoryPos;	/* advisory position for this backend */
 	bool		wakeupPending;	/* signal sent but not yet processed */
+	bool        awakening;	/* backend is awakening */
 } QueueBackendStatus;
 
 /*
@@ -348,7 +355,9 @@ static dshash_table *channelHash = NULL;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_ADVISORY_POS(i)	(asyncQueueControl->backend[i].advisoryPos)
 #define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
+#define QUEUE_BACKEND_AWAKENING(i)	(asyncQueueControl->backend[i].awakening)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -675,7 +684,9 @@ AsyncShmemInit(void)
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
 			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
+			QUEUE_BACKEND_AWAKENING(i) = false;
 		}
 	}
 
@@ -1954,6 +1965,7 @@ SignalBackends(void)
 	ProcNumber *procnos;
 	int			count;
 	ListCell   *lc;
+	List *interestedProcs = NIL;
 
 	INJECTION_POINT("listen-notify-signal-backends", NULL);
 
@@ -2002,6 +2014,12 @@ SignalBackends(void)
 			int32		pid;
 			QueuePosition pos;
 
+			// XXX: Use a list to record listeners interested in any of the pending channels.
+			// List is not the best choice, so it we decide to take this apprach, we
+			// can optimize it later by using a hash or bitmap.
+			if (!list_member_int(interestedProcs, i))
+				interestedProcs = lappend_int(interestedProcs, i);
+
 			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
 				continue;
 
@@ -2054,19 +2072,41 @@ SignalBackends(void)
 		int64		lag;
 		int32		pid;
 
-		if (QUEUE_BACKEND_WAKEUP_PENDING(i))
-			continue;
+		/* XXX we cannot rely on wakeupPending here, because the flag might be set by another notifier. */
+		//if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+		//	continue;
 
 		pos = QUEUE_BACKEND_POS(i);
 
-		/* Direct advancement for idle backends at the old head */
-		if (pendingNotifies != NULL &&
-			QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
+		if (!list_member_int(interestedProcs, i))
 		{
-			QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
-			pos = queueHeadAfterWrite;
+			/* Direct advancement for idle backends at the old head */
+			if (pendingNotifies != NULL)
+			{
+				if (QUEUE_BACKEND_AWAKENING(i))
+				{
+					// For awakening backend, advice a new position.
+					if ((QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite) ||
+					     (QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos) && QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))) &&
+						 QUEUE_POS_EQUAL(QUEUE_BACKEND_ADVISORY_POS(i), queueHeadBeforeWrite))
+						QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
+				}
+				else
+				{
+					// For non-awakening backend, directly advance its position.
+					if (QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
+					{
+						QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+						SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
+					}
+				}
+				pos = queueHeadAfterWrite;
+			}
 		}
 
+		if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+			continue;
+
 		/* Signal backends that have fallen too far behind */
 		lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
 								 QUEUE_POS_PAGE(pos));
@@ -2321,6 +2361,7 @@ asyncQueueReadAllNotifications(void)
 	/* Assert checks that we have a valid state entry */
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
 	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+	QUEUE_BACKEND_AWAKENING(MyProcNumber) = true;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
 	head = QUEUE_HEAD;
 	LWLockRelease(NotifyQueueLock);
@@ -2330,6 +2371,9 @@ asyncQueueReadAllNotifications(void)
 	if (QUEUE_POS_EQUAL(pos, head))
 	{
 		/* Nothing to do, we have read all notifications already. */
+		LWLockAcquire(NotifyQueueLock, LW_SHARED);
+		QUEUE_BACKEND_AWAKENING(MyProcNumber) = false;
+		LWLockRelease(NotifyQueueLock);
 		return;
 	}
 
@@ -2441,21 +2485,29 @@ asyncQueueReadAllNotifications(void)
 												   page_buffer.buf,
 												   snapshot);
 
-		/*
-		 * Update our position in shared memory.  The 'pos' variable now
-		 * holds our new position (advanced past all messages we just
-		 * processed).  This ensures that if we fail while processing
-		 * messages from the next page, we won't reprocess the ones we
-		 * just handled.  It also prevents us from overwriting any direct
-		 * advancement that another backend might have done while we were
-		 * processing messages.
-		 */
-		LWLockAcquire(NotifyQueueLock, LW_SHARED);
-		QUEUE_BACKEND_POS(MyProcNumber) = pos;
-		LWLockRelease(NotifyQueueLock);
-
+		// If there is a direct advancement, let's stop reading.
+		// We don't need to lock here because even if the position
+		// changes right after we read it, we just do one more loop.
+		//LWLockAcquire(NotifyQueueLock, LW_SHARED);
+		if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber)))
+		{
+			reachedStop = true;
+		}
+		//QUEUE_BACKEND_POS(MyProcNumber) = pos;
+		//LWLockRelease(NotifyQueueLock);
 	} while (!reachedStop);
 
+	LWLockAcquire(NotifyQueueLock, LW_SHARED);
+	if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber)))
+	{
+		/* respect direct advancement */
+		pos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber);
+	}
+	SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(MyProcNumber), 0, 0);
+	QUEUE_BACKEND_POS(MyProcNumber) = pos;
+	QUEUE_BACKEND_AWAKENING(MyProcNumber) = false;
+	LWLockRelease(NotifyQueueLock);
+
 	/* Done with snapshot */
 	UnregisterSnapshot(snapshot);
 }
diff --git a/src/test/authentication/t/008_listen-pos-race.pl b/src/test/authentication/t/008_listen-pos-race.pl
index 060e33ed391..c858e8da524 100644
--- a/src/test/authentication/t/008_listen-pos-race.pl
+++ b/src/test/authentication/t/008_listen-pos-race.pl
@@ -8,7 +8,7 @@ use PostgreSQL::Test::Utils;
 use Time::HiRes qw(usleep);
 use Test::More;
 
-if ($ENV{enable_injection_points} ne 'yes') {
+if ($ENV{enable_injection_points} // '' ne 'yes') {
     plan skip_all => 'Injection points not supported by this build';
 }
 
@@ -18,6 +18,7 @@ $node->start;
 
 
 if (!$node->check_extension('injection_points')) {
+	$node->stop;
     plan skip_all => 'Extension injection_points not installed';
 }
 


reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected]
  Subject: Re: Optimize LISTEN/NOTIFY
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox