public inbox for [email protected]
help / color / mirror / Atom feedFrom: Joel Jacobson <[email protected]>
To: Tom Lane <[email protected]>
Cc: Arseniy Mukhin <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: [PATCH] Fix LISTEN startup race with direct advancement
Date: Wed, 27 May 2026 12:43:54 +0200
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<CAE7r3ML+6-pokm+GHsfr+pJyZ_RUi=VY8Bp=qKx=3tcYunborA@mail.gmail.com>
<[email protected]>
<CAE7r3MLi9MRfysi+0AcEBzO-tR1PCDr40KT7E+Qgpa9WZHZhmg@mail.gmail.com>
<[email protected]>
<[email protected]>
On Tue, May 26, 2026, at 17:40, Tom Lane wrote:
> I agree with this fix, but it seems to me that it changes the meaning
> of the ListenerEntry.listening flag in a rather fundamental way.
> I'm tempted to rename that flag to "committed" or something like that.
>
> (Both "listening" and "committed" appear in dozens of places in this
> file that are not references to this flag, so TBH I'd rather use a
> flag name that is not either of those words. But I can't think of
> a better name.)
How about renaming listening to removeOnAbort and negating its meaning?
> Also, while the proposed test cases are good for showing that there's
> a bug here, I'm disinclined to commit them. I do not think there is
> value in them proportionate to the cost of two new isolation-test
> instances.
I agree. I should have said that feel free to remove them.
(Would be nice with a way to attach tests that are meant to be thrown away,
but still let cfbot include them in testing.)
/Joel
Attachments:
[application/octet-stream] v3-0001-Fix-NOTIFY-wakeups-for-pre-commit-LISTEN-entries.patch (6.7K, 2-v3-0001-Fix-NOTIFY-wakeups-for-pre-commit-LISTEN-entries.patch)
download | inline diff:
From 7fc2c6dfeb9eaf963aeb7db46fc9ae31db84cdf2 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Wed, 27 May 2026 12:30:22 +0200
Subject: [PATCH] Fix NOTIFY wakeups for pre-commit LISTEN entries
SignalBackends() used to ignore ListenerEntry entries whose flag
said that the listener was not yet committed. That can be true for
a LISTEN that has already registered its queue position, but has not
yet reached AtCommit_Notify(). If another backend notifies the same
channel in that window, advancing the listener queue pointer could
make the LISTEN miss the notification after commit.
Fix this by treating all channel entries as possible wakeup targets.
Rename the flag to removeOnAbort to reflect its remaining purpose:
identifying preallocated LISTEN entries that abort cleanup must
remove.
---
src/backend/commands/async.c | 54 +++++++++++++++---------------------
1 file changed, 23 insertions(+), 31 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index db6a9a6561b..1aae70303d0 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -114,15 +114,15 @@
* If the current transaction has executed any LISTEN/UNLISTEN actions,
* PreCommit_Notify() prepares to commit those. For LISTEN, it
* pre-allocates entries in both the per-backend localChannelTable and the
- * shared globalChannelTable (with listening=false so that these entries
- * are no-ops for the moment). It also records the final per-channel
- * intent in pendingListenActions, so post-commit/abort processing can
- * apply that in a single step. Since all these allocations happen before
- * committing to clog, we can safely abort the transaction on failure.
+ * shared globalChannelTable, marking new shared entries removeOnAbort.
+ * It also records the final per-channel intent in pendingListenActions,
+ * so post-commit/abort processing can apply that in a single step.
+ * Since all these allocations happen before committing to clog, we can
+ * safely abort the transaction on failure.
*
* After commit, AtCommit_Notify() runs through pendingListenActions and
- * updates the backend's per-channel listening flags to activate or
- * deactivate listening. This happens before sending signals.
+ * applies the final per-channel listen/unlisten state. This happens
+ * before sending signals.
*
* SignalBackends() consults the shared global channel table to identify
* listeners for the channels that the current transaction sent
@@ -384,10 +384,9 @@ static SlruDesc NotifySlruDesc;
* Global channel table definitions
*
* This hash table maps (database OID, channel name) keys to arrays of
- * ProcNumbers representing the backends listening or about to listen
- * on each channel. The "listening" flags allow us to create hash table
- * entries pre-commit and not have to assume that creating them post-commit
- * will succeed.
+ * ProcNumbers representing the backends listening or about to listen on each
+ * channel. The removeOnAbort flags allow us to create hash table entries
+ * pre-commit and discard them later if the transaction aborts.
*/
#define INITIAL_LISTENERS_ARRAY_SIZE 4
@@ -400,7 +399,7 @@ typedef struct GlobalChannelKey
typedef struct ListenerEntry
{
ProcNumber procNo; /* listener's ProcNumber */
- bool listening; /* true if committed listener */
+ bool removeOnAbort; /* remove entry if current xact aborts */
} ListenerEntry;
typedef struct GlobalChannelEntry
@@ -1523,9 +1522,8 @@ BecomeRegisteredListener(void)
*
* Prepare a LISTEN by recording it in pendingListenActions, pre-allocating
* an entry in localChannelTable, and pre-allocating an entry in the shared
- * globalChannelTable with listening=false. The listening flag will be set
- * to true in AtCommit_Notify. If we abort later, unwanted table entries
- * will be removed.
+ * globalChannelTable with removeOnAbort set. AtCommit_Notify clears
+ * removeOnAbort; abort processing removes entries still marked so.
*/
static void
PrepareTableEntriesForListen(const char *channel)
@@ -1557,7 +1555,7 @@ PrepareTableEntriesForListen(const char *channel)
*/
(void) hash_search(localChannelTable, channel, HASH_ENTER, NULL);
- /* Pre-allocate entry in shared globalChannelTable with listening=false */
+ /* Pre-allocate entry in shared globalChannelTable */
GlobalChannelKeyInit(&key, MyDatabaseId, channel);
entry = dshash_find_or_insert(globalChannelTable, &key, &found);
@@ -1592,7 +1590,7 @@ PrepareTableEntriesForListen(const char *channel)
{
if (listeners[i].procNo == MyProcNumber)
{
- /* Already have an entry; listening flag stays as-is until commit */
+ /* Already have an entry; leave removeOnAbort as-is */
dshash_release_lock(globalChannelTable, entry);
return;
}
@@ -1615,8 +1613,7 @@ PrepareTableEntriesForListen(const char *channel)
}
listeners[entry->numListeners].procNo = MyProcNumber;
- listeners[entry->numListeners].listening = false; /* staged, not yet
- * committed */
+ listeners[entry->numListeners].removeOnAbort = true;
entry->numListeners++;
dshash_release_lock(globalChannelTable, entry);
@@ -1766,11 +1763,10 @@ ApplyPendingListenActions(bool isCommit)
if (pending->action == PENDING_LISTEN)
{
/*
- * LISTEN being committed: set listening=true.
- * localChannelTable entry was created during
- * PreCommit and should be kept.
+ * LISTEN being committed; localChannelTable entry
+ * was created during PreCommit and should be kept.
*/
- listeners[i].listening = true;
+ listeners[i].removeOnAbort = false;
removeLocal = false;
}
else
@@ -1790,20 +1786,19 @@ ApplyPendingListenActions(bool isCommit)
* pendingListenActions entries, so it's pretty hard to
* test.
*/
- if (!listeners[i].listening)
+ if (listeners[i].removeOnAbort)
{
/*
* Staged LISTEN (or LISTEN+UNLISTEN) being aborted,
- * and we weren't listening before, so remove
- * pre-allocated entries from both tables.
+ * so remove pre-allocated entries from both tables.
*/
RemoveListenerFromChannel(&entry, listeners, i);
}
else
{
/*
- * We're aborting, but the previous state was that
- * we're listening, so keep localChannelTable entry.
+ * Entry pre-existed this transaction, so keep the
+ * localChannelTable entry.
*/
removeLocal = false;
}
@@ -2304,9 +2299,6 @@ SignalBackends(void)
int32 pid;
QueuePosition pos;
- if (!listeners[j].listening)
- continue; /* ignore not-yet-committed listeners */
-
i = listeners[j].procNo;
if (QUEUE_BACKEND_WAKEUP_PENDING(i))
--
2.52.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: [PATCH] Fix LISTEN startup race with direct advancement
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox