public inbox for [email protected]
help / color / mirror / Atom feedFrom: Joel Jacobson <[email protected]>
To: Tom Lane <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Tue, 15 Jul 2025 09:20:58 +0200
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<[email protected]>
On Sun, Jul 13, 2025, at 01:18, Tom Lane wrote:
> "Joel Jacobson" <[email protected]> writes:
>> The attached proof-of-concept patch proposes a straightforward
>> optimization for the single-listener case. It introduces a shared-memory
>> hash table mapping (dboid, channelname) to the ProcNumber of a single
>> listener.
>
> What does that do to the cost and parallelizability of LISTEN/UNLISTEN?
Good point. The previous patch would effectively force all LISTEN/UNLISTEN
to be serialized, which would at least hurt parallelizability.
New benchmark confirm this hypothesis.
New patch attached that combines two complementary approaches, that together
seems to scale well for both common-channel and unique-channel scenarios:
1. Partitioned Hash Locking
The Channel Hash now uses HASH_PARTITION, with an array of NUM_NOTIFY_PARTITIONS
lightweight locks. A given channel is mapped to a partition lock using
a custom hash function on (dboid, channelname).
This allows LISTEN/UNLISTEN operations on different channels to proceed
concurrently without fighting over a single global lock, addressing the
"many distinct channels" use-case.
2. Optimistic Read-Locking
For the "many backends on one channel" use-case, lock acquisition now follows
a read-then-upgrade pattern. We first acquire a LW_SHARED lock, to check the
channel's state. If the channel is already marked as has_multiple_listeners,
we can return immediately without any need for a write.
Only if we are the first or second listener on a channel do we release
the shared lock and acquire an LW_EXCLUSIVE lock to modify the hash entry.
After getting the exclusive lock, we re-verify the state to guard against
race conditions. This avoids serializing the third and all subsequent
listeners for a popular channel.
BENCHMARK
https://raw.githubusercontent.com/joelonsql/pg-bench-listen-notify/refs/heads/master/performance_ove...
https://raw.githubusercontent.com/joelonsql/pg-bench-listen-notify/refs/heads/master/performance_ove...
I didn't want to attached the images to this email because they are quite large,
due to all the details in the images.
However, since it's important this mailing list contains all relevant data discussed,
I've also included all data in the graphs formatted in ASCII/Markdown:
performance_overview.md
I've also included the raw parsed data from the pgbench output,
which has been used as input to create performance_overview.md
as well as the images:
pgbench_results_combined.csv
I've benchmarked five times per measurement, in random order.
All raw measurements have been included in the Markdown document
within { curly braces } sorted, next to the average values, to get an idea
of the variance. Stddev felt possibly misleading since I'm not sure the
data points are normally distributed, since it's benchmarking data.
I've run the benchmarks on my MacBook Pro Apple M3 Max,
using `caffeinate -dims pgbench ...`.
>> The patch also includes a "wake only tail" optimization (contributed by
>> Marko Tikkaja) to help prevent backends from falling too far behind.
>
> Coulda sworn we dealt with that case some years ago. In any case,
> if it's independent of the other idea it should probably get its
> own thread.
Maybe it's been dealt with by some other part of the system, but I can't
find any such code anywhere, it's only async.c that currently sends
PROCSIG_NOTIFY_INTERRUPT.
The wake only tail mechanism seems almost perfect, but I can think of at least
one edge-case where we could still get a problem situation:
With lots of idle backends, the rate of this one-by-one catch-up may not be fast
enough to outpace the queue's advancement, causing other idle backends
to eventually lag by more than the QUEUE_CLEANUP_DELAY threshold.
To ensure all backends are eventually processed without re-introducing
the thundering herd problem, an additional mechanism seems neessary:
I see two main options:
1. Extend the chain reaction
Once woken, a backend could signal the next backend at the queue tail,
propagating the catch-up process. This would need to be managed carefully,
perhaps with some kind of global advisory lock, to prevent multiple
cascades from running at once.
2. Centralize the work
We already have the autovacuum daemon, maybe it could also be made responsible
for kicking lagging backends?
Other ideas?
/Joel
Attached:
* pgbench-scripts.tar.gz
pgbench scripts to reproduce the results, report and images.
* performance_overview.md
Same results as in the images, but in ASCII/Markdown format.
* pgbench_results_combined.csv
Parsed output from pgbench runs, used to create performance_overview.md as well as the linked images.
* 0001-Optimize-LISTEN-NOTIFY-signaling-for-single-listener-v2.patch
Old patch just renamed to -v2
* 0002-Partition-channel-hash-to-improve-LISTEN-UNLISTEN-v2.patch
New patch with the approach explained above.
Attachments:
[text/csv] pgbench_results_combined.csv (122.0K, 2-pgbench_results_combined.csv)
download
[application/x-gzip] pgbench-scripts.tar.gz (8.6K, 3-pgbench-scripts.tar.gz)
download
[application/octet-stream] performance_overview.md (21.0K, 4-performance_overview.md)
download
[application/octet-stream] 0001-Optimize-LISTEN-NOTIFY-signaling-for-single-listener-v2.patch (24.2K, 5-0001-Optimize-LISTEN-NOTIFY-signaling-for-single-listener-v2.patch)
download | inline diff:
From aba0ffb2a9e1c5d77393a92c0ce43a968c23cbb5 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 15 Jun 2025 00:09:43 +0200
Subject: [PATCH 1/2] Optimize LISTEN/NOTIFY signaling for single-listener
channels
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Previously, the implementation would signal every backend process that was
listening on any channel in the same database. This signaling is performed via
SendProcSignal(), which ultimately issues a kill(pid, SIGUSR1) syscall for each
listening backend.
This broadcast approach is well-suited for use cases like cache invalidation but
limits the scalability of application patterns where backends listen on distinct
channels. For example, a system of worker processes might use unique channel
names to direct work to a specific worker. In these scenarios, a NOTIFY intended
for a single listener unnecessarily triggers a syscall for every other listening
backend.
This commit improves scalability for such workloads by optimizing for
the single-listener case. By making this pattern more performant, we enable it
to be used more effectively in high-throughput systems, pushing PostgreSQL's
scalability limits for this class of applications. A new shared memory hash
table is introduced to track which backend process is listening on each channel.
When a NOTIFY is issued, if a channel has exactly one registered listener, we
can signal that specific backend directly.
The system gracefully falls back to broadcast behavior under two conditions:
1. When a channel has multiple backends listening to it.
2. If the shared hash table runs out of memory and cannot create a new entry.
To support this, the LISTEN and UNLISTEN commands, as well as the backend exit
cleanup logic in asyncQueueUnregister, are updated to manage entries in the new
channel hash table. The main signaling logic in SignalBackends has been reworked
to implement the targeted-vs-broadcast decision.
To ensure the global queue tail can always advance, this change also includes a
"wake only tail" optimization, contributed by Marko Tikkaja (johto). Instead
of waking all backends that are lagging far behind, this logic specifically
signals only the backend that is currently at the queue tail. This targeted
wake-up prevents a "thundering herd" of signals and relies on a chain
reaction—where each backend wakes the next—to process the queue efficiently.
This mechanism works in conjunction with both the new targeted signaling and
the broadcast fallback.
CAVEAT: This patch should be considered a first-step, proof-of-concept
optimization. It uses a simple boolean flag to distinguish single-listener
channels from multi-listener ones and does not track the full list of backends
for a multi-listener channel. As a result, it cannot remove a hash entry for
a channel once it has been marked as having multiple listeners, causing such
entries to persist even after all listeners have departed. A more complete
solution would likely involve reference counting to track all listening backends
for each channel. This would not only prevent stuck hash entries but could also
enable targeted signaling to all listeners of a specific multi-user channel,
further refining the optimization and avoiding the fallback to a full
database-wide broadcast.
---
src/backend/commands/async.c | 572 ++++++++++++++++++++++++++++++++---
1 file changed, 537 insertions(+), 35 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..a0b7daaef7d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,11 @@
* All notification messages are placed in the queue and later read out
* by listening backends.
*
- * There is no central knowledge of which backend listens on which channel;
- * every backend has its own list of interesting channels.
+ * In addition to each backend maintaining its own list of channels, we also
+ * maintain a central hash table that tracks channels with single listeners.
+ * When a channel has exactly one listening backend, we can signal just that
+ * backend. For channels with multiple listeners, we signal all listening
+ * backends.
*
* Although there is only one queue, notifications are treated as being
* database-local; this is done by including the sender's database OID
@@ -71,13 +74,16 @@
* make any actual updates to the effective listen state (listenChannels).
* Then we signal any backends that may be interested in our messages
* (including our own backend, if listening). This is done by
- * SignalBackends(), which scans the list of listening backends and sends a
- * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- * know which backend is listening on which channel so we must signal them
- * all). We can exclude backends that are already up to date, though, and
- * we can also exclude backends that are in other databases (unless they
- * are way behind and should be kicked to make them advance their
- * pointers).
+ * SignalBackends(), which has two modes of operation, depending on
+ * if any of our channels have multiple listening backends or not:
+ * a) If there are multiple listening backends, a PROCSIG_NOTIFY_INTERRUPT
+ * signal is sent to every listening backend.
+ * b) Otherwise, such signals are only sent to each single listening backend
+ * per channel.
+ * Additionally, we use a "wake only tail" optimization: we always signal
+ * the backend furthest behind in the queue to help prevent backends from
+ * getting far behind and create a chain reaction of wake-ups.
+ * We can exclude backends that are already up to date, though.
*
* Finally, after we are out of the transaction altogether and about to go
* idle, we scan the queue for messages that need to be sent to our
@@ -146,6 +152,7 @@
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
+#include "utils/hsearch.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
@@ -162,6 +169,58 @@
*/
#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
+/*
+ * Channel hash table definitions
+ *
+ * This hash table provides an optimization by tracking which backend is
+ * listening on each channel. Channels are identified by database OID and
+ * channel name, making them database-specific.
+ *
+ * When exactly one backend listens on a channel, we signal that specific
+ * backend, avoiding unnecessary signals to all listening backends.
+ *
+ * We fall back to broadcast mode and signal all listening backends when:
+ * 1) Multiple backends listen on the same channel, OR
+ * 2) The hash table runs out of shared memory for new entries
+ *
+ * Note that CHANNEL_HASH_MAX_SIZE is not a hard limit - the hash table can
+ * store more entries than this, but performance will degrade due to bucket
+ * overflow. The actual fallback to broadcast mode occurs only when shared
+ * memory is exhausted and we cannot allocate new hash entries.
+ *
+ * The maximum size (CHANNEL_HASH_MAX_SIZE) is based on the typical OS port
+ * range. This provides a reasonable upper bound for systems that use
+ * per-connection channels.
+ *
+ */
+#define CHANNEL_HASH_INIT_SIZE 256
+#define CHANNEL_HASH_MAX_SIZE 65535
+
+/*
+ * Key structure for the channel hash table.
+ * Channels are database-specific, so we need both the database OID
+ * and the channel name to uniquely identify a channel.
+ */
+typedef struct ChannelHashKey
+{
+ Oid dboid;
+ char channel[NAMEDATALEN];
+} ChannelHashKey;
+
+/*
+ * Each entry contains a channel key (database OID + channel name) and a
+ * single backend ProcNumber that is listening on that channel. If multiple
+ * backends try to listen on the same channel, we mark it as having multiple
+ * listeners and fall back to broadcast behavior.
+ */
+typedef struct ChannelEntry
+{
+ ChannelHashKey key;
+ ProcNumber listener; /* single backend ID, or INVALID_PROC_NUMBER
+ * if multiple */
+ bool has_multiple_listeners;
+} ChannelEntry;
+
/*
* Struct representing an entry in the global notify queue
*
@@ -293,6 +352,39 @@ typedef struct AsyncQueueControl
static AsyncQueueControl *asyncQueueControl;
+/* Channel hash table for single listening backend signalling */
+static HTAB *channelHash = NULL;
+
+/*
+ * GetChannelHash
+ * Get the channel hash table, initializing our backend's pointer if needed.
+ *
+ * This must be called before any access to the channel hash table.
+ * The hash table itself is created in shared memory during AsyncShmemInit,
+ * but each backend needs to get its own pointer to it.
+ */
+static HTAB *
+GetChannelHash(void)
+{
+ if (channelHash == NULL)
+ {
+ HASHCTL hash_ctl;
+
+ /* Set up to attach to the existing shared hash table */
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(ChannelHashKey);
+ hash_ctl.entrysize = sizeof(ChannelEntry);
+
+ channelHash = ShmemInitHash("Channel Hash",
+ CHANNEL_HASH_INIT_SIZE,
+ CHANNEL_HASH_MAX_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS);
+ }
+
+ return channelHash;
+}
+
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
@@ -458,6 +550,14 @@ static uint32 notification_hash(const void *key, Size keysize);
static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
+/* Channel hash table management functions */
+static inline void ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel);
+static void ChannelHashAddListener(const char *channel, ProcNumber procno);
+static void ChannelHashRemoveListener(const char *channel, ProcNumber procno);
+static void ChannelHashRemoveBackendFromAll(ProcNumber procno);
+static ChannelEntry * ChannelHashLookup(const char *channel);
+static List *GetPendingNotifyChannels(void);
+
/*
* Compute the difference between two queue page numbers.
* Previously this function accounted for a wraparound.
@@ -492,6 +592,9 @@ AsyncShmemSize(void)
size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
+ size = add_size(size, hash_estimate_size(CHANNEL_HASH_MAX_SIZE,
+ sizeof(ChannelEntry)));
+
return size;
}
@@ -546,6 +649,23 @@ AsyncShmemInit(void)
*/
(void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL);
}
+
+ /*
+ * Create or attach to the channel hash table.
+ */
+ {
+ HASHCTL hash_ctl;
+
+ MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+ hash_ctl.keysize = sizeof(ChannelHashKey);
+ hash_ctl.entrysize = sizeof(ChannelEntry);
+
+ channelHash = ShmemInitHash("Channel Hash",
+ CHANNEL_HASH_INIT_SIZE,
+ CHANNEL_HASH_MAX_SIZE,
+ &hash_ctl,
+ HASH_ELEM | HASH_BLOBS);
+ }
}
@@ -1043,6 +1163,7 @@ Exec_ListenPreCommit(void)
QueuePosition head;
QueuePosition max;
ProcNumber prevListener;
+ ListCell *p;
/*
* Nothing to do if we are already listening to something, nor if we
@@ -1110,6 +1231,18 @@ Exec_ListenPreCommit(void)
QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER;
QUEUE_FIRST_LISTENER = MyProcNumber;
}
+
+ /*
+ * Add all our channels to the channel hash table while we still hold
+ * exclusive lock on NotifyQueueLock.
+ */
+ foreach(p, listenChannels)
+ {
+ char *channel = (char *) lfirst(p);
+
+ ChannelHashAddListener(channel, MyProcNumber);
+ }
+
LWLockRelease(NotifyQueueLock);
/* Now we are listed in the global array, so remember we're listening */
@@ -1152,6 +1285,10 @@ Exec_ListenCommit(const char *channel)
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
listenChannels = lappend(listenChannels, pstrdup(channel));
MemoryContextSwitchTo(oldcontext);
+
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ ChannelHashAddListener(channel, MyProcNumber);
+ LWLockRelease(NotifyQueueLock);
}
/*
@@ -1175,6 +1312,10 @@ Exec_UnlistenCommit(const char *channel)
{
listenChannels = foreach_delete_current(listenChannels, q);
pfree(lchan);
+
+ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+ ChannelHashRemoveListener(channel, MyProcNumber);
+ LWLockRelease(NotifyQueueLock);
break;
}
}
@@ -1239,6 +1380,9 @@ asyncQueueUnregister(void)
* Need exclusive lock here to manipulate list links.
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+ ChannelHashRemoveBackendFromAll(MyProcNumber);
+
/* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
@@ -1565,12 +1709,18 @@ asyncQueueFillWarning(void)
/*
* Send signals to listening backends.
*
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send. However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind. Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * This function operates in two modes:
+ * 1. Selective mode: When all pending notification channels have exactly one
+ * listener each, we signal only those specific backends that are listening
+ * on the channels with pending notifications.
+ * 2. Broadcast mode: When any channel has multiple listeners (or we ran out
+ * of shared memory for the channel hash table), we signal all listening
+ * backends in our database.
+ *
+ * In addition to the channel-specific signaling, we also implement a "wake
+ * only tail" optimization: we signal the backend that is furthest behind
+ * in the queue to help prevent backends from getting far behind and create
+ * a chain reaction of wake-ups. This avoids thundering herd problems.
*
* Since we know the ProcNumber and the Pid the signaling is quite cheap.
*
@@ -1583,6 +1733,11 @@ SignalBackends(void)
int32 *pids;
ProcNumber *procnos;
int count;
+ List *channels;
+ ListCell *p;
+ bool *signaled;
+ bool broadcast_mode = false;
+ bool tail_woken = false;
/*
* Identify backends that we need to signal. We don't want to send
@@ -1594,40 +1749,159 @@ SignalBackends(void)
*/
pids = (int32 *) palloc(MaxBackends * sizeof(int32));
procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
+ signaled = (bool *) palloc0(MaxBackends * sizeof(bool));
count = 0;
+ /* Get list of channels that have pending notifications */
+ channels = GetPendingNotifyChannels();
+
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+ /*
+ * Check if any channel has multiple listeners, in which case we would
+ * need to signal all backends anyway.
+ */
+ foreach(p, channels)
+ {
+ char *channel = (char *) lfirst(p);
+ ChannelEntry *entry = ChannelHashLookup(channel);
+
+ /*
+ * If there is no entry, it could mean we ran out of shared memory
+ * when trying to add this channel to the hash table, so we need to
+ * broadcast in that case as well.
+ */
+ if (!entry || entry->has_multiple_listeners)
+ {
+ broadcast_mode = true;
+ break;
+ }
+ }
+
+ if (broadcast_mode)
+ {
+ /*
+ * In broadcast mode, we iterate over all listening backends and
+ * signal the ones in our database that are not already caught up.
+ */
+ for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+ {
+ int32 pid;
+ QueuePosition pos;
+
+ if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+ continue;
+
+ pos = QUEUE_BACKEND_POS(i);
+
+ /*
+ * Always signal listeners in our own database, unless they're
+ * already caught up.
+ */
+ if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+ continue;
+
+ pid = QUEUE_BACKEND_PID(i);
+ Assert(pid != InvalidPid);
+
+ /* OK, need to signal this one */
+ pids[count] = pid;
+ procnos[count] = i;
+ signaled[i] = true;
+ count++;
+ }
+ }
+ else
+ {
+ /*
+ * Signal specific listening backends
+ */
+ foreach(p, channels)
+ {
+ char *channel = (char *) lfirst(p);
+ ChannelEntry *entry = ChannelHashLookup(channel);
+
+ ProcNumber i = entry->listener;
+ int32 pid;
+ QueuePosition pos;
+
+ Assert(entry && !entry->has_multiple_listeners);
+
+ if (signaled[i])
+ continue;
+
+ pos = QUEUE_BACKEND_POS(i);
+
+ /*
+ * Skip signaling listeners if they already caught up.
+ */
+ if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+ continue;
+
+ if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+ continue;
+
+ pid = QUEUE_BACKEND_PID(i);
+ Assert(pid != InvalidPid);
+
+ /* OK, need to signal this one */
+ pids[count] = pid;
+ procnos[count] = i;
+ signaled[i] = true;
+ count++;
+ }
+ }
+
+ /*
+ * Also check for any backends that are far behind. This ensures the
+ * global tail can advance even if they're not actively receiving
+ * notifications on their channels.
+ */
for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
{
- int32 pid = QUEUE_BACKEND_PID(i);
+ int32 pid;
QueuePosition pos;
- Assert(pid != InvalidPid);
+ /*
+ * Skip if we've already decided to signal this one.
+ */
+ if (signaled[i])
+ continue;
+
pos = QUEUE_BACKEND_POS(i);
- if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
- {
- /*
- * Always signal listeners in our own database, unless they're
- * already caught up (unlikely, but possible).
- */
- if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
- continue;
- }
+
+ /*
+ * Skip signaling listeners if they already caught up.
+ */
+ if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+ continue;
+
+ /*
+ * Wake only tail optimization: Signal the backend that is furthest
+ * behind to help prevent backends from getting far behind in the
+ * first place. This creates a chain reaction where each backend
+ * eventually wakes up the next one as notifications are processed,
+ * avoiding thundering herd.
+ *
+ * Otherwise, only skip signaling listeners if they are not far
+ * behind.
+ */
+ if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL),
+ QUEUE_POS_PAGE(pos)) == 0)
+ tail_woken = true;
else
- {
- /*
- * Listeners in other databases should be signaled only if they
- * are far behind.
- */
- if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
- QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
- continue;
- }
+ continue;
+
+ pid = QUEUE_BACKEND_PID(i);
+ Assert(pid != InvalidPid);
/* OK, need to signal this one */
pids[count] = pid;
procnos[count] = i;
count++;
+
+
}
+
LWLockRelease(NotifyQueueLock);
/* Now send signals */
@@ -1657,6 +1931,7 @@ SignalBackends(void)
pfree(pids);
pfree(procnos);
+ pfree(signaled);
}
/*
@@ -2395,3 +2670,230 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("notify_buffers", newval);
}
+
+/*
+ * Channel hash table management functions
+ */
+
+/*
+ * ChannelHashPrepareKey
+ * Prepare a channel key (database OID + channel name) for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel)
+{
+ memset(key, 0, sizeof(ChannelHashKey));
+ key->dboid = dboid;
+ strlcpy(key->channel, channel, NAMEDATALEN);
+}
+
+/*
+ * ChannelHashAddListener
+ * Register the given backend as a listener for the specified channel
+ * in the shared channel hash table.
+ *
+ * Caller must hold exclusive NotifyQueueLock.
+ */
+static void
+ChannelHashAddListener(const char *channel, ProcNumber procno)
+{
+ ChannelEntry *entry;
+ bool found;
+ ChannelHashKey key;
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+ /* Look up or create the channel entry */
+ entry = (ChannelEntry *) hash_search(GetChannelHash(),
+ &key,
+ HASH_ENTER_NULL,
+ &found);
+
+ /*
+ * If hash_search returned NULL, we've run out of shared memory to
+ * allocate new hash entries. We gracefully degrade by not tracking this
+ * channel in the hash. The channel will use the fallback broadcast
+ * signalling.
+ */
+ if (entry == NULL)
+ {
+ ereport(DEBUG1,
+ (errmsg("too many notification channels are already being tracked")));
+ return;
+ }
+
+ if (!found)
+ {
+ /* New channel, initialize the entry */
+ memcpy(&entry->key, &key, sizeof(ChannelHashKey));
+ entry->listener = procno;
+ entry->has_multiple_listeners = false;
+ }
+ else
+ {
+ /* Channel already exists */
+ if (!entry->has_multiple_listeners)
+ {
+ if (entry->listener == procno)
+ return; /* Already listening */
+
+ /*
+ * Another backend is already listening on this channel. Mark it
+ * as having multiple listeners and fall back to broadcast
+ * signalling.
+ */
+ entry->has_multiple_listeners = true;
+ entry->listener = INVALID_PROC_NUMBER;
+ }
+ /* If already marked as having multiple listeners, nothing to do */
+ }
+}
+
+/*
+ * ChannelHashRemoveListener
+ * Update the channel hash when a backend stops listening on a channel.
+ *
+ * If the channel entry currently tracks exactly one listener and that
+ * listener matches the supplied procno, remove the entry altogether.
+ *
+ * If the channel has already been flagged as having multiple listeners,
+ * we no longer track individual backends; therefore we cannot remove a
+ * single backend without additional bookkeeping. In that situation we
+ * simply leave the entry in place (still marked as having multiple
+ * listeners) and return.
+ *
+ * Caller must hold exclusive NotifyQueueLock.
+ */
+static void
+ChannelHashRemoveListener(const char *channel, ProcNumber procno)
+{
+ ChannelEntry *entry;
+ ChannelHashKey key;
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+ /* Look up the channel entry */
+ entry = (ChannelEntry *) hash_search(GetChannelHash(),
+ &key,
+ HASH_FIND,
+ NULL);
+
+ if (!entry)
+ return; /* Channel not found */
+
+ /*
+ * If this channel has multiple listeners, we can't track individual
+ * removals. Just leave it marked as having multiple listeners.
+ */
+ if (entry->has_multiple_listeners)
+ return;
+
+ /* If this backend is the single listener, remove the channel entry */
+ if (entry->listener == procno)
+ {
+ hash_search(GetChannelHash(),
+ &key,
+ HASH_REMOVE,
+ NULL);
+ }
+}
+
+/*
+ * ChannelHashRemoveBackendFromAll
+ * Sweep the channel hash and delete any channel entries for which
+ * this backend is the only tracked listener in the current database.
+ *
+ * Caller must hold exclusive NotifyQueueLock.
+ */
+static void
+ChannelHashRemoveBackendFromAll(ProcNumber procno)
+{
+ HASH_SEQ_STATUS status;
+ ChannelEntry *entry;
+
+ hash_seq_init(&status, GetChannelHash());
+
+ while ((entry = (ChannelEntry *) hash_seq_search(&status)) != NULL)
+ {
+ if (entry->key.dboid != MyDatabaseId)
+ continue;
+
+ if (entry->has_multiple_listeners)
+ continue;
+
+ if (entry->listener == procno)
+ {
+ hash_search(GetChannelHash(),
+ &entry->key,
+ HASH_REMOVE,
+ NULL);
+ }
+ }
+}
+
+/*
+ * ChannelHashLookup
+ * Look up the channel hash entry for the given channel name in the
+ * current database.
+ *
+ * Returns NULL if the channel is not being tracked (no listeners, or channel
+ * fell back to broadcast mode because we ran out of shared memory when trying
+ * to add entries to the hash table).
+ *
+ * Caller must hold at least shared NotifyQueueLock.
+ */
+static ChannelEntry *
+ChannelHashLookup(const char *channel)
+{
+ ChannelHashKey key;
+
+ Assert(LWLockHeldByMe(NotifyQueueLock));
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+ return (ChannelEntry *) hash_search(GetChannelHash(),
+ &key,
+ HASH_FIND,
+ NULL);
+}
+
+/*
+ * GetPendingNotifyChannels
+ * Get list of unique channel names from pending notifications.
+ */
+static List *
+GetPendingNotifyChannels(void)
+{
+ List *channels = NIL;
+ ListCell *p;
+ ListCell *q;
+ bool found;
+
+ if (!pendingNotifies)
+ return NIL;
+
+ /* Collect unique channel names from pending notifications */
+ foreach(p, pendingNotifies->events)
+ {
+ Notification *n = (Notification *) lfirst(p);
+ char *channel = n->data;
+
+ /* Check if we already have this channel in our list */
+ found = false;
+ foreach(q, channels)
+ {
+ char *existing = (char *) lfirst(q);
+
+ if (strcmp(existing, channel) == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found)
+ channels = lappend(channels, channel);
+ }
+
+ return channels;
+}
--
2.47.1
[application/octet-stream] 0002-Partition-channel-hash-to-improve-LISTEN-UNLISTEN-v2.patch (22.9K, 6-0002-Partition-channel-hash-to-improve-LISTEN-UNLISTEN-v2.patch)
download | inline diff:
From 61ab3b3a834192b0468d10ca5fe3824b1fec6065 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sun, 13 Jul 2025 14:39:12 +0200
Subject: [PATCH 2/2] Partition channel hash to improve LISTEN/UNLISTEN
The previous commit introduced a shared hash table to optimize NOTIFY for
single-listener channels. However, all modifications to this hash table
were serialized by the global NotifyQueueLock, creating a new contention
point for concurrent LISTEN and UNLISTEN operations. This commit
removes that bottleneck by partitioning the hash table's locking.
The single NotifyQueueLock is replaced by an array of
NUM_NOTIFY_PARTITIONS lightweight locks. A custom hash function, which
mixes the dboid and channel name, is used to map a channel to a
specific partition lock. This allows operations on different channels to
proceed in parallel, as they will contend on different locks.
Furthermore, to handle high-concurrency workloads where many backends
LISTEN on the same channel, the lock acquisition logic is optimized
using a read-then-upgrade pattern:
1. A LW_SHARED lock is taken first to check the channel's state. If no
write is needed (e.g., the channel is already marked as multi-listener),
the function can return immediately. This is the fast path for the
third and all subsequent listeners on a popular channel.
2. Only if a mutation is required is the shared lock released and a
LW_EXCLUSIVE lock acquired. After acquiring the exclusive lock, the
state is re-verified to guard against race conditions before the write
is performed.
This optimistic pattern is applied to both adding and removing listeners,
ensuring that both the "many distinct channels" and "many backends on
one channel" use-cases are highly scalable.
The SignalBackends logic is also updated to follow a strict lock
ordering hierarchy (global NotifyQueueLock before any partition lock) to
prevent deadlocks when checking the hash table.
Finally, the backend exit logic in Exec_UnlistenAllCommit is refined to
iterate over the backend's local listenChannels list, performing
targeted, per-partition removals instead of a more expensive full table scan.
With these changes, the LISTEN/UNLISTEN path is no longer serialized
by a single global lock, directly addressing the scalability concerns of
the previous implementation.
---
src/backend/commands/async.c | 398 +++++++++++++++++++++--------------
1 file changed, 241 insertions(+), 157 deletions(-)
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index a0b7daaef7d..f81a30b53e2 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -134,6 +134,7 @@
#include <limits.h>
#include <unistd.h>
#include <signal.h>
+#include <string.h>
#include "access/parallel.h"
#include "access/slru.h"
@@ -169,6 +170,12 @@
*/
#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128)
+/*
+ * Number of partitions for the channel hash table's locks.
+ * This must be a power of two.
+ */
+#define NUM_NOTIFY_PARTITIONS 128
+
/*
* Channel hash table definitions
*
@@ -176,6 +183,10 @@
* listening on each channel. Channels are identified by database OID and
* channel name, making them database-specific.
*
+ * To improve scalability of concurrent LISTEN/UNLISTEN operations, the hash
+ * table is partitioned, with each partition protected by its own LWLock. This
+ * avoids serializing all operations on a single global lock.
+ *
* When exactly one backend listens on a channel, we signal that specific
* backend, avoiding unnecessary signals to all listening backends.
*
@@ -328,6 +339,11 @@ typedef struct QueueBackendStatus
* In order to avoid deadlocks, whenever we need multiple locks, we first get
* NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
*
+ * The channel hash table is protected by a separate set of partitioned
+ * locks. To prevent deadlocks between these and NotifyQueueLock, the global
+ * lock-ordering rule is: always acquire NotifyQueueLock *before* acquiring
+ * any channel hash partition lock.
+ *
* Each backend uses the backend[] array entry with index equal to its
* ProcNumber. We rely on this to make SendProcSignal fast.
*
@@ -352,9 +368,16 @@ typedef struct AsyncQueueControl
static AsyncQueueControl *asyncQueueControl;
+/* Locks for partitioned channel hash table */
+static LWLock *channelHashLocks;
+static int channelHashTrancheId = 0;
+
/* Channel hash table for single listening backend signalling */
static HTAB *channelHash = NULL;
+/* Forward declaration needed by GetChannelHash */
+static uint32 channel_hash_func(const void *key, Size keysize);
+
/*
* GetChannelHash
* Get the channel hash table, initializing our backend's pointer if needed.
@@ -370,16 +393,21 @@ GetChannelHash(void)
{
HASHCTL hash_ctl;
- /* Set up to attach to the existing shared hash table */
+ /*
+ * Set up to attach to the existing shared hash table. The hash
+ * control parameters must match those used in AsyncShmemInit.
+ */
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(ChannelHashKey);
hash_ctl.entrysize = sizeof(ChannelEntry);
+ hash_ctl.hash = channel_hash_func;
+ hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS;
channelHash = ShmemInitHash("Channel Hash",
CHANNEL_HASH_INIT_SIZE,
CHANNEL_HASH_MAX_SIZE,
&hash_ctl,
- HASH_ELEM | HASH_BLOBS);
+ HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
}
return channelHash;
@@ -551,10 +579,10 @@ static int notification_match(const void *key1, const void *key2, Size keysize);
static void ClearPendingActionsAndNotifies(void);
/* Channel hash table management functions */
+static LWLock *GetChannelHashLock(const char *channel);
static inline void ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel);
static void ChannelHashAddListener(const char *channel, ProcNumber procno);
static void ChannelHashRemoveListener(const char *channel, ProcNumber procno);
-static void ChannelHashRemoveBackendFromAll(ProcNumber procno);
static ChannelEntry * ChannelHashLookup(const char *channel);
static List *GetPendingNotifyChannels(void);
@@ -595,6 +623,8 @@ AsyncShmemSize(void)
size = add_size(size, hash_estimate_size(CHANNEL_HASH_MAX_SIZE,
sizeof(ChannelEntry)));
+ size = add_size(size, mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock)));
+
return size;
}
@@ -659,12 +689,26 @@ AsyncShmemInit(void)
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
hash_ctl.keysize = sizeof(ChannelHashKey);
hash_ctl.entrysize = sizeof(ChannelEntry);
+ hash_ctl.hash = channel_hash_func;
+ hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS;
channelHash = ShmemInitHash("Channel Hash",
CHANNEL_HASH_INIT_SIZE,
CHANNEL_HASH_MAX_SIZE,
&hash_ctl,
- HASH_ELEM | HASH_BLOBS);
+ HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
+ }
+
+ /* Initialize locks for the partitioned hash table */
+ channelHashLocks = (LWLock *) ShmemAlloc(mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock)));
+ if (!found)
+ {
+ channelHashTrancheId = LWLockNewTrancheId();
+ LWLockRegisterTranche(channelHashTrancheId, "ChannelHashPartition");
+ }
+ for (int i = 0; i < NUM_NOTIFY_PARTITIONS; i++)
+ {
+ LWLockInitialize(&channelHashLocks[i], channelHashTrancheId);
}
}
@@ -1163,7 +1207,6 @@ Exec_ListenPreCommit(void)
QueuePosition head;
QueuePosition max;
ProcNumber prevListener;
- ListCell *p;
/*
* Nothing to do if we are already listening to something, nor if we
@@ -1232,17 +1275,6 @@ Exec_ListenPreCommit(void)
QUEUE_FIRST_LISTENER = MyProcNumber;
}
- /*
- * Add all our channels to the channel hash table while we still hold
- * exclusive lock on NotifyQueueLock.
- */
- foreach(p, listenChannels)
- {
- char *channel = (char *) lfirst(p);
-
- ChannelHashAddListener(channel, MyProcNumber);
- }
-
LWLockRelease(NotifyQueueLock);
/* Now we are listed in the global array, so remember we're listening */
@@ -1286,9 +1318,7 @@ Exec_ListenCommit(const char *channel)
listenChannels = lappend(listenChannels, pstrdup(channel));
MemoryContextSwitchTo(oldcontext);
- LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
ChannelHashAddListener(channel, MyProcNumber);
- LWLockRelease(NotifyQueueLock);
}
/*
@@ -1312,10 +1342,7 @@ Exec_UnlistenCommit(const char *channel)
{
listenChannels = foreach_delete_current(listenChannels, q);
pfree(lchan);
-
- LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
ChannelHashRemoveListener(channel, MyProcNumber);
- LWLockRelease(NotifyQueueLock);
break;
}
}
@@ -1334,9 +1361,22 @@ Exec_UnlistenCommit(const char *channel)
static void
Exec_UnlistenAllCommit(void)
{
+ ListCell *p;
+
if (Trace_notify)
elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
+ /*
+ * Before freeing the local list, iterate through it and perform a
+ * targeted removal for each of our channels from the shared hash table.
+ */
+ foreach(p, listenChannels)
+ {
+ char *channel = (char *) lfirst(p);
+
+ ChannelHashRemoveListener(channel, MyProcNumber);
+ }
+
list_free_deep(listenChannels);
listenChannels = NIL;
}
@@ -1381,8 +1421,6 @@ asyncQueueUnregister(void)
*/
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
- ChannelHashRemoveBackendFromAll(MyProcNumber);
-
/* Mark our entry as invalid */
QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
@@ -1755,16 +1793,26 @@ SignalBackends(void)
/* Get list of channels that have pending notifications */
channels = GetPendingNotifyChannels();
+ /*
+ * To prevent deadlocks, we must always acquire locks in the same order:
+ * global NotifyQueueLock first, then individual partition locks.
+ */
LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
/*
- * Check if any channel has multiple listeners, in which case we would
- * need to signal all backends anyway.
+ * Determine if we can use targeted signaling or must broadcast. This
+ * check must be done while holding NotifyQueueLock to prevent deadlocks
+ * against other backends that might be modifying the listener list and
+ * hash table simultaneously (e.g., asyncQueueUnregister).
*/
foreach(p, channels)
{
char *channel = (char *) lfirst(p);
- ChannelEntry *entry = ChannelHashLookup(channel);
+ ChannelEntry *entry;
+ LWLock *lock = GetChannelHashLock(channel);
+
+ LWLockAcquire(lock, LW_SHARED);
+ entry = ChannelHashLookup(channel);
/*
* If there is no entry, it could mean we ran out of shared memory
@@ -1774,8 +1822,10 @@ SignalBackends(void)
if (!entry || entry->has_multiple_listeners)
{
broadcast_mode = true;
+ LWLockRelease(lock);
break;
}
+ LWLockRelease(lock);
}
if (broadcast_mode)
@@ -1814,41 +1864,53 @@ SignalBackends(void)
else
{
/*
- * Signal specific listening backends
+ * In targeted mode, signal specific listening backends. We must
+ * re-check the hash entries here inside the lock to avoid races.
*/
foreach(p, channels)
{
char *channel = (char *) lfirst(p);
- ChannelEntry *entry = ChannelHashLookup(channel);
-
- ProcNumber i = entry->listener;
- int32 pid;
- QueuePosition pos;
-
- Assert(entry && !entry->has_multiple_listeners);
-
- if (signaled[i])
- continue;
-
- pos = QUEUE_BACKEND_POS(i);
-
- /*
- * Skip signaling listeners if they already caught up.
- */
- if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
- continue;
-
- if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
- continue;
-
- pid = QUEUE_BACKEND_PID(i);
- Assert(pid != InvalidPid);
-
- /* OK, need to signal this one */
- pids[count] = pid;
- procnos[count] = i;
- signaled[i] = true;
- count++;
+ ChannelEntry *entry;
+ LWLock *lock = GetChannelHashLock(channel);
+
+ LWLockAcquire(lock, LW_SHARED);
+ entry = ChannelHashLookup(channel);
+
+ if (entry && !entry->has_multiple_listeners)
+ {
+ ProcNumber i = entry->listener;
+ int32 pid;
+ QueuePosition pos;
+
+ if (signaled[i])
+ {
+ LWLockRelease(lock);
+ continue;
+ }
+
+ pos = QUEUE_BACKEND_POS(i);
+
+ if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
+ {
+ LWLockRelease(lock);
+ continue;
+ }
+
+ if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+ {
+ LWLockRelease(lock);
+ continue;
+ }
+
+ pid = QUEUE_BACKEND_PID(i);
+ Assert(pid != InvalidPid);
+
+ pids[count] = pid;
+ procnos[count] = i;
+ signaled[i] = true;
+ count++;
+ }
+ LWLockRelease(lock);
}
}
@@ -1879,12 +1941,10 @@ SignalBackends(void)
/*
* Wake only tail optimization: Signal the backend that is furthest
* behind to help prevent backends from getting far behind in the
- * first place. This creates a chain reaction where each backend
- * eventually wakes up the next one as notifications are processed,
- * avoiding thundering herd.
- *
- * Otherwise, only skip signaling listeners if they are not far
- * behind.
+ * first place. This finds the backend(s) on the same page as the
+ * global tail, which are the ones holding up truncation. This creates
+ * a chain reaction where each backend eventually wakes up the next one
+ * as notifications are processed, avoiding thundering herd.
*/
if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL),
QUEUE_POS_PAGE(pos)) == 0)
@@ -1898,8 +1958,6 @@ SignalBackends(void)
pids[count] = pid;
procnos[count] = i;
count++;
-
-
}
LWLockRelease(NotifyQueueLock);
@@ -1921,9 +1979,9 @@ SignalBackends(void)
/*
* Note: assuming things aren't broken, a signal failure here could
- * only occur if the target backend exited since we released
- * NotifyQueueLock; which is unlikely but certainly possible. So we
- * just log a low-level debug message if it happens.
+ * only occur if the target backend exited since we released the lock;
+ * which is unlikely but certainly possible. So we just log a
+ * low-level debug message if it happens.
*/
if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
@@ -2675,6 +2733,47 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
* Channel hash table management functions
*/
+/*
+ * channel_hash_func
+ * Custom hash function for the channel hash table. This function ensures
+ * that the low-order bits of the hash are well-distributed, which is
+ * critical for partitioned hash tables.
+ */
+static uint32
+channel_hash_func(const void *key, Size keysize)
+{
+ const ChannelHashKey *k = (const ChannelHashKey *) key;
+ uint32 h;
+
+ /*
+ * Mix the dboid and the channel name to produce a good hash. hash_any()
+ * is a high-quality portable hash function. This prevents channels with
+ * the same name in different databases from always mapping to the same
+ * partition.
+ */
+ h = DatumGetUInt32(hash_uint32(k->dboid));
+ h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+ strnlen(k->channel, NAMEDATALEN)));
+
+ return h;
+}
+
+/*
+ * GetChannelHashLock
+ * Return the LWLock that protects the partition for the given channel name.
+ */
+static LWLock *
+GetChannelHashLock(const char *channel)
+{
+ ChannelHashKey key;
+ uint32 hash;
+
+ ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+ hash = get_hash_value(GetChannelHash(), &key);
+
+ return &channelHashLocks[hash % NUM_NOTIFY_PARTITIONS];
+}
+
/*
* ChannelHashPrepareKey
* Prepare a channel key (database OID + channel name) for use as a hash key.
@@ -2689,10 +2788,22 @@ ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel)
/*
* ChannelHashAddListener
- * Register the given backend as a listener for the specified channel
- * in the shared channel hash table.
+ * Register the given backend as a listener for the specified channel.
*
- * Caller must hold exclusive NotifyQueueLock.
+ * This function uses an optimistic read-locking strategy to maximize
+ * concurrency when many backends listen on the same channel.
+ *
+ * 1. It first takes a shared lock and checks the channel's state. If the
+ * channel is already marked as having multiple listeners, no write is
+ * needed, and we can return immediately. This is the fast path for the
+ * 3rd, 4th, etc., listener on a given channel.
+ *
+ * 2. If a write is needed (either to create the entry or to mark it as
+ * multi-listener), it releases the shared lock and acquires an exclusive
+ * lock.
+ *
+ * 3. CRUCIALLY, after acquiring the exclusive lock, it must re-check the
+ * state, as another backend may have modified the entry in the interim.
*/
static void
ChannelHashAddListener(const char *channel, ProcNumber procno)
@@ -2700,135 +2811,108 @@ ChannelHashAddListener(const char *channel, ProcNumber procno)
ChannelEntry *entry;
bool found;
ChannelHashKey key;
+ LWLock *lock = GetChannelHashLock(channel);
ChannelHashPrepareKey(&key, MyDatabaseId, channel);
- /* Look up or create the channel entry */
- entry = (ChannelEntry *) hash_search(GetChannelHash(),
- &key,
- HASH_ENTER_NULL,
- &found);
+ /*
+ * FAST PATH: Optimistically take a shared lock. If the channel already
+ * has multiple listeners, we don't need to do anything.
+ */
+ LWLockAcquire(lock, LW_SHARED);
+ entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+ if (entry && entry->has_multiple_listeners)
+ {
+ LWLockRelease(lock);
+ return;
+ }
+ LWLockRelease(lock);
+
+ /*
+ * SLOW PATH: We need to write. Acquire exclusive lock.
+ */
+ LWLockAcquire(lock, LW_EXCLUSIVE);
/*
- * If hash_search returned NULL, we've run out of shared memory to
- * allocate new hash entries. We gracefully degrade by not tracking this
- * channel in the hash. The channel will use the fallback broadcast
- * signalling.
+ * Re-check state after acquiring exclusive lock, as it may have changed.
*/
+ entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_ENTER_NULL, &found);
+
if (entry == NULL)
{
- ereport(DEBUG1,
- (errmsg("too many notification channels are already being tracked")));
+ /* Out of memory in the hash partition. */
+ ereport(DEBUG1, (errmsg("too many notification channels are already being tracked")));
+ LWLockRelease(lock);
return;
}
if (!found)
{
- /* New channel, initialize the entry */
- memcpy(&entry->key, &key, sizeof(ChannelHashKey));
+ /* We are the first listener. */
entry->listener = procno;
entry->has_multiple_listeners = false;
}
- else
+ else if (!entry->has_multiple_listeners)
{
- /* Channel already exists */
- if (!entry->has_multiple_listeners)
+ /* We are the second listener. */
+ if (entry->listener != procno)
{
- if (entry->listener == procno)
- return; /* Already listening */
-
- /*
- * Another backend is already listening on this channel. Mark it
- * as having multiple listeners and fall back to broadcast
- * signalling.
- */
entry->has_multiple_listeners = true;
entry->listener = INVALID_PROC_NUMBER;
}
- /* If already marked as having multiple listeners, nothing to do */
}
+ /* If entry->has_multiple_listeners is now true, do nothing. */
+ LWLockRelease(lock);
}
/*
* ChannelHashRemoveListener
* Update the channel hash when a backend stops listening on a channel.
*
- * If the channel entry currently tracks exactly one listener and that
- * listener matches the supplied procno, remove the entry altogether.
- *
- * If the channel has already been flagged as having multiple listeners,
- * we no longer track individual backends; therefore we cannot remove a
- * single backend without additional bookkeeping. In that situation we
- * simply leave the entry in place (still marked as having multiple
- * listeners) and return.
- *
- * Caller must hold exclusive NotifyQueueLock.
+ * This function uses an optimistic read-lock strategy to maximize concurrency.
+ * An exclusive lock is only taken if we are the sole listener on a channel
+ * and need to remove the entry from the hash table.
*/
static void
ChannelHashRemoveListener(const char *channel, ProcNumber procno)
{
ChannelEntry *entry;
ChannelHashKey key;
+ LWLock *lock = GetChannelHashLock(channel);
ChannelHashPrepareKey(&key, MyDatabaseId, channel);
- /* Look up the channel entry */
- entry = (ChannelEntry *) hash_search(GetChannelHash(),
- &key,
- HASH_FIND,
- NULL);
-
- if (!entry)
- return; /* Channel not found */
-
/*
- * If this channel has multiple listeners, we can't track individual
- * removals. Just leave it marked as having multiple listeners.
+ * Take a shared lock first to see if a removal is even necessary. If the
+ * entry doesn't exist, or it's a multi-listener entry, we have nothing to
+ * do. This is the fast path.
*/
- if (entry->has_multiple_listeners)
+ LWLockAcquire(lock, LW_SHARED);
+ entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+ if (!entry || entry->has_multiple_listeners || entry->listener != procno)
+ {
+ LWLockRelease(lock);
return;
-
- /* If this backend is the single listener, remove the channel entry */
- if (entry->listener == procno)
- {
- hash_search(GetChannelHash(),
- &key,
- HASH_REMOVE,
- NULL);
}
-}
-
-/*
- * ChannelHashRemoveBackendFromAll
- * Sweep the channel hash and delete any channel entries for which
- * this backend is the only tracked listener in the current database.
- *
- * Caller must hold exclusive NotifyQueueLock.
- */
-static void
-ChannelHashRemoveBackendFromAll(ProcNumber procno)
-{
- HASH_SEQ_STATUS status;
- ChannelEntry *entry;
+ LWLockRelease(lock);
- hash_seq_init(&status, GetChannelHash());
+ /*
+ * A removal is likely needed. Acquire an exclusive lock.
+ */
+ LWLockAcquire(lock, LW_EXCLUSIVE);
- while ((entry = (ChannelEntry *) hash_seq_search(&status)) != NULL)
+ /*
+ * Re-check the state, as another backend might have changed it. The only
+ * state change we care about is if it became a multi-listener channel, in
+ * which case we should no longer remove it.
+ */
+ entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL);
+ if (entry && !entry->has_multiple_listeners && entry->listener == procno)
{
- if (entry->key.dboid != MyDatabaseId)
- continue;
-
- if (entry->has_multiple_listeners)
- continue;
-
- if (entry->listener == procno)
- {
- hash_search(GetChannelHash(),
- &entry->key,
- HASH_REMOVE,
- NULL);
- }
+ /* Still a single-listener entry for us, so remove it. */
+ (void) hash_search(GetChannelHash(), &key, HASH_REMOVE, NULL);
}
+ LWLockRelease(lock);
}
/*
@@ -2840,14 +2924,14 @@ ChannelHashRemoveBackendFromAll(ProcNumber procno)
* fell back to broadcast mode because we ran out of shared memory when trying
* to add entries to the hash table).
*
- * Caller must hold at least shared NotifyQueueLock.
+ * Caller must hold the appropriate partition lock (shared is sufficient).
*/
static ChannelEntry *
ChannelHashLookup(const char *channel)
{
ChannelHashKey key;
- Assert(LWLockHeldByMe(NotifyQueueLock));
+ Assert(LWLockHeldByMe(GetChannelHashLock(channel)));
ChannelHashPrepareKey(&key, MyDatabaseId, channel);
--
2.47.1
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: Optimize LISTEN/NOTIFY
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox