From 61ab3b3a834192b0468d10ca5fe3824b1fec6065 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Sun, 13 Jul 2025 14:39:12 +0200 Subject: [PATCH 2/2] Partition channel hash to improve LISTEN/UNLISTEN The previous commit introduced a shared hash table to optimize NOTIFY for single-listener channels. However, all modifications to this hash table were serialized by the global NotifyQueueLock, creating a new contention point for concurrent LISTEN and UNLISTEN operations. This commit removes that bottleneck by partitioning the hash table's locking. The single NotifyQueueLock is replaced by an array of NUM_NOTIFY_PARTITIONS lightweight locks. A custom hash function, which mixes the dboid and channel name, is used to map a channel to a specific partition lock. This allows operations on different channels to proceed in parallel, as they will contend on different locks. Furthermore, to handle high-concurrency workloads where many backends LISTEN on the same channel, the lock acquisition logic is optimized using a read-then-upgrade pattern: 1. A LW_SHARED lock is taken first to check the channel's state. If no write is needed (e.g., the channel is already marked as multi-listener), the function can return immediately. This is the fast path for the third and all subsequent listeners on a popular channel. 2. Only if a mutation is required is the shared lock released and a LW_EXCLUSIVE lock acquired. After acquiring the exclusive lock, the state is re-verified to guard against race conditions before the write is performed. This optimistic pattern is applied to both adding and removing listeners, ensuring that both the "many distinct channels" and "many backends on one channel" use-cases are highly scalable. The SignalBackends logic is also updated to follow a strict lock ordering hierarchy (global NotifyQueueLock before any partition lock) to prevent deadlocks when checking the hash table. Finally, the backend exit logic in Exec_UnlistenAllCommit is refined to iterate over the backend's local listenChannels list, performing targeted, per-partition removals instead of a more expensive full table scan. With these changes, the LISTEN/UNLISTEN path is no longer serialized by a single global lock, directly addressing the scalability concerns of the previous implementation. --- src/backend/commands/async.c | 398 +++++++++++++++++++++-------------- 1 file changed, 241 insertions(+), 157 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index a0b7daaef7d..f81a30b53e2 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -134,6 +134,7 @@ #include #include #include +#include #include "access/parallel.h" #include "access/slru.h" @@ -169,6 +170,12 @@ */ #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) +/* + * Number of partitions for the channel hash table's locks. + * This must be a power of two. + */ +#define NUM_NOTIFY_PARTITIONS 128 + /* * Channel hash table definitions * @@ -176,6 +183,10 @@ * listening on each channel. Channels are identified by database OID and * channel name, making them database-specific. * + * To improve scalability of concurrent LISTEN/UNLISTEN operations, the hash + * table is partitioned, with each partition protected by its own LWLock. This + * avoids serializing all operations on a single global lock. + * * When exactly one backend listens on a channel, we signal that specific * backend, avoiding unnecessary signals to all listening backends. * @@ -328,6 +339,11 @@ typedef struct QueueBackendStatus * In order to avoid deadlocks, whenever we need multiple locks, we first get * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock. * + * The channel hash table is protected by a separate set of partitioned + * locks. To prevent deadlocks between these and NotifyQueueLock, the global + * lock-ordering rule is: always acquire NotifyQueueLock *before* acquiring + * any channel hash partition lock. + * * Each backend uses the backend[] array entry with index equal to its * ProcNumber. We rely on this to make SendProcSignal fast. * @@ -352,9 +368,16 @@ typedef struct AsyncQueueControl static AsyncQueueControl *asyncQueueControl; +/* Locks for partitioned channel hash table */ +static LWLock *channelHashLocks; +static int channelHashTrancheId = 0; + /* Channel hash table for single listening backend signalling */ static HTAB *channelHash = NULL; +/* Forward declaration needed by GetChannelHash */ +static uint32 channel_hash_func(const void *key, Size keysize); + /* * GetChannelHash * Get the channel hash table, initializing our backend's pointer if needed. @@ -370,16 +393,21 @@ GetChannelHash(void) { HASHCTL hash_ctl; - /* Set up to attach to the existing shared hash table */ + /* + * Set up to attach to the existing shared hash table. The hash + * control parameters must match those used in AsyncShmemInit. + */ MemSet(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(ChannelHashKey); hash_ctl.entrysize = sizeof(ChannelEntry); + hash_ctl.hash = channel_hash_func; + hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS; channelHash = ShmemInitHash("Channel Hash", CHANNEL_HASH_INIT_SIZE, CHANNEL_HASH_MAX_SIZE, &hash_ctl, - HASH_ELEM | HASH_BLOBS); + HASH_ELEM | HASH_FUNCTION | HASH_PARTITION); } return channelHash; @@ -551,10 +579,10 @@ static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); /* Channel hash table management functions */ +static LWLock *GetChannelHashLock(const char *channel); static inline void ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel); static void ChannelHashAddListener(const char *channel, ProcNumber procno); static void ChannelHashRemoveListener(const char *channel, ProcNumber procno); -static void ChannelHashRemoveBackendFromAll(ProcNumber procno); static ChannelEntry * ChannelHashLookup(const char *channel); static List *GetPendingNotifyChannels(void); @@ -595,6 +623,8 @@ AsyncShmemSize(void) size = add_size(size, hash_estimate_size(CHANNEL_HASH_MAX_SIZE, sizeof(ChannelEntry))); + size = add_size(size, mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock))); + return size; } @@ -659,12 +689,26 @@ AsyncShmemInit(void) MemSet(&hash_ctl, 0, sizeof(hash_ctl)); hash_ctl.keysize = sizeof(ChannelHashKey); hash_ctl.entrysize = sizeof(ChannelEntry); + hash_ctl.hash = channel_hash_func; + hash_ctl.num_partitions = NUM_NOTIFY_PARTITIONS; channelHash = ShmemInitHash("Channel Hash", CHANNEL_HASH_INIT_SIZE, CHANNEL_HASH_MAX_SIZE, &hash_ctl, - HASH_ELEM | HASH_BLOBS); + HASH_ELEM | HASH_FUNCTION | HASH_PARTITION); + } + + /* Initialize locks for the partitioned hash table */ + channelHashLocks = (LWLock *) ShmemAlloc(mul_size(NUM_NOTIFY_PARTITIONS, sizeof(LWLock))); + if (!found) + { + channelHashTrancheId = LWLockNewTrancheId(); + LWLockRegisterTranche(channelHashTrancheId, "ChannelHashPartition"); + } + for (int i = 0; i < NUM_NOTIFY_PARTITIONS; i++) + { + LWLockInitialize(&channelHashLocks[i], channelHashTrancheId); } } @@ -1163,7 +1207,6 @@ Exec_ListenPreCommit(void) QueuePosition head; QueuePosition max; ProcNumber prevListener; - ListCell *p; /* * Nothing to do if we are already listening to something, nor if we @@ -1232,17 +1275,6 @@ Exec_ListenPreCommit(void) QUEUE_FIRST_LISTENER = MyProcNumber; } - /* - * Add all our channels to the channel hash table while we still hold - * exclusive lock on NotifyQueueLock. - */ - foreach(p, listenChannels) - { - char *channel = (char *) lfirst(p); - - ChannelHashAddListener(channel, MyProcNumber); - } - LWLockRelease(NotifyQueueLock); /* Now we are listed in the global array, so remember we're listening */ @@ -1286,9 +1318,7 @@ Exec_ListenCommit(const char *channel) listenChannels = lappend(listenChannels, pstrdup(channel)); MemoryContextSwitchTo(oldcontext); - LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); ChannelHashAddListener(channel, MyProcNumber); - LWLockRelease(NotifyQueueLock); } /* @@ -1312,10 +1342,7 @@ Exec_UnlistenCommit(const char *channel) { listenChannels = foreach_delete_current(listenChannels, q); pfree(lchan); - - LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); ChannelHashRemoveListener(channel, MyProcNumber); - LWLockRelease(NotifyQueueLock); break; } } @@ -1334,9 +1361,22 @@ Exec_UnlistenCommit(const char *channel) static void Exec_UnlistenAllCommit(void) { + ListCell *p; + if (Trace_notify) elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); + /* + * Before freeing the local list, iterate through it and perform a + * targeted removal for each of our channels from the shared hash table. + */ + foreach(p, listenChannels) + { + char *channel = (char *) lfirst(p); + + ChannelHashRemoveListener(channel, MyProcNumber); + } + list_free_deep(listenChannels); listenChannels = NIL; } @@ -1381,8 +1421,6 @@ asyncQueueUnregister(void) */ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); - ChannelHashRemoveBackendFromAll(MyProcNumber); - /* Mark our entry as invalid */ QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid; QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid; @@ -1755,16 +1793,26 @@ SignalBackends(void) /* Get list of channels that have pending notifications */ channels = GetPendingNotifyChannels(); + /* + * To prevent deadlocks, we must always acquire locks in the same order: + * global NotifyQueueLock first, then individual partition locks. + */ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); /* - * Check if any channel has multiple listeners, in which case we would - * need to signal all backends anyway. + * Determine if we can use targeted signaling or must broadcast. This + * check must be done while holding NotifyQueueLock to prevent deadlocks + * against other backends that might be modifying the listener list and + * hash table simultaneously (e.g., asyncQueueUnregister). */ foreach(p, channels) { char *channel = (char *) lfirst(p); - ChannelEntry *entry = ChannelHashLookup(channel); + ChannelEntry *entry; + LWLock *lock = GetChannelHashLock(channel); + + LWLockAcquire(lock, LW_SHARED); + entry = ChannelHashLookup(channel); /* * If there is no entry, it could mean we ran out of shared memory @@ -1774,8 +1822,10 @@ SignalBackends(void) if (!entry || entry->has_multiple_listeners) { broadcast_mode = true; + LWLockRelease(lock); break; } + LWLockRelease(lock); } if (broadcast_mode) @@ -1814,41 +1864,53 @@ SignalBackends(void) else { /* - * Signal specific listening backends + * In targeted mode, signal specific listening backends. We must + * re-check the hash entries here inside the lock to avoid races. */ foreach(p, channels) { char *channel = (char *) lfirst(p); - ChannelEntry *entry = ChannelHashLookup(channel); - - ProcNumber i = entry->listener; - int32 pid; - QueuePosition pos; - - Assert(entry && !entry->has_multiple_listeners); - - if (signaled[i]) - continue; - - pos = QUEUE_BACKEND_POS(i); - - /* - * Skip signaling listeners if they already caught up. - */ - if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) - continue; - - if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId) - continue; - - pid = QUEUE_BACKEND_PID(i); - Assert(pid != InvalidPid); - - /* OK, need to signal this one */ - pids[count] = pid; - procnos[count] = i; - signaled[i] = true; - count++; + ChannelEntry *entry; + LWLock *lock = GetChannelHashLock(channel); + + LWLockAcquire(lock, LW_SHARED); + entry = ChannelHashLookup(channel); + + if (entry && !entry->has_multiple_listeners) + { + ProcNumber i = entry->listener; + int32 pid; + QueuePosition pos; + + if (signaled[i]) + { + LWLockRelease(lock); + continue; + } + + pos = QUEUE_BACKEND_POS(i); + + if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) + { + LWLockRelease(lock); + continue; + } + + if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId) + { + LWLockRelease(lock); + continue; + } + + pid = QUEUE_BACKEND_PID(i); + Assert(pid != InvalidPid); + + pids[count] = pid; + procnos[count] = i; + signaled[i] = true; + count++; + } + LWLockRelease(lock); } } @@ -1879,12 +1941,10 @@ SignalBackends(void) /* * Wake only tail optimization: Signal the backend that is furthest * behind to help prevent backends from getting far behind in the - * first place. This creates a chain reaction where each backend - * eventually wakes up the next one as notifications are processed, - * avoiding thundering herd. - * - * Otherwise, only skip signaling listeners if they are not far - * behind. + * first place. This finds the backend(s) on the same page as the + * global tail, which are the ones holding up truncation. This creates + * a chain reaction where each backend eventually wakes up the next one + * as notifications are processed, avoiding thundering herd. */ if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL), QUEUE_POS_PAGE(pos)) == 0) @@ -1898,8 +1958,6 @@ SignalBackends(void) pids[count] = pid; procnos[count] = i; count++; - - } LWLockRelease(NotifyQueueLock); @@ -1921,9 +1979,9 @@ SignalBackends(void) /* * Note: assuming things aren't broken, a signal failure here could - * only occur if the target backend exited since we released - * NotifyQueueLock; which is unlikely but certainly possible. So we - * just log a low-level debug message if it happens. + * only occur if the target backend exited since we released the lock; + * which is unlikely but certainly possible. So we just log a + * low-level debug message if it happens. */ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0) elog(DEBUG3, "could not signal backend with PID %d: %m", pid); @@ -2675,6 +2733,47 @@ check_notify_buffers(int *newval, void **extra, GucSource source) * Channel hash table management functions */ +/* + * channel_hash_func + * Custom hash function for the channel hash table. This function ensures + * that the low-order bits of the hash are well-distributed, which is + * critical for partitioned hash tables. + */ +static uint32 +channel_hash_func(const void *key, Size keysize) +{ + const ChannelHashKey *k = (const ChannelHashKey *) key; + uint32 h; + + /* + * Mix the dboid and the channel name to produce a good hash. hash_any() + * is a high-quality portable hash function. This prevents channels with + * the same name in different databases from always mapping to the same + * partition. + */ + h = DatumGetUInt32(hash_uint32(k->dboid)); + h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel, + strnlen(k->channel, NAMEDATALEN))); + + return h; +} + +/* + * GetChannelHashLock + * Return the LWLock that protects the partition for the given channel name. + */ +static LWLock * +GetChannelHashLock(const char *channel) +{ + ChannelHashKey key; + uint32 hash; + + ChannelHashPrepareKey(&key, MyDatabaseId, channel); + hash = get_hash_value(GetChannelHash(), &key); + + return &channelHashLocks[hash % NUM_NOTIFY_PARTITIONS]; +} + /* * ChannelHashPrepareKey * Prepare a channel key (database OID + channel name) for use as a hash key. @@ -2689,10 +2788,22 @@ ChannelHashPrepareKey(ChannelHashKey * key, Oid dboid, const char *channel) /* * ChannelHashAddListener - * Register the given backend as a listener for the specified channel - * in the shared channel hash table. + * Register the given backend as a listener for the specified channel. * - * Caller must hold exclusive NotifyQueueLock. + * This function uses an optimistic read-locking strategy to maximize + * concurrency when many backends listen on the same channel. + * + * 1. It first takes a shared lock and checks the channel's state. If the + * channel is already marked as having multiple listeners, no write is + * needed, and we can return immediately. This is the fast path for the + * 3rd, 4th, etc., listener on a given channel. + * + * 2. If a write is needed (either to create the entry or to mark it as + * multi-listener), it releases the shared lock and acquires an exclusive + * lock. + * + * 3. CRUCIALLY, after acquiring the exclusive lock, it must re-check the + * state, as another backend may have modified the entry in the interim. */ static void ChannelHashAddListener(const char *channel, ProcNumber procno) @@ -2700,135 +2811,108 @@ ChannelHashAddListener(const char *channel, ProcNumber procno) ChannelEntry *entry; bool found; ChannelHashKey key; + LWLock *lock = GetChannelHashLock(channel); ChannelHashPrepareKey(&key, MyDatabaseId, channel); - /* Look up or create the channel entry */ - entry = (ChannelEntry *) hash_search(GetChannelHash(), - &key, - HASH_ENTER_NULL, - &found); + /* + * FAST PATH: Optimistically take a shared lock. If the channel already + * has multiple listeners, we don't need to do anything. + */ + LWLockAcquire(lock, LW_SHARED); + entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL); + if (entry && entry->has_multiple_listeners) + { + LWLockRelease(lock); + return; + } + LWLockRelease(lock); + + /* + * SLOW PATH: We need to write. Acquire exclusive lock. + */ + LWLockAcquire(lock, LW_EXCLUSIVE); /* - * If hash_search returned NULL, we've run out of shared memory to - * allocate new hash entries. We gracefully degrade by not tracking this - * channel in the hash. The channel will use the fallback broadcast - * signalling. + * Re-check state after acquiring exclusive lock, as it may have changed. */ + entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_ENTER_NULL, &found); + if (entry == NULL) { - ereport(DEBUG1, - (errmsg("too many notification channels are already being tracked"))); + /* Out of memory in the hash partition. */ + ereport(DEBUG1, (errmsg("too many notification channels are already being tracked"))); + LWLockRelease(lock); return; } if (!found) { - /* New channel, initialize the entry */ - memcpy(&entry->key, &key, sizeof(ChannelHashKey)); + /* We are the first listener. */ entry->listener = procno; entry->has_multiple_listeners = false; } - else + else if (!entry->has_multiple_listeners) { - /* Channel already exists */ - if (!entry->has_multiple_listeners) + /* We are the second listener. */ + if (entry->listener != procno) { - if (entry->listener == procno) - return; /* Already listening */ - - /* - * Another backend is already listening on this channel. Mark it - * as having multiple listeners and fall back to broadcast - * signalling. - */ entry->has_multiple_listeners = true; entry->listener = INVALID_PROC_NUMBER; } - /* If already marked as having multiple listeners, nothing to do */ } + /* If entry->has_multiple_listeners is now true, do nothing. */ + LWLockRelease(lock); } /* * ChannelHashRemoveListener * Update the channel hash when a backend stops listening on a channel. * - * If the channel entry currently tracks exactly one listener and that - * listener matches the supplied procno, remove the entry altogether. - * - * If the channel has already been flagged as having multiple listeners, - * we no longer track individual backends; therefore we cannot remove a - * single backend without additional bookkeeping. In that situation we - * simply leave the entry in place (still marked as having multiple - * listeners) and return. - * - * Caller must hold exclusive NotifyQueueLock. + * This function uses an optimistic read-lock strategy to maximize concurrency. + * An exclusive lock is only taken if we are the sole listener on a channel + * and need to remove the entry from the hash table. */ static void ChannelHashRemoveListener(const char *channel, ProcNumber procno) { ChannelEntry *entry; ChannelHashKey key; + LWLock *lock = GetChannelHashLock(channel); ChannelHashPrepareKey(&key, MyDatabaseId, channel); - /* Look up the channel entry */ - entry = (ChannelEntry *) hash_search(GetChannelHash(), - &key, - HASH_FIND, - NULL); - - if (!entry) - return; /* Channel not found */ - /* - * If this channel has multiple listeners, we can't track individual - * removals. Just leave it marked as having multiple listeners. + * Take a shared lock first to see if a removal is even necessary. If the + * entry doesn't exist, or it's a multi-listener entry, we have nothing to + * do. This is the fast path. */ - if (entry->has_multiple_listeners) + LWLockAcquire(lock, LW_SHARED); + entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL); + if (!entry || entry->has_multiple_listeners || entry->listener != procno) + { + LWLockRelease(lock); return; - - /* If this backend is the single listener, remove the channel entry */ - if (entry->listener == procno) - { - hash_search(GetChannelHash(), - &key, - HASH_REMOVE, - NULL); } -} - -/* - * ChannelHashRemoveBackendFromAll - * Sweep the channel hash and delete any channel entries for which - * this backend is the only tracked listener in the current database. - * - * Caller must hold exclusive NotifyQueueLock. - */ -static void -ChannelHashRemoveBackendFromAll(ProcNumber procno) -{ - HASH_SEQ_STATUS status; - ChannelEntry *entry; + LWLockRelease(lock); - hash_seq_init(&status, GetChannelHash()); + /* + * A removal is likely needed. Acquire an exclusive lock. + */ + LWLockAcquire(lock, LW_EXCLUSIVE); - while ((entry = (ChannelEntry *) hash_seq_search(&status)) != NULL) + /* + * Re-check the state, as another backend might have changed it. The only + * state change we care about is if it became a multi-listener channel, in + * which case we should no longer remove it. + */ + entry = (ChannelEntry *) hash_search(GetChannelHash(), &key, HASH_FIND, NULL); + if (entry && !entry->has_multiple_listeners && entry->listener == procno) { - if (entry->key.dboid != MyDatabaseId) - continue; - - if (entry->has_multiple_listeners) - continue; - - if (entry->listener == procno) - { - hash_search(GetChannelHash(), - &entry->key, - HASH_REMOVE, - NULL); - } + /* Still a single-listener entry for us, so remove it. */ + (void) hash_search(GetChannelHash(), &key, HASH_REMOVE, NULL); } + LWLockRelease(lock); } /* @@ -2840,14 +2924,14 @@ ChannelHashRemoveBackendFromAll(ProcNumber procno) * fell back to broadcast mode because we ran out of shared memory when trying * to add entries to the hash table). * - * Caller must hold at least shared NotifyQueueLock. + * Caller must hold the appropriate partition lock (shared is sufficient). */ static ChannelEntry * ChannelHashLookup(const char *channel) { ChannelHashKey key; - Assert(LWLockHeldByMe(NotifyQueueLock)); + Assert(LWLockHeldByMe(GetChannelHashLock(channel))); ChannelHashPrepareKey(&key, MyDatabaseId, channel); -- 2.47.1