public inbox for [email protected]
help / color / mirror / Atom feedFrom: Joel Jacobson <[email protected]>
To: Tom Lane <[email protected]>
Cc: Matheus Alcantara <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Wed, 08 Oct 2025 16:31:24 +0200
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
<[email protected]>
<[email protected]>
<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAFY6G8dap-bCnAnMG-2Gzew8yv2Vbi9gsx9+yszKMmd57ygfvA@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
On Tue, Oct 7, 2025, at 22:15, Tom Lane wrote:
> "Joel Jacobson" <[email protected]> writes:
>> Ops, I see I got the list_member() code wrong. I've changed it to now
>> create String nodes, and then use strVal().
>
> Might be better to revert to the previous coding. Using String
> nodes is going to roughly double the space eaten for the list,
> and it seems like it's not buying you a lot.
>
>> I also changed back to dshash_find(..., false) in SignalBackends(),
>> since that makes more sense to me, since we're not modifying entry.
>
> Agreed.
>
> I did a code coverage run and it seems like things are in pretty
> good shape already. async.c is about 88% covered and a lot of the
> omissions are either Trace_notify or unreached error reports, which
> I'm not especially concerned about. The visible coverage gaps are:
>
> 1. asyncQueueFillWarning. This wasn't covered before either, because
> it doesn't seem very practical to exercise it in an everyday
> regression test. Since your patch doesn't touch that code nor the
> queue contents, I'm not concerned here.
I agree.
> 2. AtSubCommit_Notify's reparenting stanza. This is a pre-existing
> omission too, but maybe worth doing something about?
>
> 3. AtSubAbort_Notify's pendingActions cleanup loop; same comments.
>
> 4. notification_match is not called at all. Again, pre-existing
> coverage gap.
I've added test coverage for all three items above.
> 5. ChannelHashAddListener: "already registered" case is not reached,
> which surprises me a bit, and neither is the "grow the array" stanza.
> Since this is new code it might be worth adding coverage.
I've added a test for the "grow the array" stanza.
The "already registered" case seems impossible to reach, since the
caller, Exec_ListenCommit, returns early if IsListeningOn.
Patches:
0001-optimize_listen_notify-v12.patch:
Improve LISTEN/NOTIFY test coverage
0002-optimize_listen_notify-v12.patch:
Optimize LISTEN/NOTIFY with channel-specific listener tracking
I split this into two patches, to make it easier to verify that the
second patch doesn't affect the tests added by the first patch. The 0001
patch also includes the "grow the array" test, which is pointless
without the 0002 patch, but felt better to add it first anyway.
I've also made changes in v12 based on feedback from Chao Li, to which I
will reply to shortly.
/Joel
Attachments:
[application/octet-stream] 0001-optimize_listen_notify-v12.patch (7.8K, 2-0001-optimize_listen_notify-v12.patch)
download | inline diff:
From 960f8aba7d76c35ba4049f6e94a11a4118e5a438 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Wed, 8 Oct 2025 09:30:54 +0200
Subject: [PATCH 1/2] Improve LISTEN/NOTIFY test coverage
This adds isolation tests to cover previously untested code paths:
* Check simple NOTIFY reparenting when parent has no action
* Check LISTEN reparenting in subtransaction
* Check LISTEN merge path when both outer and inner transactions have actions
* Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions)
* Check notification_match function (triggered by hash table duplicate detection)
This also adds a test to prepare for the next patch:
* Check ChannelHashAddListener array growth
---
src/test/isolation/expected/async-notify.out | 103 ++++++++++++++++++-
src/test/isolation/specs/async-notify.spec | 52 ++++++++++
2 files changed, 154 insertions(+), 1 deletion(-)
diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893..9c19843d2d7 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 5 sessions
starting permutation: listenc notify1 notify2 notify3 notifyf
step listenc: LISTEN c1; LISTEN c2;
@@ -47,6 +47,105 @@ notifier: NOTIFY "c2" with payload "payload" from notifier
notifier: NOTIFY "c1" with payload "payloads" from notifier
notifier: NOTIFY "c2" with payload "payloads" from notifier
+starting permutation: listenc notifys_simple
+step listenc: LISTEN c1; LISTEN c2;
+step notifys_simple:
+ BEGIN;
+ SAVEPOINT s1;
+ NOTIFY c1, 'simple1';
+ NOTIFY c2, 'simple2';
+ RELEASE SAVEPOINT s1;
+ COMMIT;
+
+notifier: NOTIFY "c1" with payload "simple1" from notifier
+notifier: NOTIFY "c2" with payload "simple2" from notifier
+
+starting permutation: lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lslisten_outer: LISTEN c3;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrollback: ROLLBACK TO SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify_check: NOTIFY c1, 'should_not_receive';
+
+starting permutation: listenc notify_many_with_dup
+step listenc: LISTEN c1; LISTEN c2;
+step notify_many_with_dup:
+ BEGIN;
+ SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+ SELECT pg_notify('c1', 'msg1');
+ COMMIT;
+
+pg_notify
+---------
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+(17 rows)
+
+pg_notify
+---------
+
+(1 row)
+
+notifier: NOTIFY "c1" with payload "msg1" from notifier
+notifier: NOTIFY "c1" with payload "msg2" from notifier
+notifier: NOTIFY "c1" with payload "msg3" from notifier
+notifier: NOTIFY "c1" with payload "msg4" from notifier
+notifier: NOTIFY "c1" with payload "msg5" from notifier
+notifier: NOTIFY "c1" with payload "msg6" from notifier
+notifier: NOTIFY "c1" with payload "msg7" from notifier
+notifier: NOTIFY "c1" with payload "msg8" from notifier
+notifier: NOTIFY "c1" with payload "msg9" from notifier
+notifier: NOTIFY "c1" with payload "msg10" from notifier
+notifier: NOTIFY "c1" with payload "msg11" from notifier
+notifier: NOTIFY "c1" with payload "msg12" from notifier
+notifier: NOTIFY "c1" with payload "msg13" from notifier
+notifier: NOTIFY "c1" with payload "msg14" from notifier
+notifier: NOTIFY "c1" with payload "msg15" from notifier
+notifier: NOTIFY "c1" with payload "msg16" from notifier
+notifier: NOTIFY "c1" with payload "msg17" from notifier
+
+starting permutation: listenc llisten l2listen l3listen lslisten
+step listenc: LISTEN c1; LISTEN c2;
+step llisten: LISTEN c1; LISTEN c2;
+step l2listen: LISTEN c1;
+step l3listen: LISTEN c1;
+step lslisten: LISTEN c1; LISTEN c2;
+
starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
step llisten: LISTEN c1; LISTEN c2;
step notify1: NOTIFY c1;
@@ -95,6 +194,8 @@ listener: NOTIFY "c2" with payload "" from notifier
starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
step l2listen: LISTEN c1;
+listener2: NOTIFY "c1" with payload "" from notifier
+listener2: NOTIFY "c1" with payload "" from notifier
step l2begin: BEGIN;
step notify1: NOTIFY c1;
step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE;
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..942b09d5735 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -31,6 +31,20 @@ step notifys1 {
ROLLBACK TO SAVEPOINT s2;
COMMIT;
}
+step notifys_simple {
+ BEGIN;
+ SAVEPOINT s1;
+ NOTIFY c1, 'simple1';
+ NOTIFY c2, 'simple2';
+ RELEASE SAVEPOINT s1;
+ COMMIT;
+}
+step notify_many_with_dup {
+ BEGIN;
+ SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+ SELECT pg_notify('c1', 'msg1');
+ COMMIT;
+}
step usage { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
step bignotify { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
teardown { UNLISTEN *; }
@@ -53,6 +67,26 @@ step l2begin { BEGIN; }
step l2commit { COMMIT; }
step l2stop { UNLISTEN *; }
+# Third listener session for testing array growth.
+
+session listener3
+step l3listen { LISTEN c1; }
+teardown { UNLISTEN *; }
+
+# Session for testing LISTEN in subtransaction with separate steps.
+
+session listen_subxact
+step lsbegin { BEGIN; }
+step lslisten_outer { LISTEN c3; }
+step lssavepoint { SAVEPOINT s1; }
+step lslisten { LISTEN c1; LISTEN c2; }
+step lsrelease { RELEASE SAVEPOINT s1; }
+step lsrollback { ROLLBACK TO SAVEPOINT s1; }
+step lscommit { COMMIT; }
+step lsnotify { NOTIFY c1, 'subxact_test'; }
+step lsnotify_check { NOTIFY c1, 'should_not_receive'; }
+teardown { UNLISTEN *; }
+
# Trivial cases.
permutation listenc notify1 notify2 notify3 notifyf
@@ -60,6 +94,24 @@ permutation listenc notify1 notify2 notify3 notifyf
# Check simple and less-simple deduplication.
permutation listenc notifyd1 notifyd2 notifys1
+# Check simple NOTIFY reparenting when parent has no action.
+permutation listenc notifys_simple
+
+# Check LISTEN reparenting in subtransaction.
+permutation lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN merge path when both outer and inner transactions have actions.
+permutation lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions).
+permutation lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+
+# Check notification_match function (triggered by hash table duplicate detection).
+permutation listenc notify_many_with_dup
+
+# Check ChannelHashAddListener array growth.
+permutation listenc llisten l2listen l3listen lslisten
+
# Cross-backend notification delivery. We use a "select 1" to force the
# listener session to check for notifies. In principle we could just wait
# for delivery, but that would require extra support in isolationtester
--
2.50.1
[application/octet-stream] 0002-optimize_listen_notify-v12.patch (22.4K, 3-0002-optimize_listen_notify-v12.patch)
download | inline diff:
From cd483d06907b0879e96983f2663b3b5b75a79eb5 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Tue, 7 Oct 2025 20:56:47 +0200
Subject: [PATCH 2/2] Optimize LISTEN/NOTIFY with channel-specific listener
tracking
Currently, idle listening backends cause a dramatic slowdown due to
context switching when they are signaled and wake up. This is wasteful
when they are not listening to the channel being notified.
Just 10 extra idle listening connections cause a slowdown from 8700 TPS
to 6100 TPS, 100 extra cause it to drop to 2000 TPS, and at 1000 extra
it falls to 250 TPS.
This patch introduces targeted signaling for LISTEN/NOTIFY, improving
scalability in workloads with many idle listeners.
A dynamic shared hash table now tracks which backends listen on each
(database, channel) pair, which SignalBackends() uses to perform
targeted signaling. In addition, it staggers wakeups by signaling one
backend at the global tail to help it advance gradually, and forces any
excessively lagging backends to catch up. A per-backend wakeup_pending
flag avoids redundant signals.
---
src/backend/commands/async.c | 470 ++++++++++++++++--
.../utils/activity/wait_event_names.txt | 1 +
src/include/storage/lwlocklist.h | 1 +
src/tools/pgindent/typedefs.list | 2 +
4 files changed, 436 insertions(+), 38 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..efa25740c9c 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,10 @@
* All notification messages are placed in the queue and later read out
* by listening backends.
*
- * There is no central knowledge of which backend listens on which channel;
- * every backend has its own list of interesting channels.
+ * We also maintain a dynamic shared hash table (dshash) that maps channel
+ * names to the set of backends listening on each channel. This table is
+ * created lazily on the first LISTEN command and grows dynamically as
+ * needed.
*
* Although there is only one queue, notifications are treated as being
* database-local; this is done by including the sender's database OID
@@ -71,13 +73,17 @@
* make any actual updates to the effective listen state (listenChannels).
* Then we signal any backends that may be interested in our messages
* (including our own backend, if listening). This is done by
- * SignalBackends(), which scans the list of listening backends and sends a
- * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- * know which backend is listening on which channel so we must signal them
- * all). We can exclude backends that are already up to date, though, and
- * we can also exclude backends that are in other databases (unless they
- * are way behind and should be kicked to make them advance their
- * pointers).
+ * SignalBackends(), which consults the shared channel hash table to
+ * identify listeners for the channels that have pending notifications
+ * in the current database. Each selected backend is marked as having a
+ * wakeup pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
+ * signal is sent to it.
+ *
+ * To maintain queue health, SignalBackends() also wakes one backend
+ * positioned at the global queue tail to help advance it, and signals
+ * any backend that has fallen too far behind to catch up. These measures
+ * prevent the notification queue from growing indefinitely, while mostly
+ * limiting wakeups to the backends that actually need them.
*
* Finally, after we are out of the transaction altogether and about to go
* idle, we scan the queue for messages that need to be sent to our
@@ -128,6 +134,7 @@
#include <limits.h>
#include <unistd.h>
#include <signal.h>
+#include <string.h>
#include "access/parallel.h"
#include "access/slru.h"
@@ -137,14 +144,17 @@
#include "commands/async.h"
#include "common/hashfn.h"
#include "funcapi.h"
+#include "lib/dshash.h"
#include "libpq/libpq.h"
#include "libpq/pqformat.h"
#include "miscadmin.h"
+#include "storage/dsm_registry.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procsignal.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
+#include "utils/dsa.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
@@ -162,6 +172,29 @@
*/
#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
+/*
+ * Channel hash table definitions
+ *
+ * This hash table maps (database OID, channel name) keys to arrays of
+ * ProcNumbers representing the backends listening on each channel.
+ */
+
+#define INITIAL_LISTENERS_ARRAY_SIZE 4
+
+typedef struct ChannelHashKey
+{
+ Oid dboid;
+ char channel[NAMEDATALEN];
+} ChannelHashKey;
+
+typedef struct ChannelEntry
+{
+ ChannelHashKey key;
+ dsa_pointer listenersArray; /* DSA pointer to ProcNumber array */
+ int numListeners; /* Number of listeners currently stored */
+ int allocatedListeners; /* Allocated size of array */
+} ChannelEntry;
+
/*
* Struct representing an entry in the global notify queue
*
@@ -227,8 +260,8 @@ typedef struct QueuePosition
/*
* Parameter determining how often we try to advance the tail pointer:
* we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is
- * also the distance by which a backend in another database needs to be
- * behind before we'll decide we need to wake it up to advance its pointer.
+ * also the distance by which a backend needs to be behind before we'll
+ * decide we need to wake it up to advance its pointer.
*
* Resist the temptation to make this really large. While that would save
* work in some places, it would add cost in others. In particular, this
@@ -246,6 +279,7 @@ typedef struct QueueBackendStatus
Oid dboid; /* backend's database OID, or InvalidOid */
ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
QueuePosition pos; /* backend has read queue up to here */
+ bool wakeupPending; /* signal sent but not yet processed */
} QueueBackendStatus;
/*
@@ -288,11 +322,91 @@ typedef struct AsyncQueueControl
ProcNumber firstListener; /* id of first listener, or
* INVALID_PROC_NUMBER */
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
+ dsa_handle channelHashDSA;
+ dshash_table_handle channelHashDSH;
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
} AsyncQueueControl;
static AsyncQueueControl *asyncQueueControl;
+static dsa_area *channelDSA = NULL;
+static dshash_table *channelHash = NULL;
+static dshash_hash channelHashFunc(const void *key, size_t size, void *arg);
+
+/* parameters for the channel hash table */
+static const dshash_parameters channelDSHParams = {
+ sizeof(ChannelHashKey),
+ sizeof(ChannelEntry),
+ dshash_memcmp,
+ channelHashFunc,
+ dshash_memcpy,
+ LWTRANCHE_NOTIFY_CHANNEL_HASH
+};
+
+/*
+ * channelHashFunc
+ * Hash function for channel keys.
+ */
+static dshash_hash
+channelHashFunc(const void *key, size_t size, void *arg)
+{
+ const ChannelHashKey *k = (const ChannelHashKey *) key;
+ dshash_hash h;
+
+ h = DatumGetUInt32(hash_uint32(k->dboid));
+ h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+ strnlen(k->channel, NAMEDATALEN)));
+
+ return h;
+}
+
+/*
+ * initChannelHash
+ * Lazy initialization of the channel hash table.
+ */
+static void
+initChannelHash(void)
+{
+ MemoryContext oldcontext;
+
+ /* Quick exit if we already did this */
+ if (asyncQueueControl->channelHashDSH != DSHASH_HANDLE_INVALID &&
+ channelHash != NULL)
+ return;
+
+ /* Otherwise, use a lock to ensure only one process creates the table */
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+ /* Be sure any local memory allocated by DSA routines is persistent */
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+ if (asyncQueueControl->channelHashDSH == DSHASH_HANDLE_INVALID)
+ {
+ /* Initialize dynamic shared hash table for channel hash */
+ channelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
+ dsa_pin(channelDSA);
+ dsa_pin_mapping(channelDSA);
+ channelHash = dshash_create(channelDSA, &channelDSHParams, NULL);
+
+ /* Store handles in shared memory for other backends to use */
+ asyncQueueControl->channelHashDSA = dsa_get_handle(channelDSA);
+ asyncQueueControl->channelHashDSH =
+ dshash_get_hash_table_handle(channelHash);
+ }
+ else if (!channelHash)
+ {
+ /* Attach to existing dynamic shared hash table */
+ channelDSA = dsa_attach(asyncQueueControl->channelHashDSA);
+ dsa_pin_mapping(channelDSA);
+ channelHash = dshash_attach(channelDSA, &channelDSHParams,
+ asyncQueueControl->channelHashDSH,
+ NULL);
+ }
+
+ MemoryContextSwitchTo(oldcontext);
+ LWLockRelease(NotifyQueueLock);
+}
+
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
@@ -301,6 +415,7 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING(i) (asyncQueueControl->backend[i].wakeupPending)
/*
* The SLRU buffer area through which we access the notification queue
@@ -457,6 +572,10 @@ static void AddEventToPendingNotifies(Notification *n);
static uint32 notification_hash(const void *key, Size keysize);
static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
+static inline void ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel);
+static void ChannelHashAddListener(const char *channel);
+static void ChannelHashRemoveListener(const char *channel);
+static List *GetPendingNotifyChannels(void);
/*
* Compute the difference between two queue page numbers.
@@ -521,12 +640,16 @@ AsyncShmemInit(void)
QUEUE_STOP_PAGE = 0;
QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
asyncQueueControl->lastQueueFillWarn = 0;
+ asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
+ asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+
for (int i = 0; i < MaxBackends; i++)
{
QUEUE_BACKEND_PID(i) = InvalidPid;
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
}
}
@@ -1152,6 +1275,7 @@ Exec_ListenCommit(const char *channel)
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
listenChannels = lappend(listenChannels, pstrdup(channel));
MemoryContextSwitchTo(oldcontext);
+ ChannelHashAddListener(channel);
}
/*
@@ -1175,6 +1299,7 @@ Exec_UnlistenCommit(const char *channel)
{
listenChannels = foreach_delete_current(listenChannels, q);
pfree(lchan);
+ ChannelHashRemoveListener(channel);
break;
}
}
@@ -1193,9 +1318,18 @@ Exec_UnlistenCommit(const char *channel)
static void
Exec_UnlistenAllCommit(void)
{
+ ListCell *lc;
+
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
+ foreach(lc, listenChannels)
+ {
+ char *channel = (char *) lfirst(lc);
+
+ ChannelHashRemoveListener(channel);
+ }
+
list_free_deep(listenChannels);
listenChannels = NIL;
}
@@ -1565,12 +1699,16 @@ asyncQueueFillWarning(void)
/*
* Send signals to listening backends.
*
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send. However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind. Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * Normally we signal only backends registered as listeners for channels
+ * with pending notifications. However, when there is no traffic on some
+ * channels, listeners on such channels will fall further and further
+ * behind. Waken them if they are too far behind, so that they'll
+ * advance their queue position pointers, allowing the global tail to
+ * advance.
+ *
+ * To stagger wakeups of lagging backends, wake the backend furthest
+ * behind (at the tail), amortizing the context-switching cost across
+ * successive notifications instead of paying it all at once.
*
* Since we know the ProcNumber and the Pid the signaling is quite cheap.
*
@@ -1583,6 +1721,9 @@ SignalBackends(void)
int32 *pids;
ProcNumber *procnos;
int count;
+ List *channels;
+ ListCell *lc;
+ int64 queue_length;
/*
* Identify backends that we need to signal. We don't want to send
@@ -1596,37 +1737,109 @@ SignalBackends(void)
procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
count = 0;
+ channels = GetPendingNotifyChannels();
+
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+ foreach(lc, channels)
{
- int32 pid = QUEUE_BACKEND_PID(i);
- QueuePosition pos;
+ char *channel = (char *) lfirst(lc);
+ ChannelEntry *entry = NULL;
+ ProcNumber *listeners;
- Assert(pid != InvalidPid);
- pos = QUEUE_BACKEND_POS(i);
- if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+ if (channelHash != NULL)
{
- /*
- * Always signal listeners in our own database, unless they're
- * already caught up (unlikely, but possible).
- */
+ ChannelHashKey key;
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+ entry = dshash_find(channelHash, &key, false);
+ }
+
+ if (entry == NULL)
+ continue; /* No listeners registered for this channel */
+
+ listeners = (ProcNumber *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (int j = 0; j < entry->numListeners; j++)
+ {
+ ProcNumber i = listeners[j];
+ int32 pid;
+ QueuePosition pos;
+
+ if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+ continue;
+
+ pos = QUEUE_BACKEND_POS(i);
+ pid = QUEUE_BACKEND_PID(i);
+
+ /* Skip if caught up or wrong database */
if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
continue;
+ if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+ continue;
+
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ pids[count] = pid;
+ procnos[count] = i;
+ count++;
}
- else
+
+ dshash_release_lock(channelHash, entry);
+ }
+
+ queue_length = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+ QUEUE_POS_PAGE(QUEUE_TAIL));
+
+ /* Check for lagging backends when the queue spans multiple pages */
+ if (queue_length > 0)
+ {
+ bool tail_woken = false;
+
+ for (ProcNumber i = QUEUE_FIRST_LISTENER;
+ i != INVALID_PROC_NUMBER;
+ i = QUEUE_NEXT_LISTENER(i))
{
- /*
- * Listeners in other databases should be signaled only if they
- * are far behind.
- */
- if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
- QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
+ QueuePosition pos;
+ int64 lag;
+ int32 pid;
+
+ if (QUEUE_BACKEND_WAKEUP_PENDING(i))
continue;
+
+ pos = QUEUE_BACKEND_POS(i);
+
+ /* Signal one backend positioned at the global tail */
+ if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL),
+ QUEUE_POS_PAGE(pos)) == 0)
+ {
+ pid = QUEUE_BACKEND_PID(i);
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ pids[count] = pid;
+ procnos[count] = i;
+ count++;
+ tail_woken = true;
+ continue;
+ }
+
+ lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+ QUEUE_POS_PAGE(pos));
+
+ /* Need to signal if a backend has fallen too far behind */
+ if (lag >= QUEUE_CLEANUP_DELAY)
+ {
+ pid = QUEUE_BACKEND_PID(i);
+ Assert(pid != InvalidPid);
+
+ QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+ pids[count] = pid;
+ procnos[count] = i;
+ count++;
+ }
}
- /* OK, need to signal this one */
- pids[count] = pid;
- procnos[count] = i;
- count++;
}
LWLockRelease(NotifyQueueLock);
@@ -1865,6 +2078,7 @@ asyncQueueReadAllNotifications(void)
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+ QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
pos = QUEUE_BACKEND_POS(MyProcNumber);
head = QUEUE_HEAD;
LWLockRelease(NotifyQueueLock);
@@ -2395,3 +2609,183 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("notify_buffers", newval);
}
+
+
+/*
+ * ChannelHashPrepareKey
+ * Prepare a channel key for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel)
+{
+ memset(key, 0, sizeof(ChannelHashKey));
+ key->dboid = dboid;
+ strlcpy(key->channel, channel, NAMEDATALEN);
+}
+
+/*
+ * ChannelHashAddListener
+ * Register as a listener for the specified channel.
+ */
+static void
+ChannelHashAddListener(const char *channel)
+{
+ ChannelHashKey key;
+ ChannelEntry *entry;
+ bool found;
+ ProcNumber *listeners;
+
+ initChannelHash();
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+ /*
+ * For new entries, we initialize listenersArray to InvalidDsaPointer as
+ * a marker. This handles both the initial creation and potential retry
+ * after OOM.
+ */
+ entry = dshash_find_or_insert(channelHash, &key, &found);
+
+ if (!found)
+ entry->listenersArray = InvalidDsaPointer;
+
+ if (!DsaPointerIsValid(entry->listenersArray))
+ {
+ /* First listener for this channel */
+ entry->listenersArray = dsa_allocate(channelDSA,
+ sizeof(ProcNumber) * INITIAL_LISTENERS_ARRAY_SIZE);
+ entry->numListeners = 0;
+ entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
+ }
+
+ listeners = (ProcNumber *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (int i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i] == MyProcNumber)
+ {
+ dshash_release_lock(channelHash, entry);
+ return; /* Already registered */
+ }
+ }
+
+ /* Need to add this listener */
+ if (entry->numListeners >= entry->allocatedListeners)
+ {
+ /* Grow the array (double the size) */
+ int new_size = entry->allocatedListeners * 2;
+ dsa_pointer new_array = dsa_allocate(channelDSA,
+ sizeof(ProcNumber) * new_size);
+ ProcNumber *new_listeners = (ProcNumber *) dsa_get_address(channelDSA,
+ new_array);
+
+ /* Copy existing listeners */
+ memcpy(new_listeners, listeners,
+ sizeof(ProcNumber) * entry->numListeners);
+
+ /* Free old array and update entry */
+ dsa_free(channelDSA, entry->listenersArray);
+ entry->listenersArray = new_array;
+ entry->allocatedListeners = new_size;
+ listeners = new_listeners;
+ }
+
+ /* Add the new listener */
+ listeners[entry->numListeners] = MyProcNumber;
+ entry->numListeners++;
+
+ dshash_release_lock(channelHash, entry);
+}
+
+/*
+ * ChannelHashRemoveListener
+ * Unregister as a listener for the specified channel.
+ */
+static void
+ChannelHashRemoveListener(const char *channel)
+{
+ ChannelHashKey key;
+ ChannelEntry *entry;
+ ProcNumber *listeners;
+ int i;
+
+ if (channelHash == NULL)
+ return;
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+ entry = dshash_find(channelHash, &key, true);
+ if (entry == NULL)
+ return;
+
+ listeners = (ProcNumber *) dsa_get_address(channelDSA,
+ entry->listenersArray);
+
+ for (i = 0; i < entry->numListeners; i++)
+ {
+ if (listeners[i] == MyProcNumber)
+ {
+ /* Found it, remove by shifting remaining elements */
+ entry->numListeners--;
+ if (i < entry->numListeners)
+ {
+ memmove(&listeners[i], &listeners[i + 1],
+ sizeof(ProcNumber) * (entry->numListeners - i));
+ }
+
+ if (entry->numListeners == 0)
+ {
+ dsa_free(channelDSA, entry->listenersArray);
+ dshash_delete_entry(channelHash, entry);
+ }
+ else
+ {
+ dshash_release_lock(channelHash, entry);
+ }
+ return;
+ }
+ }
+
+ /* Not found in list */
+ dshash_release_lock(channelHash, entry);
+}
+
+/*
+ * GetPendingNotifyChannels
+ * Get list of unique channel names from pending notifications.
+ */
+static List *
+GetPendingNotifyChannels(void)
+{
+ List *channels = NIL;
+ ListCell *p;
+ ListCell *q;
+ bool found;
+
+ if (!pendingNotifies)
+ return NIL;
+
+ foreach(p, pendingNotifies->events)
+ {
+ Notification *n = (Notification *) lfirst(p);
+ char *channel = n->data;
+
+ found = false;
+ foreach(q, channels)
+ {
+ char *existing = (char *) lfirst(q);
+
+ if (strcmp(existing, channel) == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ channels = lappend(channels, channel);
+ }
+
+ return channels;
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7553f6eacef..a4fadbd0767 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -366,6 +366,7 @@ SubtransBuffer "Waiting for I/O on a sub-transaction SLRU buffer."
MultiXactOffsetBuffer "Waiting for I/O on a multixact offset SLRU buffer."
MultiXactMemberBuffer "Waiting for I/O on a multixact member SLRU buffer."
NotifyBuffer "Waiting for I/O on a <command>NOTIFY</command> message SLRU buffer."
+NotifyChannelHash "Waiting to access the <command>NOTIFY</command> channel hash table."
SerialBuffer "Waiting for I/O on a serializable transaction conflict SLRU buffer."
WALInsert "Waiting to insert WAL data into a memory buffer."
BufferContent "Waiting to access a data page in memory."
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 06a1ffd4b08..2768ddf4414 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -100,6 +100,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
+PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash)
PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 37f26f6c6b7..2d9e2ae2b02 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -411,6 +411,8 @@ CatalogIdMapEntry
CatalogIndexState
ChangeVarNodes_callback
ChangeVarNodes_context
+ChannelEntry
+ChannelHashKey
CheckPoint
CheckPointStmt
CheckpointStatsData
--
2.50.1
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Optimize LISTEN/NOTIFY
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox