public inbox for [email protected]
help / color / mirror / Atom feedFrom: Tatsuo Ishii <[email protected]>
To: [email protected]
Subject: Feature: reduce sync messages
Date: Thu, 25 Dec 2025 16:26:00 +0900 (JST)
Message-ID: <[email protected]> (raw)
Currently pgpool forwards sync messages to all configure backend nodes
regardless the backend_weight or load_balance_mode. This is not only
a waste of CPU cycle, but degrades performance since it takes more
message round trip time as the number of backend nodes increases. This
is conspicuous if backend nodes are in a distant location. We should
send sync messages only to necessary backend nodes.
I have created a patch to enhance this. Send sync message only to
necessary backends.
Suppose there is a pgpool cluster with 3 backend nodes. Before patch:
test=# select 1 \bind \g
NOTICE: DB node id: 0 statement: Parse: select 1
NOTICE: DB node id: 0 statement: Bind: select 1
NOTICE: DB node id: 0 statement: D message
NOTICE: DB node id: 0 statement: Execute: select 1
NOTICE: DB node id: 0 statement: Sync
NOTICE: DB node id: 1 statement: Sync
NOTICE: DB node id: 2 statement: Sync
?column?
----------
1
(1 row)
As you can see, the sync messages are sent to backend node 0, 1 and 2,
although only node 0 is involved in the query "select 1". So sending
sync messages to node 1 and 2 are just waste of time.
After patch:
test=# select 1 \bind \g
NOTICE: DB node id: 2 statement: Parse: select 1
NOTICE: DB node id: 2 statement: Bind: select 1
NOTICE: DB node id: 2 statement: D message
NOTICE: DB node id: 2 statement: Execute: select 1
NOTICE: DB node id: 2 statement: Sync
?column?
----------
1
(1 row)
The sync message is only sent to node 2.
Now the implementation details.
The idea is, if pgpool does not send any query message to a backend
until now (more precisely since the last ReadyForQuery message), we
don't need to send a sync message to the backend node.
To decide which node we need to send the sync message, add a new
struct member "sync_map" to the session context. A sync_map is an
array of bool. Each array member corresponds to each backend
node. When pgpool forwards a message to backend, corresponding
sync_map member is set to true.
When a sync message is received from frontend, a sync pending message
is added as we already do. The difference is, previously no query
context is added to the sync pending message. Now we add a query
context with the sync map translated into where_to_send map in the
query context. Then we send a sync message to backend, consulting the
where_to_send map. This way, we don't need to send a sync message to
backend node which pgpool has never sent a query since the last
ReadyForQuery message.
When a Ready for query message arrives, we decide which backend to
read according to the where_to_send map, which is different from what
we do today: read from all available backend nodes. After receiving
the ReadyForQuery message, sync_map members are all set to false. Also
the sync message query context is destroyed at this timing.
There are a few other edge cases:
(1) When an ErrorResponse is received,
pool_discard_except_sync_and_ready_for_query() is called to remove any
pending messages (and backend buffer data) except sync pending message
and ReadyForQuery message. If the sync message is not found in the
queue and receives a sync message from frontend, add a new sync
pending message, consulting the sync_map as described above and
forward the sync message to backends.
(2) If no query is sent and a sync message is received, the sync_map
is all false. This doesn't make sense since there's no point to send a
sync message, but the protocol does not prohibit this. In this case we
set sync_map all true, which means we send sync messages to all
backend. This may not be the best way in terms of performance, but
this makes things simpler (and compatible what we are doing today).
Best regards,
--
Tatsuo Ishii
SRA OSS K.K.
English: http://www.sraoss.co.jp/index_en/
Japanese:http://www.sraoss.co.jp
Attachments:
[application/octet-stream] v1-0001-Send-sync-message-only-to-necessary-backends.patch (18.6K, 2-v1-0001-Send-sync-message-only-to-necessary-backends.patch)
download | inline diff:
From 4a28e7c93f5fe037a99547998cd36fc7cf70219f Mon Sep 17 00:00:00 2001
From: Tatsuo Ishii <[email protected]>
Date: Thu, 25 Dec 2025 15:57:25 +0900
Subject: [PATCH v1] Send sync message only to necessary backends.
Currently pgpool forwards sync messages to all configure backend
nodes. This is not only a waste of CPU cycle, but degrades
performance since it takes more message round trip time as the number
of backend nodes increases. This is conspicuous if backend nodes are
in a distant location. We should send sync messages only to necessary
backend nodes.
The idea is, if pgpool does not send any query message to a backend
until now (more precisely since the last ReadyForQuery message), we
don't need to send a sync message to the backend node.
To decide which node we need to send the sync message, add a new
struct member "sync_map" to the session context. A sync_map is an
array of bool. Each array member corresponds to each backend
node. When pgpool forwards a message to backend, corresponding
sync_map member is set to true.
When a sync message is received from frontend, a sync pending message
is added as we already do. The difference is, previously no query
context is added to the sync pending message. Now we add a query
context with the sync map translated into where_to_send map in the
query context. Then we send a sync message to backend, consulting the
where_to_send map. This way, we don't need to send a sync message to
backend node which pgpool has never sent a query since the last
ReadyForQuery message.
When a Ready for query message arrives, we decide which backend to
read according to the where_to_send map, which is different from what
we do today: read from all available backend nodes. After receiving
the ReadyForQuery message, sync_map members are all set to false. Also
the sync message query context is destroyed at this timing.
There are a few other edge cases:
(1) When an ErrorResponse is received,
pool_discard_except_sync_and_ready_for_query() is called to remove any
pending messages (and backend buffer data) except sync pending message
and ReadyForQuery message. If the sync message is not found in the
queue and receives a sync message from frontend, add a new sync
pending message, consulting the sync_map as described above and
forward the sync message to backends.
(2) If no query is sent and a sync message is received, the sync_map
is all false. This doesn't make sense since there's no point to send a
sync message, but the protocol does not prohibit this. In this case we
set sync_map all true, which means we send sync messages to all
backend. This may not be the best way in terms of performance, but
this makes things simpler (and compatible what we are doing today).
Author: Tatsuo Ishii <[email protected]>
---
src/context/pool_query_context.c | 20 ++--
src/context/pool_session_context.c | 40 ++++++-
src/include/context/pool_query_context.h | 3 +
src/include/context/pool_session_context.h | 11 +-
src/protocol/pool_process_query.c | 16 ++-
src/protocol/pool_proto_modules.c | 110 ++++++++++++++++--
.../regression/tests/006.memqcache/test.sh | 6 +
.../tests/039.log_backend_messages/expected.s | 1 -
src/test/regression/tests/130.declare/test.sh | 1 +
9 files changed, 187 insertions(+), 21 deletions(-)
diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c
index 1a13168c6..9445a7c01 100644
--- a/src/context/pool_query_context.c
+++ b/src/context/pool_query_context.c
@@ -63,7 +63,6 @@ static void where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *
static void where_to_send_native_replication(POOL_QUERY_CONTEXT *query_context, char *query, Node *node);
static char *remove_read_write(int len, const char *contents, int *rewritten_len);
-static void set_virtual_main_node(POOL_QUERY_CONTEXT *query_context);
static void set_load_balance_info(POOL_QUERY_CONTEXT *query_context);
static bool is_in_list(char *name, List *list);
@@ -404,22 +403,26 @@ pool_virtual_main_db_node_id(void)
* Make sure that virtual_main_node_id is either primary node id
* or load balance node id. If not, it is likely that
* virtual_main_node_id is not set up yet. Let's use the primary
- * node id. except for the special case where we need to send the
+ * node id except for the special case where we need to send the
* query to the node which is not primary nor the load balance
* node. Currently there is only one special such case that is
* handling of pg_terminate_backend() function, which may refer to
* the backend connection that is neither hosted by the primary or
* load balance node for current child process, but the query must
* be forwarded to that node. Since only that backend node can
- * handle that pg_terminate_backend query
- *
+ * handle that pg_terminate_backend query. Another exception is,
+ * processing "Sync" message. We create a special query context
+ * marked "sync_msg == true". If so, it is possible that the main
+ * node id could not be either ordinarily main node (the first
+ * alive node) nor the load balance node. So we check the flag.
*/
ereport(DEBUG5,
- (errmsg("pool_virtual_main_db_node_id: virtual_main_node_id:%d load_balance_node_id:%d PRIMARY_NODE_ID:%d",
- node_id, sc->load_balance_node_id, PRIMARY_NODE_ID)));
+ (errmsg("pool_virtual_main_db_node_id: virtual_main_node_id:%d load_balance_node_id:%d PRIMARY_NODE_ID:%d sync_msg:%d",
+ node_id, sc->load_balance_node_id, PRIMARY_NODE_ID, sc->query_context->sync_msg)));
- if (node_id != sc->query_context->load_balance_node_id && node_id != PRIMARY_NODE_ID)
+ if (!sc->query_context->sync_msg && node_id != sc->query_context->load_balance_node_id &&
+ node_id != PRIMARY_NODE_ID)
{
/*
* Only return the primary node id if we are not processing
@@ -825,6 +828,7 @@ pool_extended_send_and_wait(POOL_QUERY_CONTEXT *query_context,
stat_count_up(i, query_context->parse_tree);
}
+ set_sync_map(session_context, i);
send_extended_protocol_message(backend, i, kind, str_len, str);
if ((*kind == 'P' || *kind == 'E' || *kind == 'C') && STREAM)
@@ -1758,7 +1762,7 @@ pool_is_transaction_read_only(Node *node)
* multiple sending requests are in the map, the first node id is set to the
* virtual_main_node_id.
*/
-static void
+void
set_virtual_main_node(POOL_QUERY_CONTEXT *query_context)
{
int i;
diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c
index 6ed64b2de..416336fb0 100644
--- a/src/context/pool_session_context.c
+++ b/src/context/pool_session_context.c
@@ -4,7 +4,7 @@
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2003-2024 PgPool Global Development Group
+ * Copyright (c) 2003-2025 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
@@ -2118,3 +2118,41 @@ query_cache_disabled_tx(void)
return session_context->query_cache_disabled_tx;
}
+
+/*
+ * clear sync map
+ */
+void
+clear_sync_map(POOL_SESSION_CONTEXT *session_context)
+{
+ memset(session_context->sync_map, 0, sizeof(session_context->sync_map));
+}
+
+/*
+ * get sync map
+ */
+bool
+get_sync_map(POOL_SESSION_CONTEXT *session_context, int node_id)
+{
+ return session_context->sync_map[node_id];
+}
+
+/*
+ * set sync map
+ */
+void
+set_sync_map(POOL_SESSION_CONTEXT *session_context, int node_id)
+{
+ elog(DEBUG5, "set sync map %d", node_id);
+ session_context->sync_map[node_id] = true;
+}
+
+/*
+ * unset sync map
+ */
+void
+unset_sync_map(POOL_SESSION_CONTEXT *session_context, int node_id)
+{
+ elog(DEBUG5, "unset sync map %d", node_id);
+ session_context->sync_map[node_id] = false;
+}
diff --git a/src/include/context/pool_query_context.h b/src/include/context/pool_query_context.h
index 1d1ef13eb..c9fa2f439 100644
--- a/src/include/context/pool_query_context.h
+++ b/src/include/context/pool_query_context.h
@@ -96,6 +96,8 @@ typedef struct
bool partial_fetch; /* if true some rows have been fetched by an
* execute with non 0 row option */
+ bool sync_msg; /* true if this is a special query context for
+ * sync message */
MemoryContext memory_context; /* memory context for query context */
} POOL_QUERY_CONTEXT;
@@ -134,5 +136,6 @@ extern bool pool_is_transaction_read_only(Node *node);
extern void pool_force_query_node_to_backend(POOL_QUERY_CONTEXT *query_context, int backend_id);
extern void check_object_relationship_list(char *name, bool is_func_name);
extern int wait_for_failover_to_finish(void);
+extern void set_virtual_main_node(POOL_QUERY_CONTEXT *query_context);
#endif /* POOL_QUERY_CONTEXT_H */
diff --git a/src/include/context/pool_session_context.h b/src/include/context/pool_session_context.h
index e86966e2e..377bd97ca 100644
--- a/src/include/context/pool_session_context.h
+++ b/src/include/context/pool_session_context.h
@@ -6,7 +6,7 @@
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
- * Copyright (c) 2003-2024 PgPool Global Development Group
+ * Copyright (c) 2003-2025 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
@@ -343,6 +343,10 @@ typedef struct
*/
bool query_cache_disabled_tx;
+ /*
+ * A map to send sync message. Each entry represents backend node.
+ */
+ bool sync_map[MAX_NUM_BACKENDS];
} POOL_SESSION_CONTEXT;
extern void pool_init_session_context(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
@@ -438,4 +442,9 @@ extern void set_query_cache_disabled_tx(void);
extern void unset_query_cache_disabled_tx(void);
extern bool query_cache_disabled_tx(void);
+extern void clear_sync_map(POOL_SESSION_CONTEXT *session_context);
+extern bool get_sync_map(POOL_SESSION_CONTEXT *session_context, int node_id);
+extern void set_sync_map(POOL_SESSION_CONTEXT *session_context, int node_id);
+extern void unset_sync_map(POOL_SESSION_CONTEXT *session_context, int node_id);
+
#endif /* POOL_SESSION_CONTEXT_H */
diff --git a/src/protocol/pool_process_query.c b/src/protocol/pool_process_query.c
index 0dacbcbc4..76bd3c856 100644
--- a/src/protocol/pool_process_query.c
+++ b/src/protocol/pool_process_query.c
@@ -3355,11 +3355,23 @@ read_kind_from_backend(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
{
if (msg->type == POOL_SYNC)
{
+ StringInfoData buf;
+
ereport(DEBUG5,
(errmsg("read_kind_from_backend: sync pending message exists")));
- session_context->query_context = NULL;
pool_unset_ignore_till_sync();
- pool_unset_query_in_progress();
+ pool_pending_message_query_context_dest_set(msg, msg->query_context);
+ session_context->query_context = msg->query_context;
+ session_context->flush_pending = msg->flush_pending;
+ pool_set_query_in_progress();
+
+ /* emit debug log */
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "read_kind_from_backend sync_map: ");
+ for (i = 0; i < NUM_BACKENDS; i++)
+ appendStringInfo(&buf, "%d ", pool_get_session_context(false)->sync_map[i]);
+ elog(DEBUG5, "%s", buf.data);
+ pfree(buf.data);
}
else
{
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index 9f31442b7..2b573ec18 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -114,6 +114,8 @@ static void check_prepare(List *parse_tree_list, int len, char *contents);
static POOL_QUERY_CONTEXT *create_dummy_query_context(void);
+static void add_sync_pending_message(void);
+
/*
* This is the workhorse of processing the pg_terminate_backend function to
* make sure that the use of function should not trigger the backend node failover.
@@ -2489,6 +2491,19 @@ ReadyForQuery(POOL_CONNECTION *frontend,
*/
pool_ps_idle_display(backend);
+ clear_sync_map(session_context); /* reset sync map */
+
+ /* destroy query context for sync message */
+ if (session_context->query_context && session_context->query_context->sync_msg)
+ {
+ MemoryContext memory_context = session_context->query_context->memory_context;
+
+ elog(DEBUG5, "destroy query context for sync message");
+ pfree(session_context->query_context);
+ MemoryContextDelete(memory_context);
+ session_context->query_context = NULL;
+ }
+
return POOL_CONTINUE;
}
@@ -2991,11 +3006,14 @@ ProcessFrontendResponse(POOL_CONNECTION *frontend,
if (SL_MODE)
{
- POOL_PENDING_MESSAGE *msg;
-
- pool_unset_query_in_progress();
- msg = pool_pending_message_create('S', 0, NULL);
- pool_pending_message_add(msg);
+ /*
+ * If we are doing extended query in streaming replication or
+ * logical replication mode, add a sync pending message
+ * reflecting sync_map so that we send the sync message to
+ * only necessary backend. Previously we sent to all backends
+ * and it degrades the performance.
+ */
+ add_sync_pending_message();
}
else if (!pool_is_query_in_progress())
pool_set_query_in_progress();
@@ -4580,7 +4598,6 @@ pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend,
{
char kind;
int len;
- POOL_PENDING_MESSAGE *msg;
char *contents = NULL;
for (;;)
@@ -4592,8 +4609,11 @@ pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend,
contents = pool_read2(frontend, len);
if (kind == 'S')
{
- msg = pool_pending_message_create('S', 0, NULL);
- pool_pending_message_add(msg);
+ /*
+ * Sync message from frontend arrives. Add a sync pending
+ * message and forward to backend.
+ */
+ add_sync_pending_message();
SimpleForwardToBackend(kind, frontend, backend, len, contents);
break;
}
@@ -5286,3 +5306,77 @@ create_dummy_query_context(void)
return query_context;
}
+
+/*
+ * Previously we forwarded sync messages to all backends. This is not only a
+ * waste of CPU cycle, but degradation of performance because it adds round
+ * trip messages between pgpool and backend, which is not necessary to be sent
+ * a sync message at all if the backend received no query since the last
+ * ReadyForQuery. To fix this, we create a dummy query context which holds
+ * necessary destination backends using session_context->sync_map. The
+ * destinations are accumlation of backend node ids since the last
+ * ReadyForQuery. (upon receiving ReadyForQyery, sync_map is zero cleared.)
+ */
+static void
+add_sync_pending_message(void)
+{
+ POOL_PENDING_MESSAGE *msg;
+ POOL_SESSION_CONTEXT *session_context;
+ POOL_QUERY_CONTEXT *query_context;
+ StringInfoData buf;
+ int i;
+
+ /*
+ * This function should be called only when doing extended query in
+ * streaming replication or logical replication mode. If not, do nothing.
+ */
+ if (!pool_is_doing_extended_query_message() || !SL_MODE)
+ return;
+
+ msg = pool_pending_message_create('S', 0, NULL);
+ session_context = pool_get_session_context(false);
+ query_context = pool_init_query_context();
+
+ /* mark this query context for sync messages */
+ query_context->sync_msg = true;
+
+ /*
+ * Check if sync map is all false. (this could happen if sync message
+ * arrives without any query being sent since last ReadyForQuery). If so,
+ * we set all map members to true to avoid subsequent error in
+ * VALID_BACKEND.
+ */
+ for (i = 0; i < NUM_BACKENDS; i++)
+ {
+ if (session_context->sync_map[i] == true)
+ break;
+ }
+ if (i == NUM_BACKENDS)
+ memset(session_context->sync_map, true, NUM_BACKENDS);
+
+ /* copy sync map to query context's where_to_send map */
+ memcpy(query_context->where_to_send, session_context->sync_map,
+ sizeof(query_context->where_to_send));
+ /* set virtual main node id in the query context */
+ set_virtual_main_node(query_context);
+ /* copy query_context's where_to_send map to msg->node_ids */
+ pool_pending_message_dest_set(msg, query_context);
+ /* add to the pending message queue */
+ pool_pending_message_add(msg);
+
+ /*
+ * Set query in progress and set the query context to session context.
+ * This is necessary for subsequent SimpleForwardToBackend() to send the
+ * sync message to proper backend .
+ */
+ pool_set_query_in_progress();
+ session_context->query_context = query_context;
+
+ /* emit debug log */
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "ProcessBackendResponse sync_map: ");
+ for (i = 0; i < NUM_BACKENDS; i++)
+ appendStringInfo(&buf, "%d ", pool_get_session_context(false)->sync_map[i]);
+ elog(DEBUG5, "%s", buf.data);
+ pfree(buf.data);
+}
diff --git a/src/test/regression/tests/006.memqcache/test.sh b/src/test/regression/tests/006.memqcache/test.sh
index d00bdd084..f2371744d 100755
--- a/src/test/regression/tests/006.memqcache/test.sh
+++ b/src/test/regression/tests/006.memqcache/test.sh
@@ -34,6 +34,9 @@ do
echo "memory_cache_enabled = on" >> etc/pgpool.conf
echo "cache_safe_memqcache_table_list = 'cache_safe_v'" >> etc/pgpool.conf
echo "cache_unsafe_memqcache_table_list = 'cache_unsafe_t'" >> etc/pgpool.conf
+ echo "log_per_node_statement = on" >> etc/pgpool.conf
+ echo "log_client_messages = on" >> etc/pgpool.conf
+ echo "log_min_messages = debug5" >> etc/pgpool.conf
source ./bashrc.ports
@@ -538,6 +541,9 @@ $PGPOOL_SETUP -m s -n 2 || exit 1
echo "done."
echo "memory_cache_enabled = on" >> etc/pgpool.conf
+echo "log_per_node_statement = on" >> etc/pgpool.conf
+echo "log_client_messages = on" >> etc/pgpool.conf
+echo "log_min_messages = debug5" >> etc/pgpool.conf
cd ..
for i in 1 2 3 4 4 5 6 7
diff --git a/src/test/regression/tests/039.log_backend_messages/expected.s b/src/test/regression/tests/039.log_backend_messages/expected.s
index 5877fe361..a5ffeebec 100644
--- a/src/test/regression/tests/039.log_backend_messages/expected.s
+++ b/src/test/regression/tests/039.log_backend_messages/expected.s
@@ -88,7 +88,6 @@ FE=> Sync
<= BE DataRow
<= BE NoticeResponse(S LOG C XX000 M CommandComplete message from backend 1
<= BE CommandComplete(SELECT 3)
-<= BE NoticeResponse(S LOG C XX000 M ReadyForQuery message from backend 0
<= BE NoticeResponse(S LOG C XX000 M ReadyForQuery message from backend 1
<= BE ReadyForQuery(I)
FE=> Terminate
diff --git a/src/test/regression/tests/130.declare/test.sh b/src/test/regression/tests/130.declare/test.sh
index 65e00d571..8bcbb3e16 100755
--- a/src/test/regression/tests/130.declare/test.sh
+++ b/src/test/regression/tests/130.declare/test.sh
@@ -20,6 +20,7 @@ do
echo -n "creating test environment..."
$PGPOOL_SETUP -m $mode || exit 1
echo "done."
+ echo "log_min_messages = debug5" >> etc/pgpool.conf
source ./bashrc.ports
./startall
wait_for_pgpool_startup
--
2.43.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: Feature: reduce sync messages
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox