public inbox for [email protected]
help / color / mirror / Atom feedFrom: Chao Li <[email protected]>
To: Arseniy Mukhin <[email protected]>
Cc: Joel Jacobson <[email protected]>
Cc: Tom Lane <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Sun, 26 Oct 2025 12:11:25 +0800
Message-ID: <[email protected]> (raw)
In-Reply-To: <CAE7r3MK-3AOdh1mpZ8hw9h6F_i0D5RMoAy7CttnfCJRpB8GJDA@mail.gmail.com>
References: <[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
<[email protected]>
<[email protected]>
<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAFY6G8dap-bCnAnMG-2Gzew8yv2Vbi9gsx9+yszKMmd57ygfvA@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAE7r3MLivh1sHWF06hrVXkiQbw-KChPcQsh+9CheXprm5vRVMQ@mail.gmail.com>
<[email protected]>
<CAE7r3MK-3AOdh1mpZ8hw9h6F_i0D5RMoAy7CttnfCJRpB8GJDA@mail.gmail.com>
> On Oct 23, 2025, at 18:02, Arseniy Mukhin <[email protected]> wrote:
>
> Hi,
>
> On Thu, Oct 23, 2025 at 11:17 AM Chao Li <[email protected]> wrote:
>>
>>
>>
>>> On Oct 21, 2025, at 00:43, Arseniy Mukhin <[email protected]> wrote:
>>>
>>>
>>> I managed to reproduce the race with v20-alt3. I tried to write a TAP
>>> test reproducing the issue, so it was easier to validate changes.
>>> Please find the attached TAP test. I added it to some random package
>>> for simplicity.
>>>
>>
>> With alt3, as we have acquired the notification lock after reading every message to update the POS, I think we can do a little bit more optimization:
>>
>> The notifier: in SignalBackend()
>> * Now we check if a listener’s pos equals to beforeWritePos, then we do “directly advancement”
>> * We can change to if a listener’s pos is between beforeWritePos and afterWritePos, then we can do the advancement.
>>
>> The listener: in asyncQueueReadAllNotifications():
>> * With alt3, we only lock and update pos
>> * We can do more. If current pos in shared memory is after that local pos, then meaning some notifier has done an advancement, so it can stop reading.
>>
>
> I think this would be a reasonable optimization if it weren't for the
> race condition mentioned above. The problem is that if the local pos
> lags behind the shared memory pos, it could point to a truncated queue
> segment, so we shouldn't allow that.
>
I figured out a way to resolve the race condition for alt3:
* add an awakening flag for every listener, this flag is only set by listeners
* add an advisory pos for every listener, similar to alt1
* if a listener is awaken, notify only updates the listener’s advisory pos; otherwise directly advance its position.
* If a running listener see current pos is behind advisory pos, then stop reading
See more details in attach patch file, I added code comments for my changes. Now the TAP test won’t hit the race condition.
```
# +++ tap check in src/test/authentication +++
t/008_listen-pos-race.pl .. skipped: Injection points not supported by this build
Files=1, Tests=0, 0 wallclock secs ( 0.00 usr 0.00 sys + 0.03 cusr 0.01 csys = 0.04 CPU)
Result: NOTESTS
```
And with my solution, listeners longer will still use local pos, so that no longer need to acquire notification lock in every loop.
The patch stack is: v20 patch -> alt3 patch -> tap patch -> my patch. Please see if my solution works.
I also made a tiny change in the TAP script to allow it to terminate gracefully.
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
Attachments:
[application/octet-stream] fix-race.patch (7.2K, 2-fix-race.patch)
download | inline diff:
commit c7daefa51118d2041623b14b7f26c9177ac0b6cd
Author: Chao Li (Evan) <[email protected]>
Date: Sat Oct 25 15:42:26 2025 +0800
fix race condition
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 6e8d728e9ce..3c8a640ebed 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -250,6 +250,11 @@ typedef struct QueuePosition
#define QUEUE_POS_EQUAL(x,y) \
((x).page == (y).page && (x).offset == (y).offset)
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) || \
+ ((x).page == (y).page && (x).offset < (y).offset))
+
#define QUEUE_POS_IS_ZERO(x) \
((x).page == 0 && (x).offset == 0)
@@ -287,7 +292,9 @@ typedef struct QueueBackendStatus
Oid dboid; /* backend's database OID, or InvalidOid */
ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
QueuePosition pos; /* backend has read queue up to here */
+ QueuePosition advisoryPos; /* advisory position for this backend */
bool wakeupPending; /* signal sent but not yet processed */
+ bool awakening; /* backend is awakening */
} QueueBackendStatus;
/*
@@ -348,7 +355,9 @@ static dshash_table *channelHash = NULL;
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_ADVISORY_POS(i) (asyncQueueControl->backend[i].advisoryPos)
#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
+#define QUEUE_BACKEND_AWAKENING(i) (asyncQueueControl->backend[i].awakening)
/*
* The SLRU buffer area through which we access the notification queue
@@ -675,7 +684,9 @@ AsyncShmemInit(void)
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
+ QUEUE_BACKEND_AWAKENING(i) = false;
}
}
@@ -1954,6 +1965,7 @@ SignalBackends(void)
ProcNumber *procnos;
int count;
ListCell *lc;
+ List *interestedProcs = NIL;
INJECTION_POINT("listen-notify-signal-backends", NULL);
@@ -2002,6 +2014,12 @@ SignalBackends(void)
int32 pid;
QueuePosition pos;
+ // XXX: Use a list to record listeners interested in any of the pending channels.
+ // List is not the best choice, so it we decide to take this apprach, we
+ // can optimize it later by using a hash or bitmap.
+ if (!list_member_int(interestedProcs, i))
+ interestedProcs = lappend_int(interestedProcs, i);
+
if (QUEUE_BACKEND_WAKEUP_PENDING(i))
continue;
@@ -2054,19 +2072,41 @@ SignalBackends(void)
int64 lag;
int32 pid;
- if (QUEUE_BACKEND_WAKEUP_PENDING(i))
- continue;
+ /* XXX we cannot rely on wakeupPending here, because the flag might be set by another notifier. */
+ //if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+ // continue;
pos = QUEUE_BACKEND_POS(i);
- /* Direct advancement for idle backends at the old head */
- if (pendingNotifies != NULL &&
- QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
+ if (!list_member_int(interestedProcs, i))
{
- QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
- pos = queueHeadAfterWrite;
+ /* Direct advancement for idle backends at the old head */
+ if (pendingNotifies != NULL)
+ {
+ if (QUEUE_BACKEND_AWAKENING(i))
+ {
+ // For awakening backend, advice a new position.
+ if ((QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite) ||
+ (QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos) && QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))) &&
+ QUEUE_POS_EQUAL(QUEUE_BACKEND_ADVISORY_POS(i), queueHeadBeforeWrite))
+ QUEUE_BACKEND_ADVISORY_POS(i) = queueHeadAfterWrite;
+ }
+ else
+ {
+ // For non-awakening backend, directly advance its position.
+ if (QUEUE_POS_EQUAL(pos, queueHeadBeforeWrite))
+ {
+ QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+ SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(i), 0, 0);
+ }
+ }
+ pos = queueHeadAfterWrite;
+ }
}
+ if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+ continue;
+
/* Signal backends that have fallen too far behind */
lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
QUEUE_POS_PAGE(pos));
@@ -2321,6 +2361,7 @@ asyncQueueReadAllNotifications(void)
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+ QUEUE_BACKEND_AWAKENING(MyProcNumber) = true;
pos = QUEUE_BACKEND_POS(MyProcNumber);
head = QUEUE_HEAD;
LWLockRelease(NotifyQueueLock);
@@ -2330,6 +2371,9 @@ asyncQueueReadAllNotifications(void)
if (QUEUE_POS_EQUAL(pos, head))
{
/* Nothing to do, we have read all notifications already. */
+ LWLockAcquire(NotifyQueueLock, LW_SHARED);
+ QUEUE_BACKEND_AWAKENING(MyProcNumber) = false;
+ LWLockRelease(NotifyQueueLock);
return;
}
@@ -2441,21 +2485,29 @@ asyncQueueReadAllNotifications(void)
page_buffer.buf,
snapshot);
- /*
- * Update our position in shared memory. The 'pos' variable now
- * holds our new position (advanced past all messages we just
- * processed). This ensures that if we fail while processing
- * messages from the next page, we won't reprocess the ones we
- * just handled. It also prevents us from overwriting any direct
- * advancement that another backend might have done while we were
- * processing messages.
- */
- LWLockAcquire(NotifyQueueLock, LW_SHARED);
- QUEUE_BACKEND_POS(MyProcNumber) = pos;
- LWLockRelease(NotifyQueueLock);
-
+ // If there is a direct advancement, let's stop reading.
+ // We don't need to lock here because even if the position
+ // changes right after we read it, we just do one more loop.
+ //LWLockAcquire(NotifyQueueLock, LW_SHARED);
+ if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber)))
+ {
+ reachedStop = true;
+ }
+ //QUEUE_BACKEND_POS(MyProcNumber) = pos;
+ //LWLockRelease(NotifyQueueLock);
} while (!reachedStop);
+ LWLockAcquire(NotifyQueueLock, LW_SHARED);
+ if (QUEUE_POS_PRECEDES(pos, QUEUE_BACKEND_ADVISORY_POS(MyProcNumber)))
+ {
+ /* respect direct advancement */
+ pos = QUEUE_BACKEND_ADVISORY_POS(MyProcNumber);
+ }
+ SET_QUEUE_POS(QUEUE_BACKEND_ADVISORY_POS(MyProcNumber), 0, 0);
+ QUEUE_BACKEND_POS(MyProcNumber) = pos;
+ QUEUE_BACKEND_AWAKENING(MyProcNumber) = false;
+ LWLockRelease(NotifyQueueLock);
+
/* Done with snapshot */
UnregisterSnapshot(snapshot);
}
diff --git a/src/test/authentication/t/008_listen-pos-race.pl b/src/test/authentication/t/008_listen-pos-race.pl
index 060e33ed391..c858e8da524 100644
--- a/src/test/authentication/t/008_listen-pos-race.pl
+++ b/src/test/authentication/t/008_listen-pos-race.pl
@@ -8,7 +8,7 @@ use PostgreSQL::Test::Utils;
use Time::HiRes qw(usleep);
use Test::More;
-if ($ENV{enable_injection_points} ne 'yes') {
+if ($ENV{enable_injection_points} // '' ne 'yes') {
plan skip_all => 'Injection points not supported by this build';
}
@@ -18,6 +18,7 @@ $node->start;
if (!$node->check_extension('injection_points')) {
+ $node->stop;
plan skip_all => 'Extension injection_points not installed';
}
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected]
Subject: Re: Optimize LISTEN/NOTIFY
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox