From 72a6252a504f0dc90aa1236a0bc8f560fb75a227 Mon Sep 17 00:00:00 2001 From: Joel Jacobson Date: Sat, 16 Aug 2025 19:28:18 +0200 Subject: [PATCH] LISTEN/NOTIFY: make the latency/throughput trade-off tunable Background: Currently, listeners are signaled on every NOTIFY as soon as possible. That minimizes perceived latency, but under bursty traffic it leads to many redundant wakeups, heavy context switching, and degraded throughput. This patch adds listener-side wakeup coalescing controlled by a new GUC, notify_latency_target. The setting defines the maximum additional latency that is acceptable, allowing redundant wakeups to be coalesced within the specified interval. Each listener has a shared "wakeup pending" flag. Senders that observe the flag is already set do nothing, effectively coalescing their NOTIFY with the pending wakeup. The listener records the start time of each processing cycle; if it is awakened again too soon, it defers work and arms a timeout to re-awaken after the configured delay. The flag is cleared when entering asyncQueueReadAllNotifications(). A new timeout reason, NOTIFY_DEFERRED_WAKEUP_TIMEOUT, is registered at backend startup. This makes the inherent latency/throughput trade-off explicit and administrator-controlled. Larger delays increase batching and reduce wakeup churn, improving throughput at the cost of additional per-notify latency; a delay of 0 preserves the previous behavior. Queue ordering, visibility, and cross-database semantics are unchanged. User-visible change: new GUC notify_latency_target (ms, default 0). --- doc/src/sgml/config.sgml | 29 ++++++++++++ src/backend/commands/async.c | 47 ++++++++++++++++++- src/backend/utils/init/postinit.c | 2 + src/backend/utils/misc/guc_parameters.dat | 10 ++++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/commands/async.h | 1 + src/include/utils/timeout.h | 1 + 7 files changed, 90 insertions(+), 1 deletion(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index e9b420f3ddb..f0156b52a0c 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -10267,6 +10267,35 @@ COPY postgres_log FROM '/full/path/to/logfile.csv' WITH csv; + + notify_latency_target (integer) + + notify_latency_target configuration parameter + + + + + Sets the maximum acceptable additional latency for delivering + LISTEN/NOTIFY + notifications. During bursty periods, notifications that arrive + within this interval are coalesced and delivered together, + trading bounded extra latency for fewer wakeups and higher + throughput. + + + + After a listening backend has been idle, the first + NOTIFY causes an immediately wakeup. + If additional notifications happen before + notify_latency_target has elapsed since the + start of that processing cycle, wakeup is deferred by one full + notify_latency_target interval from the point + of deferral. When that interval expires, the listening backend + wakes and catches up in a single wakeup. + + + + bytea_output (enum) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..c2d97f731a7 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -150,6 +150,7 @@ #include "utils/ps_status.h" #include "utils/snapmgr.h" #include "utils/timestamp.h" +#include "utils/timeout.h" /* @@ -246,6 +247,7 @@ typedef struct QueueBackendStatus Oid dboid; /* backend's database OID, or InvalidOid */ ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ QueuePosition pos; /* backend has read queue up to here */ + bool wakeup_pending_flag; /* for listener wakeup throttling */ } QueueBackendStatus; /* @@ -293,6 +295,8 @@ typedef struct AsyncQueueControl static AsyncQueueControl *asyncQueueControl; +static TimestampTz last_wakeup_start_time = 0; + #define QUEUE_HEAD (asyncQueueControl->head) #define QUEUE_TAIL (asyncQueueControl->tail) #define QUEUE_STOP_PAGE (asyncQueueControl->stopPage) @@ -301,6 +305,9 @@ static AsyncQueueControl *asyncQueueControl; #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) +#define QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i) \ + (asyncQueueControl->backend[i].wakeup_pending_flag) + /* * The SLRU buffer area through which we access the notification queue @@ -423,6 +430,7 @@ static bool tryAdvanceTail = false; /* GUC parameters */ bool Trace_notify = false; +int notify_latency_target = 0; /* For 8 KB pages this gives 8 GB of disk space */ int max_notify_queue_pages = 1048576; @@ -527,6 +535,7 @@ AsyncShmemInit(void) QUEUE_BACKEND_DBOID(i) = InvalidOid; QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); + QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i) = false; } } @@ -1603,7 +1612,18 @@ SignalBackends(void) QueuePosition pos; Assert(pid != InvalidPid); + + /* + * If a wakeup is already pending for this listener, do nothing. The + * pending signal guarantees it will wake up and process all messages + * up to the current queue head, including the one we just wrote. This + * coalesces multiple wakeups into one. + */ + if (QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i)) + continue; + pos = QUEUE_BACKEND_POS(i); + if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) { /* @@ -1624,6 +1644,7 @@ SignalBackends(void) continue; } /* OK, need to signal this one */ + QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i) = true; pids[count] = pid; procnos[count] = i; count++; @@ -1861,10 +1882,13 @@ asyncQueueReadAllNotifications(void) AsyncQueueEntry align; } page_buffer; - /* Fetch current state */ + last_wakeup_start_time = GetCurrentTimestamp(); + + /* Fetch current state and clear wakeup-pending flag */ LWLockAcquire(NotifyQueueLock, LW_SHARED); /* Assert checks that we have a valid state entry */ Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber)); + QUEUE_BACKEND_WAKEUP_PENDING_FLAG(MyProcNumber) = false; pos = QUEUE_BACKEND_POS(MyProcNumber); head = QUEUE_HEAD; LWLockRelease(NotifyQueueLock); @@ -2189,6 +2213,27 @@ ProcessIncomingNotify(bool flush) if (listenChannels == NIL) return; + /* + * Throttling check: if we were last active too recently, defer. This + * check is safe without a lock because it's based on a backend-local + * timestamp. + */ + if (notify_latency_target > 0 && + !TimestampDifferenceExceeds(last_wakeup_start_time, + GetCurrentTimestamp(), + notify_latency_target)) + { + /* + * Too soon. We leave wakeup_pending_flag untouched (it must be true, + * or we wouldn't have been signaled) to tell senders we are + * intentionally delaying. Arm a timer to re-awaken and process the + * backlog later. + */ + enable_timeout_after(NOTIFY_DEFERRED_WAKEUP_TIMEOUT, + notify_latency_target); + return; + } + if (Trace_notify) elog(DEBUG1, "ProcessIncomingNotify"); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 641e535a73c..4afd6eb7441 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -33,6 +33,7 @@ #include "catalog/pg_database.h" #include "catalog/pg_db_role_setting.h" #include "catalog/pg_tablespace.h" +#include "commands/async.h" #include "libpq/auth.h" #include "libpq/libpq-be.h" #include "mb/pg_wchar.h" @@ -764,6 +765,7 @@ InitPostgres(const char *in_dbname, Oid dboid, RegisterTimeout(TRANSACTION_TIMEOUT, TransactionTimeoutHandler); RegisterTimeout(IDLE_SESSION_TIMEOUT, IdleSessionTimeoutHandler); RegisterTimeout(CLIENT_CONNECTION_CHECK_TIMEOUT, ClientCheckTimeoutHandler); + RegisterTimeout(NOTIFY_DEFERRED_WAKEUP_TIMEOUT, HandleNotifyInterrupt); RegisterTimeout(IDLE_STATS_UPDATE_TIMEOUT, IdleStatsUpdateTimeoutHandler); } diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 6bc6be13d2a..2b23a9520bf 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1567,6 +1567,16 @@ max => 'INT_MAX', }, +{ name => 'notify_latency_target', type => 'int', context => 'PGC_SUSET', group => 'CLIENT_CONN_OTHER', + short_desc => 'Latency target for waking listeners to process NOTIFY.', + long_desc => 'First notify after idle wakes immediately; arrivals within the interval defer the next wakeup by one full interval and are coalesced. 0 disables.', + flags => 'GUC_UNIT_MS', + variable => 'notify_latency_target', + boot_val => '0', + min => '0', + max => 'INT_MAX', +}, + { name => 'wal_decode_buffer_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY', short_desc => 'Buffer size for reading ahead in the WAL during recovery.', long_desc => 'Maximum distance to read ahead in the WAL to prefetch referenced data blocks.', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c36fcb9ab61..fd2150b66f9 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -766,6 +766,7 @@ autovacuum_worker_slots = 16 # autovacuum worker slots to allocate #lock_timeout = 0 # in milliseconds, 0 is disabled #idle_in_transaction_session_timeout = 0 # in milliseconds, 0 is disabled #idle_session_timeout = 0 # in milliseconds, 0 is disabled +#notify_latency_target = 0 # in milliseconds, 0 is disabled #bytea_output = 'hex' # hex, escape #xmlbinary = 'base64' #xmloption = 'content' diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..ed27456e487 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -16,6 +16,7 @@ #include extern PGDLLIMPORT bool Trace_notify; +extern PGDLLIMPORT int notify_latency_target; extern PGDLLIMPORT int max_notify_queue_pages; extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending; diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h index 7b19beafdc9..ea720b05043 100644 --- a/src/include/utils/timeout.h +++ b/src/include/utils/timeout.h @@ -36,6 +36,7 @@ typedef enum TimeoutId IDLE_STATS_UPDATE_TIMEOUT, CLIENT_CONNECTION_CHECK_TIMEOUT, STARTUP_PROGRESS_TIMEOUT, + NOTIFY_DEFERRED_WAKEUP_TIMEOUT, /* First user-definable timeout reason */ USER_TIMEOUT, /* Maximum number of timeout reasons */ -- 2.50.1