public inbox for [email protected]  
help / color / mirror / Atom feed
From: Joel Jacobson <[email protected]>
To: Tom Lane <[email protected]>
Cc: Matheus Alcantara <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Sat, 11 Oct 2025 09:43:22 +0200
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAFY6G8dap-bCnAnMG-2Gzew8yv2Vbi9gsx9+yszKMmd57ygfvA@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>

On Sat, Oct 11, 2025, at 08:43, Joel Jacobson wrote:
> In addition to previously suggested optimization, there is another major
> one that seems doable, that would mean a great improvement for workload
> having large traffic differences between channels, i.e. some low traffic
> and some high traffic.
>
> I'm not entirely sure this approach is correct though, I've might
> misunderstood the guarantees of the heavyweight lock. My assumption is
> based on that there can only be one backend that is currently running
> the code in PreCommit_Notify after having aquired the heavyweight lock.
> If this is not true, then it doesn't work. What made me worried is the
> exclusive lock we also take inside the same function, I don't see the
> point of it since we're already holding the heavyweight lock, but maybe
> this is just to "allows deadlocks to be detected" like the comment says?
..,
> * 0003-optimize_listen_notify-v14.patch:
>
> Optimize LISTEN/NOTIFY by advancing idle backends directly
>
> Building on the previous channel-specific listener tracking
> optimization, this patch further reduces context switching by detecting
> idle listening backends that don't listen to any of the channels being
> notified and advancing their queue positions directly without waking
> them up.
...
> 0003-optimize_listen_notify-v14.patch:
>
> idle_listeners  round_trips_per_sec     max_latency_usec
>              0              33236.8                 1090
>             10              34681.0                 1338
>             20              34530.4                 1372
>             30              34061.6                 1339
>             40              33084.5                  913
>             50              33847.5                  955
>             60              33675.8                 1239
>             70              28857.4                20443
>             80              33324.9                  786
>             90              33612.3                  758
>            100              31259.2                 7706

I noticed the strange data point at idle_listeners=70.
This made me think about the "wake tail only" trick,
and realized this is now unnecessary with the new 0003 idea.

New version attached that removes that part from the 0003 patch.

This also of course improved the stability of max_latency_usec,
since in this specific benchmark all other listeners are always idle,
so they don't need to be woken up ever:

idle_listeners  round_trips_per_sec     max_latency_usec
             0              33631.8                  546
            10              34318.0                  586
            20              34813.0                  596
            30              35073.4                  574
            40              34646.1                  569
            50              33755.5                  634
            60              33991.6                  561
            70              34049.0                  550
            80              33886.0                  541
            90              33545.0                  540
           100              33163.1                  660

/Joel
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1515ed405ba..b462dcc8348 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -35,6 +35,7 @@
 
 #include <ctype.h>
 #include <float.h>
+#include <inttypes.h>
 #include <limits.h>
 #include <math.h>
 #include <signal.h>
@@ -237,6 +238,11 @@ static const char *const PARTITION_METHOD[] = {"none", "range", "hash"};
 /* random seed used to initialize base_random_sequence */
 static int64 random_seed = -1;
 
+/* LISTEN/NOTIFY benchmark mode parameters */
+static bool listen_notify_mode = false;	/* enable LISTEN/NOTIFY benchmark */
+static int	notify_round_trips = 100;	/* number of round-trips per iteration */
+static int	notify_idle_step = 10;		/* idle listeners to add per iteration */
+
 /*
  * end of configurable parameters
  *********************************************************************/
@@ -930,6 +936,10 @@ usage(void)
 		   "                           (same as \"-b simple-update\")\n"
 		   "  -S, --select-only        perform SELECT-only transactions\n"
 		   "                           (same as \"-b select-only\")\n"
+		   "  --listen-notify-benchmark\n"
+		   "                           run LISTEN/NOTIFY round-trip benchmark\n"
+		   "  --notify-round-trips=NUM number of round-trips per iteration (default: 100)\n"
+		   "  --notify-idle-step=NUM   idle listeners to add per iteration (default: 10)\n"
 		   "\nBenchmarking options:\n"
 		   "  -c, --client=NUM         number of concurrent database clients (default: 1)\n"
 		   "  -C, --connect            establish new connection for each transaction\n"
@@ -6689,6 +6699,216 @@ set_random_seed(const char *seed)
 	return true;
 }
 
+/*
+ * Run LISTEN/NOTIFY round-trip benchmark
+ *
+ * This benchmark measures the round-trip time between two processes that
+ * ping-pong NOTIFY messages while adding idle listening connections.
+ */
+static void
+runListenNotifyBenchmark(void)
+{
+	PGconn	   *conn1 = NULL;
+	PGconn	   *conn2 = NULL;
+	PGconn	  **idle_conns = NULL;
+	int			num_idle = 0;
+	int			max_idle = 10000;	/* reasonable upper limit */
+	PGresult   *res;
+	char		channel1[] = "pgbench_channel_1";
+	char		channel2[] = "pgbench_channel_2";
+	char		notify_cmd[256];
+	bool		first_failure = false;
+
+	pg_log_info("starting LISTEN/NOTIFY round-trip benchmark");
+	pg_log_info("round-trips per iteration: %d", notify_round_trips);
+	pg_log_info("idle listeners added per iteration: %d", notify_idle_step);
+	printf("\n%14s  %19s  %19s\n", "idle_listeners", "round_trips_per_sec", "max_latency_usec");
+
+	/* Allocate array for idle connections */
+	idle_conns = (PGconn **) pg_malloc0(max_idle * sizeof(PGconn *));
+
+	/* Create two active connections for ping-pong */
+	conn1 = doConnect();
+	if (conn1 == NULL)
+		pg_fatal("failed to create connection 1");
+
+	conn2 = doConnect();
+	if (conn2 == NULL)
+		pg_fatal("failed to create connection 2");
+
+	/* Set up LISTEN on both connections */
+	snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel1);
+	res = PQexec(conn1, notify_cmd);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("LISTEN failed on connection 1: %s", PQerrorMessage(conn1));
+	PQclear(res);
+
+	snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel2);
+	res = PQexec(conn2, notify_cmd);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("LISTEN failed on connection 2: %s", PQerrorMessage(conn2));
+	PQclear(res);
+
+	/* Main benchmark loop: measure round-trips then add idle connections */
+	while (num_idle < max_idle)
+	{
+		int			i;
+		int64		total_latency = 0;
+		int64		max_latency = 0;
+
+		/* Perform round-trip measurements */
+		for (i = 0; i < notify_round_trips; i++)
+		{
+			pg_time_usec_t start_time,
+						end_time;
+			int64		latency;
+			PGnotify   *notify;
+			int			sock;
+			fd_set		input_mask;
+			struct timeval tv;
+
+			/* Clear any pending notifications */
+			PQconsumeInput(conn1);
+			while ((notify = PQnotifies(conn1)) != NULL)
+				PQfreemem(notify);
+			PQconsumeInput(conn2);
+			while ((notify = PQnotifies(conn2)) != NULL)
+				PQfreemem(notify);
+
+			/* Start timer and send notification from conn1 */
+			start_time = pg_time_now();
+			snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", channel2);
+			res = PQexec(conn1, notify_cmd);
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+				pg_fatal("NOTIFY failed: %s", PQerrorMessage(conn1));
+			PQclear(res);
+
+			/* Wait for notification on conn2 */
+			sock = PQsocket(conn2);
+			notify = NULL;
+			while (notify == NULL)
+			{
+				PQconsumeInput(conn2);
+				notify = PQnotifies(conn2);
+				if (notify == NULL)
+				{
+					/* Wait for data on socket */
+					FD_ZERO(&input_mask);
+					FD_SET(sock, &input_mask);
+					tv.tv_sec = 10;	/* 10 second timeout */
+					tv.tv_usec = 0;
+					if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+						pg_fatal("select() failed: %m");
+				}
+			}
+			PQfreemem(notify);
+
+			/* Send notification back from conn2 */
+			snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", channel1);
+			res = PQexec(conn2, notify_cmd);
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+				pg_fatal("NOTIFY failed: %s", PQerrorMessage(conn2));
+			PQclear(res);
+
+			/* Wait for notification on conn1 */
+			sock = PQsocket(conn1);
+			notify = NULL;
+			while (notify == NULL)
+			{
+				PQconsumeInput(conn1);
+				notify = PQnotifies(conn1);
+				if (notify == NULL)
+				{
+					/* Wait for data on socket */
+					FD_ZERO(&input_mask);
+					FD_SET(sock, &input_mask);
+					tv.tv_sec = 10;	/* 10 second timeout */
+					tv.tv_usec = 0;
+					if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+						pg_fatal("select() failed: %m");
+				}
+			}
+			PQfreemem(notify);
+
+			/* End timer */
+			end_time = pg_time_now();
+
+			/* Calculate individual round-trip latency */
+			latency = end_time - start_time;
+
+			/* Accumulate total latency and track maximum */
+			total_latency += latency;
+			if (latency > max_latency)
+				max_latency = latency;
+		}
+
+		/* Calculate and report round-trips per second and max latency */
+		fprintf(stdout, "%14d  %19.1f  %19" PRId64 "\n",
+				num_idle,
+				1000000.0 * notify_round_trips / total_latency,
+				max_latency);
+		fflush(stdout);
+
+		/* Stop if we hit connection limit */
+		if (first_failure)
+			break;
+
+		/* Add idle listening connections */
+		for (i = 0; i < notify_idle_step && num_idle < max_idle; i++)
+		{
+			PGconn	   *idle_conn;
+			char		idle_channel[256];
+
+			idle_conn = doConnect();
+			if (idle_conn == NULL)
+			{
+				if (!first_failure)
+				{
+					pg_log_info("reached max_connections at %d idle listeners", num_idle);
+					first_failure = true;
+				}
+				break;
+			}
+
+			/* Each idle connection listens on a unique channel */
+			snprintf(idle_channel, sizeof(idle_channel), "idle_%d", num_idle);
+			snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", idle_channel);
+
+			res = PQexec(idle_conn, notify_cmd);
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			{
+				pg_log_warning("LISTEN failed on idle connection %d: %s",
+							   num_idle, PQerrorMessage(idle_conn));
+				PQfinish(idle_conn);
+				PQclear(res);
+				first_failure = true;
+				break;
+			}
+			PQclear(res);
+
+			idle_conns[num_idle] = idle_conn;
+			num_idle++;
+		}
+
+		/* Stop if we couldn't add any connections */
+		if (first_failure && i == 0)
+			break;
+	}
+
+	/* Clean up */
+	pg_log_info("cleaning up connections");
+	PQfinish(conn1);
+	PQfinish(conn2);
+	for (int i = 0; i < num_idle; i++)
+	{
+		if (idle_conns[i])
+			PQfinish(idle_conns[i]);
+	}
+	pg_free(idle_conns);
+
+	pg_log_info("LISTEN/NOTIFY benchmark completed");
+}
+
 int
 main(int argc, char **argv)
 {
@@ -6739,6 +6959,9 @@ main(int argc, char **argv)
 		{"verbose-errors", no_argument, NULL, 15},
 		{"exit-on-abort", no_argument, NULL, 16},
 		{"debug", no_argument, NULL, 17},
+		{"listen-notify-benchmark", no_argument, NULL, 18},
+		{"notify-round-trips", required_argument, NULL, 19},
+		{"notify-idle-step", required_argument, NULL, 20},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -7092,6 +7315,22 @@ main(int argc, char **argv)
 			case 17:			/* debug */
 				pg_logging_increase_verbosity();
 				break;
+			case 18:			/* listen-notify-benchmark */
+				listen_notify_mode = true;
+				benchmarking_option_set = true;
+				break;
+			case 19:			/* notify-round-trips */
+				benchmarking_option_set = true;
+				if (!option_parse_int(optarg, "--notify-round-trips", 1, INT_MAX,
+									  &notify_round_trips))
+					exit(1);
+				break;
+			case 20:			/* notify-idle-step */
+				benchmarking_option_set = true;
+				if (!option_parse_int(optarg, "--notify-idle-step", 1, INT_MAX,
+									  &notify_idle_step))
+					exit(1);
+				break;
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -7210,6 +7449,20 @@ main(int argc, char **argv)
 			pg_fatal("some of the specified options cannot be used in benchmarking mode");
 	}
 
+	/* Handle LISTEN/NOTIFY benchmark mode */
+	if (listen_notify_mode)
+	{
+		/* Establish a database connection for setup */
+		if ((con = doConnect()) == NULL)
+			pg_fatal("could not connect to database");
+
+		/* Run the LISTEN/NOTIFY benchmark */
+		runListenNotifyBenchmark();
+
+		PQfinish(con);
+		exit(0);
+	}
+
 	if (nxacts > 0 && duration > 0)
 		pg_fatal("specify either a number of transactions (-t) or a duration (-T), not both");
 


Attachments:

  [application/octet-stream] 0001-optimize_listen_notify-v15.patch (7.8K, 2-0001-optimize_listen_notify-v15.patch)
  download | inline diff:
From 183c8a106705a6391cd68f406019253d36680da4 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Wed, 8 Oct 2025 09:30:54 +0200
Subject: [PATCH 1/3] Improve LISTEN/NOTIFY test coverage

This adds isolation tests to cover previously untested code paths:

* Check simple NOTIFY reparenting when parent has no action
* Check LISTEN reparenting in subtransaction
* Check LISTEN merge path when both outer and inner transactions have actions
* Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions)
* Check notification_match function (triggered by hash table duplicate detection)

This also adds a test to prepare for the next patch:

* Check ChannelHashAddListener array growth
---
 src/test/isolation/expected/async-notify.out | 103 ++++++++++++++++++-
 src/test/isolation/specs/async-notify.spec   |  52 ++++++++++
 2 files changed, 154 insertions(+), 1 deletion(-)

diff --git a/src/test/isolation/expected/async-notify.out b/src/test/isolation/expected/async-notify.out
index 556e1805893..9c19843d2d7 100644
--- a/src/test/isolation/expected/async-notify.out
+++ b/src/test/isolation/expected/async-notify.out
@@ -1,4 +1,4 @@
-Parsed test spec with 3 sessions
+Parsed test spec with 5 sessions
 
 starting permutation: listenc notify1 notify2 notify3 notifyf
 step listenc: LISTEN c1; LISTEN c2;
@@ -47,6 +47,105 @@ notifier: NOTIFY "c2" with payload "payload" from notifier
 notifier: NOTIFY "c1" with payload "payloads" from notifier
 notifier: NOTIFY "c2" with payload "payloads" from notifier
 
+starting permutation: listenc notifys_simple
+step listenc: LISTEN c1; LISTEN c2;
+step notifys_simple: 
+	BEGIN;
+	SAVEPOINT s1;
+	NOTIFY c1, 'simple1';
+	NOTIFY c2, 'simple2';
+	RELEASE SAVEPOINT s1;
+	COMMIT;
+
+notifier: NOTIFY "c1" with payload "simple1" from notifier
+notifier: NOTIFY "c2" with payload "simple2" from notifier
+
+starting permutation: lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+step lsbegin: BEGIN;
+step lslisten_outer: LISTEN c3;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrelease: RELEASE SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify: NOTIFY c1, 'subxact_test';
+listen_subxact: NOTIFY "c1" with payload "subxact_test" from listen_subxact
+
+starting permutation: lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+step lsbegin: BEGIN;
+step lssavepoint: SAVEPOINT s1;
+step lslisten: LISTEN c1; LISTEN c2;
+step lsrollback: ROLLBACK TO SAVEPOINT s1;
+step lscommit: COMMIT;
+step lsnotify_check: NOTIFY c1, 'should_not_receive';
+
+starting permutation: listenc notify_many_with_dup
+step listenc: LISTEN c1; LISTEN c2;
+step notify_many_with_dup: 
+	BEGIN;
+	SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+	SELECT pg_notify('c1', 'msg1');
+	COMMIT;
+
+pg_notify
+---------
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+         
+(17 rows)
+
+pg_notify
+---------
+         
+(1 row)
+
+notifier: NOTIFY "c1" with payload "msg1" from notifier
+notifier: NOTIFY "c1" with payload "msg2" from notifier
+notifier: NOTIFY "c1" with payload "msg3" from notifier
+notifier: NOTIFY "c1" with payload "msg4" from notifier
+notifier: NOTIFY "c1" with payload "msg5" from notifier
+notifier: NOTIFY "c1" with payload "msg6" from notifier
+notifier: NOTIFY "c1" with payload "msg7" from notifier
+notifier: NOTIFY "c1" with payload "msg8" from notifier
+notifier: NOTIFY "c1" with payload "msg9" from notifier
+notifier: NOTIFY "c1" with payload "msg10" from notifier
+notifier: NOTIFY "c1" with payload "msg11" from notifier
+notifier: NOTIFY "c1" with payload "msg12" from notifier
+notifier: NOTIFY "c1" with payload "msg13" from notifier
+notifier: NOTIFY "c1" with payload "msg14" from notifier
+notifier: NOTIFY "c1" with payload "msg15" from notifier
+notifier: NOTIFY "c1" with payload "msg16" from notifier
+notifier: NOTIFY "c1" with payload "msg17" from notifier
+
+starting permutation: listenc llisten l2listen l3listen lslisten
+step listenc: LISTEN c1; LISTEN c2;
+step llisten: LISTEN c1; LISTEN c2;
+step l2listen: LISTEN c1;
+step l3listen: LISTEN c1;
+step lslisten: LISTEN c1; LISTEN c2;
+
 starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
 step llisten: LISTEN c1; LISTEN c2;
 step notify1: NOTIFY c1;
@@ -95,6 +194,8 @@ listener: NOTIFY "c2" with payload "" from notifier
 
 starting permutation: l2listen l2begin notify1 lbegins llisten lcommit l2commit l2stop
 step l2listen: LISTEN c1;
+listener2: NOTIFY "c1" with payload "" from notifier
+listener2: NOTIFY "c1" with payload "" from notifier
 step l2begin: BEGIN;
 step notify1: NOTIFY c1;
 step lbegins: BEGIN ISOLATION LEVEL SERIALIZABLE;
diff --git a/src/test/isolation/specs/async-notify.spec b/src/test/isolation/specs/async-notify.spec
index 0b8cfd91083..942b09d5735 100644
--- a/src/test/isolation/specs/async-notify.spec
+++ b/src/test/isolation/specs/async-notify.spec
@@ -31,6 +31,20 @@ step notifys1	{
 	ROLLBACK TO SAVEPOINT s2;
 	COMMIT;
 }
+step notifys_simple	{
+	BEGIN;
+	SAVEPOINT s1;
+	NOTIFY c1, 'simple1';
+	NOTIFY c2, 'simple2';
+	RELEASE SAVEPOINT s1;
+	COMMIT;
+}
+step notify_many_with_dup	{
+	BEGIN;
+	SELECT pg_notify('c1', 'msg' || s::text) FROM generate_series(1, 17) s;
+	SELECT pg_notify('c1', 'msg1');
+	COMMIT;
+}
 step usage		{ SELECT pg_notification_queue_usage() > 0 AS nonzero; }
 step bignotify	{ SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
 teardown		{ UNLISTEN *; }
@@ -53,6 +67,26 @@ step l2begin	{ BEGIN; }
 step l2commit	{ COMMIT; }
 step l2stop		{ UNLISTEN *; }
 
+# Third listener session for testing array growth.
+
+session listener3
+step l3listen	{ LISTEN c1; }
+teardown		{ UNLISTEN *; }
+
+# Session for testing LISTEN in subtransaction with separate steps.
+
+session listen_subxact
+step lsbegin	{ BEGIN; }
+step lslisten_outer	{ LISTEN c3; }
+step lssavepoint	{ SAVEPOINT s1; }
+step lslisten	{ LISTEN c1; LISTEN c2; }
+step lsrelease	{ RELEASE SAVEPOINT s1; }
+step lsrollback	{ ROLLBACK TO SAVEPOINT s1; }
+step lscommit	{ COMMIT; }
+step lsnotify	{ NOTIFY c1, 'subxact_test'; }
+step lsnotify_check	{ NOTIFY c1, 'should_not_receive'; }
+teardown		{ UNLISTEN *; }
+
 
 # Trivial cases.
 permutation listenc notify1 notify2 notify3 notifyf
@@ -60,6 +94,24 @@ permutation listenc notify1 notify2 notify3 notifyf
 # Check simple and less-simple deduplication.
 permutation listenc notifyd1 notifyd2 notifys1
 
+# Check simple NOTIFY reparenting when parent has no action.
+permutation listenc notifys_simple
+
+# Check LISTEN reparenting in subtransaction.
+permutation lsbegin lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN merge path when both outer and inner transactions have actions.
+permutation lsbegin lslisten_outer lssavepoint lslisten lsrelease lscommit lsnotify
+
+# Check LISTEN abort path (ROLLBACK TO SAVEPOINT discards pending actions).
+permutation lsbegin lssavepoint lslisten lsrollback lscommit lsnotify_check
+
+# Check notification_match function (triggered by hash table duplicate detection).
+permutation listenc notify_many_with_dup
+
+# Check ChannelHashAddListener array growth.
+permutation listenc llisten l2listen l3listen lslisten
+
 # Cross-backend notification delivery.  We use a "select 1" to force the
 # listener session to check for notifies.  In principle we could just wait
 # for delivery, but that would require extra support in isolationtester
-- 
2.50.1



  [application/octet-stream] 0002-optimize_listen_notify-v15.patch (29.8K, 3-0002-optimize_listen_notify-v15.patch)
  download | inline diff:
From b39c5b71f6d6b219ab06c6b731e2317f480edf5d Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Tue, 7 Oct 2025 20:56:47 +0200
Subject: [PATCH 2/3] Optimize LISTEN/NOTIFY with channel-specific listener
 tracking

Currently, idle listening backends cause a dramatic slowdown due to
context switching when they are signaled and wake up. This is wasteful
when they are not listening to the channel being notified.

Just 10 extra idle listening connections cause a slowdown from 8700 TPS
to 6100 TPS, 100 extra cause it to drop to 2000 TPS, and at 1000 extra
it falls to 250 TPS.

This patch introduces targeted signaling for LISTEN/NOTIFY, improving
scalability in workloads with many idle listeners.

A dynamic shared hash table now tracks which backends listen on each
(database, channel) pair, which SignalBackends() uses to perform
targeted signaling.  In addition, it staggers wakeups by signaling one
backend at the global tail to help it advance gradually, and forces any
excessively lagging backends to catch up.  A per-backend wakeup_pending
flag avoids redundant signals.
---
 src/backend/commands/async.c                  | 591 +++++++++++++++---
 .../utils/activity/wait_event_names.txt       |   1 +
 src/include/storage/lwlocklist.h              |   1 +
 src/tools/pgindent/typedefs.list              |   2 +
 4 files changed, 505 insertions(+), 90 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..bb5ebfab26d 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -24,8 +24,10 @@
  *	  All notification messages are placed in the queue and later read out
  *	  by listening backends.
  *
- *	  There is no central knowledge of which backend listens on which channel;
- *	  every backend has its own list of interesting channels.
+ *	  We also maintain a dynamic shared hash table (dshash) that maps channel
+ *	  names to the set of backends listening on each channel. This table is
+ *	  created lazily on the first LISTEN command and grows dynamically as
+ *	  needed.
  *
  *	  Although there is only one queue, notifications are treated as being
  *	  database-local; this is done by including the sender's database OID
@@ -68,16 +70,20 @@
  *	  CommitTransaction() which will then do the actual transaction commit.
  *
  *	  After commit we are called another time (AtCommit_Notify()). Here we
- *	  make any actual updates to the effective listen state (listenChannels).
+ *	  make any actual updates to the effective listen state (channelHash).
  *	  Then we signal any backends that may be interested in our messages
  *	  (including our own backend, if listening).  This is done by
- *	  SignalBackends(), which scans the list of listening backends and sends a
- *	  PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
- *	  know which backend is listening on which channel so we must signal them
- *	  all).  We can exclude backends that are already up to date, though, and
- *	  we can also exclude backends that are in other databases (unless they
- *	  are way behind and should be kicked to make them advance their
- *	  pointers).
+ *	  SignalBackends(), which consults the shared channel hash table to
+ *	  identify listeners for the channels that have pending notifications
+ *	  in the current database.  Each selected backend is marked as having a
+ *	  wakeup pending to avoid duplicate signals, and a PROCSIG_NOTIFY_INTERRUPT
+ *	  signal is sent to it.
+ *
+ *	  To maintain queue health, SignalBackends() also wakes one backend
+ *	  positioned at the global queue tail to help advance it, and signals
+ *	  any backend that has fallen too far behind to catch up.  These measures
+ *	  prevent the notification queue from growing indefinitely, while mostly
+ *	  limiting wakeups to the backends that actually need them.
  *
  *	  Finally, after we are out of the transaction altogether and about to go
  *	  idle, we scan the queue for messages that need to be sent to our
@@ -128,6 +134,7 @@
 #include <limits.h>
 #include <unistd.h>
 #include <signal.h>
+#include <string.h>
 
 #include "access/parallel.h"
 #include "access/slru.h"
@@ -137,14 +144,17 @@
 #include "commands/async.h"
 #include "common/hashfn.h"
 #include "funcapi.h"
+#include "lib/dshash.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
+#include "storage/dsm_registry.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/procsignal.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
+#include "utils/dsa.h"
 #include "utils/guc_hooks.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -162,6 +172,29 @@
  */
 #define NOTIFY_PAYLOAD_MAX_LENGTH	(BLCKSZ - NAMEDATALEN - 128)
 
+/*
+ * Channel hash table definitions
+ *
+ * This hash table maps (database OID, channel name) keys to arrays of
+ * ProcNumbers representing the backends listening on each channel.
+ */
+
+#define INITIAL_LISTENERS_ARRAY_SIZE 4
+
+typedef struct ChannelHashKey
+{
+	Oid			dboid;
+	char		channel[NAMEDATALEN];
+} ChannelHashKey;
+
+typedef struct ChannelEntry
+{
+	ChannelHashKey key;
+	dsa_pointer listenersArray; /* DSA pointer to ProcNumber array */
+	int			numListeners;	/* Number of listeners currently stored */
+	int			allocatedListeners; /* Allocated size of array */
+} ChannelEntry;
+
 /*
  * Struct representing an entry in the global notify queue
  *
@@ -227,8 +260,8 @@ typedef struct QueuePosition
 /*
  * Parameter determining how often we try to advance the tail pointer:
  * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data.  This is
- * also the distance by which a backend in another database needs to be
- * behind before we'll decide we need to wake it up to advance its pointer.
+ * also the distance by which a backend needs to be behind before we'll
+ * decide we need to wake it up to advance its pointer.
  *
  * Resist the temptation to make this really large.  While that would save
  * work in some places, it would add cost in others.  In particular, this
@@ -246,6 +279,7 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	bool		wakeupPending;	/* signal sent but not yet processed */
 } QueueBackendStatus;
 
 /*
@@ -288,11 +322,91 @@ typedef struct AsyncQueueControl
 	ProcNumber	firstListener;	/* id of first listener, or
 								 * INVALID_PROC_NUMBER */
 	TimestampTz lastQueueFillWarn;	/* time of last queue-full msg */
+	dsa_handle	channelHashDSA;
+	dshash_table_handle channelHashDSH;
 	QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
 static AsyncQueueControl *asyncQueueControl;
 
+static dsa_area *channelDSA = NULL;
+static dshash_table *channelHash = NULL;
+static dshash_hash channelHashFunc(const void *key, size_t size, void *arg);
+
+/* parameters for the channel hash table */
+static const dshash_parameters channelDSHParams = {
+	sizeof(ChannelHashKey),
+	sizeof(ChannelEntry),
+	dshash_memcmp,
+	channelHashFunc,
+	dshash_memcpy,
+	LWTRANCHE_NOTIFY_CHANNEL_HASH
+};
+
+/*
+ * channelHashFunc
+ *		Hash function for channel keys.
+ */
+static dshash_hash
+channelHashFunc(const void *key, size_t size, void *arg)
+{
+	const ChannelHashKey *k = (const ChannelHashKey *) key;
+	dshash_hash h;
+
+	h = DatumGetUInt32(hash_uint32(k->dboid));
+	h ^= DatumGetUInt32(hash_any((const unsigned char *) k->channel,
+								 strnlen(k->channel, NAMEDATALEN)));
+
+	return h;
+}
+
+/*
+ * initChannelHash
+ *		Lazy initialization of the channel hash table.
+ */
+static void
+initChannelHash(void)
+{
+	MemoryContext oldcontext;
+
+	/* Quick exit if we already did this */
+	if (asyncQueueControl->channelHashDSH != DSHASH_HANDLE_INVALID &&
+		channelHash != NULL)
+		return;
+
+	/* Otherwise, use a lock to ensure only one process creates the table */
+	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+	/* Be sure any local memory allocated by DSA routines is persistent */
+	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+
+	if (asyncQueueControl->channelHashDSH == DSHASH_HANDLE_INVALID)
+	{
+		/* Initialize dynamic shared hash table for channel hash */
+		channelDSA = dsa_create(LWTRANCHE_NOTIFY_CHANNEL_HASH);
+		dsa_pin(channelDSA);
+		dsa_pin_mapping(channelDSA);
+		channelHash = dshash_create(channelDSA, &channelDSHParams, NULL);
+
+		/* Store handles in shared memory for other backends to use */
+		asyncQueueControl->channelHashDSA = dsa_get_handle(channelDSA);
+		asyncQueueControl->channelHashDSH =
+			dshash_get_hash_table_handle(channelHash);
+	}
+	else if (!channelHash)
+	{
+		/* Attach to existing dynamic shared hash table */
+		channelDSA = dsa_attach(asyncQueueControl->channelHashDSA);
+		dsa_pin_mapping(channelDSA);
+		channelHash = dshash_attach(channelDSA, &channelDSHParams,
+									asyncQueueControl->channelHashDSH,
+									NULL);
+	}
+
+	MemoryContextSwitchTo(oldcontext);
+	LWLockRelease(NotifyQueueLock);
+}
+
 #define QUEUE_HEAD					(asyncQueueControl->head)
 #define QUEUE_TAIL					(asyncQueueControl->tail)
 #define QUEUE_STOP_PAGE				(asyncQueueControl->stopPage)
@@ -301,6 +415,7 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_WAKEUP_PENDING(i)	(asyncQueueControl->backend[i].wakeupPending)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -312,17 +427,11 @@ static SlruCtlData NotifyCtlData;
 
 #define QUEUE_FULL_WARN_INTERVAL	5000	/* warn at most once every 5s */
 
-/*
- * listenChannels identifies the channels we are actually listening to
- * (ie, have committed a LISTEN on).  It is a simple list of channel names,
- * allocated in TopMemoryContext.
- */
-static List *listenChannels = NIL;	/* list of C strings */
-
 /*
  * State for pending LISTEN/UNLISTEN actions consists of an ordered list of
  * all actions requested in the current transaction.  As explained above,
- * we don't actually change listenChannels until we reach transaction commit.
+ * we don't actually change the shared channelHash until we reach transaction
+ * commit.
  *
  * The list is kept in CurTransactionContext.  In subtransactions, each
  * subtransaction has its own list in its own CurTransactionContext, but
@@ -418,6 +527,9 @@ static bool unlistenExitRegistered = false;
 /* True if we're currently registered as a listener in asyncQueueControl */
 static bool amRegisteredListener = false;
 
+/* Count of channels we're currently listening on */
+static int	numChannelsListeningOn = 0;
+
 /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
 static bool tryAdvanceTail = false;
 
@@ -457,6 +569,8 @@ static void AddEventToPendingNotifies(Notification *n);
 static uint32 notification_hash(const void *key, Size keysize);
 static int	notification_match(const void *key1, const void *key2, Size keysize);
 static void ClearPendingActionsAndNotifies(void);
+static inline void ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel);
+static List *GetPendingNotifyChannels(void);
 
 /*
  * Compute the difference between two queue page numbers.
@@ -521,12 +635,16 @@ AsyncShmemInit(void)
 		QUEUE_STOP_PAGE = 0;
 		QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER;
 		asyncQueueControl->lastQueueFillWarn = 0;
+		asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
+		asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+
 		for (int i = 0; i < MaxBackends; i++)
 		{
 			QUEUE_BACKEND_PID(i) = InvalidPid;
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = false;
 		}
 	}
 
@@ -683,7 +801,7 @@ Async_Notify(const char *channel, const char *payload)
  *		Common code for listen, unlisten, unlisten all commands.
  *
  *		Adds the request to the list of pending actions.
- *		Actual update of the listenChannels list happens during transaction
+ *		Actual update of the shared channelHash happens during transaction
  *		commit.
  */
 static void
@@ -782,24 +900,60 @@ Async_UnlistenAll(void)
 /*
  * SQL function: return a set of the channel names this backend is actively
  * listening to.
- *
- * Note: this coding relies on the fact that the listenChannels list cannot
- * change within a transaction.
  */
 Datum
 pg_listening_channels(PG_FUNCTION_ARGS)
 {
 	FuncCallContext *funcctx;
+	List	   *listenChannels;
 
 	/* stuff done only on the first call of the function */
 	if (SRF_IS_FIRSTCALL())
 	{
+		MemoryContext oldcontext;
+		dshash_seq_status status;
+		ChannelEntry *entry;
+
 		/* create a function context for cross-call persistence */
 		funcctx = SRF_FIRSTCALL_INIT();
+
+		/* get channels from channelHash and store in function context */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		listenChannels = NIL;
+
+		if (channelHash != NULL)
+		{
+			dshash_seq_init(&status, channelHash, false);
+			while ((entry = dshash_seq_next(&status)) != NULL)
+			{
+				if (entry->key.dboid == MyDatabaseId)
+				{
+					ProcNumber *listeners;
+
+					listeners = (ProcNumber *) dsa_get_address(channelDSA,
+															   entry->listenersArray);
+
+					for (int i = 0; i < entry->numListeners; i++)
+					{
+						if (listeners[i] == MyProcNumber)
+						{
+							listenChannels = lappend(listenChannels, pstrdup(entry->key.channel));
+							break;
+						}
+					}
+				}
+			}
+			dshash_seq_term(&status);
+		}
+
+		funcctx->user_fctx = listenChannels;
+		MemoryContextSwitchTo(oldcontext);
 	}
 
 	/* stuff done on every call of the function */
 	funcctx = SRF_PERCALL_SETUP();
+	listenChannels = (List *) funcctx->user_fctx;
 
 	if (funcctx->call_cntr < list_length(listenChannels))
 	{
@@ -957,7 +1111,7 @@ PreCommit_Notify(void)
  *
  *		This is called at transaction commit, after committing to clog.
  *
- *		Update listenChannels and clear transaction-local state.
+ *		Update channelHash and clear transaction-local state.
  *
  *		If we issued any notifications in the transaction, send signals to
  *		listening backends (possibly including ourselves) to process them.
@@ -1002,7 +1156,7 @@ AtCommit_Notify(void)
 	}
 
 	/* If no longer listening to anything, get out of listener array */
-	if (amRegisteredListener && listenChannels == NIL)
+	if (amRegisteredListener && numChannelsListeningOn == 0)
 		asyncQueueUnregister();
 
 	/*
@@ -1130,55 +1284,131 @@ Exec_ListenPreCommit(void)
 /*
  * Exec_ListenCommit --- subroutine for AtCommit_Notify
  *
- * Add the channel to the list of channels we are listening on.
+ * Add the channel to the shared channelHash.
  */
 static void
 Exec_ListenCommit(const char *channel)
 {
-	MemoryContext oldcontext;
+	ChannelHashKey key;
+	ChannelEntry *entry;
+	bool		found;
+	ProcNumber *listeners;
 
-	/* Do nothing if we are already listening on this channel */
-	if (IsListeningOn(channel))
-		return;
+	initChannelHash();
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
 
 	/*
-	 * Add the new channel name to listenChannels.
-	 *
-	 * XXX It is theoretically possible to get an out-of-memory failure here,
-	 * which would be bad because we already committed.  For the moment it
-	 * doesn't seem worth trying to guard against that, but maybe improve this
-	 * later.
+	 * For new entries, we initialize listenersArray to InvalidDsaPointer as a
+	 * marker. This handles both the initial creation and potential retry
+	 * after OOM.
 	 */
-	oldcontext = MemoryContextSwitchTo(TopMemoryContext);
-	listenChannels = lappend(listenChannels, pstrdup(channel));
-	MemoryContextSwitchTo(oldcontext);
+	entry = dshash_find_or_insert(channelHash, &key, &found);
+
+	if (!found)
+		entry->listenersArray = InvalidDsaPointer;
+
+	if (!DsaPointerIsValid(entry->listenersArray))
+	{
+		/* First listener for this channel */
+		entry->listenersArray = dsa_allocate(channelDSA,
+											 sizeof(ProcNumber) * INITIAL_LISTENERS_ARRAY_SIZE);
+		entry->numListeners = 0;
+		entry->allocatedListeners = INITIAL_LISTENERS_ARRAY_SIZE;
+	}
+
+	listeners = (ProcNumber *) dsa_get_address(channelDSA,
+											   entry->listenersArray);
+
+	for (int i = 0; i < entry->numListeners; i++)
+	{
+		if (listeners[i] == MyProcNumber)
+		{
+			dshash_release_lock(channelHash, entry);
+			return;				/* Already registered */
+		}
+	}
+
+	if (entry->numListeners >= entry->allocatedListeners)
+	{
+		int			new_size = entry->allocatedListeners * 2;
+		dsa_pointer new_array = dsa_allocate(channelDSA,
+											 sizeof(ProcNumber) * new_size);
+		ProcNumber *new_listeners = (ProcNumber *) dsa_get_address(channelDSA,
+																   new_array);
+
+		memcpy(new_listeners, listeners,
+			   sizeof(ProcNumber) * entry->numListeners);
+
+		dsa_free(channelDSA, entry->listenersArray);
+		entry->listenersArray = new_array;
+		entry->allocatedListeners = new_size;
+		listeners = new_listeners;
+	}
+
+	listeners[entry->numListeners] = MyProcNumber;
+	entry->numListeners++;
+	numChannelsListeningOn++;
+
+	dshash_release_lock(channelHash, entry);
 }
 
 /*
  * Exec_UnlistenCommit --- subroutine for AtCommit_Notify
  *
- * Remove the specified channel name from listenChannels.
+ * Remove the specified channel from channelHash.
  */
 static void
 Exec_UnlistenCommit(const char *channel)
 {
-	ListCell   *q;
+	ChannelHashKey key;
+	ChannelEntry *entry;
+	ProcNumber *listeners;
+	int			i;
 
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid);
 
-	foreach(q, listenChannels)
+	if (channelHash == NULL)
+		return;
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	/* Look up the channel with exclusive lock so we can modify it */
+	entry = dshash_find(channelHash, &key, true);
+	if (entry == NULL)
+		return;
+
+	listeners = (ProcNumber *) dsa_get_address(channelDSA,
+											   entry->listenersArray);
+
+	for (i = 0; i < entry->numListeners; i++)
 	{
-		char	   *lchan = (char *) lfirst(q);
-
-		if (strcmp(lchan, channel) == 0)
+		if (listeners[i] == MyProcNumber)
 		{
-			listenChannels = foreach_delete_current(listenChannels, q);
-			pfree(lchan);
-			break;
+			entry->numListeners--;
+			if (i < entry->numListeners)
+				memmove(&listeners[i], &listeners[i + 1],
+						sizeof(ProcNumber) * (entry->numListeners - i));
+
+			if (entry->numListeners == 0)
+			{
+				/* Last listener for this channel */
+				dsa_free(channelDSA, entry->listenersArray);
+				dshash_delete_entry(channelHash, entry);
+			}
+			else
+			{
+				dshash_release_lock(channelHash, entry);
+			}
+
+			numChannelsListeningOn--;
+			return;
 		}
 	}
 
+	dshash_release_lock(channelHash, entry);
+
 	/*
 	 * We do not complain about unlistening something not being listened;
 	 * should we?
@@ -1193,33 +1423,82 @@ Exec_UnlistenCommit(const char *channel)
 static void
 Exec_UnlistenAllCommit(void)
 {
+	dshash_seq_status status;
+	ChannelEntry *entry;
+
 	if (Trace_notify)
 		elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid);
 
-	list_free_deep(listenChannels);
-	listenChannels = NIL;
+	if (channelHash == NULL)
+		return;
+
+	dshash_seq_init(&status, channelHash, true);
+	while ((entry = dshash_seq_next(&status)) != NULL)
+	{
+		if (entry->key.dboid == MyDatabaseId)
+		{
+			ProcNumber *listeners;
+			int			i;
+
+			listeners = (ProcNumber *) dsa_get_address(channelDSA,
+													   entry->listenersArray);
+
+			for (i = 0; i < entry->numListeners; i++)
+			{
+				if (listeners[i] == MyProcNumber)
+				{
+					entry->numListeners--;
+					if (i < entry->numListeners)
+						memmove(&listeners[i], &listeners[i + 1],
+								sizeof(ProcNumber) * (entry->numListeners - i));
+
+					if (entry->numListeners == 0)
+					{
+						dsa_free(channelDSA, entry->listenersArray);
+						dshash_delete_current(&status);
+					}
+					break;
+				}
+			}
+		}
+	}
+	dshash_seq_term(&status);
+	numChannelsListeningOn = 0;
 }
 
 /*
  * Test whether we are actively listening on the given channel name.
  *
  * Note: this function is executed for every notification found in the queue.
- * Perhaps it is worth further optimization, eg convert the list to a sorted
- * array so we can binary-search it.  In practice the list is likely to be
- * fairly short, though.
  */
 static bool
 IsListeningOn(const char *channel)
 {
-	ListCell   *p;
+	ChannelHashKey key;
+	ChannelEntry *entry;
+	ProcNumber *listeners;
 
-	foreach(p, listenChannels)
+	initChannelHash();
+
+	ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+
+	entry = dshash_find(channelHash, &key, false);
+	if (entry == NULL)
+		return false;			/* No listeners registered for this channel */
+
+	listeners = (ProcNumber *) dsa_get_address(channelDSA,
+											   entry->listenersArray);
+
+	for (int i = 0; i < entry->numListeners; i++)
 	{
-		char	   *lchan = (char *) lfirst(p);
-
-		if (strcmp(lchan, channel) == 0)
+		if (listeners[i] == MyProcNumber)
+		{
+			dshash_release_lock(channelHash, entry);
 			return true;
+		}
 	}
+
+	dshash_release_lock(channelHash, entry);
 	return false;
 }
 
@@ -1230,7 +1509,7 @@ IsListeningOn(const char *channel)
 static void
 asyncQueueUnregister(void)
 {
-	Assert(listenChannels == NIL);	/* else caller error */
+	Assert(numChannelsListeningOn == 0);	/* else caller error */
 
 	if (!amRegisteredListener)	/* nothing to do */
 		return;
@@ -1565,12 +1844,16 @@ asyncQueueFillWarning(void)
 /*
  * Send signals to listening backends.
  *
- * Normally we signal only backends in our own database, since only those
- * backends could be interested in notifies we send.  However, if there's
- * notify traffic in our database but no traffic in another database that
- * does have listener(s), those listeners will fall further and further
- * behind.  Waken them anyway if they're far enough behind, so that they'll
- * advance their queue position pointers, allowing the global tail to advance.
+ * Normally we signal only backends registered as listeners for channels
+ * with pending notifications.  However, when there is no traffic on some
+ * channels, listeners on such channels will fall further and further
+ * behind.  Waken them if they are too far behind, so that they'll
+ * advance their queue position pointers, allowing the global tail to
+ * advance.
+ *
+ * To stagger wakeups of lagging backends, wake the backend furthest
+ * behind (at the tail), amortizing the context-switching cost across
+ * successive notifications instead of paying it all at once.
  *
  * Since we know the ProcNumber and the Pid the signaling is quite cheap.
  *
@@ -1583,6 +1866,9 @@ SignalBackends(void)
 	int32	   *pids;
 	ProcNumber *procnos;
 	int			count;
+	List	   *channels;
+	ListCell   *lc;
+	int64		queue_length;
 
 	/*
 	 * Identify backends that we need to signal.  We don't want to send
@@ -1596,37 +1882,109 @@ SignalBackends(void)
 	procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber));
 	count = 0;
 
+	channels = GetPendingNotifyChannels();
+
 	LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
-	for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i))
+	foreach(lc, channels)
 	{
-		int32		pid = QUEUE_BACKEND_PID(i);
-		QueuePosition pos;
+		char	   *channel = (char *) lfirst(lc);
+		ChannelEntry *entry = NULL;
+		ProcNumber *listeners;
 
-		Assert(pid != InvalidPid);
-		pos = QUEUE_BACKEND_POS(i);
-		if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+		if (channelHash != NULL)
 		{
-			/*
-			 * Always signal listeners in our own database, unless they're
-			 * already caught up (unlikely, but possible).
-			 */
+			ChannelHashKey key;
+
+			ChannelHashPrepareKey(&key, MyDatabaseId, channel);
+			entry = dshash_find(channelHash, &key, false);
+		}
+
+		if (entry == NULL)
+			continue;			/* No listeners registered for this channel */
+
+		listeners = (ProcNumber *) dsa_get_address(channelDSA,
+												   entry->listenersArray);
+
+		for (int j = 0; j < entry->numListeners; j++)
+		{
+			ProcNumber	i = listeners[j];
+			int32		pid;
+			QueuePosition pos;
+
+			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+			pid = QUEUE_BACKEND_PID(i);
+
+			/* Skip if caught up or wrong database */
 			if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD))
 				continue;
+			if (QUEUE_BACKEND_DBOID(i) != MyDatabaseId)
+				continue;
+
+			Assert(pid != InvalidPid);
+
+			QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+			pids[count] = pid;
+			procnos[count] = i;
+			count++;
 		}
-		else
+
+		dshash_release_lock(channelHash, entry);
+	}
+
+	queue_length = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+									  QUEUE_POS_PAGE(QUEUE_TAIL));
+
+	/* Check for lagging backends when the queue spans multiple pages */
+	if (queue_length > 0)
+	{
+		bool		tail_woken = false;
+
+		for (ProcNumber i = QUEUE_FIRST_LISTENER;
+			 i != INVALID_PROC_NUMBER;
+			 i = QUEUE_NEXT_LISTENER(i))
 		{
-			/*
-			 * Listeners in other databases should be signaled only if they
-			 * are far behind.
-			 */
-			if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
-								   QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY)
+			QueuePosition pos;
+			int64		lag;
+			int32		pid;
+
+			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
 				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+
+			/* Signal one backend positioned at the global tail */
+			if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL),
+												  QUEUE_POS_PAGE(pos)) == 0)
+			{
+				pid = QUEUE_BACKEND_PID(i);
+				Assert(pid != InvalidPid);
+
+				QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+				pids[count] = pid;
+				procnos[count] = i;
+				count++;
+				tail_woken = true;
+				continue;
+			}
+
+			lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
+									 QUEUE_POS_PAGE(pos));
+
+			/* Need to signal if a backend has fallen too far behind */
+			if (lag >= QUEUE_CLEANUP_DELAY)
+			{
+				pid = QUEUE_BACKEND_PID(i);
+				Assert(pid != InvalidPid);
+
+				QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
+				pids[count] = pid;
+				procnos[count] = i;
+				count++;
+			}
 		}
-		/* OK, need to signal this one */
-		pids[count] = pid;
-		procnos[count] = i;
-		count++;
 	}
 	LWLockRelease(NotifyQueueLock);
 
@@ -1673,9 +2031,9 @@ AtAbort_Notify(void)
 	/*
 	 * If we LISTEN but then roll back the transaction after PreCommit_Notify,
 	 * we have registered as a listener but have not made any entry in
-	 * listenChannels.  In that case, deregister again.
+	 * channelHash.  In that case, deregister again.
 	 */
-	if (amRegisteredListener && listenChannels == NIL)
+	if (amRegisteredListener && numChannelsListeningOn == 0)
 		asyncQueueUnregister();
 
 	/* And clean up */
@@ -1865,6 +2223,7 @@ asyncQueueReadAllNotifications(void)
 	LWLockAcquire(NotifyQueueLock, LW_SHARED);
 	/* Assert checks that we have a valid state entry */
 	Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber));
+	QUEUE_BACKEND_WAKEUP_PENDING(MyProcNumber) = false;
 	pos = QUEUE_BACKEND_POS(MyProcNumber);
 	head = QUEUE_HEAD;
 	LWLockRelease(NotifyQueueLock);
@@ -2186,7 +2545,7 @@ ProcessIncomingNotify(bool flush)
 	notifyInterruptPending = false;
 
 	/* Do nothing else if we aren't actively listening */
-	if (listenChannels == NIL)
+	if (numChannelsListeningOn == 0)
 		return;
 
 	if (Trace_notify)
@@ -2395,3 +2754,55 @@ check_notify_buffers(int *newval, void **extra, GucSource source)
 {
 	return check_slru_buffers("notify_buffers", newval);
 }
+
+
+/*
+ * ChannelHashPrepareKey
+ *		Prepare a channel key for use as a hash key.
+ */
+static inline void
+ChannelHashPrepareKey(ChannelHashKey *key, Oid dboid, const char *channel)
+{
+	memset(key, 0, sizeof(ChannelHashKey));
+	key->dboid = dboid;
+	strlcpy(key->channel, channel, NAMEDATALEN);
+}
+
+/*
+ * GetPendingNotifyChannels
+ *		Get list of unique channel names from pending notifications.
+ */
+static List *
+GetPendingNotifyChannels(void)
+{
+	List	   *channels = NIL;
+	ListCell   *p;
+	ListCell   *q;
+	bool		found;
+
+	if (!pendingNotifies)
+		return NIL;
+
+	foreach(p, pendingNotifies->events)
+	{
+		Notification *n = (Notification *) lfirst(p);
+		char	   *channel = n->data;
+
+		found = false;
+		foreach(q, channels)
+		{
+			char	   *existing = (char *) lfirst(q);
+
+			if (strcmp(existing, channel) == 0)
+			{
+				found = true;
+				break;
+			}
+		}
+
+		if (!found)
+			channels = lappend(channels, channel);
+	}
+
+	return channels;
+}
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 7553f6eacef..a4fadbd0767 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -366,6 +366,7 @@ SubtransBuffer	"Waiting for I/O on a sub-transaction SLRU buffer."
 MultiXactOffsetBuffer	"Waiting for I/O on a multixact offset SLRU buffer."
 MultiXactMemberBuffer	"Waiting for I/O on a multixact member SLRU buffer."
 NotifyBuffer	"Waiting for I/O on a <command>NOTIFY</command> message SLRU buffer."
+NotifyChannelHash	"Waiting to access the <command>NOTIFY</command> channel hash table."
 SerialBuffer	"Waiting for I/O on a serializable transaction conflict SLRU buffer."
 WALInsert	"Waiting to insert WAL data into a memory buffer."
 BufferContent	"Waiting to access a data page in memory."
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 06a1ffd4b08..2768ddf4414 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -100,6 +100,7 @@ PG_LWLOCKTRANCHE(SUBTRANS_BUFFER, SubtransBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTOFFSET_BUFFER, MultiXactOffsetBuffer)
 PG_LWLOCKTRANCHE(MULTIXACTMEMBER_BUFFER, MultiXactMemberBuffer)
 PG_LWLOCKTRANCHE(NOTIFY_BUFFER, NotifyBuffer)
+PG_LWLOCKTRANCHE(NOTIFY_CHANNEL_HASH, NotifyChannelHash)
 PG_LWLOCKTRANCHE(SERIAL_BUFFER, SerialBuffer)
 PG_LWLOCKTRANCHE(WAL_INSERT, WALInsert)
 PG_LWLOCKTRANCHE(BUFFER_CONTENT, BufferContent)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 5290b91e83e..5ccdd4043e8 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -412,6 +412,8 @@ CatalogIdMapEntry
 CatalogIndexState
 ChangeVarNodes_callback
 ChangeVarNodes_context
+ChannelEntry
+ChannelHashKey
 CheckPoint
 CheckPointStmt
 CheckpointStatsData
-- 
2.50.1



  [application/octet-stream] 0003-optimize_listen_notify-v15.patch (5.9K, 4-0003-optimize_listen_notify-v15.patch)
  download | inline diff:
From 3fe37ec554905d69f71a05e9dec26d5b3ac7fd23 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Sat, 11 Oct 2025 07:28:57 +0200
Subject: [PATCH 3/3] Optimize LISTEN/NOTIFY by advancing idle backends
 directly

Building on the previous channel-specific listener tracking
optimization, this patch further reduces context switching by detecting
idle listening backends that don't listen to any of the channels being
notified and advancing their queue positions directly without waking
them up.

When a backend commits notifications, it now saves both the queue head
position before and after writing. In SignalBackends(), backends that
are at the old queue head and weren't marked for wakeup (meaning they
don't listen to any of the notified channels) are advanced directly to
the new queue head. This eliminates unnecessary wakeups for these
backends, which would otherwise wake up, scan through all the
notifications, skip each one, and advance to the same position anyway.

The implementation carefully handles the race condition where other
backends may write notifications after the heavyweight lock is released
but before SignalBackends() is called. By saving queueHeadAfterWrite
immediately after writing (before releasing the lock), we ensure
backends are only advanced over the exact notifications we wrote, not
notifications from other concurrent backends.
---
 src/backend/commands/async.c | 79 ++++++++++++++++++++++++++++--------
 1 file changed, 62 insertions(+), 17 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index bb5ebfab26d..5570f73dd13 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -500,6 +500,8 @@ typedef struct NotificationList
 	int			nestingLevel;	/* current transaction nesting depth */
 	List	   *events;			/* list of Notification structs */
 	HTAB	   *hashtab;		/* hash of NotificationHash structs, or NULL */
+	QueuePosition queueHeadBeforeWrite; /* QUEUE_HEAD before writing notifies */
+	QueuePosition queueHeadAfterWrite;	/* QUEUE_HEAD after writing notifies */
 	struct NotificationList *upper; /* details for upper transaction levels */
 } NotificationList;
 
@@ -1048,6 +1050,7 @@ PreCommit_Notify(void)
 	if (pendingNotifies)
 	{
 		ListCell   *nextNotify;
+		bool		firstIteration = true;
 
 		/*
 		 * Make sure that we have an XID assigned to the current transaction.
@@ -1076,6 +1079,9 @@ PreCommit_Notify(void)
 		LockSharedObject(DatabaseRelationId, InvalidOid, 0,
 						 AccessExclusiveLock);
 
+		/* Initialize queueHeadBeforeWrite to a safe default */
+		SET_QUEUE_POS(pendingNotifies->queueHeadBeforeWrite, 0, 0);
+
 		/* Now push the notifications into the queue */
 		nextNotify = list_head(pendingNotifies->events);
 		while (nextNotify != NULL)
@@ -1093,6 +1099,19 @@ PreCommit_Notify(void)
 			 * point in time we can still roll the transaction back.
 			 */
 			LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE);
+
+			/*
+			 * On the first iteration, save the queue head position before we
+			 * write any notifications.  This is used by SignalBackends() to
+			 * identify backends that can be advanced directly without waking
+			 * them up.
+			 */
+			if (firstIteration)
+			{
+				pendingNotifies->queueHeadBeforeWrite = QUEUE_HEAD;
+				firstIteration = false;
+			}
+
 			asyncQueueFillWarning();
 			if (asyncQueueIsFull())
 				ereport(ERROR,
@@ -1102,6 +1121,18 @@ PreCommit_Notify(void)
 			LWLockRelease(NotifyQueueLock);
 		}
 
+		/*
+		 * Save the queue head after writing all our notifications.  This is
+		 * used by SignalBackends() to know where to advance idle backends to.
+		 * We must save this now because other backends may write their own
+		 * notifications after we release the heavyweight lock but before we
+		 * call SignalBackends(), and we must not advance backends over those
+		 * other notifications.
+		 */
+		LWLockAcquire(NotifyQueueLock, LW_SHARED);
+		pendingNotifies->queueHeadAfterWrite = QUEUE_HEAD;
+		LWLockRelease(NotifyQueueLock);
+
 		/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
 	}
 }
@@ -1934,14 +1965,43 @@ SignalBackends(void)
 		dshash_release_lock(channelHash, entry);
 	}
 
+	/*
+	 * Avoid needing to wake listening backends that are at the old queue head
+	 * (before we wrote our notifications) that we know are not interested in
+	 * our notifications, since otherwise they would have been marked for
+	 * wakeup by now.  Do this by advancing them directly to the new queue
+	 * head.
+	 */
+	if (pendingNotifies != NULL)
+	{
+		QueuePosition oldHead = pendingNotifies->queueHeadBeforeWrite;
+		QueuePosition newHead = pendingNotifies->queueHeadAfterWrite;
+
+		for (ProcNumber i = QUEUE_FIRST_LISTENER;
+			 i != INVALID_PROC_NUMBER;
+			 i = QUEUE_NEXT_LISTENER(i))
+		{
+			QueuePosition pos;
+
+			if (QUEUE_BACKEND_WAKEUP_PENDING(i))
+				continue;
+
+			pos = QUEUE_BACKEND_POS(i);
+
+			if (QUEUE_POS_EQUAL(pos, oldHead) &&
+				QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
+			{
+				QUEUE_BACKEND_POS(i) = newHead;
+			}
+		}
+	}
+
 	queue_length = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
 									  QUEUE_POS_PAGE(QUEUE_TAIL));
 
 	/* Check for lagging backends when the queue spans multiple pages */
 	if (queue_length > 0)
 	{
-		bool		tail_woken = false;
-
 		for (ProcNumber i = QUEUE_FIRST_LISTENER;
 			 i != INVALID_PROC_NUMBER;
 			 i = QUEUE_NEXT_LISTENER(i))
@@ -1955,21 +2015,6 @@ SignalBackends(void)
 
 			pos = QUEUE_BACKEND_POS(i);
 
-			/* Signal one backend positioned at the global tail */
-			if (!tail_woken && asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_TAIL),
-												  QUEUE_POS_PAGE(pos)) == 0)
-			{
-				pid = QUEUE_BACKEND_PID(i);
-				Assert(pid != InvalidPid);
-
-				QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
-				pids[count] = pid;
-				procnos[count] = i;
-				count++;
-				tail_woken = true;
-				continue;
-			}
-
 			lag = asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD),
 									 QUEUE_POS_PAGE(pos));
 
-- 
2.50.1



  [text/plain] pgbench-listen-notify-benchmark-patch.txt (9.3K, 5-pgbench-listen-notify-benchmark-patch.txt)
  download | inline diff:
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1515ed405ba..b462dcc8348 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -35,6 +35,7 @@
 
 #include <ctype.h>
 #include <float.h>
+#include <inttypes.h>
 #include <limits.h>
 #include <math.h>
 #include <signal.h>
@@ -237,6 +238,11 @@ static const char *const PARTITION_METHOD[] = {"none", "range", "hash"};
 /* random seed used to initialize base_random_sequence */
 static int64 random_seed = -1;
 
+/* LISTEN/NOTIFY benchmark mode parameters */
+static bool listen_notify_mode = false;	/* enable LISTEN/NOTIFY benchmark */
+static int	notify_round_trips = 100;	/* number of round-trips per iteration */
+static int	notify_idle_step = 10;		/* idle listeners to add per iteration */
+
 /*
  * end of configurable parameters
  *********************************************************************/
@@ -930,6 +936,10 @@ usage(void)
 		   "                           (same as \"-b simple-update\")\n"
 		   "  -S, --select-only        perform SELECT-only transactions\n"
 		   "                           (same as \"-b select-only\")\n"
+		   "  --listen-notify-benchmark\n"
+		   "                           run LISTEN/NOTIFY round-trip benchmark\n"
+		   "  --notify-round-trips=NUM number of round-trips per iteration (default: 100)\n"
+		   "  --notify-idle-step=NUM   idle listeners to add per iteration (default: 10)\n"
 		   "\nBenchmarking options:\n"
 		   "  -c, --client=NUM         number of concurrent database clients (default: 1)\n"
 		   "  -C, --connect            establish new connection for each transaction\n"
@@ -6689,6 +6699,216 @@ set_random_seed(const char *seed)
 	return true;
 }
 
+/*
+ * Run LISTEN/NOTIFY round-trip benchmark
+ *
+ * This benchmark measures the round-trip time between two processes that
+ * ping-pong NOTIFY messages while adding idle listening connections.
+ */
+static void
+runListenNotifyBenchmark(void)
+{
+	PGconn	   *conn1 = NULL;
+	PGconn	   *conn2 = NULL;
+	PGconn	  **idle_conns = NULL;
+	int			num_idle = 0;
+	int			max_idle = 10000;	/* reasonable upper limit */
+	PGresult   *res;
+	char		channel1[] = "pgbench_channel_1";
+	char		channel2[] = "pgbench_channel_2";
+	char		notify_cmd[256];
+	bool		first_failure = false;
+
+	pg_log_info("starting LISTEN/NOTIFY round-trip benchmark");
+	pg_log_info("round-trips per iteration: %d", notify_round_trips);
+	pg_log_info("idle listeners added per iteration: %d", notify_idle_step);
+	printf("\n%14s  %19s  %19s\n", "idle_listeners", "round_trips_per_sec", "max_latency_usec");
+
+	/* Allocate array for idle connections */
+	idle_conns = (PGconn **) pg_malloc0(max_idle * sizeof(PGconn *));
+
+	/* Create two active connections for ping-pong */
+	conn1 = doConnect();
+	if (conn1 == NULL)
+		pg_fatal("failed to create connection 1");
+
+	conn2 = doConnect();
+	if (conn2 == NULL)
+		pg_fatal("failed to create connection 2");
+
+	/* Set up LISTEN on both connections */
+	snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel1);
+	res = PQexec(conn1, notify_cmd);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("LISTEN failed on connection 1: %s", PQerrorMessage(conn1));
+	PQclear(res);
+
+	snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", channel2);
+	res = PQexec(conn2, notify_cmd);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pg_fatal("LISTEN failed on connection 2: %s", PQerrorMessage(conn2));
+	PQclear(res);
+
+	/* Main benchmark loop: measure round-trips then add idle connections */
+	while (num_idle < max_idle)
+	{
+		int			i;
+		int64		total_latency = 0;
+		int64		max_latency = 0;
+
+		/* Perform round-trip measurements */
+		for (i = 0; i < notify_round_trips; i++)
+		{
+			pg_time_usec_t start_time,
+						end_time;
+			int64		latency;
+			PGnotify   *notify;
+			int			sock;
+			fd_set		input_mask;
+			struct timeval tv;
+
+			/* Clear any pending notifications */
+			PQconsumeInput(conn1);
+			while ((notify = PQnotifies(conn1)) != NULL)
+				PQfreemem(notify);
+			PQconsumeInput(conn2);
+			while ((notify = PQnotifies(conn2)) != NULL)
+				PQfreemem(notify);
+
+			/* Start timer and send notification from conn1 */
+			start_time = pg_time_now();
+			snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", channel2);
+			res = PQexec(conn1, notify_cmd);
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+				pg_fatal("NOTIFY failed: %s", PQerrorMessage(conn1));
+			PQclear(res);
+
+			/* Wait for notification on conn2 */
+			sock = PQsocket(conn2);
+			notify = NULL;
+			while (notify == NULL)
+			{
+				PQconsumeInput(conn2);
+				notify = PQnotifies(conn2);
+				if (notify == NULL)
+				{
+					/* Wait for data on socket */
+					FD_ZERO(&input_mask);
+					FD_SET(sock, &input_mask);
+					tv.tv_sec = 10;	/* 10 second timeout */
+					tv.tv_usec = 0;
+					if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+						pg_fatal("select() failed: %m");
+				}
+			}
+			PQfreemem(notify);
+
+			/* Send notification back from conn2 */
+			snprintf(notify_cmd, sizeof(notify_cmd), "NOTIFY %s", channel1);
+			res = PQexec(conn2, notify_cmd);
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+				pg_fatal("NOTIFY failed: %s", PQerrorMessage(conn2));
+			PQclear(res);
+
+			/* Wait for notification on conn1 */
+			sock = PQsocket(conn1);
+			notify = NULL;
+			while (notify == NULL)
+			{
+				PQconsumeInput(conn1);
+				notify = PQnotifies(conn1);
+				if (notify == NULL)
+				{
+					/* Wait for data on socket */
+					FD_ZERO(&input_mask);
+					FD_SET(sock, &input_mask);
+					tv.tv_sec = 10;	/* 10 second timeout */
+					tv.tv_usec = 0;
+					if (select(sock + 1, &input_mask, NULL, NULL, &tv) < 0)
+						pg_fatal("select() failed: %m");
+				}
+			}
+			PQfreemem(notify);
+
+			/* End timer */
+			end_time = pg_time_now();
+
+			/* Calculate individual round-trip latency */
+			latency = end_time - start_time;
+
+			/* Accumulate total latency and track maximum */
+			total_latency += latency;
+			if (latency > max_latency)
+				max_latency = latency;
+		}
+
+		/* Calculate and report round-trips per second and max latency */
+		fprintf(stdout, "%14d  %19.1f  %19" PRId64 "\n",
+				num_idle,
+				1000000.0 * notify_round_trips / total_latency,
+				max_latency);
+		fflush(stdout);
+
+		/* Stop if we hit connection limit */
+		if (first_failure)
+			break;
+
+		/* Add idle listening connections */
+		for (i = 0; i < notify_idle_step && num_idle < max_idle; i++)
+		{
+			PGconn	   *idle_conn;
+			char		idle_channel[256];
+
+			idle_conn = doConnect();
+			if (idle_conn == NULL)
+			{
+				if (!first_failure)
+				{
+					pg_log_info("reached max_connections at %d idle listeners", num_idle);
+					first_failure = true;
+				}
+				break;
+			}
+
+			/* Each idle connection listens on a unique channel */
+			snprintf(idle_channel, sizeof(idle_channel), "idle_%d", num_idle);
+			snprintf(notify_cmd, sizeof(notify_cmd), "LISTEN %s", idle_channel);
+
+			res = PQexec(idle_conn, notify_cmd);
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			{
+				pg_log_warning("LISTEN failed on idle connection %d: %s",
+							   num_idle, PQerrorMessage(idle_conn));
+				PQfinish(idle_conn);
+				PQclear(res);
+				first_failure = true;
+				break;
+			}
+			PQclear(res);
+
+			idle_conns[num_idle] = idle_conn;
+			num_idle++;
+		}
+
+		/* Stop if we couldn't add any connections */
+		if (first_failure && i == 0)
+			break;
+	}
+
+	/* Clean up */
+	pg_log_info("cleaning up connections");
+	PQfinish(conn1);
+	PQfinish(conn2);
+	for (int i = 0; i < num_idle; i++)
+	{
+		if (idle_conns[i])
+			PQfinish(idle_conns[i]);
+	}
+	pg_free(idle_conns);
+
+	pg_log_info("LISTEN/NOTIFY benchmark completed");
+}
+
 int
 main(int argc, char **argv)
 {
@@ -6739,6 +6959,9 @@ main(int argc, char **argv)
 		{"verbose-errors", no_argument, NULL, 15},
 		{"exit-on-abort", no_argument, NULL, 16},
 		{"debug", no_argument, NULL, 17},
+		{"listen-notify-benchmark", no_argument, NULL, 18},
+		{"notify-round-trips", required_argument, NULL, 19},
+		{"notify-idle-step", required_argument, NULL, 20},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -7092,6 +7315,22 @@ main(int argc, char **argv)
 			case 17:			/* debug */
 				pg_logging_increase_verbosity();
 				break;
+			case 18:			/* listen-notify-benchmark */
+				listen_notify_mode = true;
+				benchmarking_option_set = true;
+				break;
+			case 19:			/* notify-round-trips */
+				benchmarking_option_set = true;
+				if (!option_parse_int(optarg, "--notify-round-trips", 1, INT_MAX,
+									  &notify_round_trips))
+					exit(1);
+				break;
+			case 20:			/* notify-idle-step */
+				benchmarking_option_set = true;
+				if (!option_parse_int(optarg, "--notify-idle-step", 1, INT_MAX,
+									  &notify_idle_step))
+					exit(1);
+				break;
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -7210,6 +7449,20 @@ main(int argc, char **argv)
 			pg_fatal("some of the specified options cannot be used in benchmarking mode");
 	}
 
+	/* Handle LISTEN/NOTIFY benchmark mode */
+	if (listen_notify_mode)
+	{
+		/* Establish a database connection for setup */
+		if ((con = doConnect()) == NULL)
+			pg_fatal("could not connect to database");
+
+		/* Run the LISTEN/NOTIFY benchmark */
+		runListenNotifyBenchmark();
+
+		PQfinish(con);
+		exit(0);
+	}
+
 	if (nxacts > 0 && duration > 0)
 		pg_fatal("specify either a number of transactions (-t) or a duration (-T), not both");
 


reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected]
  Subject: Re: Optimize LISTEN/NOTIFY
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox