public inbox for [email protected]
help / color / mirror / Atom feedFrom: Joel Jacobson <[email protected]>
To: Chao Li <[email protected]>
To: Arseniy Mukhin <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Thu, 06 Nov 2025 09:33:18 +0100
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
<[email protected]>
<[email protected]>
<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAFY6G8dap-bCnAnMG-2Gzew8yv2Vbi9gsx9+yszKMmd57ygfvA@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAE7r3MLivh1sHWF06hrVXkiQbw-KChPcQsh+9CheXprm5vRVMQ@mail.gmail.com>
<[email protected]>
<CAE7r3MK-3AOdh1mpZ8hw9h6F_i0D5RMoAy7CttnfCJRpB8GJDA@mail.gmail.com>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<CAE7r3MJxxycYwDFJdTJ+HmzeNJ_JGKyr4HpT5njN1ouVX56OUg@mail.gmail.com>
<[email protected]>
<CAE7r3MJ0VoAJzdLzX0dgfPJpBJxW+wg_pYaCi6mQJi47+qukhg@mail.gmail.com>
<[email protected]>
On Thu, Nov 6, 2025, at 00:21, Chao Li wrote:
> That’s what we don’t know. We now lack a performance test for
> evaluating how “direct advancement” efficiently helps if it only
> handles sleeping listeners. So what I was suggesting is that we should
> first create some tests, maybe also add a few more statistics, so that
> we can evaluate different solutions. If a simple implementation that
> only handles sleeping listeners would have performed good enough, of
> course we can take it; otherwise we may need to either pursue a better
> solution.
Just for the sake of evaluating this patch, I've added instrumentation
of async.c that increments counters for the different branches in
asyncQueueReadAllNotifications and SignalBackends. (I'm just using
atomics without any locking, but should be fine since this is just
statistics.)
pg_get_async_wakeup_stats-patch.txt adds the SQL-callable
catalog functions pg_reset_async_wakeup_stats() and
pg_get_async_wakeup_stats(), which should not be included in the patch,
they are just for evaluating. It can be applied on top of the v23 patch.
Below is just an example of how to compile and an arbitrary mix of
command line options. I've tired a lot of combinations, and we seem to
be holding up fine in all cases I've tried.
async-notify-test-5.c will detect if the pg_*_async_wakeup_stats() functions
exists, and only show the extra histograms if so.
% gcc -Wall -Wextra -O2 -pthread -I/Users/joel/pg19/include/postgresql/server -I/Users/joel/pg19/include -o async-notify-test-5 async-notify-test-5.c -L/Users/joel/pg19/lib -lpq -pthread -lm
% ./async-notify-test-5 --listeners 10 --notifiers 10 --channels 10 --sleep 0.1 --sleep-exp 2.0 --batch 10
10 s: 38100 sent (3690/s), 381000 received (36900/s)
Notification Latency Distribution:
0.00-0.01ms 0 (0.0%) avg: 0.000ms
0.01-0.10ms # 23 (0.0%) avg: 0.092ms
0.10-1.00ms ####### 298002 (78.2%) avg: 0.563ms
1.00-10.00ms ## 82975 (21.8%) avg: 1.506ms
10.00-100.00ms 0 (0.0%) avg: 0.000ms
>100.00ms 0 (0.0%) avg: 0.000ms
asyncQueueReadAllNotifications Statistics:
necessary_wakeups ######## 35469 (88.2%)
unnecessary_wakeups # 4762 (11.8%)
SignalBackends Statistics:
signaled_needed # 34983 (9.5%)
avoided_wakeups ######## 325874 (88.9%)
already_advancing # 3 (0.0%)
signaled_uncertain # 5347 (1.5%)
already_ahead # 375 (0.1%)
Thoughts on how to interpret results:
- Is the notification latency distribution good enough, for the given
workload? Naturally, if the workload is too high, we cannot expect to
ever achieve sub millisecond latency anyway, so it's a judgement.
- Even if the "unnecessary_wakeups" is high relative to
"necessary_wakeups", it's not necessarily a problem, if the latency
distribution still is good enough. We should also think about the
ratio between "unnecessary_wakeups" and "avoided_wakeups", since even
if "unnecessary_wakeups" is high in absolute numbers, if the
"avoided_wakeups" is magnitudes larger, that means the cost of the
context switching has been dramatically reduced already. I think there
is always a risk when optimizing to forget what problem one was trying
to solve initially, usually a bottleneck. When the bottleneck is gone
and is somewhere else instead, then the efforts should IMO usually be
spent elsewhere, especially if more optimizations would need a
insignificant increase of code complexity.
- It's the "signaled_uncertain" that primarily contribute to
"unnecessary_wakeups".
/Joel
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dac12f8124..7e8e0b14f42 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -137,6 +137,7 @@
#include <signal.h>
#include <string.h>
+#include "access/htup_details.h"
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
@@ -332,6 +333,13 @@ typedef struct AsyncQueueControl
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
dsa_handle channelHashDSA;
dshash_table_handle channelHashDSH;
+ pg_atomic_uint32 signaledNeeded; /* listening to some of the channels; signal needed */
+ pg_atomic_uint32 avoidedWakeups; /* directly advanced */
+ pg_atomic_uint32 alreadyAdvancing; /* already advancing its position */
+ pg_atomic_uint32 signaledUncertain; /* signaled due to uncertain need */
+ pg_atomic_uint32 alreadyAhead; /* already ahead, no action needed */
+ pg_atomic_uint32 necessaryWakeups; /* wakeups where at least one message was interesting */
+ pg_atomic_uint32 unnecessaryWakeups; /* wakeups where no messages were interesting */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
} AsyncQueueControl;
@@ -517,7 +525,8 @@ static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer,
- Snapshot snapshot);
+ Snapshot snapshot,
+ bool *interested);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(bool flush);
static bool AsyncExistsPendingNotify(Notification *n);
@@ -683,6 +692,13 @@ AsyncShmemInit(void)
asyncQueueControl->lastQueueFillWarn = 0;
asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+ pg_atomic_init_u32(&asyncQueueControl->signaledNeeded, 0);
+ pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0);
+ pg_atomic_init_u32(&asyncQueueControl->alreadyAdvancing, 0);
+ pg_atomic_init_u32(&asyncQueueControl->signaledUncertain, 0);
+ pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0);
+ pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0);
+ pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0);
for (int i = 0; i < MaxBackends; i++)
{
@@ -997,6 +1013,81 @@ pg_listening_channels(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+/*
+ * SQL function: return statistics about NOTIFY wakeups
+ *
+ * This function returns a single row with:
+ * - necessary_wakeups: wakeups where at least one message was interesting
+ * - unnecessary_wakeups: wakeups where no messages were interesting
+ * - direct_advancements_success: directly advanced
+ * - already_advancing: already advancing its position
+ * - signaled_uncertain: signaled due to uncertain need
+ * - already_ahead: already ahead, no action needed
+ */
+Datum
+pg_get_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+ TupleDesc tupdesc;
+ Datum values[7];
+ bool nulls[7];
+ HeapTuple tuple;
+ uint32 signaled_needed;
+ uint32 direct_advancements_success;
+ uint32 already_advancing;
+ uint32 signaled_uncertain;
+ uint32 already_ahead;
+ uint32 necessary_wakeups;
+ uint32 unnecessary_wakeups;
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context that cannot accept type record")));
+
+ /* Read the atomic counters */
+ signaled_needed = pg_atomic_read_u32(&asyncQueueControl->signaledNeeded);
+ direct_advancements_success = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups);
+ already_advancing = pg_atomic_read_u32(&asyncQueueControl->alreadyAdvancing);
+ signaled_uncertain = pg_atomic_read_u32(&asyncQueueControl->signaledUncertain);
+ already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead);
+ necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups);
+ unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups);
+
+ /* Fill in the values */
+ memset(nulls, 0, sizeof(nulls));
+ values[0] = Int64GetDatum((int64) signaled_needed);
+ values[1] = Int64GetDatum((int64) direct_advancements_success);
+ values[2] = Int64GetDatum((int64) already_advancing);
+ values[3] = Int64GetDatum((int64) signaled_uncertain);
+ values[4] = Int64GetDatum((int64) already_ahead);
+ values[5] = Int64GetDatum((int64) necessary_wakeups);
+ values[6] = Int64GetDatum((int64) unnecessary_wakeups);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * SQL function: reset NOTIFY wakeup statistics
+ *
+ * This function resets all the async wakeup counters to zero.
+ */
+Datum
+pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+ /* Reset all the atomic counters to zero */
+ pg_atomic_write_u32(&asyncQueueControl->signaledNeeded, 0);
+ pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0);
+ pg_atomic_write_u32(&asyncQueueControl->alreadyAdvancing, 0);
+ pg_atomic_write_u32(&asyncQueueControl->signaledUncertain, 0);
+ pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0);
+ pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0);
+ pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
+ PG_RETURN_VOID();
+}
+
/*
* Async_UnlistenOnExit
*
@@ -2014,6 +2105,7 @@ SignalBackends(void)
Assert(pid != InvalidPid);
+ pg_atomic_fetch_add_u32(&asyncQueueControl->signaledNeeded, 1);
QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
pids[count] = pid;
procnos[count] = i;
@@ -2049,7 +2141,14 @@ SignalBackends(void)
* currently advancing its position.
*/
if (!QUEUE_BACKEND_ADVANCING_POS(i))
+ {
QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+ pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1);
+ }
+ else
+ {
+ pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAdvancing, 1);
+ }
}
else if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
{
@@ -2060,6 +2159,7 @@ SignalBackends(void)
*/
Assert(pid != InvalidPid);
+ pg_atomic_fetch_add_u32(&asyncQueueControl->signaledUncertain, 1);
QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
pids[count] = pid;
procnos[count] = i;
@@ -2071,6 +2171,7 @@ SignalBackends(void)
* The backend is already ahead of the notifications we wrote.
* No need to do anything.
*/
+ pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1);
Assert(QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos));
}
}
@@ -2301,6 +2402,7 @@ asyncQueueReadAllNotifications(void)
volatile QueuePosition pos;
QueuePosition head;
Snapshot snapshot;
+ bool interested = false;
/* page_buffer must be adequately aligned, so use a union */
union
@@ -2435,7 +2537,8 @@ asyncQueueReadAllNotifications(void)
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
page_buffer.buf,
- snapshot);
+ snapshot,
+ &interested);
} while (!reachedStop);
}
PG_FINALLY();
@@ -2450,6 +2553,11 @@ asyncQueueReadAllNotifications(void)
}
PG_END_TRY();
+ if (interested)
+ pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1);
+ else
+ pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1);
+
/* Done with snapshot */
UnregisterSnapshot(snapshot);
}
@@ -2474,7 +2582,8 @@ static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer,
- Snapshot snapshot)
+ Snapshot snapshot,
+ bool *interested)
{
bool reachedStop = false;
bool reachedEndOfPage;
@@ -2535,6 +2644,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
char *payload = qe->data + strlen(channel) + 1;
NotifyMyFrontEnd(channel, payload, qe->srcPid);
+
+ /* Mark were interested in at least one message */
+ *interested = true;
}
}
else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..0bbd7db39c7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8571,7 +8571,18 @@
proname => 'pg_notification_queue_usage', provolatile => 'v',
proparallel => 'r', prorettype => 'float8', proargtypes => '',
prosrc => 'pg_notification_queue_usage' },
-
+{ oid => '9315',
+ descr => 'get statistics about NOTIFY wakeups',
+ proname => 'pg_get_async_wakeup_stats', provolatile => 'v',
+ proparallel => 'r', prorettype => 'record', proargtypes => '',
+ proallargtypes => '{int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o}',
+ proargnames => '{signaled_needed,avoided_wakeups,already_advancing,signaled_uncertain,already_ahead,necessary_wakeups,unnecessary_wakeups}',
+ prosrc => 'pg_get_async_wakeup_stats' },
+{ oid => '9316',
+ descr => 'reset statistics about NOTIFY wakeups',
+ proname => 'pg_reset_async_wakeup_stats', provolatile => 'v',
+ proparallel => 'r', prorettype => 'void', proargtypes => '',
+ prosrc => 'pg_reset_async_wakeup_stats' },
# shared memory usage
{ oid => '5052', descr => 'allocations from the main shared memory segment',
proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',
Attachments:
[application/octet-stream] async-notify-test-5.c (24.9K, 2-async-notify-test-5.c)
download
[text/plain] pg_get_async_wakeup_stats-patch.txt (9.2K, 3-pg_get_async_wakeup_stats-patch.txt)
download | inline diff:
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dac12f8124..7e8e0b14f42 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -137,6 +137,7 @@
#include <signal.h>
#include <string.h>
+#include "access/htup_details.h"
#include "access/parallel.h"
#include "access/slru.h"
#include "access/transam.h"
@@ -332,6 +333,13 @@ typedef struct AsyncQueueControl
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
dsa_handle channelHashDSA;
dshash_table_handle channelHashDSH;
+ pg_atomic_uint32 signaledNeeded; /* listening to some of the channels; signal needed */
+ pg_atomic_uint32 avoidedWakeups; /* directly advanced */
+ pg_atomic_uint32 alreadyAdvancing; /* already advancing its position */
+ pg_atomic_uint32 signaledUncertain; /* signaled due to uncertain need */
+ pg_atomic_uint32 alreadyAhead; /* already ahead, no action needed */
+ pg_atomic_uint32 necessaryWakeups; /* wakeups where at least one message was interesting */
+ pg_atomic_uint32 unnecessaryWakeups; /* wakeups where no messages were interesting */
QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
} AsyncQueueControl;
@@ -517,7 +525,8 @@ static void asyncQueueReadAllNotifications(void);
static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer,
- Snapshot snapshot);
+ Snapshot snapshot,
+ bool *interested);
static void asyncQueueAdvanceTail(void);
static void ProcessIncomingNotify(bool flush);
static bool AsyncExistsPendingNotify(Notification *n);
@@ -683,6 +692,13 @@ AsyncShmemInit(void)
asyncQueueControl->lastQueueFillWarn = 0;
asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+ pg_atomic_init_u32(&asyncQueueControl->signaledNeeded, 0);
+ pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0);
+ pg_atomic_init_u32(&asyncQueueControl->alreadyAdvancing, 0);
+ pg_atomic_init_u32(&asyncQueueControl->signaledUncertain, 0);
+ pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0);
+ pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0);
+ pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0);
for (int i = 0; i < MaxBackends; i++)
{
@@ -997,6 +1013,81 @@ pg_listening_channels(PG_FUNCTION_ARGS)
SRF_RETURN_DONE(funcctx);
}
+/*
+ * SQL function: return statistics about NOTIFY wakeups
+ *
+ * This function returns a single row with:
+ * - necessary_wakeups: wakeups where at least one message was interesting
+ * - unnecessary_wakeups: wakeups where no messages were interesting
+ * - direct_advancements_success: directly advanced
+ * - already_advancing: already advancing its position
+ * - signaled_uncertain: signaled due to uncertain need
+ * - already_ahead: already ahead, no action needed
+ */
+Datum
+pg_get_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+ TupleDesc tupdesc;
+ Datum values[7];
+ bool nulls[7];
+ HeapTuple tuple;
+ uint32 signaled_needed;
+ uint32 direct_advancements_success;
+ uint32 already_advancing;
+ uint32 signaled_uncertain;
+ uint32 already_ahead;
+ uint32 necessary_wakeups;
+ uint32 unnecessary_wakeups;
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("function returning record called in context that cannot accept type record")));
+
+ /* Read the atomic counters */
+ signaled_needed = pg_atomic_read_u32(&asyncQueueControl->signaledNeeded);
+ direct_advancements_success = pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups);
+ already_advancing = pg_atomic_read_u32(&asyncQueueControl->alreadyAdvancing);
+ signaled_uncertain = pg_atomic_read_u32(&asyncQueueControl->signaledUncertain);
+ already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead);
+ necessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups);
+ unnecessary_wakeups = pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups);
+
+ /* Fill in the values */
+ memset(nulls, 0, sizeof(nulls));
+ values[0] = Int64GetDatum((int64) signaled_needed);
+ values[1] = Int64GetDatum((int64) direct_advancements_success);
+ values[2] = Int64GetDatum((int64) already_advancing);
+ values[3] = Int64GetDatum((int64) signaled_uncertain);
+ values[4] = Int64GetDatum((int64) already_ahead);
+ values[5] = Int64GetDatum((int64) necessary_wakeups);
+ values[6] = Int64GetDatum((int64) unnecessary_wakeups);
+
+ tuple = heap_form_tuple(tupdesc, values, nulls);
+ PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * SQL function: reset NOTIFY wakeup statistics
+ *
+ * This function resets all the async wakeup counters to zero.
+ */
+Datum
+pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+ /* Reset all the atomic counters to zero */
+ pg_atomic_write_u32(&asyncQueueControl->signaledNeeded, 0);
+ pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0);
+ pg_atomic_write_u32(&asyncQueueControl->alreadyAdvancing, 0);
+ pg_atomic_write_u32(&asyncQueueControl->signaledUncertain, 0);
+ pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0);
+ pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0);
+ pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
+ PG_RETURN_VOID();
+}
+
/*
* Async_UnlistenOnExit
*
@@ -2014,6 +2105,7 @@ SignalBackends(void)
Assert(pid != InvalidPid);
+ pg_atomic_fetch_add_u32(&asyncQueueControl->signaledNeeded, 1);
QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
pids[count] = pid;
procnos[count] = i;
@@ -2049,7 +2141,14 @@ SignalBackends(void)
* currently advancing its position.
*/
if (!QUEUE_BACKEND_ADVANCING_POS(i))
+ {
QUEUE_BACKEND_POS(i) = queueHeadAfterWrite;
+ pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1);
+ }
+ else
+ {
+ pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAdvancing, 1);
+ }
}
else if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
{
@@ -2060,6 +2159,7 @@ SignalBackends(void)
*/
Assert(pid != InvalidPid);
+ pg_atomic_fetch_add_u32(&asyncQueueControl->signaledUncertain, 1);
QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
pids[count] = pid;
procnos[count] = i;
@@ -2071,6 +2171,7 @@ SignalBackends(void)
* The backend is already ahead of the notifications we wrote.
* No need to do anything.
*/
+ pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1);
Assert(QUEUE_POS_PRECEDES(queueHeadBeforeWrite, pos));
}
}
@@ -2301,6 +2402,7 @@ asyncQueueReadAllNotifications(void)
volatile QueuePosition pos;
QueuePosition head;
Snapshot snapshot;
+ bool interested = false;
/* page_buffer must be adequately aligned, so use a union */
union
@@ -2435,7 +2537,8 @@ asyncQueueReadAllNotifications(void)
*/
reachedStop = asyncQueueProcessPageEntries(&pos, head,
page_buffer.buf,
- snapshot);
+ snapshot,
+ &interested);
} while (!reachedStop);
}
PG_FINALLY();
@@ -2450,6 +2553,11 @@ asyncQueueReadAllNotifications(void)
}
PG_END_TRY();
+ if (interested)
+ pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 1);
+ else
+ pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 1);
+
/* Done with snapshot */
UnregisterSnapshot(snapshot);
}
@@ -2474,7 +2582,8 @@ static bool
asyncQueueProcessPageEntries(volatile QueuePosition *current,
QueuePosition stop,
char *page_buffer,
- Snapshot snapshot)
+ Snapshot snapshot,
+ bool *interested)
{
bool reachedStop = false;
bool reachedEndOfPage;
@@ -2535,6 +2644,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
char *payload = qe->data + strlen(channel) + 1;
NotifyMyFrontEnd(channel, payload, qe->srcPid);
+
+ /* Mark were interested in at least one message */
+ *interested = true;
}
}
else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..0bbd7db39c7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8571,7 +8571,18 @@
proname => 'pg_notification_queue_usage', provolatile => 'v',
proparallel => 'r', prorettype => 'float8', proargtypes => '',
prosrc => 'pg_notification_queue_usage' },
-
+{ oid => '9315',
+ descr => 'get statistics about NOTIFY wakeups',
+ proname => 'pg_get_async_wakeup_stats', provolatile => 'v',
+ proparallel => 'r', prorettype => 'record', proargtypes => '',
+ proallargtypes => '{int8,int8,int8,int8,int8,int8,int8}', proargmodes => '{o,o,o,o,o,o,o}',
+ proargnames => '{signaled_needed,avoided_wakeups,already_advancing,signaled_uncertain,already_ahead,necessary_wakeups,unnecessary_wakeups}',
+ prosrc => 'pg_get_async_wakeup_stats' },
+{ oid => '9316',
+ descr => 'reset statistics about NOTIFY wakeups',
+ proname => 'pg_reset_async_wakeup_stats', provolatile => 'v',
+ proparallel => 'r', prorettype => 'void', proargtypes => '',
+ prosrc => 'pg_reset_async_wakeup_stats' },
# shared memory usage
{ oid => '5052', descr => 'allocations from the main shared memory segment',
proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Optimize LISTEN/NOTIFY
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox