From e8dcd9e1d035d8d711312a83daa791da6d8906a9 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Sat, 11 Oct 2025 07:28:57 +0200 Subject: [PATCH 3/3] Optimize LISTEN/NOTIFY by advancing idle backends directly Building on the previous channel-specific listener tracking optimization, this patch further reduces context switching by detecting idle listening backends that don't listen to any of the channels being notified and advancing their queue positions directly without waking them up. When a backend commits notifications, it now saves both the queue head position before and after writing. In SignalBackends(), backends that are at the old queue head and weren't marked for wakeup (meaning they don't listen to any of the notified channels) are advanced directly to the new queue head. This eliminates unnecessary wakeups for these backends, which would otherwise wake up, scan through all the notifications, skip each one, and advance to the same position anyway. The implementation carefully handles the race condition where other backends may write notifications after the heavyweight lock is released but before SignalBackends() is called. By saving queueHeadAfterWrite immediately after writing (before releasing the lock), we ensure backends are only advanced over the exact notifications we wrote, not notifications from other concurrent backends. --- src/backend/commands/async.c | 62 ++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index bb5ebfab26d..a4cc7395c08 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -500,6 +500,8 @@ typedef struct NotificationList int nestingLevel; /* current transaction nesting depth */ List *events; /* list of Notification structs */ HTAB *hashtab; /* hash of NotificationHash structs, or NULL */ + QueuePosition queueHeadBeforeWrite; /* QUEUE_HEAD before writing notifies */ + QueuePosition queueHeadAfterWrite; /* QUEUE_HEAD after writing notifies */ struct NotificationList *upper; /* details for upper transaction levels */ } NotificationList; @@ -1048,6 +1050,7 @@ PreCommit_Notify(void) if (pendingNotifies) { ListCell *nextNotify; + bool firstIteration = true; /* * Make sure that we have an XID assigned to the current transaction. @@ -1076,6 +1079,9 @@ PreCommit_Notify(void) LockSharedObject(DatabaseRelationId, InvalidOid, 0, AccessExclusiveLock); + /* Initialize queueHeadBeforeWrite to a safe default */ + SET_QUEUE_POS(pendingNotifies->queueHeadBeforeWrite, 0, 0); + /* Now push the notifications into the queue */ nextNotify = list_head(pendingNotifies->events); while (nextNotify != NULL) @@ -1093,6 +1099,19 @@ PreCommit_Notify(void) * point in time we can still roll the transaction back. */ LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* + * On the first iteration, save the queue head position before we + * write any notifications. This is used by SignalBackends() to + * identify backends that can be advanced directly without waking + * them up. + */ + if (firstIteration) + { + pendingNotifies->queueHeadBeforeWrite = QUEUE_HEAD; + firstIteration = false; + } + asyncQueueFillWarning(); if (asyncQueueIsFull()) ereport(ERROR, @@ -1102,6 +1121,18 @@ PreCommit_Notify(void) LWLockRelease(NotifyQueueLock); } + /* + * Save the queue head after writing all our notifications. This is + * used by SignalBackends() to know where to advance idle backends to. + * We must save this now because other backends may write their own + * notifications after we release the heavyweight lock but before we + * call SignalBackends(), and we must not advance backends over those + * other notifications. + */ + LWLockAcquire(NotifyQueueLock, LW_SHARED); + pendingNotifies->queueHeadAfterWrite = QUEUE_HEAD; + LWLockRelease(NotifyQueueLock); + /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ } } @@ -1934,6 +1965,37 @@ SignalBackends(void) dshash_release_lock(channelHash, entry); } + /* + * Avoid needing to wake listening backends that are at the old queue head + * (before we wrote our notifications) that we know are not interested in + * our notifications, since otherwise they would have been marked for + * wakeup by now. Do this by advancing them directly to the new queue + * head. + */ + if (pendingNotifies != NULL) + { + QueuePosition oldHead = pendingNotifies->queueHeadBeforeWrite; + QueuePosition newHead = pendingNotifies->queueHeadAfterWrite; + + for (ProcNumber i = QUEUE_FIRST_LISTENER; + i != INVALID_PROC_NUMBER; + i = QUEUE_NEXT_LISTENER(i)) + { + QueuePosition pos; + + if (QUEUE_BACKEND_WAKEUP_PENDING(i)) + continue; + + pos = QUEUE_BACKEND_POS(i); + + if (QUEUE_POS_EQUAL(pos, oldHead) && + QUEUE_BACKEND_DBOID(i) == MyDatabaseId) + { + QUEUE_BACKEND_POS(i) = newHead; + } + } + } + queue_length = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), QUEUE_POS_PAGE(QUEUE_TAIL)); -- 2.50.1