public inbox for [email protected]
help / color / mirror / Atom feedFrom: Joel Jacobson <[email protected]>
To: Chao Li <[email protected]>
Cc: Tom Lane <[email protected]>
Cc: Thomas Munro <[email protected]>
Cc: pgsql-hackers <[email protected]>
Cc: Heikki Linnakangas <[email protected]>
Cc: Rishu Bagga <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Thu, 25 Sep 2025 23:13:31 +0200
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
<[email protected]>
<[email protected]>
<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
On Thu, Sep 25, 2025, at 10:25, Chao Li wrote:
> Hi Joel,
>
> Thanks for the patch. After reviewing it, I got a few comments.
Thanks for reviewing!
>> On Sep 25, 2025, at 04:34, Joel Jacobson <[email protected]> wrote:
> 1.
...
> Can we define the new one after STARTUP_PROGRESS_TIMEOUT to try to
> preserve the existing enum value?
Fixed.
> 2.
...
> I think we should add one more table to make the comment to align with
> last line’s comment.
Fixed.
> 3.
...
> I know compiler will auto initiate notify_latency_target to 0. But all
> other global and static variables around are explicitly initiated, so
> it would look better to assign 0 to it, which just keeps coding style
> consistent.
Fixed.
> 4.
...
> Should we avid duplicate timeout to be enabled? Now, whenever a
> duplicate notification is avoid, a new timeout is enabled. I think we
> can add another variable to remember if a timeout has been enabled.
Hmm, I don't see how duplicate timeout could happen?
Once we decide to defer the wakeup, wakeup_pending_flag remains set,
which avoids further signals from notifiers, so I don't see how we could
re-enter ProcessIncomingNotify(), since notifyInterruptPending is reset
when ProcessIncomingNotify() is called, and notifyInterruptPending is
only set when a signal is received (or set directly when in same
process).
New patch attached with 1-3 fixed.
/Joel
Attachments:
[application/octet-stream] 0001-LISTEN-NOTIFY-make-the-latency-throughput-trade-off-v2.patch (11.1K, 2-0001-LISTEN-NOTIFY-make-the-latency-throughput-trade-off-v2.patch)
download | inline diff:
From 72a6252a504f0dc90aa1236a0bc8f560fb75a227 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sat, 16 Aug 2025 19:28:18 +0200
Subject: [PATCH] LISTEN/NOTIFY: make the latency/throughput trade-off tunable
Background: Currently, listeners are signaled on every NOTIFY as soon as
possible. That minimizes perceived latency, but under bursty traffic it
leads to many redundant wakeups, heavy context switching, and degraded
throughput.
This patch adds listener-side wakeup coalescing controlled by a new GUC,
notify_latency_target. The setting defines the maximum additional
latency that is acceptable, allowing redundant wakeups to be coalesced
within the specified interval.
Each listener has a shared "wakeup pending" flag. Senders that observe
the flag is already set do nothing, effectively coalescing their NOTIFY
with the pending wakeup. The listener records the start time of each
processing cycle; if it is awakened again too soon, it defers work and
arms a timeout to re-awaken after the configured delay. The flag is
cleared when entering asyncQueueReadAllNotifications(). A new timeout
reason, NOTIFY_DEFERRED_WAKEUP_TIMEOUT, is registered at backend
startup.
This makes the inherent latency/throughput trade-off explicit and
administrator-controlled. Larger delays increase batching and reduce
wakeup churn, improving throughput at the cost of additional per-notify
latency; a delay of 0 preserves the previous behavior. Queue ordering,
visibility, and cross-database semantics are unchanged.
User-visible change: new GUC notify_latency_target (ms, default 0).
---
doc/src/sgml/config.sgml | 29 ++++++++++++
src/backend/commands/async.c | 47 ++++++++++++++++++-
src/backend/utils/init/postinit.c | 2 +
src/backend/utils/misc/guc_parameters.dat | 10 ++++
src/backend/utils/misc/postgresql.conf.sample | 1 +
src/include/commands/async.h | 1 +
src/include/utils/timeout.h | 1 +
7 files changed, 90 insertions(+), 1 deletion(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index e9b420f3ddb..f0156b52a0c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -10267,6 +10267,35 @@ COPY postgres_log FROM '/full/path/to/logfile.csv' WITH csv;
</listitem>
</varlistentry>
+ <varlistentry id="guc-notify-min-wakeup-delay" xreflabel="notify_latency_target">
+ <term><varname>notify_latency_target</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>notify_latency_target</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Sets the maximum acceptable additional latency for delivering
+ <command>LISTEN</command>/<command>NOTIFY</command>
+ notifications. During bursty periods, notifications that arrive
+ within this interval are coalesced and delivered together,
+ trading bounded extra latency for fewer wakeups and higher
+ throughput.
+ </para>
+
+ <para>
+ After a listening backend has been idle, the first
+ <command>NOTIFY</command> causes an immediately wakeup.
+ If additional notifications happen before
+ <varname>notify_latency_target</varname> has elapsed since the
+ start of that processing cycle, wakeup is deferred by one full
+ <varname>notify_latency_target</varname> interval from the point
+ of deferral. When that interval expires, the listening backend
+ wakes and catches up in a single wakeup.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-bytea-output" xreflabel="bytea_output">
<term><varname>bytea_output</varname> (<type>enum</type>)
<indexterm>
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..c2d97f731a7 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -150,6 +150,7 @@
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
+#include "utils/timeout.h"
/*
@@ -246,6 +247,7 @@ typedef struct QueueBackendStatus
Oid dboid; /* backend's database OID, or InvalidOid */
ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */
QueuePosition pos; /* backend has read queue up to here */
+ bool wakeup_pending_flag; /* for listener wakeup throttling */
} QueueBackendStatus;
/*
@@ -293,6 +295,8 @@ typedef struct AsyncQueueControl
static AsyncQueueControl *asyncQueueControl;
+static TimestampTz last_wakeup_start_time = 0;
+
#define QUEUE_HEAD (asyncQueueControl->head)
#define QUEUE_TAIL (asyncQueueControl->tail)
#define QUEUE_STOP_PAGE (asyncQueueControl->stopPage)
@@ -301,6 +305,9 @@ static AsyncQueueControl *asyncQueueControl;
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
#define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener)
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i) \
+ (asyncQueueControl->backend[i].wakeup_pending_flag)
+
/*
* The SLRU buffer area through which we access the notification queue
@@ -423,6 +430,7 @@ static bool tryAdvanceTail = false;
/* GUC parameters */
bool Trace_notify = false;
+int notify_latency_target = 0;
/* For 8 KB pages this gives 8 GB of disk space */
int max_notify_queue_pages = 1048576;
@@ -527,6 +535,7 @@ AsyncShmemInit(void)
QUEUE_BACKEND_DBOID(i) = InvalidOid;
QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+ QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i) = false;
}
}
@@ -1603,7 +1612,18 @@ SignalBackends(void)
QueuePosition pos;
Assert(pid != InvalidPid);
+
+ /*
+ * If a wakeup is already pending for this listener, do nothing. The
+ * pending signal guarantees it will wake up and process all messages
+ * up to the current queue head, including the one we just wrote. This
+ * coalesces multiple wakeups into one.
+ */
+ if (QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i))
+ continue;
+
pos = QUEUE_BACKEND_POS(i);
+
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
{
/*
@@ -1624,6 +1644,7 @@ SignalBackends(void)
continue;
}
/* OK, need to signal this one */
+ QUEUE_BACKEND_WAKEUP_PENDING_FLAG(i) = true;
pids[count] = pid;
procnos[count] = i;
count++;
@@ -1861,10 +1882,13 @@ asyncQueueReadAllNotifications(void)
AsyncQueueEntry align;
} page_buffer;
- /* Fetch current state */
+ last_wakeup_start_time = GetCurrentTimestamp();
+
+ /* Fetch current state and clear wakeup-pending flag */
LWLockAcquire(NotifyQueueLock, LW_SHARED);
/* Assert checks that we have a valid state entry */
Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+ QUEUE_BACKEND_WAKEUP_PENDING_FLAG(MyProcNumber) = false;
pos = QUEUE_BACKEND_POS(MyProcNumber);
head = QUEUE_HEAD;
LWLockRelease(NotifyQueueLock);
@@ -2189,6 +2213,27 @@ ProcessIncomingNotify(bool flush)
if (listenChannels == NIL)
return;
+ /*
+ * Throttling check: if we were last active too recently, defer. This
+ * check is safe without a lock because it's based on a backend-local
+ * timestamp.
+ */
+ if (notify_latency_target > 0 &&
+ !TimestampDifferenceExceeds(last_wakeup_start_time,
+ GetCurrentTimestamp(),
+ notify_latency_target))
+ {
+ /*
+ * Too soon. We leave wakeup_pending_flag untouched (it must be true,
+ * or we wouldn't have been signaled) to tell senders we are
+ * intentionally delaying. Arm a timer to re-awaken and process the
+ * backlog later.
+ */
+ enable_timeout_after(NOTIFY_DEFERRED_WAKEUP_TIMEOUT,
+ notify_latency_target);
+ return;
+ }
+
if (Trace_notify)
elog(DEBUG1, "ProcessIncomingNotify");
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 641e535a73c..4afd6eb7441 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -33,6 +33,7 @@
#include "catalog/pg_database.h"
#include "catalog/pg_db_role_setting.h"
#include "catalog/pg_tablespace.h"
+#include "commands/async.h"
#include "libpq/auth.h"
#include "libpq/libpq-be.h"
#include "mb/pg_wchar.h"
@@ -764,6 +765,7 @@ InitPostgres(const char *in_dbname, Oid dboid,
RegisterTimeout(TRANSACTION_TIMEOUT, TransactionTimeoutHandler);
RegisterTimeout(IDLE_SESSION_TIMEOUT, IdleSessionTimeoutHandler);
RegisterTimeout(CLIENT_CONNECTION_CHECK_TIMEOUT, ClientCheckTimeoutHandler);
+ RegisterTimeout(NOTIFY_DEFERRED_WAKEUP_TIMEOUT, HandleNotifyInterrupt);
RegisterTimeout(IDLE_STATS_UPDATE_TIMEOUT,
IdleStatsUpdateTimeoutHandler);
}
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index 6bc6be13d2a..2b23a9520bf 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -1567,6 +1567,16 @@
max => 'INT_MAX',
},
+{ name => 'notify_latency_target', type => 'int', context => 'PGC_SUSET', group => 'CLIENT_CONN_OTHER',
+ short_desc => 'Latency target for waking listeners to process NOTIFY.',
+ long_desc => 'First notify after idle wakes immediately; arrivals within the interval defer the next wakeup by one full interval and are coalesced. 0 disables.',
+ flags => 'GUC_UNIT_MS',
+ variable => 'notify_latency_target',
+ boot_val => '0',
+ min => '0',
+ max => 'INT_MAX',
+},
+
{ name => 'wal_decode_buffer_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
short_desc => 'Buffer size for reading ahead in the WAL during recovery.',
long_desc => 'Maximum distance to read ahead in the WAL to prefetch referenced data blocks.',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c36fcb9ab61..fd2150b66f9 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -766,6 +766,7 @@ autovacuum_worker_slots = 16 # autovacuum worker slots to allocate
#lock_timeout = 0 # in milliseconds, 0 is disabled
#idle_in_transaction_session_timeout = 0 # in milliseconds, 0 is disabled
#idle_session_timeout = 0 # in milliseconds, 0 is disabled
+#notify_latency_target = 0 # in milliseconds, 0 is disabled
#bytea_output = 'hex' # hex, escape
#xmlbinary = 'base64'
#xmloption = 'content'
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..ed27456e487 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -16,6 +16,7 @@
#include <signal.h>
extern PGDLLIMPORT bool Trace_notify;
+extern PGDLLIMPORT int notify_latency_target;
extern PGDLLIMPORT int max_notify_queue_pages;
extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
diff --git a/src/include/utils/timeout.h b/src/include/utils/timeout.h
index 7b19beafdc9..ea720b05043 100644
--- a/src/include/utils/timeout.h
+++ b/src/include/utils/timeout.h
@@ -36,6 +36,7 @@ typedef enum TimeoutId
IDLE_STATS_UPDATE_TIMEOUT,
CLIENT_CONNECTION_CHECK_TIMEOUT,
STARTUP_PROGRESS_TIMEOUT,
+ NOTIFY_DEFERRED_WAKEUP_TIMEOUT,
/* First user-definable timeout reason */
USER_TIMEOUT,
/* Maximum number of timeout reasons */
--
2.50.1
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: Optimize LISTEN/NOTIFY
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox