From 17777283bda5fa41b430e4f71a7246d3f04a94bf Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Tue, 22 Jul 2025 10:32:34 +0200 Subject: [PATCH 1/2] Optimize LISTEN/NOTIFY signaling with a lock-free atomic state machine This commit introduces a powerful pattern for modernizing inter-process communication by refactoring the LISTEN/NOTIFY subsystem to use a lock-free, atomic finite state machine (FSM). This directly addresses the historical lack of safe, efficient state synchronization primitives. Previously, if multiple transactions sent notifications concurrently, each would unconditionally attempt to signal all listening backends. This resulted in a storm of superfluous signals to listeners that were already pending a wakeup, causing unnecessary system call overhead. By introducing an atomic per-backend state (IDLE, SIGNALLED, PROCESSING) in shared memory and manipulated via compare-and-swap (CAS), this inefficiency is eliminated. A notifier can now atomically transition a listener's state from IDLE to SIGNALLED, ensuring that only the first notifier for a given idle listener dispatches a wakeup. The FSM also robustly handles race conditions where new notifications arrive while a listener is PROCESSING, guaranteeing no work is ever missed. This FSM pattern is a generalizable solution for managing concurrency in PostgreSQL. By modeling inter-process interactions as explicit state transitions, we can build more robust and performant subsystems. This commit demonstrates the pattern's effectiveness within async.c, and by cleanly solving the state management problem first, it enables a subsequent, trivial optimization of the wakeup mechanism itself. --- src/backend/commands/async.c | 209 ++++++++++++++++++++++++++++++----- src/backend/tcop/postgres.c | 4 +- src/include/commands/async.h | 4 +- 3 files changed, 185 insertions(+), 32 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..ae20017af9b 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -150,8 +150,19 @@ #include "utils/ps_status.h" #include "utils/snapmgr.h" #include "utils/timestamp.h" +#include "port/atomics.h" +/* + * Async notification state machine states + */ +typedef enum AsyncListenerState +{ + ASYNC_STATE_IDLE = 0, /* Backend is idle, waiting for signal */ + ASYNC_STATE_SIGNALLED = 1, /* Backend has been signaled, will process soon */ + ASYNC_STATE_PROCESSING = 2 /* Backend is actively processing notifications */ +} AsyncListenerState; + /* * Maximum size of a NOTIFY payload, including terminating NULL. This * must be kept small enough so that a notification message fits on one @@ -246,6 +257,7 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + pg_atomic_uint32 state; /* async state machine state */ } QueueBackendStatus; /* @@ -301,6 +313,7 @@ static AsyncQueueControl *asyncQueueControl; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_STATE(i) (asyncQueueControl->backend[i].state) /* * The SLRU buffer area through which we access the notification queue @@ -405,12 +418,10 @@ static NotificationList *pendingNotifies = NULL; /* * Inbound notifications are initially processed by HandleNotifyInterrupt(), - * called from inside a signal handler. That just sets the - * notifyInterruptPending flag and sets the process + * called from inside a signal handler. That just sets the process * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to * actually deal with the interrupt. */ -volatile sig_atomic_t notifyInterruptPending = false; /* True if we've registered an on_shmem_exit cleanup */ static bool unlistenExitRegistered = false; @@ -527,6 +538,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + pg_atomic_init_u32(&QUEUE_BACKEND_STATE(i), ASYNC_STATE_IDLE); } } @@ -1099,6 +1111,8 @@ Exec_ListenPreCommit(void) QUEUE_BACKEND_POS(MyProcNumber) = max; QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; + /* Initialize the atomic state to IDLE */ + pg_atomic_write_u32(&QUEUE_BACKEND_STATE(MyProcNumber), ASYNC_STATE_IDLE); /* Insert backend into list of listeners at correct position */ if (prevListener != INVALID_PROC_NUMBER) { @@ -1242,6 +1256,8 @@ asyncQueueUnregister(void) /* Mark our entry as invalid */ QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid; QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid; + /* Reset state to IDLE to prevent zombie listeners */ + pg_atomic_write_u32(&QUEUE_BACKEND_STATE(MyProcNumber), ASYNC_STATE_IDLE); /* and remove it from the list */ if (QUEUE_FIRST_LISTENER == MyProcNumber) QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber); @@ -1634,25 +1650,84 @@ SignalBackends(void) for (int i = 0; i < count; i++) { int32 pid = pids[i]; + ProcNumber procno = procnos[i]; + uint32 expected; + bool signal_needed = false; /* - * If we are signaling our own process, no need to involve the kernel; - * just set the flag directly. + * Implement state machine transitions for the notifier. + * We use a loop to handle race conditions where the state + * changes between our read and the CAS operation. */ - if (pid == MyProcPid) + uint32 current_state = pg_atomic_read_membarrier_u32(&QUEUE_BACKEND_STATE(procno)); + + switch (current_state) { - notifyInterruptPending = true; - continue; + case ASYNC_STATE_IDLE: + /* Try to transition from IDLE to SIGNALLED */ + expected = ASYNC_STATE_IDLE; + if (pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(procno), + &expected, + ASYNC_STATE_SIGNALLED)) + { + /* Success - need to send signal */ + signal_needed = true; + if (Trace_notify) + elog(DEBUG1, "SignalBackends: transitioned backend %d from IDLE to SIGNALLED", pid); + } + /* Another notifier already signaled - we're done */ + break; + + case ASYNC_STATE_SIGNALLED: + /* Backend is already signaled - nothing to do */ + if (Trace_notify) + elog(DEBUG1, "SignalBackends: backend %d already in SIGNALLED state, skipping", pid); + break; + + case ASYNC_STATE_PROCESSING: + /* Try to transition from PROCESSING to SIGNALLED */ + expected = ASYNC_STATE_PROCESSING; + if (pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(procno), + &expected, + ASYNC_STATE_SIGNALLED)) + { + /* Success - need to send signal for re-scan */ + signal_needed = true; + if (Trace_notify) + elog(DEBUG1, "SignalBackends: transitioned backend %d from PROCESSING to SIGNALLED for re-scan", pid); + break; + } + /* Another notifier already signaled - we're done */ + break; + + default: + /* Should never happen */ + elog(ERROR, "unexpected async state %u for backend %d", + current_state, pid); } - /* - * Note: assuming things aren't broken, a signal failure here could - * only occur if the target backend exited since we released - * NotifyQueueLock; which is unlikely but certainly possible. So we - * just log a low-level debug message if it happens. - */ - if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0) - elog(DEBUG3, "could not signal backend with PID %d: %m", pid); + /* Send signal if needed */ + if (signal_needed) + { + /* + * For our own process, no need to involve the kernel + */ + if (pid == MyProcPid) + { + SetLatch(MyLatch); + } + else + { + /* + * Note: assuming things aren't broken, a signal failure here could + * only occur if the target backend exited since we released + * NotifyQueueLock; which is unlikely but certainly possible. So we + * just log a low-level debug message if it happens. + */ + if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procno) < 0) + elog(DEBUG3, "could not signal backend with PID %d: %m", pid); + } + } } pfree(pids); @@ -1805,20 +1880,43 @@ HandleNotifyInterrupt(void) { /* * Note: this is called by a SIGNAL HANDLER. You must be very wary what - * you do here. + * you do here. The actual state transition has already been done by + * the notifier before sending the signal, so we only need to set the + * latch to ensure the backend wakes up and processes the notification. */ - /* signal that work needs to be done */ - notifyInterruptPending = true; - /* make sure the event is processed in due course */ SetLatch(MyLatch); } +/* + * IsNotifyInterruptPending + * + * Check if there's a pending notify interrupt for this backend + */ +bool +IsNotifyInterruptPending(void) +{ + uint32 state; + + /* If not registered as a listener, no notifications are pending */ + if (!amRegisteredListener) + return false; + + /* + * Read the current state with a memory barrier to ensure we see + * the most recent value written by notifiers. + */ + state = pg_atomic_read_membarrier_u32(&QUEUE_BACKEND_STATE(MyProcNumber)); + + /* Notification is pending if state is SIGNALLED */ + return (state == ASYNC_STATE_SIGNALLED); +} + /* * ProcessNotifyInterrupt * - * This is called if we see notifyInterruptPending set, just before + * This is called if we see a notification interrupt is pending, just before * transmitting ReadyForQuery at the end of a frontend command, and * also if a notify signal occurs while reading from the frontend. * HandleNotifyInterrupt() will cause the read to be interrupted @@ -1837,7 +1935,7 @@ ProcessNotifyInterrupt(bool flush) return; /* not really idle */ /* Loop in case another signal arrives while sending messages */ - while (notifyInterruptPending) + while (IsNotifyInterruptPending()) ProcessIncomingNotify(flush); } @@ -2182,28 +2280,81 @@ asyncQueueAdvanceTail(void) static void ProcessIncomingNotify(bool flush) { - /* We *must* reset the flag */ - notifyInterruptPending = false; + uint32 expected; - /* Do nothing else if we aren't actively listening */ + /* Do nothing if we aren't actively listening */ if (listenChannels == NIL) return; + /* + * Perform state transition from SIGNALLED to PROCESSING. + * This is the "acquire lock" operation for the listener. + */ + expected = ASYNC_STATE_SIGNALLED; + if (!pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(MyProcNumber), + &expected, + ASYNC_STATE_PROCESSING)) + { + /* + * CAS failed - the state was not SIGNALLED. This should not happen + * as ProcessNotifyInterrupt only calls us when state is SIGNALLED. + */ + elog(ERROR, "unexpected async state %u in ProcessIncomingNotify, expected SIGNALLED", + expected); + } + if (Trace_notify) - elog(DEBUG1, "ProcessIncomingNotify"); + elog(DEBUG1, "ProcessIncomingNotify: transitioned to PROCESSING"); set_ps_display("notify interrupt"); /* - * We must run asyncQueueReadAllNotifications inside a transaction, else - * bad things happen if it gets an error. - */ + * We must run asyncQueueReadAllNotifications inside a transaction, else + * bad things happen if it gets an error. + */ StartTransactionCommand(); asyncQueueReadAllNotifications(); CommitTransactionCommand(); + /* + * Try to transition from PROCESSING back to IDLE. + * This is the "release lock" operation for the listener. + */ + expected = ASYNC_STATE_PROCESSING; + if (pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(MyProcNumber), + &expected, + ASYNC_STATE_IDLE)) + { + /* Success - we're done, transitioned to IDLE */ + if (Trace_notify) + elog(DEBUG1, "ProcessIncomingNotify: transitioned to IDLE"); + } + else + { + /* CAS failed - check what the new state is */ + if (expected == ASYNC_STATE_SIGNALLED) + { + /* + * A notifier set our state to SIGNALLED while we were processing. + * We are done with this batch of work, but we know there is more + * to do. Rather than loop here and risk starving other backend + * activity, we set our own latch to ensure we are woken up again + * to re-process, and then exit. The state is left as SIGNALLED. + */ + if (Trace_notify) + elog(DEBUG1, "ProcessIncomingNotify: signalled while processing"); + SetLatch(MyLatch); + } + else + { + /* Any other state is an error */ + elog(ERROR, "unexpected async state %u when trying to return to IDLE", + expected); + } + } + /* * If this isn't an end-of-command case, we must flush the notify messages * to ensure frontend gets them promptly. diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index a297606cdd7..e1d80cbefea 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -512,7 +512,7 @@ ProcessClientReadInterrupt(bool blocked) ProcessCatchupInterrupt(); /* Process notify interrupts, if any */ - if (notifyInterruptPending) + if (IsNotifyInterruptPending()) ProcessNotifyInterrupt(true); } else if (ProcDiePending) @@ -4604,7 +4604,7 @@ PostgresMain(const char *dbname, const char *username) * were received during the just-finished transaction, they'll * be seen by the client before ReadyForQuery is. */ - if (notifyInterruptPending) + if (IsNotifyInterruptPending()) ProcessNotifyInterrupt(false); /* diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..7f2e0ac0b9f 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -17,7 +17,6 @@ extern PGDLLIMPORT bool Trace_notify; extern PGDLLIMPORT int max_notify_queue_pages; -extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending; extern Size AsyncShmemSize(void); extern void AsyncShmemInit(void); @@ -46,4 +45,7 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); +/* check if notification interrupt is pending */ +extern bool IsNotifyInterruptPending(void); + #endif /* ASYNC_H */ -- 2.47.1