public inbox for [email protected]
help / color / mirror / Atom feedFrom: Joel Jacobson <[email protected]>
To: Tom Lane <[email protected]>
Cc: Chao Li <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Sat, 27 Dec 2025 13:40:29 +0100
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
<[email protected]>
<[email protected]>
<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAFY6G8dap-bCnAnMG-2Gzew8yv2Vbi9gsx9+yszKMmd57ygfvA@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAE7r3MLivh1sHWF06hrVXkiQbw-KChPcQsh+9CheXprm5vRVMQ@mail.gmail.com>
<14865EB6-0BF4-462B-9072-10BDAC10>
<[email protected]>
<[email protected]>
On Fri, Dec 26, 2025, at 21:12, Joel Jacobson wrote:
> On Tue, Nov 25, 2025, at 21:17, Tom Lane wrote:
>> "Joel Jacobson" <[email protected]> writes:
>>> It looks to me like it would be best with two boolean fields; one
>>> boolean to stage the updates during PreCommit_Notify, that each
>>> pendingActions could flip back and forth, and another boolean that
>>> represents the current value, which we would overwrite with the staged
>>> value during AtCommit_Notify.
>>
>> +1, I had a feeling that a single boolean wouldn't quite do it.
>> (There are various ways we could define the states, but what
>> you say above seems pretty reasonable.)
>
> I've implemented the two boolean approach and think it's good.
>
> The signals arrays are now preallocated during PreCommit_Notify.
>
> More details in the patch message under "Two-phase staging pattern".
New version with some fixes.
I should have mentioned that v31 is based on v28 (v29 and v30 were discarded).
Here is also a write-up of changes from v28 to v31:
0001: No changes.
0002:
* To avoid post-commit OOM hazards, we now allocate hash table entries
during PreCommit_Notify. Each listener entry has two boolean flags;
staged and current. For each LISTEN/UNLISTEN action the staged flag
is set/unset during PreCommit_Notify. The last action's staged value
per channel is then copied from staged to current during
AtCommit_Notify.
* On abort, AtAbort_Notify reverts staged changes.
* The signal arrays are now preallocated during PreCommit_Notify.
* Renamed Exec_UnlistenAllCommit to CleanupListenersOnExit for the
exit-handler path, since it has different semantics (unconditional
removal rather than staged/current handling).
In case someone has already started reviewing v31,
these are the changes I made in v32:
0001:
* Added test: Check UNLISTEN * cancels a LISTEN in the same transaction
0002:
* Fixed initialization of QueueBackendStatus fields, corrected the
LISTEN + UNLISTEN same-transaction case, restructured AtAbort_Notify
to mirror AtCommit_Notify, and added a guard for OOM during staging.
/Joel
Attachments:
[application/octet-stream] 0001-optimize_listen_notify-v32.patch (9.9K, 2-0001-optimize_listen_notify-v32.patch)
download | inline diff:
From 9550c98af2f24fb7653e9f18e451cf0131224a72 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sat, 27 Dec 2025 08:06:21 +0100
Subject: [PATCH 1/2] Improve LISTEN/NOTIFY test coverage
This adds isolation tests to cover previously untested code paths:
* Check simple NOTIFY reparenting when parent has no action
* Check LISTEN reparenting in subtransaction
* Check LISTEN merge path when both outer and inner transactions have actions
* Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions)
* Check notification_match function (triggered by hash table duplicate detection)
* Check that notifications sent from a backend that has not done LISTEN
are properly delivered to a listener in another backend
* Check UNLISTEN * cancels a LISTEN in the same transaction
This also adds a test to prepare for the next patch:
* Check ChannelHashAddListener array growth
---
src/test/isolation/expected/async-notify.out | 124 ++++++++++++++++++-
src/test/isolation/specs/async-notify.spec | 72 +++++++++++
2 files changed, 195 insertions(+), 1 deletion(-)
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893..5d6bcce2b02 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 7 sessions
starting permutation: listenc notify1 notify2 notify3 notifyf
step listenc: LISTEN c1; LISTEN c2;
@@ -47,6 +47,115 @@ notifier: NOTIFY "c2" with payload "payload" from notifier
notifier: NOTIFY "c1" with payload "payloads" from notifier
notifier: NOTIFY "c2" with payload "payloads" from notifier
+starting permutation: listenc notifys_simple
+step listenc: LISTEN c1; LISTEN c2;
+step notifys_simple:
+ BEGIN;
+ SAVEPOINT s1;
+ NOTIFY c1, 'simple1';
+ NOTIFY c2, 'simple2';
+ RELEASE SAVEPOINT s1;
+ COMMIT;
+
+notifier: NOTIFY "c1" with payload "simple1" from notifier
+notifier: NOTIFY "c2" with payload "simple2" from notifier
+
+starting permutation: lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lslisten_outer: LISTEN c3;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrollback: ROLLBACK TO SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify_check: NOTIFY c1, 'should_not_receive';
+
+starting permutation: lunlisten_all notify1 lcheck
+step lunlisten_all: BEGIN; LISTEN c1; UNLISTEN *; COMMIT;
+step notify1: NOTIFY c1;
+step lcheck: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+
+starting permutation: listenc notify_many_with_dup
+step listenc: LISTEN c1; LISTEN c2;
+step notify_many_with_dup:
+ BEGIN;
+ SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+ SELECT pg_notify('c1', 'msg1');
+ COMMIT;
+
+pg_notify
+---------
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+(17 rows)
+
+pg_notify
+---------
+
+(1 row)
+
+notifier: NOTIFY "c1" with payload "msg1" from notifier
+notifier: NOTIFY "c1" with payload "msg2" from notifier
+notifier: NOTIFY "c1" with payload "msg3" from notifier
+notifier: NOTIFY "c1" with payload "msg4" from notifier
+notifier: NOTIFY "c1" with payload "msg5" from notifier
+notifier: NOTIFY "c1" with payload "msg6" from notifier
+notifier: NOTIFY "c1" with payload "msg7" from notifier
+notifier: NOTIFY "c1" with payload "msg8" from notifier
+notifier: NOTIFY "c1" with payload "msg9" from notifier
+notifier: NOTIFY "c1" with payload "msg10" from notifier
+notifier: NOTIFY "c1" with payload "msg11" from notifier
+notifier: NOTIFY "c1" with payload "msg12" from notifier
+notifier: NOTIFY "c1" with payload "msg13" from notifier
+notifier: NOTIFY "c1" with payload "msg14" from notifier
+notifier: NOTIFY "c1" with payload "msg15" from notifier
+notifier: NOTIFY "c1" with payload "msg16" from notifier
+notifier: NOTIFY "c1" with payload "msg17" from notifier
+
+starting permutation: listenc llisten l2listen l3listen lslisten
+step listenc: LISTEN c1; LISTEN c2;
+step llisten: LISTEN c1; LISTEN c2;
+step l2listen: LISTEN c1;
+step l3listen: LISTEN c1;
+step lslisten: LISTEN c1; LISTEN c2;
+
starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
step llisten: LISTEN c1; LISTEN c2;
step notify1: NOTIFY c1;
@@ -95,6 +204,8 @@ listener: NOTIFY "c2" with payload "" from notifier
starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
step l2listen: LISTEN c1;
+listener2: NOTIFY "c1" with payload "" from notifier
+listener2: NOTIFY "c1" with payload "" from notifier
step l2begin: BEGIN;
step notify1: NOTIFY c1;
step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE;
@@ -104,6 +215,17 @@ step l2commit: COMMIT;
listener2: NOTIFY "c1" with payload "" from notifier
step l2stop: UNLISTEN *;
+starting permutation: lch_listen nch_notify lch_check
+step lch_listen: LISTEN ch;
+step nch_notify: NOTIFY ch, 'aa';
+step lch_check: SELECT 1 AS x;
+x
+-
+1
+(1 row)
+
+listener_ch: NOTIFY "ch" with payload "aa" from notifier_ch
+
starting permutation: llisten lbegin usage bignotify usage
step llisten: LISTEN c1; LISTEN c2;
step lbegin: BEGIN;
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..d09c2297f09 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -31,6 +31,20 @@ step notifys1 {
ROLLBACK TO SAVEPOINT s2;
COMMIT;
}
+step notifys_simple {
+ BEGIN;
+ SAVEPOINT s1;
+ NOTIFY c1, 'simple1';
+ NOTIFY c2, 'simple2';
+ RELEASE SAVEPOINT s1;
+ COMMIT;
+}
+step notify_many_with_dup {
+ BEGIN;
+ SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+ SELECT pg_notify('c1', 'msg1');
+ COMMIT;
+}
step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
teardown { UNLISTEN *; }
@@ -43,6 +57,7 @@ step lcheck { SELECT 1 AS x; }
step lbegin { BEGIN; }
step lbegins { BEGIN ISOLATION LEVEL SERIALIZABLE; }
step lcommit { COMMIT; }
+step lunlisten_all { BEGIN; LISTEN c1; UNLISTEN *; COMMIT; }
teardown { UNLISTEN *; }
# In some tests we need a second listener, just to block the queue.
@@ -53,6 +68,38 @@ step l2begin { BEGIN; }
step l2commit { COMMIT; }
step l2stop { UNLISTEN *; }
+# Third listener session for testing array growth.
+
+session listener3
+step l3listen { LISTEN c1; }
+teardown { UNLISTEN *; }
+
+# Listener session for cross-session notification test with channel 'ch'.
+
+session listener_ch
+step lch_listen { LISTEN ch; }
+step lch_check { SELECT 1 AS x; }
+teardown { UNLISTEN *; }
+
+# Notifier session for cross-session notification test with channel 'ch'.
+
+session notifier_ch
+step nch_notify { NOTIFY ch, 'aa'; }
+
+# Session for testing LISTEN in subtransaction with separate steps.
+
+session listen_subxact
+step lsbegin { BEGIN; }
+step lslisten_outer { LISTEN c3; }
+step lssavepoint { SAVEPOINT s1; }
+step lslisten { LISTEN c1; LISTEN c2; }
+step lsrelease { RELEASE SAVEPOINT s1; }
+step lsrollback { ROLLBACK TO SAVEPOINT s1; }
+step lscommit { COMMIT; }
+step lsnotify { NOTIFY c1, 'subxact_test'; }
+step lsnotify_check { NOTIFY c1, 'should_not_receive'; }
+teardown { UNLISTEN *; }
+
# Trivial cases.
permutation listenc notify1 notify2 notify3 notifyf
@@ -60,6 +107,27 @@ permutation listenc notify1 notify2 notify3 notifyf
# Check simple and less-simple deduplication.
permutation listenc notifyd1 notifyd2 notifys1
+# Check simple NOTIFY reparenting when parent has no action.
+permutation listenc notifys_simple
+
+# Check LISTEN reparenting in subtransaction.
+permutation lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN merge path when both outer and inner transactions have actions.
+permutation lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions).
+permutation lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+
+# Check UNLISTEN * cancels a LISTEN in the same transaction.
+permutation lunlisten_all notify1 lcheck
+
+# Check notification_match function (triggered by hash table duplicate detection).
+permutation listenc notify_many_with_dup
+
+# Check ChannelHashAddListener array growth.
+permutation listenc llisten l2listen l3listen lslisten
+
# Cross-backend notification delivery. We use a "select 1" to force the
# listener session to check for notifies. In principle we could just wait
# for delivery, but that would require extra support in isolationtester
@@ -73,6 +141,10 @@ permutation listenc llisten notify1 notify2 notify3 notifyf lcheck
# and notify queue is not empty
permutation l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
+# Check that notifications sent from a backend that has not done LISTEN
+# are properly delivered to a listener in another backend.
+permutation lch_listen nch_notify lch_check
+
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
# after submitting notifications while another connection is listening for
# those notifications and waiting inside an active transaction. We have to
--
2.50.1
[application/octet-stream] 0002-optimize_listen_notify-v32.patch (54.0K, 3-0002-optimize_listen_notify-v32.patch)
download | inline diff:
From 67ea5434e40b88e996c7ce1c8f417801afababc1 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sat, 27 Dec 2025 08:07:03 +0100
Subject: [PATCH 2/2] Optimize LISTEN/NOTIFY with shared channel map and direct
advancement
This patch reworks the LISTEN/NOTIFY signaling path to avoid the
long-standing inefficiency where every commit wakes all listening
backends in the same database, even those that are listening on
completely different channels.
Problem
-------
At present, SignalBackends has no central knowledge of which backend
listens on which channel. When a backend commits a transaction that
issued NOTIFY, it simply iterates over all registered listeners in the
same database and sends each one a PROCSIG_NOTIFY_INTERRUPT signal.
That behavior is fine when all listeners are on the same channel, but
when many backends are listening on different channels, each NOTIFY
triggers a storm of unnecessary wakeups and context switches. As the
number of idle listeners grows, this often becomes the bottleneck and
throughput drops sharply.
Overview of the solution
------------------------
This patch introduces a lazily-created dynamic shared hash (dshash)
backed by dynamic shared memory (DSA) that maps (dboid, channel) to
arrays of listening backends (ProcNumbers). This allows the sender to
target only those backends actually listening on the channels for which
it has queued notifications.
Two-phase staging pattern
-------------------------
To ensure transaction safety, LISTEN/UNLISTEN operations use a two-phase
staging pattern. Memory allocation and hash table modifications happen
in PreCommit_Notify (before committing to clog), where failures can
safely abort the transaction. After committing to clog, AtCommit_Notify
only looks up entries that were already added during PreCommit_Notify
and sets their boolean flags, so there is no OOM hazard.
Each listener entry in the shared hash uses a ListenerEntry struct
containing the backend's ProcNumber and two boolean flags: "staged" is
set during PreCommit_Notify, while "current" is copied from staged
during AtCommit_Notify and is what other backends read.
For LISTEN, PreCommit_Notify allocates memory and adds an entry with
staged=true and current=false, then AtCommit_Notify copies staged to
current. For UNLISTEN, PreCommit_Notify sets staged=false on the
existing entry, then AtCommit_Notify copies staged to current and
removes the entry if false.
On abort, staged changes are reverted to match current, and entries
where current=false (never committed) are removed.
Signal arrays for sending notifications are also preallocated in
PreCommit_Notify to avoid allocation failures after committing to clog.
Direct advancement
------------------
A further optimization avoids signaling idle backends that are not
listening on any of the channels notified within the transaction.
While queuing notifications, PreCommit_Notify records the queue head
position both before and after writing its notifications. Because all
writers are serialized by the existing cluster-wide heavyweight lock on
"database 0", no backend (from any database) can insert entries between
those two points. This guarantees that the region [oldHead, newHead)
contains only the entries written by our commit.
SignalBackends uses this fact to directly advance any backend still
positioned at oldHead up to newHead, avoiding a needless wakeup for
listeners that would otherwise not find any notifies of interest.
To handle advancing backends correctly, each backend's entry tracks both
whether it is currently advancing (isAdvancing) and the target position
it is advancing to (advancingPos). This allows SignalBackends to signal
advancing backends only when their target position would leave them
behind the new queue head, while safely direct-advancing idle backends
that would not be interested in the newly written notifications.
Idle backends that are stationary at a position before the old queue
head are signaled, since they might be interested in the notifications
in between their current position and the old queue head.
Other notes
-----------
The patch maintains dual data structures: a shared channelHash for
determining which backends to signal, and a local per-backend
listenChannelsHash for fast lock-free lookups during notification
processing. This avoids contention on the shared hash during the
high-frequency IsListeningOn checks that occur for every notification
read from the queue. Backends remain registered in the global listener
list as long as listenChannelsHash is non-empty.
This patch adds LWLock tranche NOTIFY_CHANNEL_HASH and wait event
NotifyChannelHash for visibility.
There are no user-visible behavioral changes; this is an internal
optimization only.
---
src/backend/commands/async.c | 960 ++++++++++++++----
.../utils/activity/wait_event_names.txt | 1 +
src/include/storage/lwlocklist.h | 1 +
src/tools/pgindent/typedefs.list | 2 +
4 files changed, 787 insertions(+), 177 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index eb86402cae4..a9fbadc95b9 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,10 @@
* All notification messages are placed in the queue and later read out
* by listening backends.
*
- * There is no central knowledge of which backend listens on which channel;
- * every backend has its own list of interesting channels.
+ * We also maintain a dynamic shared hash table (dshash) that maps channel
+ * names to the set of backends listening on each channel. This table is
+ * created lazily on the first LISTEN command and grows dynamically as
+ * needed.
*
* Although there is only one queue, notifications are treated as being
* database-local; this is done by including the sender's database OID
@@ -64,20 +66,33 @@
* notifications, we can still call elog(ERROR, ...) and the transaction
* will roll back.
*
+ * PreCommit_Notify() also stages any pending LISTEN/UNLISTEN actions by
+ * adding entries to listenChannelsHash and the shared channelHash with
+ * staged=true (for LISTEN) or staged=false (for UNLISTEN). This is done
+ * before committing to clog so that failures can safely abort.
+ *
* Once we have put all of the notifications into the queue, we return to
* CommitTransaction() which will then do the actual transaction commit.
*
* After commit we are called another time (AtCommit_Notify()). Here we
- * make any actual updates to the effective listen state (listenChannels).
- * Then we signal any backends that may be interested in our messages
- * (including our own backend, if listening). This is done by
- * SignalBackends(), which scans the list of listening backends and sends a
- * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- * know which backend is listening on which channel so we must signal them
- * all). We can exclude backends that are already up to date, though, and
- * we can also exclude backends that are in other databases (unless they
- * are way behind and should be kicked to make them advance their
- * pointers).
+ * commit the staged listen/unlisten changes by copying staged to current,
+ * removing entries where current becomes false. Then we signal any backends
+ * that may be interested in our messages (including our own backend,
+ * if listening). This is done by SignalBackends(), which consults the
+ * shared channel hash table to identify listeners for the channels that
+ * have pending notifications in the current database. Each selected
+ * backend is marked as having a wakeup pending to avoid duplicate signals,
+ * and a PROCSIG_NOTIFY_INTERRUPT signal is sent to it.
+ *
+ * When writing notifications, PreCommit_Notify() records the queue head
+ * position both before and after the write. Because all writers serialize
+ * on a cluster-wide heavyweight lock, no backend can insert entries between
+ * these two points. SignalBackends() uses this fact to directly advance any
+ * backend that is still positioned at the old head, or within the range
+ * written, avoiding unnecessary wakeups for idle listeners that have
+ * nothing to read. Backends that cannot be direct advanced are signaled
+ * if they are stuck behind the old queue head, or advancing to a position
+ * before the new queue head, since otherwise notifications could be delayed.
*
* Finally, after we are out of the transaction altogether and about to go
* idle, we scan the queue for messages that need to be sent to our
@@ -137,14 +152,17 @@
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
+#include "lib/dshash.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
+#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/dsa.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
@@ -162,6 +180,37 @@
*/
#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
+/*
+ * Channel hash table definitions
+ *
+ * This hash table maps (database OID, channel name) keys to arrays of
+ * ProcNumbers representing the backends listening on each channel.
+ */
+
+#define INITIAL_LISTENERS_ARRAY_SIZE 4
+
+typedef struct ChannelHashKey
+{
+ Oid dboid;
+ char channel[NAMEDATALEN];
+} ChannelHashKey;
+
+
+typedef struct ListenerEntry
+{
+ ProcNumber procNo;
+ bool staged;
+ bool current;
+} ListenerEntry;
+
+typedef struct ChannelEntry
+{
+ ChannelHashKey key;
+ dsa_pointer listenersArray; /* DSA pointer to ListenerEntry array */
+ int numListeners; /* Number of listeners currently stored */
+ int allocatedListeners; /* Allocated size of array */
+} ChannelEntry;
+
/*
* Struct representing an entry in the global notify queue
*
@@ -224,11 +273,14 @@ typedef struct QueuePosition
(x).page != (y).page ? (x) : \
(x).offset > (y).offset ? (x) : (y))
+/* returns true if x comes before y in queue order */
+#define QUEUE_POS_PRECEDES(x,y) \
+ (asyncQueuePagePrecedes((x).page, (y).page) || \
+ ((x).page == (y).page && (x).offset < (y).offset))
+
/*
* Parameter determining how often we try to advance the tail pointer:
- * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
- * also the distance by which a backend in another database needs to be
- * behind before we'll decide we need to wake it up to advance its pointer.
+ * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.
*
* Resist the temptation to make this really large. While that would save
* work in some places, it would add cost in others. In particular, this
@@ -246,6 +298,9 @@ typedef struct QueueBackendStatus
Oid dboid; /* backend's database OID, or InvalidOid */
ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
QueuePosition pos; /* backend has read queue up to here */
+ bool wakeupPending; /* signal sent but not yet processed */
+ bool isAdvancing; /* backend is advancing its position */
+ QueuePosition advancingPos; /* target position backend is advancing to */
} QueueBackendStatus;
/*
@@ -260,9 +315,10 @@ typedef struct QueueBackendStatus
* (since no other backend will inspect it).
*
* When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the
- * entries of other backends and also change the head pointer. When holding
- * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
- * can change the tail pointers.
+ * entries of other backends and also change the head pointer. They can
+ * also advance other backends' queue positions, unless they are not
+ * in the process of doing that themselves. When holding both NotifyQueueLock and
+ * NotifyQueueTailLock in EXCLUSIVE mode, backends can change the tail pointers.
*
* SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
* the control lock for the pg_notify SLRU buffers.
@@ -288,11 +344,16 @@ typedef struct AsyncQueueControl
ProcNumber firstListener; /* id of first listener, or
* INVALID_PROC_NUMBER */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
+ dsa_handle channelHashDSA;
+ dshash_table_handle channelHashDSH;
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
} AsyncQueueControl;
static AsyncQueueControl *asyncQueueControl;
+static dsa_area *channelDSA = NULL;
+static dshash_table *channelHash = NULL;
+
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
@@ -301,6 +362,9 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
+#define QUEUE_BACKEND_IS_ADVANCING(i) (asyncQueueControl->backend[i].isAdvancing)
+#define QUEUE_BACKEND_ADVANCING_POS(i) (asyncQueueControl->backend[i].advancingPos)
/*
* The SLRU buffer area through which we access the notification queue
@@ -313,16 +377,18 @@ static SlruCtlData NotifyCtlData;
#define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */
/*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on). It is a simple list of channel names,
- * allocated in TopMemoryContext.
+ * listenChannelsHash identifies the channels we are listening to.
+ * Entries are added during PreCommit_Notify (before committing to clog) and
+ * removed on abort if the LISTEN was never committed. It is a hash table
+ * of channel names, allocated in TopMemoryContext.
*/
-static List *listenChannels = NIL; /* list of C strings */
+static HTAB *listenChannelsHash = NULL;
/*
* State for pending LISTEN/UNLISTEN actions consists of an ordered list of
- * all actions requested in the current transaction. As explained above,
- * we don't actually change listenChannels until we reach transaction commit.
+ * all actions requested in the current transaction. During PreCommit_Notify,
+ * we stage these changes in listenChannelsHash and the shared channelHash.
+ * On abort, AtAbort_Notify cleans up any staged-but-uncommitted entries.
*
* The list is kept in CurTransactionContext. In subtransactions, each
* subtransaction has its own list in its own CurTransactionContext, but
@@ -391,6 +457,7 @@ typedef struct NotificationList
int nestingLevel; /* current transaction nesting depth */
List *events; /* list of Notification structs */
HTAB *hashtab; /* hash of NotificationHash structs, or NULL */
+ HTAB *channelHashtab; /* hash of unique channel names, or NULL */
struct NotificationList *upper; /* details for upper transaction levels */
} NotificationList;
@@ -401,6 +468,11 @@ struct NotificationHash
Notification *event; /* => the actual Notification struct */
};
+struct ChannelHash
+{
+ char channel[NAMEDATALEN];
+};
+
static NotificationList *pendingNotifies = NULL;
/*
@@ -418,6 +490,36 @@ static bool unlistenExitRegistered = false;
/* True if we're currently registered as a listener in asyncQueueControl */
static bool amRegisteredListener = false;
+/*
+ * Queue head positions for direct advancement.
+ * These are captured during PreCommit_Notify while holding the heavyweight
+ * lock on database 0, ensuring no other backend can insert notifications
+ * between them. SignalBackends uses these to advance idle backends.
+ */
+static QueuePosition queueHeadBeforeWrite;
+static QueuePosition queueHeadAfterWrite;
+
+/*
+ * List of channels with pending notifications in the current transaction.
+ */
+static List *pendingNotifyChannels = NIL;
+
+/*
+ * List of channels with staged listen/unlisten changes in the current
+ * transaction. Populated during PreCommit_Notify and used by AtCommit_Notify
+ * to copy staged values to current.
+ */
+static List *pendingListenChannels = NIL;
+
+/*
+ * Preallocated arrays for SignalBackends to avoid memory allocation after
+ * committing to clog. Allocated in PreCommit_Notify when there are pending
+ * notifications.
+ */
+static int32 *signalPids = NULL;
+static ProcNumber *signalProcnos = NULL;
+
+
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
static bool tryAdvanceTail = false;
@@ -428,14 +530,14 @@ bool Trace_notify = false;
int max_notify_queue_pages = 1048576;
/* local function prototypes */
-static inline int64 asyncQueuePageDiff(int64 p, int64 q);
static inline bool asyncQueuePagePrecedes(int64 p, int64 q);
static void queue_listen(ListenActionKind action, const char *channel);
static void Async_UnlistenOnExit(int code, Datum arg);
static void Exec_ListenPreCommit(void);
-static void Exec_ListenCommit(const char *channel);
-static void Exec_UnlistenCommit(const char *channel);
-static void Exec_UnlistenAllCommit(void);
+static void Exec_ListenPreCommitStage(const char *channel);
+static void Exec_UnlistenPreCommitStage(const char *channel);
+static void Exec_UnlistenAllPreCommitStage(void);
+static void CleanupListenersOnExit(void);
static bool IsListeningOn(const char *channel);
static void asyncQueueUnregister(void);
static bool asyncQueueIsFull(void);
@@ -456,16 +558,9 @@ static void AddEventToPendingNotifies(Notification *n);
static uint32 notification_hash(const void *key, Size keysize);
static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
-
-/*
- * Compute the difference between two queue page numbers.
- * Previously this function accounted for a wraparound.
- */
-static inline int64
-asyncQueuePageDiff(int64 p, int64 q)
-{
- return p - q;
-}
+static inline void ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel);
+static dshash_hash channelHashFunc(const void *key, size_t size, void *arg);
+static void initChannelHash(void);
/*
* Determines whether p precedes q.
@@ -477,6 +572,105 @@ asyncQueuePagePrecedes(int64 p, int64 q)
return p < q;
}
+/*
+ * channelHashFunc
+ * Hash function for channel keys.
+ */
+static dshash_hash
+channelHashFunc(const void *key, size_t size, void *arg)
+{
+ const ChannelHashKey *k = (const ChannelHashKey *) key;
+ dshash_hash h;
+
+ h = DatumGetUInt32(hash_uint32(k->dboid));
+ h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+ strnlen(k->channel, NAMEDATALEN)));
+
+ return h;
+}
+
+/* parameters for the channel hash table */
+static const dshash_parameters channelDSHParams = {
+ sizeof(ChannelHashKey),
+ sizeof(ChannelEntry),
+ dshash_memcmp,
+ channelHashFunc,
+ dshash_memcpy,
+ LWTRANCHE_NOTIFY_CHANNEL_HASH
+};
+
+/*
+ * initChannelHash
+ * Lazy initialization of the channel hash table.
+ */
+static void
+initChannelHash(void)
+{
+ MemoryContext oldcontext;
+
+ /* Quick exit if we already did this */
+ if (asyncQueueControl->channelHashDSH != DSHASH_HANDLE_INVALID &&
+ channelHash != NULL)
+ return;
+
+ /* Otherwise, use a lock to ensure only one process creates the table */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+ /* Be sure any local memory allocated by DSA routines is persistent */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+ if (asyncQueueControl->channelHashDSH == DSHASH_HANDLE_INVALID)
+ {
+ /* Initialize dynamic shared hash table for channel hash */
+ channelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
+ dsa_pin(channelDSA);
+ dsa_pin_mapping(channelDSA);
+ channelHash = dshash_create(channelDSA, &channelDSHParams, NULL);
+
+ /* Store handles in shared memory for other backends to use */
+ asyncQueueControl->channelHashDSA = dsa_get_handle(channelDSA);
+ asyncQueueControl->channelHashDSH =
+ dshash_get_hash_table_handle(channelHash);
+ }
+ else if (!channelHash)
+ {
+ /* Attach to existing dynamic shared hash table */
+ channelDSA = dsa_attach(asyncQueueControl->channelHashDSA);
+ dsa_pin_mapping(channelDSA);
+ channelHash = dshash_attach(channelDSA, &channelDSHParams,
+ asyncQueueControl->channelHashDSH,
+ NULL);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+ LWLockRelease(NotifyQueueLock);
+}
+
+/*
+ * initListenChannelsHash
+ * Lazy initialization of the local listen channels hash table.
+ */
+static void
+initListenChannelsHash(void)
+{
+ HASHCTL hash_ctl;
+
+ /* Quick exit if we already did this */
+ if (listenChannelsHash != NULL)
+ return;
+
+ /* Initialize local hash table for this backend's listened channels */
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(struct ChannelHash);
+
+ listenChannelsHash =
+ hash_create("Listen Channels",
+ 64,
+ &hash_ctl,
+ HASH_ELEM | HASH_STRINGS);
+}
+
/*
* Report space needed for our shared memory area
*/
@@ -520,12 +714,17 @@ AsyncShmemInit(void)
QUEUE_STOP_PAGE = 0;
QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
asyncQueueControl->lastQueueFillWarn = 0;
+ asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
+ asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
for (int i = 0; i < MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ SET_QUEUE_POS(QUEUE_BACKEND_ADVANCING_POS(i), 0, 0);
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
+ QUEUE_BACKEND_IS_ADVANCING(i) = false;
}
}
@@ -656,6 +855,7 @@ Async_Notify(const char *channel, const char *payload)
notifies->events = list_make1(n);
/* We certainly don't need a hashtable yet */
notifies->hashtab = NULL;
+ notifies->channelHashtab = NULL;
notifies->upper = pendingNotifies;
pendingNotifies = notifies;
}
@@ -682,8 +882,8 @@ Async_Notify(const char *channel, const char *payload)
* Common code for listen, unlisten, unlisten all commands.
*
* Adds the request to the list of pending actions.
- * Actual update of the listenChannels list happens during transaction
- * commit.
+ * Actual update of listenChannelsHash and channelHash happens during
+ * PreCommit_Notify, with staged changes committed in AtCommit_Notify.
*/
static void
queue_listen(ListenActionKind action, const char *channel)
@@ -782,30 +982,49 @@ Async_UnlistenAll(void)
* SQL function: return a set of the channel names this backend is actively
* listening to.
*
- * Note: this coding relies on the fact that the listenChannels list cannot
+ * Note: this coding relies on the fact that the listenChannelsHash cannot
* change within a transaction.
*/
Datum
pg_listening_channels(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
+ HASH_SEQ_STATUS *status;
/* stuff done only on the first call of the function */
if (SRF_IS_FIRSTCALL())
{
+ MemoryContext oldcontext;
+
/* create a function context for cross-call persistence */
funcctx = SRF_FIRSTCALL_INIT();
+
+ /* Initialize hash table iteration if we have any channels */
+ if (listenChannelsHash != NULL)
+ {
+ oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+ status = (HASH_SEQ_STATUS *) palloc(sizeof(HASH_SEQ_STATUS));
+ hash_seq_init(status, listenChannelsHash);
+ funcctx->user_fctx = status;
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ funcctx->user_fctx = NULL;
+ }
}
/* stuff done on every call of the function */
funcctx = SRF_PERCALL_SETUP();
+ status = (HASH_SEQ_STATUS *) funcctx->user_fctx;
- if (funcctx->call_cntr < list_length(listenChannels))
+ if (status != NULL)
{
- char *channel = (char *) list_nth(listenChannels,
- funcctx->call_cntr);
+ struct ChannelHash *entry;
- SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel));
+ entry = (struct ChannelHash *) hash_seq_search(status);
+ if (entry != NULL)
+ SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(entry->channel));
}
SRF_RETURN_DONE(funcctx);
@@ -821,7 +1040,7 @@ pg_listening_channels(PG_FUNCTION_ARGS)
static void
Async_UnlistenOnExit(int code, Datum arg)
{
- Exec_UnlistenAllCommit();
+ CleanupListenersOnExit();
asyncQueueUnregister();
}
@@ -868,8 +1087,24 @@ PreCommit_Notify(void)
elog(DEBUG1, "PreCommit_Notify");
/* Preflight for any pending listen/unlisten actions */
+ if (pendingNotifies != NULL || pendingActions != NULL)
+ initChannelHash();
+
+ if (pendingNotifies != NULL)
+ {
+ if (signalPids == NULL)
+ signalPids = MemoryContextAlloc(TopMemoryContext,
+ MaxBackends * sizeof(int32));
+
+ if (signalProcnos == NULL)
+ signalProcnos = MemoryContextAlloc(TopMemoryContext,
+ MaxBackends * sizeof(ProcNumber));
+ }
+
if (pendingActions != NULL)
{
+ initListenChannelsHash();
+
foreach(p, pendingActions->actions)
{
ListenAction *actrec = (ListenAction *) lfirst(p);
@@ -878,12 +1113,13 @@ PreCommit_Notify(void)
{
case LISTEN_LISTEN:
Exec_ListenPreCommit();
+ Exec_ListenPreCommitStage(actrec->channel);
break;
case LISTEN_UNLISTEN:
- /* there is no Exec_UnlistenPreCommit() */
+ Exec_UnlistenPreCommitStage(actrec->channel);
break;
case LISTEN_UNLISTEN_ALL:
- /* there is no Exec_UnlistenAllPreCommit() */
+ Exec_UnlistenAllPreCommitStage();
break;
}
}
@@ -893,6 +1129,36 @@ PreCommit_Notify(void)
if (pendingNotifies)
{
ListCell *nextNotify;
+ bool firstIteration = true;
+
+ /*
+ * Build list of unique channels for SignalBackends().
+ *
+ * If we have a channelHashtab, use it to efficiently get the unique
+ * channels. Otherwise, fall back to the linear approach.
+ */
+ pendingNotifyChannels = NIL;
+ if (pendingNotifies->channelHashtab != NULL)
+ {
+ HASH_SEQ_STATUS status;
+ struct ChannelHash *channelEntry;
+
+ hash_seq_init(&status, pendingNotifies->channelHashtab);
+ while ((channelEntry = (struct ChannelHash *) hash_seq_search(&status)) != NULL)
+ pendingNotifyChannels = lappend(pendingNotifyChannels, channelEntry->channel);
+ }
+ else
+ {
+ /* Linear approach for small number of notifications */
+ foreach_ptr(Notification, n, pendingNotifies->events)
+ {
+ char *channel = n->data;
+
+ /* Add if not already in list */
+ if (!list_member_ptr(pendingNotifyChannels, channel))
+ pendingNotifyChannels = lappend(pendingNotifyChannels, channel);
+ }
+ }
/*
* Make sure that we have an XID assigned to the current transaction.
@@ -921,6 +1187,22 @@ PreCommit_Notify(void)
LockSharedObject(DatabaseRelationId, InvalidOid, 0,
AccessExclusiveLock);
+ /*
+ * For the direct advancement optimization in SignalBackends(), we
+ * need to ensure that no other backend can insert queue entries
+ * between queueHeadBeforeWrite and queueHeadAfterWrite. The
+ * heavyweight lock above provides this guarantee, since it serializes
+ * all writers.
+ *
+ * Note: if the heavyweight lock were ever removed for scalability
+ * reasons, we could achieve the same guarantee by holding
+ * NotifyQueueLock in EXCLUSIVE mode across all our insertions, rather
+ * than releasing and reacquiring it for each page as we do below.
+ */
+
+ /* Initialize queueHeadBeforeWrite to a safe default */
+ SET_QUEUE_POS(queueHeadBeforeWrite, 0, 0);
+
/* Now push the notifications into the queue */
nextNotify = list_head(pendingNotifies->events);
while (nextNotify != NULL)
@@ -938,12 +1220,20 @@ PreCommit_Notify(void)
* point in time we can still roll the transaction back.
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ if (firstIteration)
+ {
+ queueHeadBeforeWrite = QUEUE_HEAD;
+ firstIteration = false;
+ }
+
asyncQueueFillWarning();
if (asyncQueueIsFull())
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg("too many notifications in the NOTIFY queue")));
nextNotify = asyncQueueAddEntries(nextNotify);
+ queueHeadAfterWrite = QUEUE_HEAD;
+
LWLockRelease(NotifyQueueLock);
}
@@ -956,7 +1246,7 @@ PreCommit_Notify(void)
*
* This is called at transaction commit, after committing to clog.
*
- * Update listenChannels and clear transaction-local state.
+ * Update listenChannelsHash and clear transaction-local state.
*
* If we issued any notifications in the transaction, send signals to
* listening backends (possibly including ourselves) to process them.
@@ -966,7 +1256,6 @@ PreCommit_Notify(void)
void
AtCommit_Notify(void)
{
- ListCell *p;
/*
* Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to
@@ -978,30 +1267,62 @@ AtCommit_Notify(void)
if (Trace_notify)
elog(DEBUG1, "AtCommit_Notify");
- /* Perform any pending listen/unlisten actions */
- if (pendingActions != NULL)
+ /* Commit staged listen/unlisten changes by copying staged to current */
+ if (pendingListenChannels != NIL)
{
- foreach(p, pendingActions->actions)
+ ListCell *lc;
+
+ foreach(lc, pendingListenChannels)
{
- ListenAction *actrec = (ListenAction *) lfirst(p);
+ char *channel = (char *) lfirst(lc);
+ ChannelHashKey key;
+ ChannelEntry *entry;
+ ListenerEntry *listeners;
- switch (actrec->action)
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+ entry = dshash_find(channelHash, &key, true);
+ if (entry == NULL)
+ continue;
+
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA, entry->listenersArray);
+
+ for (int i = 0; i < entry->numListeners; i++)
{
- case LISTEN_LISTEN:
- Exec_ListenCommit(actrec->channel);
- break;
- case LISTEN_UNLISTEN:
- Exec_UnlistenCommit(actrec->channel);
- break;
- case LISTEN_UNLISTEN_ALL:
- Exec_UnlistenAllCommit();
+ if (listeners[i].procNo == MyProcNumber)
+ {
+ /* Commit staged value to current */
+ listeners[i].current = listeners[i].staged;
+
+ if (!listeners[i].current)
+ {
+ /* UNLISTEN committed: remove from local and shared */
+ (void) hash_search(listenChannelsHash, channel,
+ HASH_REMOVE, NULL);
+
+ entry->numListeners--;
+ if (i < entry->numListeners)
+ memmove(&listeners[i], &listeners[i + 1],
+ sizeof(ListenerEntry) * (entry->numListeners - i));
+
+ if (entry->numListeners == 0)
+ {
+ dsa_free(channelDSA, entry->listenersArray);
+ dshash_delete_entry(channelHash, entry);
+ entry = NULL;
+ }
+ }
break;
+ }
}
+
+ if (entry != NULL)
+ dshash_release_lock(channelHash, entry);
}
}
/* If no longer listening to anything, get out of listener array */
- if (amRegisteredListener && listenChannels == NIL)
+ if (amRegisteredListener &&
+ (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0))
asyncQueueUnregister();
/*
@@ -1098,6 +1419,9 @@ Exec_ListenPreCommit(void)
QUEUE_BACKEND_POS(MyProcNumber) = max;
QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+ QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
+ QUEUE_BACKEND_ADVANCING_POS(MyProcNumber) = max;
/* Insert backend into list of listeners at correct position */
if (prevListener != INVALID_PROC_NUMBER)
{
@@ -1127,99 +1451,213 @@ Exec_ListenPreCommit(void)
}
/*
- * Exec_ListenCommit --- subroutine for AtCommit_Notify
+ * Exec_ListenPreCommitStage --- subroutine for PreCommit_Notify
*
- * Add the channel to the list of channels we are listening on.
+ * Stage a LISTEN by adding entries to listenChannelsHash and the shared
+ * channelHash with staged=true, current=false. The staged value is copied
+ * to current in AtCommit_Notify.
*/
static void
-Exec_ListenCommit(const char *channel)
+Exec_ListenPreCommitStage(const char *channel)
{
- MemoryContext oldcontext;
-
- /* Do nothing if we are already listening on this channel */
- if (IsListeningOn(channel))
- return;
-
- /*
- * Add the new channel name to listenChannels.
- *
- * XXX It is theoretically possible to get an out-of-memory failure here,
- * which would be bad because we already committed. For the moment it
- * doesn't seem worth trying to guard against that, but maybe improve this
- * later.
- */
- oldcontext = MemoryContextSwitchTo(TopMemoryContext);
- listenChannels = lappend(listenChannels, pstrdup(channel));
- MemoryContextSwitchTo(oldcontext);
+ ChannelHashKey key;
+ ChannelEntry *entry;
+ bool found;
+ ListenerEntry *listeners;
+
+ (void) hash_search(listenChannelsHash, channel, HASH_ENTER, NULL);
+
+ pendingListenChannels = lappend(pendingListenChannels, pstrdup(channel));
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+ entry = dshash_find_or_insert(channelHash, &key, &found);
+
+ if (!found)
+ {
+ entry->listenersArray = InvalidDsaPointer;
+ entry->numListeners = 0;
+ entry->allocatedListeners = 0;
+ }
+
+ if (!DsaPointerIsValid(entry->listenersArray))
+ {
+ entry->listenersArray = dsa_allocate(channelDSA,
+ sizeof(ListenerEntry) * INITIAL_LISTENERS_ARRAY_SIZE);
+ entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
+ }
+
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA, entry->listenersArray);
+
+ for (int i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procNo == MyProcNumber)
+ {
+ listeners[i].staged = true;
+ dshash_release_lock(channelHash, entry);
+ return;
+ }
+ }
+
+ if (entry->numListeners >= entry->allocatedListeners)
+ {
+ int new_size = entry->allocatedListeners * 2;
+ dsa_pointer new_array = dsa_allocate(channelDSA,
+ sizeof(ListenerEntry) * new_size);
+ ListenerEntry *new_listeners = (ListenerEntry *) dsa_get_address(channelDSA, new_array);
+
+ memcpy(new_listeners, listeners, sizeof(ListenerEntry) * entry->numListeners);
+ dsa_free(channelDSA, entry->listenersArray);
+ entry->listenersArray = new_array;
+ entry->allocatedListeners = new_size;
+ listeners = new_listeners;
+ }
+
+ listeners[entry->numListeners].procNo = MyProcNumber;
+ listeners[entry->numListeners].staged = true;
+ listeners[entry->numListeners].current = false;
+ entry->numListeners++;
+
+ dshash_release_lock(channelHash, entry);
}
/*
- * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenPreCommitStage --- subroutine for PreCommit_Notify
*
- * Remove the specified channel name from listenChannels.
+ * Stage an UNLISTEN by setting staged=false on our entry in channelHash.
+ * The staged value is copied to current in AtCommit_Notify, and the entry
+ * is removed if current becomes false.
*/
static void
-Exec_UnlistenCommit(const char *channel)
+Exec_UnlistenPreCommitStage(const char *channel)
{
- ListCell *q;
+ ChannelHashKey key;
+ ChannelEntry *entry;
+ ListenerEntry *listeners;
- if (Trace_notify)
- elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+ entry = dshash_find(channelHash, &key, true);
+ if (entry == NULL)
+ return;
- foreach(q, listenChannels)
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA, entry->listenersArray);
+
+ for (int i = 0; i < entry->numListeners; i++)
{
- char *lchan = (char *) lfirst(q);
-
- if (strcmp(lchan, channel) == 0)
+ if (listeners[i].procNo == MyProcNumber && listeners[i].staged)
{
- listenChannels = foreach_delete_current(listenChannels, q);
- pfree(lchan);
+ listeners[i].staged = false;
+
+ pendingListenChannels = lappend(pendingListenChannels, pstrdup(channel));
break;
}
}
- /*
- * We do not complain about unlistening something not being listened;
- * should we?
- */
+ dshash_release_lock(channelHash, entry);
}
/*
- * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify
+ * Exec_UnlistenAllPreCommitStage --- subroutine for PreCommit_Notify
*
- * Unlisten on all channels for this backend.
+ * Stage UNLISTEN * by setting staged=false on all our entries in channelHash.
*/
static void
-Exec_UnlistenAllCommit(void)
+Exec_UnlistenAllPreCommitStage(void)
{
+ dshash_seq_status status;
+ ChannelEntry *entry;
+
+ dshash_seq_init(&status, channelHash, true);
+ while ((entry = dshash_seq_next(&status)) != NULL)
+ {
+ if (entry->key.dboid == MyDatabaseId)
+ {
+ ListenerEntry *listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (int i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procNo == MyProcNumber && listeners[i].staged)
+ {
+ listeners[i].staged = false;
+ pendingListenChannels = lappend(pendingListenChannels,
+ pstrdup(entry->key.channel));
+ }
+ }
+ }
+ }
+ dshash_seq_term(&status);
+}
+
+/*
+ * CleanupListenersOnExit --- called from Async_UnlistenOnExit
+ *
+ * Remove this backend from all channels in the shared hash.
+ */
+static void
+CleanupListenersOnExit(void)
+{
+ dshash_seq_status status;
+ ChannelEntry *entry;
+
if (Trace_notify)
- elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
+ elog(DEBUG1, "CleanupListenersOnExit(%d)", MyProcPid);
- list_free_deep(listenChannels);
- listenChannels = NIL;
+ /* Clear our local cache */
+ if (listenChannelsHash != NULL)
+ {
+ hash_destroy(listenChannelsHash);
+ listenChannelsHash = NULL;
+ }
+
+ /* Now remove from the shared channelHash */
+ if (channelHash == NULL)
+ return;
+
+ dshash_seq_init(&status, channelHash, true);
+ while ((entry = dshash_seq_next(&status)) != NULL)
+ {
+ if (entry->key.dboid == MyDatabaseId)
+ {
+ ListenerEntry *listeners;
+ int i;
+
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procNo == MyProcNumber)
+ {
+ entry->numListeners--;
+ if (i < entry->numListeners)
+ memmove(&listeners[i], &listeners[i + 1],
+ sizeof(ListenerEntry) * (entry->numListeners - i));
+
+ if (entry->numListeners == 0)
+ {
+ dsa_free(channelDSA, entry->listenersArray);
+ dshash_delete_current(&status);
+ }
+ break;
+ }
+ }
+ }
+ }
+ dshash_seq_term(&status);
}
/*
* Test whether we are actively listening on the given channel name.
*
* Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it. In practice the list is likely to be
- * fairly short, though.
*/
static bool
IsListeningOn(const char *channel)
{
- ListCell *p;
+ if (listenChannelsHash == NULL)
+ return false;
- foreach(p, listenChannels)
- {
- char *lchan = (char *) lfirst(p);
-
- if (strcmp(lchan, channel) == 0)
- return true;
- }
- return false;
+ return (hash_search(listenChannelsHash, channel, HASH_FIND, NULL) != NULL);
}
/*
@@ -1229,7 +1667,7 @@ IsListeningOn(const char *channel)
static void
asyncQueueUnregister(void)
{
- Assert(listenChannels == NIL); /* else caller error */
+ Assert(listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0); /* else caller error */
if (!amRegisteredListener) /* nothing to do */
return;
@@ -1241,6 +1679,9 @@ asyncQueueUnregister(void)
/* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
+ QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
+ SET_QUEUE_POS(QUEUE_BACKEND_ADVANCING_POS(MyProcNumber), 0, 0);
/* and remove it from the list */
if (QUEUE_FIRST_LISTENER == MyProcNumber)
QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
@@ -1565,12 +2006,21 @@ asyncQueueFillWarning(void)
/*
* Send signals to listening backends.
*
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send. However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind. Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * Normally we signal only backends in our own database, that are
+ * listening on the channels with pending notifies, since only those
+ * backends are interested in notifies we send.
+ *
+ * Backends that are not interested in our notifies, that are known
+ * to still be positioned at the old queue head, or anywhere in the
+ * queue region we just wrote, can be safely advanced directly to the
+ * new head, since that region is known to contain only our own
+ * notifications. This avoids unnecessary wakeups when there is
+ * nothing of interest to them.
+ *
+ * Backends that are not interested in our notifies, that are advancing
+ * to a target position before the new queue head, or that are not
+ * advancing and are stationary at a position before the old queue head
+ * needs to be signaled since notifications could otherwise be delayed.
*
* Since we know the ProcNumber and the Pid the signaling is quite cheap.
*
@@ -1580,60 +2030,106 @@ asyncQueueFillWarning(void)
static void
SignalBackends(void)
{
- int32 *pids;
- ProcNumber *procnos;
int count;
+ ListCell *lc;
- /*
- * Identify backends that we need to signal. We don't want to send
- * signals while holding the NotifyQueueLock, so this loop just builds a
- * list of target PIDs.
- *
- * XXX in principle these pallocs could fail, which would be bad. Maybe
- * preallocate the arrays? They're not that large, though.
- */
- pids = (int32 *) palloc(MaxBackends * sizeof(int32));
- procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+ Assert(signalPids != NULL && signalProcnos != NULL);
count = 0;
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+ foreach(lc, pendingNotifyChannels)
{
- int32 pid = QUEUE_BACKEND_PID(i);
- QueuePosition pos;
+ char *channel = (char *) lfirst(lc);
+ ChannelEntry *entry = NULL;
+ ListenerEntry *listeners;
- Assert(pid != InvalidPid);
- pos = QUEUE_BACKEND_POS(i);
- if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ if (channelHash != NULL)
{
- /*
- * Always signal listeners in our own database, unless they're
- * already caught up (unlikely, but possible).
- */
+ ChannelHashKey key;
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+ entry = dshash_find(channelHash, &key, false);
+ }
+
+ if (entry == NULL)
+ continue;
+
+ listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (int j = 0; j < entry->numListeners; j++)
+ {
+ ProcNumber i;
+ int32 pid;
+ QueuePosition pos;
+
+ if (!listeners[j].current)
+ continue;
+
+ i = listeners[j].procNo;
+
+ if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+ continue;
+
+ pos = QUEUE_BACKEND_POS(i);
+ pid = QUEUE_BACKEND_PID(i);
+
+ /* Skip if caught up */
if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
continue;
+
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ signalPids[count] = pid;
+ signalProcnos[count] = i;
+ count++;
}
- else
+
+ dshash_release_lock(channelHash, entry);
+ }
+
+ if (pendingNotifies != NULL)
+ {
+ for (ProcNumber i = QUEUE_FIRST_LISTENER;
+ i != INVALID_PROC_NUMBER;
+ i = QUEUE_NEXT_LISTENER(i))
{
- /*
- * Listeners in other databases should be signaled only if they
- * are far behind.
- */
- if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
- QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
+ QueuePosition pos;
+ int32 pid;
+
+ if (QUEUE_BACKEND_WAKEUP_PENDING(i))
continue;
+
+ pos = QUEUE_BACKEND_POS(i);
+ pid = QUEUE_BACKEND_PID(i);
+
+ if (QUEUE_BACKEND_IS_ADVANCING(i) ?
+ QUEUE_POS_PRECEDES(QUEUE_BACKEND_ADVANCING_POS(i), queueHeadAfterWrite) :
+ QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
+ {
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ signalPids[count] = pid;
+ signalProcnos[count] = i;
+ count++;
+ }
+ else if (!QUEUE_BACKEND_IS_ADVANCING(i) &&
+ QUEUE_POS_PRECEDES(pos, queueHeadAfterWrite))
+ {
+ Assert(!QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite));
+
+ QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+ }
}
- /* OK, need to signal this one */
- pids[count] = pid;
- procnos[count] = i;
- count++;
}
LWLockRelease(NotifyQueueLock);
/* Now send signals */
for (int i = 0; i < count; i++)
{
- int32 pid = pids[i];
+ int32 pid = signalPids[i];
/*
* If we are signaling our own process, no need to involve the kernel;
@@ -1651,12 +2147,9 @@ SignalBackends(void)
* NotifyQueueLock; which is unlikely but certainly possible. So we
* just log a low-level debug message if it happens.
*/
- if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
+ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, signalProcnos[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
}
-
- pfree(pids);
- pfree(procnos);
}
/*
@@ -1664,18 +2157,71 @@ SignalBackends(void)
*
* This is called at transaction abort.
*
- * Gets rid of pending actions and outbound notifies that we would have
- * executed if the transaction got committed.
+ * Revert any staged listen/unlisten changes and clean up transaction state.
*/
void
AtAbort_Notify(void)
{
/*
- * If we LISTEN but then roll back the transaction after PreCommit_Notify,
- * we have registered as a listener but have not made any entry in
- * listenChannels. In that case, deregister again.
+ * Revert staged listen/unlisten changes. For new LISTENs (current=false),
+ * remove from both local and shared hash. For UNLISTENs (current=true),
+ * just revert staged back to current.
*/
- if (amRegisteredListener && listenChannels == NIL)
+ if (pendingListenChannels != NIL && channelHash != NULL)
+ {
+ ListCell *lc;
+
+ foreach(lc, pendingListenChannels)
+ {
+ char *channel = (char *) lfirst(lc);
+ ChannelHashKey key;
+ ChannelEntry *entry;
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+ entry = dshash_find(channelHash, &key, true);
+ if (entry != NULL)
+ {
+ ListenerEntry *listeners = (ListenerEntry *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (int i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i].procNo == MyProcNumber)
+ {
+ /* Revert staged value to current */
+ listeners[i].staged = listeners[i].current;
+
+ if (!listeners[i].current)
+ {
+ /* New LISTEN being aborted: remove from local and shared */
+ if (listenChannelsHash != NULL)
+ (void) hash_search(listenChannelsHash, channel,
+ HASH_REMOVE, NULL);
+
+ entry->numListeners--;
+ if (i < entry->numListeners)
+ memmove(&listeners[i], &listeners[i + 1],
+ sizeof(ListenerEntry) * (entry->numListeners - i));
+ }
+ break;
+ }
+ }
+
+ if (entry->numListeners == 0)
+ {
+ if (DsaPointerIsValid(entry->listenersArray))
+ dsa_free(channelDSA, entry->listenersArray);
+ dshash_delete_entry(channelHash, entry);
+ }
+ else
+ dshash_release_lock(channelHash, entry);
+ }
+ }
+ }
+
+
+ if (amRegisteredListener &&
+ (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0))
asyncQueueUnregister();
/* And clean up */
@@ -1854,20 +2400,29 @@ asyncQueueReadAllNotifications(void)
QueuePosition head;
Snapshot snapshot;
- /* Fetch current state */
+ /*
+ * Fetch current state, indicate to others that we have woken up, and that
+ * we now will be advancing our position.
+ */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+ QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
+ head = QUEUE_HEAD;
pos = QUEUE_BACKEND_POS(MyProcNumber);
- head = QUEUE_HEAD;
- LWLockRelease(NotifyQueueLock);
if (QUEUE_POS_EQUAL(pos, head))
{
/* Nothing to do, we have read all notifications already. */
+ LWLockRelease(NotifyQueueLock);
return;
}
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = true;
+ QUEUE_BACKEND_ADVANCING_POS(MyProcNumber) = head;
+
+ LWLockRelease(NotifyQueueLock);
+
/*----------
* Get snapshot we'll use to decide which xacts are still in progress.
* This is trickier than it might seem, because of race conditions.
@@ -1954,6 +2509,8 @@ asyncQueueReadAllNotifications(void)
/* Update shared state */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
+
+ QUEUE_BACKEND_IS_ADVANCING(MyProcNumber) = false;
QUEUE_BACKEND_POS(MyProcNumber) = pos;
LWLockRelease(NotifyQueueLock);
@@ -2051,7 +2608,7 @@ asyncQueueProcessPageEntries(QueuePosition *current,
* over it on the first LISTEN in a session, and not get stuck on
* it indefinitely.
*/
- if (listenChannels == NIL)
+ if (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0)
continue;
if (TransactionIdDidCommit(qe->xid))
@@ -2306,7 +2863,7 @@ ProcessIncomingNotify(bool flush)
notifyInterruptPending = false;
/* Do nothing else if we aren't actively listening */
- if (listenChannels == NIL)
+ if (listenChannelsHash == NULL || hash_get_num_entries(listenChannelsHash) == 0)
return;
if (Trace_notify)
@@ -2410,13 +2967,15 @@ AddEventToPendingNotifies(Notification *n)
{
Assert(pendingNotifies->events != NIL);
- /* Create the hash table if it's time to */
+ /* Create the hash tables if it's time to */
if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES &&
pendingNotifies->hashtab == NULL)
{
HASHCTL hash_ctl;
ListCell *l;
+ Assert(pendingNotifies->channelHashtab == NULL);
+
/* Create the hash table */
hash_ctl.keysize = sizeof(Notification *);
hash_ctl.entrysize = sizeof(struct NotificationHash);
@@ -2429,10 +2988,22 @@ AddEventToPendingNotifies(Notification *n)
&hash_ctl,
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT);
+ /* Create the channel hash table */
+ memset(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = NAMEDATALEN;
+ hash_ctl.entrysize = sizeof(struct ChannelHash);
+ hash_ctl.hcxt = CurTransactionContext;
+ pendingNotifies->channelHashtab =
+ hash_create("Pending Notify Channels",
+ 64L,
+ &hash_ctl,
+ HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
+
/* Insert all the already-existing events */
foreach(l, pendingNotifies->events)
{
Notification *oldn = (Notification *) lfirst(l);
+ char *channel = oldn->data;
bool found;
(void) hash_search(pendingNotifies->hashtab,
@@ -2440,22 +3011,42 @@ AddEventToPendingNotifies(Notification *n)
HASH_ENTER,
&found);
Assert(!found);
+
+ /* Insert channel into channelHashtab */
+ (void) hash_search(pendingNotifies->channelHashtab,
+ channel,
+ HASH_ENTER,
+ &found);
+ /* found may be true if multiple events on same channel */
}
}
/* Add new event to the list, in order */
pendingNotifies->events = lappend(pendingNotifies->events, n);
- /* Add event to the hash table if needed */
+ /* Add event to the hash tables if needed */
if (pendingNotifies->hashtab != NULL)
{
bool found;
+ Assert(pendingNotifies->channelHashtab != NULL);
+
(void) hash_search(pendingNotifies->hashtab,
&n,
HASH_ENTER,
&found);
Assert(!found);
+
+ /* Add channel to channelHashtab */
+ {
+ char *channel = n->data;
+
+ (void) hash_search(pendingNotifies->channelHashtab,
+ channel,
+ HASH_ENTER,
+ &found);
+ /* found may be true if we already have an event on this channel */
+ }
}
}
@@ -2493,7 +3084,7 @@ notification_match(const void *key1, const void *key2, Size keysize)
return 1; /* not equal */
}
-/* Clear the pendingActions and pendingNotifies lists. */
+/* Clear the pendingActions, pendingNotifies, and pendingNotifyChannels lists. */
static void
ClearPendingActionsAndNotifies(void)
{
@@ -2505,6 +3096,8 @@ ClearPendingActionsAndNotifies(void)
*/
pendingActions = NULL;
pendingNotifies = NULL;
+ pendingNotifyChannels = NIL;
+ pendingListenChannels = NIL;
}
/*
@@ -2515,3 +3108,16 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("notify_buffers", newval);
}
+
+
+/*
+ * ChannelHashPrepareKey
+ * Prepare a channel key for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel)
+{
+ memset(key, 0, sizeof(ChannelHashKey));
+ key->dboid = dboid;
+ strlcpy(key->channel, channel, NAMEDATALEN);
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index dcfadbd5aae..32b0b21f184 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -371,6 +371,7 @@ SubtransBuffer "Waiting for I/O on a sub-transaction SLRU buffer."
MultiXactOffsetBuffer "Waiting for I/O on a multixact offset SLRU buffer."
MultiXactMemberBuffer "Waiting for I/O on a multixact member SLRU buffer."
NotifyBuffer "Waiting for I/O on a <command>NOTIFY</command> message SLRU buffer."
+NotifyChannelHash "Waiting to access the <command>NOTIFY</command> channel hash table."
SerialBuffer "Waiting for I/O on a serializable transaction conflict SLRU buffer."
WALInsert "Waiting to insert WAL data into a memory buffer."
BufferContent "Waiting to access a data page in memory."
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 533344509e9..277a78e7954 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -102,6 +102,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
+PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash)
PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5c88fa92f4e..973d4a449fd 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -421,6 +421,8 @@ CatalogIdMapEntry
CatalogIndexState
ChangeVarNodes_callback
ChangeVarNodes_context
+ChannelEntry
+ChannelHashKey
CheckPoint
CheckPointStmt
CheckpointStatsData
--
2.50.1
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Optimize LISTEN/NOTIFY
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox