public inbox for [email protected]
help / color / mirror / Atom feedFrom: Nadav Shatz <[email protected]>
To: Tatsuo Ishii <[email protected]>
Cc: [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
Date: Mon, 23 Mar 2026 14:07:33 +0100
Message-ID: <CACeKOO2fMX9f9hFVHCYEF9BUWV0xD21K2JPQFYzZSWVdC6ZrXQ@mail.gmail.com> (raw)
In-Reply-To: <[email protected]>
References: <CACeKOO1SCQLwx4=4gp6+swH5NW_GLJCd0uOSL0cU2U7VFR_o8g@mail.gmail.com>
<[email protected]>
<CACeKOO0ifCtNP6b=h-24EWnw_NJy-KhOc9mWOLZKAxkF_iEogw@mail.gmail.com>
<[email protected]>
Hi Tatsuo,
Thank you for the thorough review and the fix to the tests!. Here's the
updated patch addressing all your comments.
re - replication_delay_source_cmd requirement
Good catch — the feature now also works when `delay_threshold_by_time > 0`.
I've added the TTL update call to `check_replication_time_lag()` (the
pg_stat_replication path), not just
`check_replication_time_lag_with_cmd()`. The docs are updated to reflect
that either `replication_delay_source_cmd` or `delay_threshold_by_time` can
provide the time-based delay.
re - Documentation compile error
Fixed — the xref was pointing to `runtime-config-track-table-mutation` but
the actual section ID is `runtime-config-table-mutation-map`.
Thanks again and looking forward to hearing back from you.
On Mon, Mar 23, 2026 at 6:13 AM Tatsuo Ishii <[email protected]> wrote:
>
> > Thank you for looking into this, fixing it and getting back to me.
> >
> > Looking forward to your update.
>
> It seems my commit fixed the issue.
>
> https://git.postgresql.org/gitweb/?p=pgpool2.git;a=commit;h=18f7f632de982d8fb5d0da2f2fdc48e26ac467e7
>
> So, I continue the review.
>
> + <para>
> + This feature requires <xref
> linkend="guc-replication-delay-source-cmd"> to be configured
> + for monitoring replication delay from replicas.
> + </para>
>
> Why this feature requires replication_delay_source_cmd to be set? Why
> can't we enable the feature as well when delay_threshold_by_time > 0?
> Both replication_delay_source_cmd and delay_threshold_by_time should
> provide standy delay in time, which provides enogh information to
> run the feature.
>
> 1. documentation
>
> - I get a compile error.
>
> openjade -wall -wno-unused-param -wno-empty -wfully-tagged -c
> /usr/share/sgml/docbook/stylesheet/dsssl/modular/catalog -d stylesheet.dsl
> -t sgml -i output-html -V html-index pgpool.sgml
> openjade:loadbalance.sgml:1122:21:X: reference to non-existent ID
> "RUNTIME-CONFIG-TRACK-TABLE-MUTATION"
>
> Best regards,
> --
> Tatsuo Ishii
> SRA OSS K.K.
> English: http://www.sraoss.co.jp/index_en/
> Japanese:http://www.sraoss.co.jp
>
--
Nadav Shatz
Tailor Brands | CTO
Attachments:
[application/octet-stream] table_track.patch (103.6K, 3-table_track.patch)
download | inline diff:
From 0a42bca011460e83156e5181ca7e2c4895b689c6 Mon Sep 17 00:00:00 2001
From: Nadav Shatz <[email protected]>
Date: Tue, 6 Jan 2026 12:41:50 +0200
Subject: [PATCH] feat(load_balance): add in-memory table mutation tracking
Introduces 'dml_adaptive_global' as a new value for disable_load_balance_on_write.
This mode is a superset of dml_adaptive: it performs per-transaction local tracking
AND cross-session shared-memory tracking of recently written tables, routing reads
to primary until a TTL (based on measured replication delay) expires.
Sub-parameters (track_table_mutation_*) control TTL factor, cold start duration,
hash table sizing, and query parse cache sizing.
---
doc/src/sgml/loadbalance.sgml | 334 ++++
src/Makefile.am | 1 +
src/config/pool_config_variables.c | 89 +
src/context/pool_query_context.c | 227 ++-
src/context/pool_session_context.c | 15 +-
src/include/pool.h | 4 +-
src/include/pool_config.h | 24 +-
src/include/utils/pool_track_table_mutation.h | 245 +++
src/main/pgpool_main.c | 29 +-
src/protocol/CommandComplete.c | 29 +
src/protocol/child.c | 8 +
src/protocol/pool_proto_modules.c | 6 +-
src/query_cache/pool_memqcache.c | 6 +
src/sample/pgpool.conf.sample-stream | 56 +
src/streaming_replication/pool_worker_child.c | 24 +
src/test/regression/libs.sh | 2 +
.../tests/042.track_table_mutation/test.sh | 354 ++++
.../043.track_table_mutation_watchdog/test.sh | 184 +++
src/utils/pool_track_table_mutation.c | 1453 +++++++++++++++++
19 files changed, 3075 insertions(+), 15 deletions(-)
create mode 100644 src/include/utils/pool_track_table_mutation.h
create mode 100755 src/test/regression/tests/042.track_table_mutation/test.sh
create mode 100755 src/test/regression/tests/043.track_table_mutation_watchdog/test.sh
create mode 100644 src/utils/pool_track_table_mutation.c
diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml
index 9e1e7b39b..7384ce81a 100644
--- a/doc/src/sgml/loadbalance.sgml
+++ b/doc/src/sgml/loadbalance.sgml
@@ -1110,6 +1110,18 @@ app_name_redirect_preference_list > database_redirect_preference_list > us
Dependent functions, triggers, and views on the tables can be configured
using <xref linkend="guc-dml-adaptive-object-relationship-list">
</para>
+
+ <para>
+ If this parameter is set to <varname>dml_adaptive_global</varname>,
+ <productname>Pgpool-II</> behaves like <varname>dml_adaptive</varname>
+ (per-transaction write tracking) and additionally uses shared memory to track
+ recently written tables across all sessions cluster-wide. When a table is
+ written in any session, subsequent reads of that table from any session are
+ routed to primary until a TTL (based on measured replication delay) expires.
+ This prevents stale reads after writes even across different connections.
+ See <xref linkend="runtime-config-table-mutation-map"> for the sub-parameters
+ that control the shared-memory tracking behavior.
+ </para>
</listitem>
</varlistentry>
@@ -1195,4 +1207,326 @@ dml_adaptive_object_relationship_list = 'table_1:table_2'
</variablelist>
</sect2>
+
+ <sect2 id="runtime-config-table-mutation-map">
+ <title>Table Mutation Map Configuration (Lagless Replica Reads)</title>
+
+ <para>
+ These parameters configure the track table mutation feature, which is activated by setting
+ <xref linkend="guc-disable-load-balance-on-write"> to <literal>dml_adaptive_global</literal>.
+ The feature tracks recently written tables to prevent stale reads from replica nodes during
+ replication lag, implementing the "lagless" architecture pattern for distributed systems
+ with read replicas.
+ </para>
+
+ <para>
+ When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period
+ (<literal>replication_delay * track_table_mutation_ttl_factor</literal>). Any SELECT queries on stale tables are routed
+ to the primary node instead of replicas, ensuring read-after-write consistency.
+ </para>
+
+ <para>
+ This feature requires time-based replication delay monitoring. This can be provided by either
+ <xref linkend="guc-replication-delay-source-cmd"> (external command mode) or by setting
+ <xref linkend="guc-delay-threshold-by-time"> (which uses <literal>pg_stat_replication.replay_lag</literal>
+ from PostgreSQL 10+). At least one of these must be configured for the TTL calculation to work.
+ </para>
+
+ <warning>
+ <para>
+ Enabling <literal>dml_adaptive_global</literal> increases shared memory consumption. With default settings,
+ the feature requires approximately 6.4 MB of shared memory (0.1 MB for table tracking + 6.3 MB for query cache).
+ Memory usage scales with configuration parameters:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ Table tracking: <literal>track_table_mutation_table_size * 40 bytes</literal> (default: 2048 * 40 = ~80 KB)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ Query cache: <literal>track_table_mutation_query_parse_cache_size * 640 bytes</literal> (default: 10000 * 640 = ~6.3 MB)
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ For high-traffic systems with large cache sizes (e.g., <literal>track_table_mutation_query_parse_cache_size = 100000</literal>),
+ memory usage can reach 64 MB or more. Consider your system's available shared memory when using <literal>dml_adaptive_global</literal>.
+ </para>
+ </warning>
+
+ <variablelist>
+
+ <varlistentry id="guc-track-table-mutation-ttl-factor" xreflabel="track_table_mutation_ttl_factor">
+ <term><varname>track_table_mutation_ttl_factor</varname> (<type>floating point</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_ttl_factor</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Multiplier for calculating the TTL: <literal>TTL = replication_delay * track_table_mutation_ttl_factor</literal>.
+ Higher values provide more safety margin but may reduce read replica utilization.
+ </para>
+ <para>
+ Valid range: 1.0-100.0. Default is <literal>5.0</literal>.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-max-staleness" xreflabel="track_table_mutation_max_staleness">
+ <term><varname>track_table_mutation_max_staleness</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_max_staleness</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum duration in milliseconds that a single table entry can continuously force queries to primary,
+ measured from when the table was first marked stale. When this cap is reached, the entry is expired
+ regardless of recent writes. If the table is written to again after expiry, a fresh tracking entry
+ is created.
+ </para>
+ <para>
+ This parameter bounds the cross-session impact of table mutation tracking. Even if a table is written
+ to in a tight loop, its effect on other sessions' load balancing is limited to this duration. For
+ legitimately busy tables, the gap between forced expiry and the next write re-marking the table is
+ negligible (typically milliseconds).
+ </para>
+ <para>
+ Set to 0 to disable the cap (not recommended for production).
+ Valid range: 0-3600000 ms. Default is <literal>60000</literal> (60 seconds).
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-cold-start-duration" xreflabel="track_table_mutation_cold_start_duration">
+ <term><varname>track_table_mutation_cold_start_duration</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_cold_start_duration</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Duration in milliseconds to route all queries to primary after a child process starts.
+ This prevents stale reads when a new connection is established before the track table mutation
+ is populated with recent write history.
+ </para>
+ <para>
+ When watchdog is enabled and the local node becomes the leader, Pgpool-II also triggers a
+ global cold start for this duration to avoid stale reads after leadership changes.
+ </para>
+ <para>
+ Valid range: 0-60000 ms. Default is <literal>2000</literal> (2 seconds).
+ Set to 0 to disable cold start behavior.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-table-buckets" xreflabel="track_table_mutation_table_buckets">
+ <term><varname>track_table_mutation_table_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_table_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the track table mutation hash table.
+ Higher values reduce hash collisions and improve lookup performance.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>1024</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-table-size" xreflabel="track_table_mutation_table_size">
+ <term><varname>track_table_mutation_table_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_table_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of tables that can be tracked simultaneously in the track table mutation.
+ When full, oldest entries are evicted using a simple eviction strategy.
+ </para>
+ <para>
+ Valid range: 128-131072. Default is <literal>2048</literal>.
+ Memory usage: approximately 40 bytes per entry.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-query-buckets" xreflabel="track_table_mutation_query_buckets">
+ <term><varname>track_table_mutation_query_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_query_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the query parse cache. The cache stores normalized
+ query strings mapped to their table dependencies to avoid repeated parsing.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>2048</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-query-parse-cache-size" xreflabel="track_table_mutation_query_parse_cache_size">
+ <term><varname>track_table_mutation_query_parse_cache_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_query_parse_cache_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of query parse results to cache. Uses LRU eviction when full.
+ Larger caches reduce parsing overhead but consume more shared memory.
+ </para>
+ <para>
+ Valid range: 100-1000000. Default is <literal>10000</literal>.
+ Memory usage: approximately 640 bytes per entry (~6.3 MB for default, ~64 MB for 100000 entries).
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+
+ <sect3 id="runtime-config-track-table-mutation-example">
+ <title>Track Table Mutation Configuration Example</title>
+ <para>
+ To enable track table mutation with replication delay monitoring:
+ </para>
+ <programlisting>
+# Enable dml_adaptive_global mode (includes track table mutation)
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_ttl_factor = 5.0
+track_table_mutation_max_staleness = 60000
+track_table_mutation_cold_start_duration = 2000
+
+# Option A: Use external command for replication delay
+replication_delay_source_cmd = '/path/to/get-replication-delay.sh'
+replication_delay_source_timeout = 10
+
+# Option B: Use pg_stat_replication replay_lag (PG 10+)
+# delay_threshold_by_time = 1000
+
+# Adjust cache sizes based on workload (increases memory usage)
+track_table_mutation_table_size = 4096
+track_table_mutation_query_parse_cache_size = 50000
+ </programlisting>
+ <para>
+ Total shared memory required for above configuration: approximately 31.2 MB (31 MB query cache + 0.2 MB table map + overhead).
+ Default configuration (10000 query cache entries, 2048 tables) requires approximately 6.4 MB.
+ </para>
+ </sect3>
+
+ <sect3 id="runtime-config-track-table-mutation-limitations">
+ <title>Limitations</title>
+ <para>
+ The track table mutation feature has the following limitation:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>PREPARE</literal> statements are not tracked. When a prepared statement
+ containing data modification is executed, the table mutation is not recorded.
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ If your application uses prepared statements and requires read-after-write consistency,
+ consider using explicit transaction routing or the <literal>/*NO LOAD BALANCE*/</literal>
+ comment directive for affected queries.
+ </para>
+ <para>
+ The following statement types <emphasis>are</emphasis> tracked and will mark tables as stale:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>INSERT</literal>, <literal>UPDATE</literal>, <literal>DELETE</literal>
+ statements (including those with <literal>RETURNING</literal> clauses).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>TRUNCATE</literal> statements (including multiple tables).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>MERGE</literal> statements (PostgreSQL 15+).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>WITH</literal> clauses containing data modifications (Common Table Expressions
+ with <literal>INSERT</literal>, <literal>UPDATE</literal>, or <literal>DELETE</literal>).
+ For example, <literal>WITH deleted AS (DELETE FROM t1 RETURNING *) SELECT * FROM deleted</literal>
+ will properly mark table <literal>t1</literal> as stale.
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ <emphasis>Transaction Rollback Behavior:</emphasis> Within explicit transactions, tables
+ are only marked as stale in shared memory when the transaction is committed. If the
+ transaction is rolled back, no tables are marked, since no actual data modification
+ occurred on replicas. This prevents rolled-back transactions from unnecessarily
+ disabling load balancing. For autocommit statements (outside explicit transactions),
+ tables are marked immediately upon command completion.
+ </para>
+
+ <para>
+ <emphasis>Cross-Session Impact and Safety Bounds:</emphasis>
+ Unlike <literal>dml_adaptive</literal> (which only affects the session that issued the write),
+ <literal>dml_adaptive_global</literal> affects all sessions reading the same table in the same database.
+ The following safety mechanisms bound this cross-session impact:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <emphasis>Maximum staleness cap:</emphasis> The <xref linkend="guc-track-table-mutation-max-staleness">
+ parameter (default: 60 seconds) limits how long any single table entry can continuously force primary
+ routing. Even under sustained writes, the entry expires after this period and is only renewed by
+ subsequent committed writes.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <emphasis>Database isolation:</emphasis> Table staleness tracking is scoped by database OID. Writes
+ in one database never affect load balancing decisions for sessions connected to a different database.
+ In multi-tenant deployments where tenants use separate databases, one tenant's write activity cannot
+ influence another tenant's query routing.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <emphasis>Committed writes only:</emphasis> Only committed transactions mark tables as stale.
+ Rolled-back transactions have no effect on the shared tracking state.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <emphasis>Bounded table map size:</emphasis> The shared memory table map has a fixed maximum size
+ (<xref linkend="guc-track-table-mutation-table-size">). At most this many tables can be marked stale
+ simultaneously, providing a natural ceiling on the feature's impact.
+ </para>
+ </listitem>
+ </itemizedlist>
+ </sect3>
+
+ </sect2>
+
</sect1>
diff --git a/src/Makefile.am b/src/Makefile.am
index 4678ab530..39588af58 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \
rewrite/pool_timestamp.c \
rewrite/pool_lobj.c \
utils/pool_select_walker.c \
+ utils/pool_track_table_mutation.c \
utils/strlcpy.c \
utils/psprintf.c \
utils/pool_params.c \
diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c
index ce13c42f6..a6b909d42 100644
--- a/src/config/pool_config_variables.c
+++ b/src/config/pool_config_variables.c
@@ -290,6 +290,7 @@ static const struct config_enum_entry disable_load_balance_on_write_options[] =
{"trans_transaction", DLBOW_TRANS_TRANSACTION, false},
{"always", DLBOW_ALWAYS, false},
{"dml_adaptive", DLBOW_DML_ADAPTIVE, false},
+ {"dml_adaptive_global", DLBOW_DML_ADAPTIVE_GLOBAL, false},
{NULL, 0, false}
};
@@ -1777,6 +1778,19 @@ static struct config_int_array ConfigureNamesIntArray[] =
static struct config_double ConfigureNamesDouble[] =
{
+ {
+ {"track_table_mutation_ttl_factor",
+ CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "TTL multiplier for track table mutation "
+ "(TTL = replication_delay * factor)",
+ CONFIG_VAR_TYPE_DOUBLE, false, 0
+ },
+ &g_pool_config.track_table_mutation_ttl_factor,
+ 5.0, /* boot value: 5x replication delay */
+ 1.0, 100.0, /* min, max */
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_DOUBLE
};
@@ -2397,6 +2411,81 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"track_table_mutation_max_staleness",
+ CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Maximum duration in milliseconds that a "
+ "table can be marked stale from its first "
+ "write. 0 disables the cap.",
+ CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS
+ },
+ &g_pool_config.track_table_mutation_max_staleness,
+ 60000, /* 60 seconds */
+ 0, 3600000, /* 0 to 1 hour */
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_cold_start_duration",
+ CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Duration in milliseconds to force queries "
+ "to primary after child process starts.",
+ CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS
+ },
+ &g_pool_config.track_table_mutation_cold_start_duration,
+ 2000, /* 2 seconds */
+ 0, 60000, /* 0 to 60 seconds */
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_table_buckets",
+ CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for track table mutation.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_table_buckets,
+ 1024,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_table_size",
+ CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in track table mutation.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_table_size,
+ 2048,
+ 128, 131072,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_query_buckets",
+ CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_query_buckets,
+ 2048,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_query_parse_cache_size",
+ CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_query_parse_cache_size,
+ 10000,
+ 100, 1000000,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_INT
};
diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c
index 7cf9813eb..683b0ec66 100644
--- a/src/context/pool_query_context.c
+++ b/src/context/pool_query_context.c
@@ -29,6 +29,7 @@
#include "utils/statistics.h"
#include "utils/pool_select_walker.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_session_context.h"
#include "context/pool_query_context.h"
#include "parser/nodes.h"
@@ -1828,15 +1829,23 @@ is_in_list(char *name, List *list)
static bool
is_select_object_in_temp_write_list(Node *node, void *context)
{
- if (node == NULL || pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
+ if (node == NULL ||
+ !DLBOW_IS_DML_ADAPTIVE(
+ pool_config->disable_load_balance_on_write))
return false;
if (IsA(node, RangeVar))
{
RangeVar *rgv = (RangeVar *) node;
- POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
+ POOL_SESSION_CONTEXT *session_context;
+ bool is_adaptive;
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context->is_in_transaction)
+ session_context = pool_get_session_context(false);
+ is_adaptive = DLBOW_IS_DML_ADAPTIVE(
+ pool_config->disable_load_balance_on_write);
+
+ if (is_adaptive &&
+ session_context->is_in_transaction)
{
ereport(DEBUG1,
(errmsg("is_select_object_in_temp_write_list: \"%s\", found relation \"%s\"", (char *) context, rgv->relname)));
@@ -1880,7 +1889,13 @@ static char *get_associated_object_from_dml_adaptive_relations
void
check_object_relationship_list(char *name, bool is_func_name)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && pool_config->parsed_dml_adaptive_object_relationship_list)
+ bool is_adaptive;
+
+ is_adaptive = DLBOW_IS_DML_ADAPTIVE(
+ pool_config->disable_load_balance_on_write);
+
+ if (is_adaptive &&
+ pool_config->parsed_dml_adaptive_object_relationship_list)
{
POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
@@ -1944,7 +1959,7 @@ add_object_into_temp_write_list(Node *node, void *context)
static void
dml_adaptive(Node *node, char *query)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
/* Set/Unset transaction status flags */
if (IsA(node, TransactionStmt))
@@ -1963,6 +1978,46 @@ dml_adaptive(Node *node, char *query)
}
else if (is_commit_or_rollback_query(node))
{
+ /*
+ * For dml_adaptive_global: on COMMIT, flush
+ * the accumulated table writes to shared
+ * memory. On ROLLBACK, skip -- the writes
+ * never committed so no stale-read risk
+ * exists. This prevents polluting the table
+ * map with rolled-back transactions.
+ */
+ int dlbow =
+ pool_config->disable_load_balance_on_write;
+ List *wlist =
+ session_context->transaction_temp_write_list;
+
+ if (dlbow == DLBOW_DML_ADAPTIVE_GLOBAL &&
+ is_commit_query(node) &&
+ wlist != NIL)
+ {
+ ListCell *cell;
+ int dboid;
+
+ dboid =
+ pool_track_table_mutation_get_database_oid();
+ if (dboid > 0)
+ {
+ foreach(cell, wlist)
+ {
+ char *tname;
+ int toid;
+
+ tname = (char *) lfirst(cell);
+ toid =
+ pool_table_name_to_oid(tname);
+
+ if (toid > 0)
+ pool_track_table_mutation_mark_table_written(
+ toid, dboid);
+ }
+ }
+ }
+
session_context->is_in_transaction = false;
if (session_context->transaction_temp_write_list != NIL)
@@ -2010,6 +2065,20 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
if (dest == POOL_PRIMARY)
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+
+ /*
+ * Resolve table and database OIDs now to populate relcache.
+ * This avoids potential hangs in CommandComplete where we shouldn't
+ * be running new queries against the backend.
+ */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ int *oids;
+
+ pool_extract_table_oids(node, &oids);
+ pool_track_table_mutation_get_database_oid();
+ }
}
/* Should be sent to both primary and standby? */
else if (dest == POOL_BOTH)
@@ -2139,6 +2208,154 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
}
+ /*
+ * Check track table mutation for recently
+ * written tables. If in cold start or any
+ * table was recently written, route to
+ * primary to avoid stale reads.
+ */
+ else if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ bool force_primary = false;
+ int lb_node;
+ POOL_QUERY_CONTEXT *qctx =
+ session_context->query_context;
+
+ if (pool_track_table_mutation_in_cold_start())
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance"
+ " because of track table"
+ " mutation cold start"),
+ errdetail("destination = PRIMARY"
+ " for query= \"%s\"",
+ query)));
+ force_primary = true;
+ }
+ else
+ {
+ SelectContext ctx;
+ int dboid;
+ int num_oids;
+ int i;
+
+ memset(&ctx, 0, sizeof(ctx));
+ num_oids =
+ pool_extract_table_oids_from_select_stmt(
+ node, &ctx);
+ if (num_oids > 0)
+ {
+ dboid =
+ pool_track_table_mutation_get_database_oid();
+
+ if (dboid <= 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load"
+ " balance because"
+ " database oid was"
+ " unavailable"),
+ errdetail("destination"
+ " = PRIMARY for"
+ " query= \"%s\"",
+ query)));
+ force_primary = true;
+ }
+ else
+ {
+ for (i = 0; i < num_oids; i++)
+ {
+ bool stale;
+
+ stale =
+ pool_track_table_mutation_table_is_stale(
+ ctx.table_oids[i],
+ dboid);
+ if (stale)
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load"
+ " balance because"
+ " table \"%s\" was"
+ " recently written",
+ ctx.table_names[i]),
+ errdetail("destination"
+ " = PRIMARY for"
+ " query= \"%s\"",
+ query)));
+ force_primary = true;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (force_primary)
+ {
+ pool_set_node_to_be_sent(
+ query_context,
+ PRIMARY_NODE_ID);
+ }
+ else
+ {
+ if (pool_config->statement_level_load_balance)
+ {
+ session_context->load_balance_node_id =
+ select_load_balancing_node();
+ }
+
+ /*
+ * If replication delay is too much,
+ * and prefer_lower_delay_standby is
+ * true then elect the lowest-delayed
+ * node, otherwise send to primary.
+ */
+ lb_node =
+ session_context->load_balance_node_id;
+ if (STREAM &&
+ check_replication_delay(lb_node))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load"
+ " balance because of"
+ " too much replication"
+ " delay"),
+ errdetail("destination"
+ " = %d for"
+ " query= \"%s\"",
+ dest, query)));
+
+ if (pool_config->prefer_lower_delay_standby)
+ {
+ lb_node =
+ select_load_balancing_node();
+ session_context->load_balance_node_id =
+ lb_node;
+ qctx->load_balance_node_id =
+ lb_node;
+ pool_set_node_to_be_sent(
+ query_context,
+ lb_node);
+ }
+ else
+ {
+ pool_set_node_to_be_sent(
+ query_context,
+ PRIMARY_NODE_ID);
+ }
+ }
+ else
+ {
+ qctx->load_balance_node_id =
+ session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(
+ query_context,
+ qctx->load_balance_node_id);
+ }
+ }
+ }
else
{
if (pool_config->statement_level_load_balance)
diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c
index ded41c7fc..3ebd68e10 100644
--- a/src/context/pool_session_context.c
+++ b/src/context/pool_session_context.c
@@ -532,7 +532,7 @@ dump_sent_message(char *caller, POOL_SENT_MESSAGE *m)
static void
dml_adaptive_init(void)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
session_context->is_in_transaction = false;
session_context->transaction_temp_write_list = NIL;
@@ -542,7 +542,9 @@ dml_adaptive_init(void)
static void
dml_adaptive_destroy(void)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context)
+ if (DLBOW_IS_DML_ADAPTIVE(
+ pool_config->disable_load_balance_on_write) &&
+ session_context)
{
if (session_context->transaction_temp_write_list != NIL)
list_free_deep(session_context->transaction_temp_write_list);
@@ -738,10 +740,13 @@ void
pool_set_writing_transaction(void)
{
/*
- * If disable_transaction_on_write is 'off' or 'dml_adaptive', then never
- * turn on writing transaction flag.
+ * If disable_load_balance_on_write is 'off' or 'dml_adaptive' or
+ * 'dml_adaptive_global', then never turn on writing transaction flag.
*/
- if (pool_config->disable_load_balance_on_write != DLBOW_OFF && pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
+ if (pool_config->disable_load_balance_on_write !=
+ DLBOW_OFF &&
+ !DLBOW_IS_DML_ADAPTIVE(
+ pool_config->disable_load_balance_on_write))
{
pool_get_session_context(false)->writing_transaction = true;
ereport(DEBUG5,
diff --git a/src/include/pool.h b/src/include/pool.h
index 65907dcf1..0e901691a 100644
--- a/src/include/pool.h
+++ b/src/include/pool.h
@@ -424,7 +424,7 @@ typedef enum
#define Min(x, y) ((x) < (y) ? (x) : (y))
-#define MAX_NUM_SEMAPHORES 8
+#define MAX_NUM_SEMAPHORES 10
#define CONN_COUNTER_SEM 0
#define REQUEST_INFO_SEM 1
#define QUERY_CACHE_STATS_SEM 2
@@ -434,6 +434,8 @@ typedef enum
#define FOLLOW_PRIMARY_SEM 6
#define MAIN_EXIT_HANDLER_SEM 7 /* used in exit_hander in pgpool main
* process */
+#define TRACK_TABLE_MUTATION_TABLE_SEM 8
+#define TRACK_TABLE_MUTATION_QUERY_SEM 9
#define MAX_REQUEST_QUEUE_SIZE 10
#define MAX_SEC_WAIT_FOR_CLUSTER_TRANSACTION 10 /* time in seconds to keep
diff --git a/src/include/pool_config.h b/src/include/pool_config.h
index 9a397d166..c1e6ecc6f 100644
--- a/src/include/pool_config.h
+++ b/src/include/pool_config.h
@@ -105,9 +105,13 @@ typedef enum DLBOW_OPTION
DLBOW_TRANSACTION,
DLBOW_TRANS_TRANSACTION,
DLBOW_ALWAYS,
- DLBOW_DML_ADAPTIVE
+ DLBOW_DML_ADAPTIVE,
+ DLBOW_DML_ADAPTIVE_GLOBAL
} DLBOW_OPTION;
+#define DLBOW_IS_DML_ADAPTIVE(opt) \
+ ((opt) == DLBOW_DML_ADAPTIVE || (opt) == DLBOW_DML_ADAPTIVE_GLOBAL)
+
typedef enum RELQTARGET_OPTION
{
RELQTARGET_PRIMARY = 1,
@@ -365,6 +369,24 @@ typedef struct
* replication check */
char *replication_delay_source_cmd; /* external command for replication delay */
int replication_delay_source_timeout; /* timeout for external command in seconds */
+
+ /* Track table mutation configuration */
+ double track_table_mutation_ttl_factor; /* TTL multiplier for
+ * replication delay */
+ int track_table_mutation_max_staleness; /* max staleness
+ * duration ms */
+ int track_table_mutation_cold_start_duration; /* cold start
+ * duration ms */
+ int track_table_mutation_table_buckets; /* hash buckets for
+ * table map */
+ int track_table_mutation_table_size; /* max table map
+ * entries */
+ int track_table_mutation_query_buckets; /* hash buckets for
+ * query cache */
+ int track_table_mutation_query_parse_cache_size; /* max query
+ * cache
+ * entries */
+
char *failover_command; /* execute command when failover happens */
char *follow_primary_command; /* execute command when failover is
* ended */
diff --git a/src/include/utils/pool_track_table_mutation.h b/src/include/utils/pool_track_table_mutation.h
new file mode 100644
index 000000000..b0de2d809
--- /dev/null
+++ b/src/include/utils/pool_track_table_mutation.h
@@ -0,0 +1,245 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_track_table_mutation.h: In-memory tracking of
+ * recently written tables to prevent stale reads.
+ */
+
+#ifndef POOL_TRACK_TABLE_MUTATION_H
+#define POOL_TRACK_TABLE_MUTATION_H
+
+#include "pool.h"
+#include <sys/time.h>
+
+/*
+ * Maximum table name length including schema: "schema"."table"
+ * Using NAMEDATALEN * 2 + 4 for quotes and dot
+ */
+#define TRACK_TABLE_MUTATION_TABLE_NAME_LEN (NAMEDATALEN * 2 + 4)
+
+/*
+ * Maximum number of tables we track per query
+ */
+#define TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY 8
+
+/*
+ * Invalid index marker for linked lists
+ */
+#define TRACK_TABLE_MUTATION_INVALID_INDEX (-1)
+
+/*
+ * Default TTL in microseconds (100ms) used when replication delay is unknown
+ */
+#define TRACK_TABLE_MUTATION_DEFAULT_TTL_US (100 * 1000)
+
+/*
+ * Entry in the table mutation hash table (keyed by table/database oids)
+ */
+typedef struct TrackTableMutationEntry
+{
+ int table_oid; /* Table oid */
+ int dboid; /* Database oid */
+ struct timeval first_write_time; /* When the entry was first created */
+ struct timeval last_write_time; /* When the table was last written */
+ uint32 hash; /* Pre-computed hash value */
+ int next; /* Next in collision chain */
+ bool in_use; /* Is this entry in use? */
+} TrackTableMutationEntry;
+
+/*
+ * Header for the table mutation hash table in shared memory
+ */
+typedef struct TrackTableMutationHashTable
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * TrackTableMutationEntry entries[max_entries];
+ */
+} TrackTableMutationHashTable;
+
+/*
+ * Entry in the query parse cache
+ */
+typedef struct QueryParseEntry
+{
+ uint64 query_hash; /* Hash of normalized query */
+ bool is_write; /* True if INSERT/UPDATE/DELETE */
+ int num_tables; /* Number of tables in query */
+ char table_names
+ [TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY]
+ [TRACK_TABLE_MUTATION_TABLE_NAME_LEN];
+ int next; /* Next entry in collision chain */
+ int lru_prev; /* Previous in LRU list */
+ int lru_next; /* Next in LRU list */
+ bool in_use; /* Is this entry in use? */
+} QueryParseEntry;
+
+/*
+ * Header for the query parse cache in shared memory
+ */
+typedef struct QueryParseCache
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ int lru_head; /* Most recently used */
+ int lru_tail; /* Least recently used */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * QueryParseEntry entries[max_entries];
+ */
+} QueryParseCache;
+
+/*
+ * Global state for track table mutation feature
+ */
+typedef struct TrackTableMutationState
+{
+ bool initialized; /* Shmem initialized? */
+ uint64 current_ttl_us; /* Current TTL in microseconds */
+ struct timeval ttl_last_updated; /* When TTL was last updated */
+ struct timeval last_cleanup_time; /* When last expired cleanup ran */
+ struct timeval global_cold_start_until; /* Global cold start end time */
+ uint32 stats_queries_checked; /* Queries checked */
+ uint32 stats_forced_primary; /* Forced to primary */
+ uint32 stats_allowed_replica; /* Allowed to replica */
+} TrackTableMutationState;
+
+/*
+ * Main shared memory structure containing all components
+ */
+typedef struct TrackTableMutationShmem
+{
+ TrackTableMutationState state;
+ TrackTableMutationHashTable *table_map;
+ QueryParseCache *query_cache;
+} TrackTableMutationShmem;
+
+/* ----------------
+ * Public API functions
+ * ----------------
+ */
+
+/*
+ * Initialize shared memory structures for track table mutation.
+ * Called from pgpool_main.c after pool_init_pool_info().
+ */
+extern void pool_track_table_mutation_init(void);
+
+/*
+ * Initialize per-child process state for track table mutation.
+ * Called from child.c when a new child process starts.
+ * Sets up cold start tracking.
+ */
+extern void pool_track_table_mutation_child_init(void);
+
+/*
+ * Check if the child process is in cold start period.
+ * During cold start, all queries are routed to primary.
+ * Returns true if in cold start, false otherwise.
+ */
+extern bool pool_track_table_mutation_in_cold_start(void);
+
+/*
+ * Trigger a global cold start period for all processes.
+ * Used after watchdog leader change to avoid stale reads.
+ */
+extern void pool_track_table_mutation_trigger_global_cold_start(void);
+
+/*
+ * Get oid of current database.
+ */
+extern int pool_track_table_mutation_get_database_oid(void);
+
+/*
+ * Check if a table was recently written to (is "stale").
+ * If stale, reads from this table should go to primary.
+ * Returns true if table is stale (recently written), false otherwise.
+ */
+extern bool pool_track_table_mutation_table_is_stale(
+ int table_oid, int dboid);
+
+/*
+ * Mark tables as recently written.
+ * Called after INSERT/UPDATE/DELETE queries complete.
+ * table_oids: array of table oids
+ * num_tables: number of tables in array
+ * dboid: database oid
+ */
+extern void pool_track_table_mutation_mark_tables_written(
+ const int *table_oids, int num_tables, int dboid);
+
+/*
+ * Convenience function to mark a single table as written.
+ * table_oid: table oid
+ * dboid: database oid
+ */
+extern void pool_track_table_mutation_mark_table_written(
+ int table_oid, int dboid);
+
+/*
+ * Update the TTL based on current replication delay.
+ * Called from pool_worker_child.c when replication delay is updated.
+ * delay_us: replication delay in microseconds
+ */
+extern void pool_track_table_mutation_update_ttl(uint64 delay_us);
+
+/*
+ * Look up cached parse result for a query.
+ * hash: hash of normalized query
+ * is_write: output - true if query is a write
+ * table_names: output - array to fill with table names
+ * num_tables: output - number of tables found
+ * Returns true if found in cache, false otherwise.
+ */
+extern bool pool_track_table_mutation_get_cached_parse(
+ uint64 hash, bool *is_write,
+ char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int *num_tables);
+
+/*
+ * Cache a parse result for a query.
+ * hash: hash of normalized query
+ * is_write: true if query is a write
+ * table_names: array of table names
+ * num_tables: number of tables
+ */
+extern void pool_track_table_mutation_cache_parse(
+ uint64 hash, bool is_write,
+ const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int num_tables);
+
+/*
+ * Normalize a query and compute its hash.
+ * Strips comments, normalizes whitespace and literals.
+ * query: input SQL query string
+ * Returns: 64-bit hash of normalized query
+ */
+extern uint64 pool_track_table_mutation_normalize_and_hash(const char *query);
+
+/*
+ * Calculate required shared memory size for track table mutation.
+ */
+extern Size pool_track_table_mutation_shmem_size(void);
+
+#endif /* POOL_TRACK_TABLE_MUTATION_H */
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index bf7c452e2..d4e274f02 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -57,6 +57,7 @@
#include "auth/pool_passwd.h"
#include "auth/pool_hba.h"
#include "query_cache/pool_memqcache.h"
+#include "utils/pool_track_table_mutation.h"
#include "watchdog/wd_internal_commands.h"
#include "watchdog/wd_lifecheck.h"
#include "watchdog/watchdog.h"
@@ -1500,11 +1501,14 @@ sigusr1_interrupt_processor(void)
if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
{
+ WD_STATES wd_state;
+
ereport(LOG,
(errmsg("Pgpool-II parent process received watchdog state change signal from watchdog")));
user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
- if (wd_internal_get_watchdog_local_node_state() == WD_STANDBY)
+ wd_state = wd_internal_get_watchdog_local_node_state();
+ if (wd_state == WD_STANDBY)
{
ereport(LOG,
(errmsg("we have joined the watchdog cluster as STANDBY node"),
@@ -1518,6 +1522,12 @@ sigusr1_interrupt_processor(void)
*/
pool_release_follow_primary_lock(true);
}
+ else if (wd_state == WD_COORDINATOR &&
+ pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_trigger_global_cold_start();
+ }
}
if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT])
{
@@ -3083,6 +3093,16 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size()));
}
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ size += MAXALIGN(pool_track_table_mutation_shmem_size());
+ elog(DEBUG1,
+ "track_table_mutation: %zu bytes requested"
+ " for shared memory",
+ MAXALIGN(pool_track_table_mutation_shmem_size()));
+ }
+
initialize_shared_memory_main_segment(size);
/* Move the backend descriptors to shared memory */
@@ -3199,6 +3219,13 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
wd_ipc_initialize_data();
}
+ /* Initialize track table mutation for recently written tables */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_init();
+ }
+
}
/*
diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c
index a3b8f0ea1..a4ec83f93 100644
--- a/src/protocol/CommandComplete.c
+++ b/src/protocol/CommandComplete.c
@@ -38,6 +38,8 @@
#include "utils/palloc.h"
#include "utils/memutils.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
+#include "query_cache/pool_memqcache.h"
static int extract_ntuples(char *message);
static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete);
@@ -304,6 +306,33 @@ handle_query_context(POOL_CONNECTION_POOL *backend)
node = session_context->query_context->parse_tree;
+ /*
+ * Track table writes for dml_adaptive_global feature.
+ * For autocommit statements (not in explicit transaction), mark tables
+ * immediately. For explicit transactions, marking is deferred to COMMIT
+ * in dml_adaptive() so that ROLLBACKed writes don't pollute the shared
+ * memory table map.
+ */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL &&
+ node != NULL &&
+ !session_context->is_in_transaction)
+ {
+ int *oids;
+ int num_oids;
+
+ num_oids = pool_extract_table_oids(node, &oids);
+ if (num_oids > 0)
+ {
+ int dboid;
+
+ dboid = pool_track_table_mutation_get_database_oid();
+ if (dboid > 0)
+ pool_track_table_mutation_mark_tables_written(
+ oids, num_oids, dboid);
+ }
+ }
+
if (IsA(node, PrepareStmt))
{
if (session_context->uncompleted_message)
diff --git a/src/protocol/child.c b/src/protocol/child.c
index c34f05728..316b76239 100644
--- a/src/protocol/child.c
+++ b/src/protocol/child.c
@@ -57,6 +57,7 @@
#include "utils/elog.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -213,6 +214,13 @@ do_child(int *fds)
/* Initialize per process context */
pool_init_process_context();
+ /* Initialize track table mutation child state for cold start tracking */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_child_init();
+ }
+
/* initialize connection pool */
if (pool_init_cp())
{
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index f9458bb55..706abff5b 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -1461,7 +1461,9 @@ Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
pool_where_to_send(query_context, query_context->original_query,
query_context->parse_tree);
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && strlen(name) != 0)
+ if (DLBOW_IS_DML_ADAPTIVE(
+ pool_config->disable_load_balance_on_write)
+ && strlen(name) != 0)
pool_setall_node_to_be_sent(query_context);
if (REPLICATION)
@@ -1804,7 +1806,7 @@ Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
return POOL_END;
}
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE &&
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) &&
TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T')
{
pool_where_to_send(query_context, query_context->original_query,
diff --git a/src/query_cache/pool_memqcache.c b/src/query_cache/pool_memqcache.c
index f38f71146..dca93334e 100644
--- a/src/query_cache/pool_memqcache.c
+++ b/src/query_cache/pool_memqcache.c
@@ -1305,6 +1305,12 @@ pool_extract_table_oids(Node *node, int **oidsp)
}
return num_oids;
}
+ else if (IsA(node, MergeStmt))
+ {
+ MergeStmt *stmt = (MergeStmt *) node;
+
+ table = make_table_name_from_rangevar(stmt->relation);
+ }
else if (IsA(node, ExplainStmt))
{
ListCell *cell;
diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream
index 1ac982907..00132d534 100644
--- a/src/sample/pgpool.conf.sample-stream
+++ b/src/sample/pgpool.conf.sample-stream
@@ -478,6 +478,14 @@ backend_clustering_mode = streaming_replication
# modified within the current explicit transaction will
# not be load balanced until the end of the transaction.
#
+ # dml_adaptive_global:
+ # Superset of dml_adaptive. In addition to per-transaction
+ # tracking, uses shared memory to track recently written
+ # tables across all sessions. Reads from recently written
+ # tables are routed to primary until a TTL (based on
+ # replication delay) expires. Requires additional shared
+ # memory. See track_table_mutation_* parameters below.
+ #
# always:
# if a write query is issued, read queries will
# not be load balanced until the session ends.
@@ -499,6 +507,54 @@ backend_clustering_mode = streaming_replication
#statement_level_load_balance = off
# Enables statement level load balancing
+# - Track Table Mutation (used by dml_adaptive_global) -
+ # WARNING: dml_adaptive_global increases shared memory usage
+ # Default settings require ~6.4 MB shared memory
+ # (0.1 MB table tracking + 6.3 MB query cache)
+
+#track_table_mutation_ttl_factor = 5.0
+ # TTL multiplier: TTL = replication_delay * factor
+ # Higher values provide more safety margin
+ # Range: 1.0-100.0 (default: 5.0)
+ # (change requires reload)
+
+#track_table_mutation_max_staleness = 60000
+ # Maximum duration (ms) a table can be marked stale
+ # from its first write. Bounds cross-session impact:
+ # even under continuous writes, staleness expires
+ # after this period and is only renewed by new writes.
+ # 0 disables the cap. Range: 0-3600000 (default: 60000 = 60s)
+ # (change requires reload)
+
+#track_table_mutation_cold_start_duration = 2000
+ # Duration in milliseconds to route all queries to primary
+ # after child process starts (cold start period)
+ # Range: 0-60000 ms (default: 2000 ms = 2 seconds)
+ # Set to 0 to disable cold start behavior
+ # (change requires reload)
+
+#track_table_mutation_table_buckets = 1024
+ # Number of hash buckets for track table mutation
+ # Higher values reduce hash collisions
+ # Range: 64-65536 (default: 1024)
+ # (change requires restart)
+
+#track_table_mutation_table_size = 2048
+ # Maximum number of tables to track simultaneously
+ # Range: 128-131072 (default: 2048)
+ # (change requires restart)
+
+#track_table_mutation_query_buckets = 2048
+ # Number of hash buckets for query parse cache
+ # Range: 64-65536 (default: 2048)
+ # (change requires restart)
+
+#track_table_mutation_query_parse_cache_size = 10000
+ # Maximum number of query parse results to cache
+ # Range: 100-1000000 (default: 10000)
+ # Memory usage: ~640 bytes per entry (~6.3 MB default, ~64 MB for 100000)
+ # (change requires restart)
+
#------------------------------------------------------------------------------
# STREAMING REPLICATION MODE
#------------------------------------------------------------------------------
diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c
index 311b63865..3ad806e3e 100644
--- a/src/streaming_replication/pool_worker_child.c
+++ b/src/streaming_replication/pool_worker_child.c
@@ -58,6 +58,7 @@
#include "utils/pool_ip.h"
#include "utils/ps_status.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -419,6 +420,7 @@ check_replication_time_lag(void)
BackendInfo *bkinfo;
uint64 lag;
uint64 delay_threshold_by_time;
+ uint64 max_delay_us = 0;
ErrorContextCallback callback;
int active_standby_node;
bool replication_delay_by_time;
@@ -643,6 +645,10 @@ check_replication_time_lag(void)
* seconds to micro
* seconds */
+ /* Track max delay for mutation TTL */
+ if (lag > max_delay_us)
+ max_delay_us = lag;
+
/* Log delay if necessary */
if ((pool_config->log_standby_delay == LSD_ALWAYS && lag > 0) ||
(pool_config->log_standby_delay == LSD_OVER_THRESHOLD &&
@@ -668,6 +674,13 @@ check_replication_time_lag(void)
}
}
+ /*
+ * Update track table mutation TTL from the max
+ * observed time-based replication delay.
+ */
+ if (replication_delay_by_time && max_delay_us > 0)
+ pool_track_table_mutation_update_ttl(max_delay_us);
+
error_context_stack = callback.previous;
}
@@ -695,6 +708,7 @@ check_replication_time_lag_with_cmd(void)
double delay_ms;
uint64 delay;
uint64 delay_threshold_by_time;
+ uint64 max_delay_us = 0; /* Track max delay for mutation map */
int token_count = 0;
int primary_node_id;
int save_errno;
@@ -1003,6 +1017,10 @@ check_replication_time_lag_with_cmd(void)
bkinfo->standby_delay = delay;
bkinfo->standby_delay_by_time = true;
+ /* Track maximum delay for table mutation map TTL calculation */
+ if (delay > max_delay_us)
+ max_delay_us = delay;
+
/*
* Log delay if necessary. threshold is in milliseconds, convert
* to microseconds.
@@ -1021,6 +1039,12 @@ check_replication_time_lag_with_cmd(void)
token = strtok_r(NULL, " \t\n", &saveptr);
}
+ /* Update table mutation TTL based on max observed delay */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL &&
+ max_delay_us > 0)
+ pool_track_table_mutation_update_ttl(max_delay_us);
+
}
PG_CATCH();
{
diff --git a/src/test/regression/libs.sh b/src/test/regression/libs.sh
index 7c5a0c182..1c8ae392d 100644
--- a/src/test/regression/libs.sh
+++ b/src/test/regression/libs.sh
@@ -42,6 +42,8 @@ function wait_for_failover_done {
function clean_all {
pgrep pgpool | xargs kill -9 > /dev/null 2>&1
pgrep postgres | xargs kill -9 > /dev/null 2>&1
+ # Clean up leaked SysV IPC resources left behind by kill -9
+ ipcrm --all 2>/dev/null || true
rm -f $PGSOCKET_DIR/.s.PGSQL.*
netstat -t -p 2>/dev/null|grep pgpool
}
diff --git a/src/test/regression/tests/042.track_table_mutation/test.sh b/src/test/regression/tests/042.track_table_mutation/test.sh
new file mode 100755
index 000000000..8b4dd17b8
--- /dev/null
+++ b/src/test/regression/tests/042.track_table_mutation/test.sh
@@ -0,0 +1,354 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# test script for track table mutation feature (in-memory table tracking).
+# Tests routing of queries based on recently written tables.
+#
+source $TESTLIBS
+TESTDIR=testdir
+PSQL=$PGBIN/psql
+PSQLOPTS="-a -q -X"
+PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin
+export PGDATABASE=test
+
+# Only run in streaming replication mode since that's the target use case
+for mode in s
+do
+ rm -fr $TESTDIR
+ mkdir $TESTDIR
+ cd $TESTDIR
+
+ # Create test environment with 2 nodes
+ echo -n "creating test environment..."
+ $PGPOOL_SETUP -m $mode -n 2 || exit 1
+ echo "done."
+
+ source ./bashrc.ports
+
+ # Configure track table mutation feature via dml_adaptive_global
+ echo "disable_load_balance_on_write = 'dml_adaptive_global'" >> etc/pgpool.conf
+ echo "track_table_mutation_ttl_factor = 5.0" >> etc/pgpool.conf
+ echo "track_table_mutation_cold_start_duration = 10000" >> etc/pgpool.conf
+
+ # Enable load balancing explicitly
+ echo "load_balance_mode = on" >> etc/pgpool.conf
+
+ # Configure weights so we can distinguish routing
+ # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1
+ # This means load balanced queries go to node 1 by default
+ echo "backend_weight0 = 0" >> etc/pgpool.conf
+ echo "backend_weight1 = 1" >> etc/pgpool.conf
+
+ # Enable debug logging to see routing decisions
+ echo "log_min_messages = debug1" >> etc/pgpool.conf
+
+ ./startall
+
+ export PGPORT=$PGPOOL_PORT
+ export PGHOST=localhost
+
+ wait_for_pgpool_startup
+
+ # Create test tables
+ $PSQL test <<EOF
+CREATE TABLE t1(i INTEGER);
+CREATE TABLE t2(i INTEGER);
+CREATE TABLE t3(i INTEGER);
+EOF
+
+ echo "=== Test 1: Cold Start Routing ==="
+ # During cold start, all queries should go to primary
+ # Restart pgpool to trigger cold start
+ ./shutdownall
+ ./startall
+ wait_for_pgpool_startup
+
+ # Immediately query - should go to primary due to cold start
+ $PSQL test -c "SELECT 'cold_start_test' as marker, * FROM t1;" > /dev/null 2>&1
+
+ # Check log for cold start message (use -a to handle binary log files)
+ if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then
+ echo "Test 1 PASSED: Cold start routing works"
+ else
+ echo "Test 1 FAILED: Cold start routing not detected"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 2: Wait for cold start to end ==="
+ # Wait for cold start period to end (10 seconds).
+ # Use generous margin to avoid flakiness under load (e.g. full regression suite).
+ sleep 12
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Now a clean table query should load balance (go to node 1)
+ $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1
+
+ # After cold start, queries to clean tables should load balance
+ # Check that it did NOT get forced to primary due to track table mutation
+ if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then
+ echo "Test 2 FAILED: Still in cold start after waiting"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 2 PASSED: Cold start ended correctly"
+
+ echo "=== Test 3: Write-then-Read Routing ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Write to t1 and then read - use single connection to ensure same session
+ $PSQL test <<EOF
+INSERT INTO t1 VALUES (1);
+SELECT 'write_read_test' as marker, * FROM t1;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ # Check log for table staleness message
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 3 PASSED: Write-then-read routing works"
+ else
+ echo "Test 3 FAILED: Table staleness not detected after write"
+ # Show relevant log entries for debugging
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 4: Clean Table Still Load Balances ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Read from t2 (never written to) - should load balance
+ $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1
+
+ # Should NOT see track table mutation blocking message for t2
+ if grep -a -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then
+ echo "Test 4 FAILED: Clean table incorrectly marked as stale"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 4 PASSED: Clean tables still load balance"
+
+ echo "=== Test 5: UPDATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Update t2 and then read - use single connection
+ $PSQL test <<EOF
+UPDATE t2 SET i = 999 WHERE i = 0;
+SELECT 'update_test' as marker, * FROM t2;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 5 PASSED: UPDATE marks table as stale"
+ else
+ echo "Test 5 FAILED: UPDATE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 6: DELETE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Delete from t3 and then read - use single connection
+ $PSQL test <<EOF
+DELETE FROM t3 WHERE i = 0;
+SELECT 'delete_test' as marker, * FROM t3;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 6 PASSED: DELETE marks table as stale"
+ else
+ echo "Test 6 FAILED: DELETE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 7: TRUNCATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a fresh table for TRUNCATE test
+ $PSQL test -c "CREATE TABLE t_truncate(i INTEGER);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_truncate VALUES (1), (2), (3);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Truncate and then read - use single connection
+ $PSQL test <<EOF
+TRUNCATE t_truncate;
+SELECT 'truncate_test' as marker, * FROM t_truncate;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 7 PASSED: TRUNCATE marks table as stale"
+ else
+ echo "Test 7 FAILED: TRUNCATE did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 8: WITH Clause (CTE with DELETE) Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a fresh table for WITH test
+ $PSQL test -c "CREATE TABLE t_cte(i INTEGER);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_cte VALUES (1), (2), (3);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Use WITH clause with DELETE, then read from the table
+ $PSQL test <<EOF
+WITH deleted AS (DELETE FROM t_cte WHERE i = 1 RETURNING *)
+SELECT * FROM deleted;
+SELECT 'cte_test' as marker, * FROM t_cte;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 8 PASSED: WITH clause (CTE) marks table as stale"
+ else
+ echo "Test 8 FAILED: WITH clause (CTE) did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ # Test 9: MERGE (PostgreSQL 15+ only)
+ PG_MAJOR_VERSION=$($PSQL -t -c "SELECT substring(version() from 'PostgreSQL ([0-9]+)');" | tr -d ' ')
+ if [ "$PG_MAJOR_VERSION" -ge 15 ] 2>/dev/null; then
+ echo "=== Test 9: MERGE Marks Table as Stale (PostgreSQL $PG_MAJOR_VERSION) ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create tables for MERGE test
+ $PSQL test -c "CREATE TABLE t_merge_target(id INTEGER PRIMARY KEY, val TEXT);" > /dev/null 2>&1
+ $PSQL test -c "CREATE TABLE t_merge_source(id INTEGER, val TEXT);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_merge_target VALUES (1, 'old');" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_merge_source VALUES (1, 'new'), (2, 'insert');" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Use MERGE, then read from the target table
+ $PSQL test <<EOF
+MERGE INTO t_merge_target t
+USING t_merge_source s ON t.id = s.id
+WHEN MATCHED THEN UPDATE SET val = s.val
+WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val);
+SELECT 'merge_test' as marker, * FROM t_merge_target;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 9 PASSED: MERGE marks table as stale"
+ else
+ echo "Test 9 FAILED: MERGE did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+ else
+ echo "=== Test 9: MERGE skipped (requires PostgreSQL 15+, have $PG_MAJOR_VERSION) ==="
+ fi
+
+ echo "=== Test 10: ROLLBACK Does NOT Mark Table as Stale ==="
+ # Create a fresh table for rollback test
+ $PSQL test -c "CREATE TABLE t_rollback(i INTEGER);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Write inside a transaction, then rollback
+ $PSQL test <<EOF
+BEGIN;
+INSERT INTO t_rollback VALUES (1);
+ROLLBACK;
+SELECT 'rollback_test' as marker, * FROM t_rollback;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ # Should NOT see t_rollback marked as stale since the write was rolled back
+ if grep -a -q "could not load balance because table.*t_rollback.*was recently written" log/pgpool.log; then
+ echo "Test 10 FAILED: Rolled-back write incorrectly marked table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 10 PASSED: ROLLBACK does not mark table as stale"
+
+ echo "=== Test 11: COMMIT Marks Table as Stale ==="
+ # Create a fresh table for commit test
+ $PSQL test -c "CREATE TABLE t_commit(i INTEGER);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Write inside a transaction, then commit, then read
+ $PSQL test <<EOF
+BEGIN;
+INSERT INTO t_commit VALUES (1);
+COMMIT;
+SELECT 'commit_test' as marker, * FROM t_commit;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 11 PASSED: COMMIT marks table as stale"
+ else
+ echo "Test 11 FAILED: Committed write did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo ""
+ echo "=== All Track Table Mutation Tests PASSED ==="
+
+ ./shutdownall
+
+ cd ..
+done
+
+exit 0
diff --git a/src/test/regression/tests/043.track_table_mutation_watchdog/test.sh b/src/test/regression/tests/043.track_table_mutation_watchdog/test.sh
new file mode 100755
index 000000000..c50c213d6
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation_watchdog/test.sh
@@ -0,0 +1,184 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# Test script for track table mutation global cold start
+# on watchdog leader change.
+#
+# Uses $WATCHDOG_SETUP to create a 2-node watchdog cluster,
+# then verifies that when the leader is stopped the new
+# leader triggers a global cold start.
+#-------------------------------------------------------------------
+source $TESTLIBS
+TESTDIR=testdir
+PSQL=$PGBIN/psql
+success_count=0
+
+dir=`pwd`
+rm -fr $TESTDIR
+mkdir $TESTDIR
+cd $TESTDIR
+
+# Create 2-node watchdog cluster
+$WATCHDOG_SETUP -wn 2 || exit 1
+
+# Ensure per-node scripts are executable
+# (sed -i in watchdog_setup can strip permissions)
+chmod 755 pgpool*/startall pgpool*/shutdownall
+
+# Append track_table_mutation config to both nodes
+for i in 0 1
+do
+ cat >> pgpool${i}/etc/pgpool.conf <<EOF
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_cold_start_duration = 2000
+enable_consensus_with_half_votes = on
+log_min_messages = debug1
+EOF
+done
+
+./startall
+export PCPPASSFILE=$dir/$TESTDIR/pgpool0/pcppass
+
+# Wait for watchdog lifecheck on node 0
+echo -n "waiting for watchdog node 0 starting up..."
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "lifecheck started" \
+ pgpool0/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ break
+ fi
+ sleep 2
+done
+echo "done."
+
+# Test 1: Verify leader came up
+echo "=== Test 1: Waiting for the pgpool leader... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "I am the cluster leader node" \
+ pgpool0/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 1 PASSED: Leader brought up."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 1 ]; then
+ echo "Test 1 FAILED: Leader did not start"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 2: Verify standby joined cluster
+echo "=== Test 2: Waiting for standby to join... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "successfully joined the watchdog cluster" \
+ pgpool1/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 2 PASSED: Standby joined."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 2 ]; then
+ echo "Test 2 FAILED: Standby did not join"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 3: Verify track_table_mutation initialized
+echo "=== Test 3: Verify feature initialized ==="
+if grep -a "track_table_mutation: initialized" \
+ pgpool0/log/pgpool.log > /dev/null 2>&1; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 3 PASSED: Feature initialized."
+else
+ echo "Test 3 FAILED: Feature not initialized"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 4: Stop leader (pgpool0) to trigger failover
+echo "=== Test 4: Stopping leader... ==="
+cd pgpool0
+source ./bashrc.ports
+$PGPOOL_INSTALL_DIR/bin/pgpool \
+ -f etc/pgpool.conf -m f stop
+cd ..
+
+echo "Checking standby detected shutdown..."
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "is shutting down" \
+ pgpool1/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 4 PASSED: Shutdown detected."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 4 ]; then
+ echo "Test 4 FAILED: Shutdown not detected"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 5: Verify standby became new leader
+echo "=== Test 5: Checking standby takes over... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "I am the cluster leader node" \
+ pgpool1/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 5 PASSED: Standby became leader."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 5 ]; then
+ echo "Test 5 FAILED: Standby did not become leader"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 6: Verify global cold start was triggered
+echo "=== Test 6: Checking global cold start... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "track_table_mutation: global cold start" \
+ pgpool1/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 6 PASSED: Global cold start triggered."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+# Cleanup
+./shutdownall
+
+echo ""
+echo "$success_count out of 6 successful"
+
+if test $success_count -eq 6
+then
+ echo "=== All Watchdog Tests PASSED ==="
+ exit 0
+fi
+
+exit 1
diff --git a/src/utils/pool_track_table_mutation.c b/src/utils/pool_track_table_mutation.c
new file mode 100644
index 000000000..ee09b3f50
--- /dev/null
+++ b/src/utils/pool_track_table_mutation.c
@@ -0,0 +1,1453 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_track_table_mutation.c: In-memory tracking of recently
+ * written tables to prevent stale reads from replicas.
+ *
+ * Based on the "lagless" architecture from Tailor Brands.
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include "pool.h"
+#include "pool_config.h"
+#include "context/pool_session_context.h"
+#include "utils/pool_track_table_mutation.h"
+#include "utils/elog.h"
+#include "utils/pool_ipc.h"
+#include "utils/palloc.h"
+#include "utils/pool_relcache.h"
+
+#define DATABASE_TO_OID_QUERY \
+ "SELECT oid FROM pg_catalog.pg_database" \
+ " WHERE datname = '%s'"
+
+/*
+ * Helper macro: true when the feature is not active.
+ */
+#define TRACK_TABLE_MUTATION_DISABLED() \
+ (pool_config->disable_load_balance_on_write != \
+ DLBOW_DML_ADAPTIVE_GLOBAL || \
+ track_table_mutation_shmem == NULL)
+
+/* ----------------
+ * Local variables
+ * ----------------
+ */
+
+/* Pointer to shared memory structure */
+static TrackTableMutationShmem *track_table_mutation_shmem = NULL;
+
+/* Per-process cold start tracking (not in shared memory) */
+static struct timeval process_start_time;
+static bool cold_start_initialized = false;
+
+/* ----------------
+ * Helper macros for flexible arrays in shared memory
+ * ----------------
+ */
+
+/* Get pointer to bucket array in table map */
+#define TABLE_MAP_BUCKETS(map) \
+ ((int *)((char *)(map) + \
+ sizeof(TrackTableMutationHashTable)))
+
+/* Get pointer to entry array in table map */
+#define TABLE_MAP_ENTRIES(map) \
+ ((TrackTableMutationEntry *)((char *)(map) + \
+ sizeof(TrackTableMutationHashTable) + \
+ (map)->num_buckets * sizeof(int)))
+
+/* Get pointer to bucket array in parse cache */
+#define PARSE_CACHE_BUCKETS(cache) \
+ ((int *)((char *)(cache) + sizeof(QueryParseCache)))
+
+/* Get pointer to entry array in parse cache */
+#define PARSE_CACHE_ENTRIES(cache) \
+ ((QueryParseEntry *)((char *)(cache) + \
+ sizeof(QueryParseCache) + \
+ (cache)->num_buckets * sizeof(int)))
+
+/* ----------------
+ * Semaphore lock helpers
+ * ----------------
+ */
+
+static inline void
+table_map_lock(void)
+{
+ pool_semaphore_lock(TRACK_TABLE_MUTATION_TABLE_SEM);
+}
+
+static inline void
+table_map_unlock(void)
+{
+ pool_semaphore_unlock(TRACK_TABLE_MUTATION_TABLE_SEM);
+}
+
+static inline void
+parse_cache_lock(void)
+{
+ pool_semaphore_lock(TRACK_TABLE_MUTATION_QUERY_SEM);
+}
+
+static inline void
+parse_cache_unlock(void)
+{
+ pool_semaphore_unlock(TRACK_TABLE_MUTATION_QUERY_SEM);
+}
+
+/* ----------------
+ * Hash functions
+ * ----------------
+ */
+
+/*
+ * FNV-1a hash for table/database oid pair
+ */
+static uint32
+fnv1a_hash_table_key(int table_oid, int dboid)
+{
+ uint32 hash = 2166136261u; /* FNV offset basis */
+ uint32 data[2];
+ const unsigned char *bytes;
+ size_t i;
+
+ data[0] = (uint32) table_oid;
+ data[1] = (uint32) dboid;
+ bytes = (const unsigned char *) data;
+
+ for (i = 0; i < sizeof(data); i++)
+ {
+ hash ^= bytes[i];
+ hash *= 16777619u; /* FNV prime */
+ }
+
+ return hash;
+}
+
+/*
+ * FNV-1a hash for 64-bit value
+ */
+static uint64
+fnv1a_hash_64(const char *str, size_t len)
+{
+ /* FNV offset basis for 64-bit */
+ uint64 hash = 14695981039346656037ULL;
+ size_t i;
+
+ for (i = 0; i < len; i++)
+ {
+ hash ^= (uint8)str[i];
+ hash *= 1099511628211ULL; /* FNV prime */
+ }
+
+ return hash;
+}
+
+/* ----------------
+ * Time utilities
+ * ----------------
+ */
+
+/*
+ * Get elapsed time in microseconds between two timevals
+ */
+static int64
+elapsed_us(struct timeval *start, struct timeval *end)
+{
+ return ((int64)(end->tv_sec - start->tv_sec) * 1000000)
+ + (end->tv_usec - start->tv_usec);
+}
+
+/*
+ * Get current time
+ */
+static void
+get_current_time(struct timeval *tv)
+{
+ gettimeofday(tv, NULL);
+}
+
+/* ----------------
+ * Database oid lookup
+ * ----------------
+ */
+
+static int
+track_table_mutation_get_database_oid_internal(void)
+{
+ int oid = 0;
+ static POOL_RELCACHE *relcache;
+ POOL_CONNECTION_POOL *backend;
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Safety check: must have shmem initialized */
+ if (track_table_mutation_shmem == NULL)
+ return oid;
+
+ session_context = pool_get_session_context(false);
+ if (session_context == NULL)
+ return oid;
+
+ backend = session_context->backend;
+ if (backend == NULL ||
+ MAIN_CONNECTION(backend) == NULL ||
+ MAIN_CONNECTION(backend)->sp == NULL)
+ return oid;
+
+ /* Ensure database name is valid */
+ if (MAIN_CONNECTION(backend)->sp->database == NULL)
+ return oid;
+
+ if (!relcache)
+ {
+ relcache = pool_create_relcache(
+ pool_config->relcache_size,
+ DATABASE_TO_OID_QUERY,
+ int_register_func,
+ int_unregister_func,
+ false);
+ if (relcache == NULL)
+ {
+ ereport(LOG,
+ (errmsg("track_table_mutation: "
+ "error creating relcache")));
+ return oid;
+ }
+ }
+
+ oid = (int) (intptr_t) pool_search_relcache(
+ relcache, backend,
+ MAIN_CONNECTION(backend)->sp->database);
+ return oid;
+}
+
+int
+pool_track_table_mutation_get_database_oid(void)
+{
+ return track_table_mutation_get_database_oid_internal();
+}
+
+/* ----------------
+ * Table mutation hash table operations
+ * ----------------
+ */
+
+/*
+ * Initialize table mutation hash table
+ */
+static void
+table_map_init(TrackTableMutationHashTable *map,
+ int num_buckets, int max_entries)
+{
+ int *buckets;
+ TrackTableMutationEntry *entries;
+ int i;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ map->num_buckets = num_buckets;
+ map->max_entries = max_entries;
+ map->num_entries = 0;
+ map->free_list_head = 0;
+
+ buckets = TABLE_MAP_BUCKETS(map);
+ entries = TABLE_MAP_ENTRIES(map);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = invalid;
+
+ /* Initialize free list - chain all entries */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ?
+ i + 1 : invalid;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "table map init %d buckets, "
+ "%d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Allocate an entry from the free list
+ */
+static int
+table_map_alloc_entry(TrackTableMutationHashTable *map)
+{
+ TrackTableMutationEntry *entries;
+ int idx;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ entries = TABLE_MAP_ENTRIES(map);
+
+ if (map->free_list_head == invalid)
+ return invalid;
+
+ idx = map->free_list_head;
+ map->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = invalid;
+ map->num_entries++;
+
+ return idx;
+}
+
+/*
+ * Free an entry back to the free list
+ */
+static void
+table_map_free_entry(TrackTableMutationHashTable *map,
+ int idx)
+{
+ TrackTableMutationEntry *entries;
+
+ entries = TABLE_MAP_ENTRIES(map);
+
+ entries[idx].in_use = false;
+ entries[idx].next = map->free_list_head;
+ map->free_list_head = idx;
+ map->num_entries--;
+}
+
+/*
+ * Look up a table in the hash table.
+ * Returns entry index or INVALID_INDEX if not found.
+ * Must be called with lock held.
+ */
+static int
+table_map_lookup(TrackTableMutationHashTable *map,
+ int table_oid, int dboid,
+ uint32 hash)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries;
+ int bucket = hash % map->num_buckets;
+ int idx = buckets[bucket];
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ entries = TABLE_MAP_ENTRIES(map);
+
+ while (idx != invalid)
+ {
+ if (entries[idx].hash == hash &&
+ entries[idx].table_oid == table_oid &&
+ entries[idx].dboid == dboid)
+ {
+ return idx;
+ }
+ idx = entries[idx].next;
+ }
+
+ return invalid;
+}
+
+/*
+ * Insert or update a table entry.
+ * Must be called with lock held.
+ */
+static void
+table_map_insert(TrackTableMutationHashTable *map,
+ int table_oid, int dboid,
+ uint32 hash,
+ struct timeval *write_time)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries;
+ int bucket = hash % map->num_buckets;
+ int idx;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ entries = TABLE_MAP_ENTRIES(map);
+
+ /* Check if entry already exists */
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != invalid)
+ {
+ /* Update last write time; keep first_write_time */
+ entries[idx].last_write_time = *write_time;
+ return;
+ }
+
+ /* Allocate new entry */
+ idx = table_map_alloc_entry(map);
+ if (idx == invalid)
+ {
+ int b;
+
+ /* Table is full - evict first non-empty bucket */
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ if (buckets[b] != invalid)
+ {
+ int victim = buckets[b];
+
+ buckets[b] = entries[victim].next;
+ table_map_free_entry(map, victim);
+ idx = table_map_alloc_entry(map);
+ break;
+ }
+ }
+
+ if (idx == invalid)
+ {
+ ereport(WARNING,
+ (errmsg("track_table_mutation: "
+ "failed to allocate entry "
+ "for oid %d (dboid %d)",
+ table_oid, dboid)));
+ return;
+ }
+ }
+
+ /* Initialize new entry */
+ entries[idx].table_oid = table_oid;
+ entries[idx].dboid = dboid;
+ entries[idx].hash = hash;
+ entries[idx].first_write_time = *write_time;
+ entries[idx].last_write_time = *write_time;
+
+ /* Insert at head of bucket chain */
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: "
+ "marked oid %d (dboid %d) written",
+ table_oid, dboid)));
+}
+
+/*
+ * Remove expired entries from the table map.
+ * Must be called with lock held.
+ */
+static void
+table_map_cleanup_expired(
+ TrackTableMutationHashTable *map, uint64 ttl_us)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries;
+ struct timeval now;
+ int64 max_stale_us;
+ int removed = 0;
+ int b;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ entries = TABLE_MAP_ENTRIES(map);
+ get_current_time(&now);
+
+ max_stale_us = (int64)pool_config
+ ->track_table_mutation_max_staleness * 1000LL;
+
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ int *prev_ptr = &buckets[b];
+ int idx = buckets[b];
+
+ while (idx != invalid)
+ {
+ int64 age;
+ int64 total_age;
+ bool expired;
+
+ age = elapsed_us(
+ &entries[idx].last_write_time, &now);
+ expired = (age > (int64)ttl_us);
+
+ /*
+ * Also evict entries that exceed
+ * max_staleness from first write.
+ */
+ if (!expired && max_stale_us > 0)
+ {
+ total_age = elapsed_us(
+ &entries[idx].first_write_time,
+ &now);
+ expired = (total_age >= max_stale_us);
+ }
+
+ if (expired)
+ {
+ /* Entry has expired - remove it */
+ int next = entries[idx].next;
+
+ *prev_ptr = next;
+ table_map_free_entry(map, idx);
+ idx = next;
+ removed++;
+ }
+ else
+ {
+ prev_ptr = &entries[idx].next;
+ idx = entries[idx].next;
+ }
+ }
+ }
+
+ if (removed > 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "cleaned up %d expired entries",
+ removed)));
+ }
+}
+
+/* ----------------
+ * Parse cache operations
+ * ----------------
+ */
+
+/*
+ * Initialize parse cache
+ */
+static void
+parse_cache_init(QueryParseCache *cache,
+ int num_buckets, int max_entries)
+{
+ int *buckets;
+ QueryParseEntry *entries;
+ int i;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ cache->num_buckets = num_buckets;
+ cache->max_entries = max_entries;
+ cache->num_entries = 0;
+ cache->free_list_head = 0;
+ cache->lru_head = invalid;
+ cache->lru_tail = invalid;
+
+ buckets = PARSE_CACHE_BUCKETS(cache);
+ entries = PARSE_CACHE_ENTRIES(cache);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = invalid;
+
+ /* Initialize free list */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ?
+ i + 1 : invalid;
+ entries[i].lru_prev = invalid;
+ entries[i].lru_next = invalid;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "parse cache init %d buckets, "
+ "%d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Move entry to front of LRU list (most recently used)
+ */
+static void
+parse_cache_lru_touch(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache);
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ /* Already at head? */
+ if (cache->lru_head == idx)
+ return;
+
+ /* Remove from current position */
+ if (entries[idx].lru_prev != invalid)
+ entries[entries[idx].lru_prev].lru_next =
+ entries[idx].lru_next;
+ if (entries[idx].lru_next != invalid)
+ entries[entries[idx].lru_next].lru_prev =
+ entries[idx].lru_prev;
+ if (cache->lru_tail == idx)
+ cache->lru_tail = entries[idx].lru_prev;
+
+ /* Insert at head */
+ entries[idx].lru_prev = invalid;
+ entries[idx].lru_next = cache->lru_head;
+ if (cache->lru_head != invalid)
+ entries[cache->lru_head].lru_prev = idx;
+ cache->lru_head = idx;
+ if (cache->lru_tail == invalid)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Add entry to LRU list (at head)
+ */
+static void
+parse_cache_lru_add(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache);
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ entries[idx].lru_prev = invalid;
+ entries[idx].lru_next = cache->lru_head;
+
+ if (cache->lru_head != invalid)
+ entries[cache->lru_head].lru_prev = idx;
+
+ cache->lru_head = idx;
+
+ if (cache->lru_tail == invalid)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Remove entry from LRU list
+ */
+static void
+parse_cache_lru_remove(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache);
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ if (entries[idx].lru_prev != invalid)
+ entries[entries[idx].lru_prev].lru_next =
+ entries[idx].lru_next;
+ else
+ cache->lru_head = entries[idx].lru_next;
+
+ if (entries[idx].lru_next != invalid)
+ entries[entries[idx].lru_next].lru_prev =
+ entries[idx].lru_prev;
+ else
+ cache->lru_tail = entries[idx].lru_prev;
+
+ entries[idx].lru_prev = invalid;
+ entries[idx].lru_next = invalid;
+}
+
+/*
+ * Allocate entry from free list, evicting LRU if needed
+ */
+static int
+parse_cache_alloc_entry(QueryParseCache *cache)
+{
+ QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache);
+ int *buckets = PARSE_CACHE_BUCKETS(cache);
+ int idx;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ if (cache->free_list_head != invalid)
+ {
+ idx = cache->free_list_head;
+ cache->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = invalid;
+ cache->num_entries++;
+ return idx;
+ }
+
+ /* No free entries - evict LRU */
+ if (cache->lru_tail == invalid)
+ return invalid;
+
+ idx = cache->lru_tail;
+
+ /* Remove from hash bucket */
+ {
+ int bucket;
+ int *prev_ptr;
+ int curr;
+
+ bucket = entries[idx].query_hash %
+ cache->num_buckets;
+ prev_ptr = &buckets[bucket];
+ curr = buckets[bucket];
+
+ while (curr != invalid)
+ {
+ if (curr == idx)
+ {
+ *prev_ptr = entries[curr].next;
+ break;
+ }
+ prev_ptr = &entries[curr].next;
+ curr = entries[curr].next;
+ }
+ }
+
+ /* Remove from LRU list */
+ parse_cache_lru_remove(cache, idx);
+
+ /* Reinitialize entry */
+ entries[idx].in_use = true;
+ entries[idx].next = invalid;
+
+ return idx;
+}
+
+/*
+ * Look up a query in the parse cache
+ */
+static int
+parse_cache_lookup(QueryParseCache *cache, uint64 hash)
+{
+ int *buckets = PARSE_CACHE_BUCKETS(cache);
+ QueryParseEntry *entries = PARSE_CACHE_ENTRIES(cache);
+ int bucket = hash % cache->num_buckets;
+ int idx = buckets[bucket];
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ while (idx != invalid)
+ {
+ if (entries[idx].query_hash == hash)
+ return idx;
+ idx = entries[idx].next;
+ }
+
+ return invalid;
+}
+
+/* ----------------
+ * Query normalization
+ * ----------------
+ */
+
+/*
+ * Simple query normalization:
+ * - Strip comments (-- and C-style block comments)
+ * - Collapse whitespace
+ * - Convert to lowercase (except inside strings)
+ * - Replace literal values with placeholders
+ */
+static size_t
+normalize_query(const char *query, char *output,
+ size_t output_size)
+{
+ const char *src = query;
+ char *dst = output;
+ char *dst_end = output + output_size - 1;
+ bool in_string = false;
+ char string_char = 0;
+ bool last_was_space = true;
+
+ while (*src && dst < dst_end)
+ {
+ /* Handle string literals */
+ if (in_string)
+ {
+ if (*src == string_char)
+ {
+ if (*(src + 1) == string_char)
+ {
+ /* Escaped quote */
+ src += 2;
+ continue;
+ }
+ in_string = false;
+ /* Replace string with placeholder */
+ *dst++ = '$';
+ }
+ src++;
+ continue;
+ }
+
+ /* Check for string start */
+ if (*src == '\'' || *src == '"')
+ {
+ in_string = true;
+ string_char = *src;
+ src++;
+ continue;
+ }
+
+ /* Handle single-line comments */
+ if (*src == '-' && *(src + 1) == '-')
+ {
+ while (*src && *src != '\n')
+ src++;
+ continue;
+ }
+
+ /* Handle multi-line comments */
+ if (*src == '/' && *(src + 1) == '*')
+ {
+ src += 2;
+ while (*src &&
+ !(*src == '*' && *(src + 1) == '/'))
+ src++;
+ if (*src)
+ src += 2;
+ continue;
+ }
+
+ /* Handle whitespace */
+ if (*src == ' ' || *src == '\t' ||
+ *src == '\n' || *src == '\r')
+ {
+ if (!last_was_space)
+ {
+ *dst++ = ' ';
+ last_was_space = true;
+ }
+ src++;
+ continue;
+ }
+
+ /* Handle numbers - replace with placeholder */
+ if ((*src >= '0' && *src <= '9') ||
+ (*src == '.' && *(src + 1) >= '0' &&
+ *(src + 1) <= '9'))
+ {
+ while (*src &&
+ ((*src >= '0' && *src <= '9') ||
+ *src == '.'))
+ src++;
+ if (!last_was_space &&
+ dst > output && *(dst - 1) != '$')
+ *dst++ = '$';
+ last_was_space = false;
+ continue;
+ }
+
+ /* Regular character - convert to lowercase */
+ if (*src >= 'A' && *src <= 'Z')
+ *dst++ = *src + 32;
+ else
+ *dst++ = *src;
+
+ last_was_space = false;
+ src++;
+ }
+
+ /* Remove trailing space */
+ if (dst > output && *(dst - 1) == ' ')
+ dst--;
+
+ *dst = '\0';
+ return dst - output;
+}
+
+/* ----------------
+ * Public API implementation
+ * ----------------
+ */
+
+/*
+ * Calculate the total shared memory size required
+ * for the track table mutation feature.
+ */
+Size
+pool_track_table_mutation_shmem_size(void)
+{
+ Size size = 0;
+ int tbl_bkt;
+ int tbl_sz;
+ int qry_bkt;
+ int qry_sz;
+
+ tbl_bkt = pool_config->track_table_mutation_table_buckets;
+ tbl_sz = pool_config->track_table_mutation_table_size;
+ qry_bkt = pool_config->track_table_mutation_query_buckets;
+ qry_sz = pool_config->track_table_mutation_query_parse_cache_size;
+
+ /* Main structure */
+ size += sizeof(TrackTableMutationShmem);
+
+ /* Table mutation hash table */
+ size += sizeof(TrackTableMutationHashTable);
+ size += tbl_bkt * sizeof(int);
+ size += tbl_sz * sizeof(TrackTableMutationEntry);
+
+ /* Parse cache */
+ size += sizeof(QueryParseCache);
+ size += qry_bkt * sizeof(int);
+ size += qry_sz * sizeof(QueryParseEntry);
+
+ return size;
+}
+
+/*
+ * Initialize shared memory structures for the
+ * track table mutation feature. Allocates and sets
+ * up the table map and parse cache in shared memory.
+ * Called once from pgpool main process at startup.
+ */
+void
+pool_track_table_mutation_init(void)
+{
+#ifndef POOL_PRIVATE
+ Size shmem_size;
+ char *shmem_ptr;
+ TrackTableMutationState *st;
+ int tbl_bkt;
+ int tbl_sz;
+ int qry_bkt;
+ int qry_sz;
+
+ if (pool_config->disable_load_balance_on_write !=
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "feature disabled")));
+ return;
+ }
+
+ tbl_bkt = pool_config->track_table_mutation_table_buckets;
+ tbl_sz = pool_config->track_table_mutation_table_size;
+ qry_bkt = pool_config->track_table_mutation_query_buckets;
+ qry_sz = pool_config->track_table_mutation_query_parse_cache_size;
+
+ shmem_size = pool_track_table_mutation_shmem_size();
+
+ /*
+ * Allocate from the main shared memory segment.
+ * Memory is zeroed by
+ * initialize_shared_memory_main_segment().
+ */
+ shmem_ptr = pool_shared_memory_segment_get_chunk(
+ shmem_size);
+ if (shmem_ptr == NULL)
+ {
+ ereport(ERROR,
+ (errmsg("track_table_mutation: "
+ "failed to allocate %zu bytes",
+ shmem_size)));
+ return;
+ }
+
+ /* Set up pointers within shared memory */
+ track_table_mutation_shmem =
+ (TrackTableMutationShmem *) shmem_ptr;
+ shmem_ptr += sizeof(TrackTableMutationShmem);
+
+ track_table_mutation_shmem->table_map =
+ (TrackTableMutationHashTable *) shmem_ptr;
+ shmem_ptr += sizeof(TrackTableMutationHashTable);
+ shmem_ptr += tbl_bkt * sizeof(int);
+ shmem_ptr += tbl_sz * sizeof(TrackTableMutationEntry);
+
+ track_table_mutation_shmem->query_cache =
+ (QueryParseCache *) shmem_ptr;
+
+ /* Initialize structures */
+ table_map_init(
+ track_table_mutation_shmem->table_map,
+ tbl_bkt, tbl_sz);
+
+ parse_cache_init(
+ track_table_mutation_shmem->query_cache,
+ qry_bkt, qry_sz);
+
+ /* Initialize global state */
+ st = &track_table_mutation_shmem->state;
+ st->initialized = true;
+ st->current_ttl_us = TRACK_TABLE_MUTATION_DEFAULT_TTL_US;
+ get_current_time(&st->ttl_last_updated);
+ get_current_time(&st->last_cleanup_time);
+ st->global_cold_start_until.tv_sec = 0;
+ st->global_cold_start_until.tv_usec = 0;
+ st->stats_queries_checked = 0;
+ st->stats_forced_primary = 0;
+ st->stats_allowed_replica = 0;
+
+ ereport(LOG,
+ (errmsg("track_table_mutation: "
+ "initialized with %zu bytes shmem",
+ shmem_size)));
+#endif
+}
+
+/*
+ * Initialize per-child process state.
+ * Records the process start time for cold start
+ * period tracking. Called when a child process starts.
+ */
+void
+pool_track_table_mutation_child_init(void)
+{
+ int dur;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return;
+
+ get_current_time(&process_start_time);
+ cold_start_initialized = true;
+ dur = pool_config->track_table_mutation_cold_start_duration;
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "child init, cold start %d ms",
+ dur)));
+}
+
+/*
+ * Check if the process is in cold start period.
+ * During cold start, all queries are routed to
+ * primary to avoid stale reads. Checks both
+ * per-process and global (watchdog) cold start.
+ */
+bool
+pool_track_table_mutation_in_cold_start(void)
+{
+ struct timeval now;
+ int64 elapsed_ms;
+ int dur;
+ TrackTableMutationState *st;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return false;
+
+ dur = pool_config->track_table_mutation_cold_start_duration;
+ if (dur <= 0)
+ return false;
+
+ get_current_time(&now);
+ st = &track_table_mutation_shmem->state;
+
+ /* Check watchdog-triggered global cold start */
+ if (st->global_cold_start_until.tv_sec != 0 &&
+ elapsed_us(&now,
+ &st->global_cold_start_until) > 0)
+ {
+ return true;
+ }
+
+ /* Check per-process cold start */
+ if (!cold_start_initialized)
+ return false;
+
+ elapsed_ms = elapsed_us(&process_start_time, &now) / 1000;
+
+ if (elapsed_ms < dur)
+ {
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: "
+ "cold start (%ld/%d ms)",
+ (long)elapsed_ms, dur)));
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Trigger a global cold start for all processes.
+ * Sets the cold start end time in shared memory.
+ * Called after watchdog leader change to force all
+ * queries to primary during the transition.
+ */
+void
+pool_track_table_mutation_trigger_global_cold_start(void)
+{
+ struct timeval now;
+ struct timeval *until;
+ int dur;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return;
+
+ dur = pool_config->track_table_mutation_cold_start_duration;
+ if (dur <= 0)
+ return;
+
+ get_current_time(&now);
+ until = &track_table_mutation_shmem->state
+ .global_cold_start_until;
+ *until = now;
+ until->tv_sec += dur / 1000;
+ until->tv_usec += (dur % 1000) * 1000;
+ if (until->tv_usec >= 1000000)
+ {
+ until->tv_sec += until->tv_usec / 1000000;
+ until->tv_usec %= 1000000;
+ }
+
+ ereport(LOG,
+ (errmsg("track_table_mutation: "
+ "global cold start for %d ms",
+ dur)));
+}
+
+/*
+ * Check if a table was recently written (is "stale").
+ * Returns true if reads should go to primary because
+ * the table was written within the current TTL window.
+ */
+bool
+pool_track_table_mutation_table_is_stale(
+ int table_oid, int dboid)
+{
+ TrackTableMutationHashTable *map;
+ struct timeval now;
+ uint64 ttl_us;
+ uint32 hash;
+ int idx;
+ bool is_stale = false;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return false;
+
+ if (table_oid <= 0 || dboid <= 0)
+ {
+ is_stale = true;
+ goto update_stats;
+ }
+
+ map = track_table_mutation_shmem->table_map;
+ hash = fnv1a_hash_table_key(table_oid, dboid);
+
+ table_map_lock();
+
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ TrackTableMutationEntry *entries;
+ int64 age;
+ int64 total_age;
+ int64 max_stale_us;
+
+ entries = TABLE_MAP_ENTRIES(map);
+ get_current_time(&now);
+ ttl_us = track_table_mutation_shmem->state
+ .current_ttl_us;
+
+ age = elapsed_us(
+ &entries[idx].last_write_time, &now);
+ is_stale = (age < (int64)ttl_us);
+
+ /*
+ * Enforce max_staleness hard cap: no entry
+ * can force primary routing longer than
+ * max_staleness from its first write.
+ */
+ if (is_stale)
+ {
+ max_stale_us = (int64)pool_config
+ ->track_table_mutation_max_staleness
+ * 1000LL;
+ if (max_stale_us > 0)
+ {
+ total_age = elapsed_us(
+ &entries[idx].first_write_time,
+ &now);
+ if (total_age >= max_stale_us)
+ is_stale = false;
+ }
+ }
+
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: "
+ "oid %d dboid %d "
+ "elapsed=%ld ttl=%lu stale=%d",
+ table_oid, dboid,
+ (long)age,
+ (unsigned long)ttl_us,
+ is_stale)));
+ }
+
+ table_map_unlock();
+
+update_stats:
+ /* Update statistics using semaphore */
+ if (track_table_mutation_shmem != NULL)
+ {
+ TrackTableMutationState *st;
+
+ table_map_lock();
+ st = &track_table_mutation_shmem->state;
+ st->stats_queries_checked++;
+ if (is_stale)
+ st->stats_forced_primary++;
+ else
+ st->stats_allowed_replica++;
+ table_map_unlock();
+ }
+
+ return is_stale;
+}
+
+/*
+ * Mark multiple tables as recently written.
+ * Called after DML queries complete to record
+ * which tables were modified.
+ */
+void
+pool_track_table_mutation_mark_tables_written(
+ const int *table_oids, int num_tables, int dboid)
+{
+ TrackTableMutationHashTable *map;
+ TrackTableMutationState *st;
+ struct timeval now;
+ int i;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return;
+
+ if (num_tables <= 0 || table_oids == NULL ||
+ dboid <= 0)
+ return;
+
+ map = track_table_mutation_shmem->table_map;
+ st = &track_table_mutation_shmem->state;
+ get_current_time(&now);
+
+ table_map_lock();
+
+ /* Periodically clean up expired entries */
+ if (map->num_entries > map->max_entries * 3 / 4)
+ {
+ int64 since_cleanup;
+
+ since_cleanup = elapsed_us(
+ &st->last_cleanup_time, &now);
+ /* 100ms interval */
+ if (since_cleanup > 100000)
+ {
+ table_map_cleanup_expired(
+ map, st->current_ttl_us);
+ st->last_cleanup_time = now;
+ }
+ }
+
+ for (i = 0; i < num_tables; i++)
+ {
+ uint32 hash;
+ int table_oid = table_oids[i];
+
+ if (table_oid > 0)
+ {
+ hash = fnv1a_hash_table_key(
+ table_oid, dboid);
+ table_map_insert(map, table_oid,
+ dboid, hash, &now);
+ }
+ }
+
+ table_map_unlock();
+}
+
+/*
+ * Mark a single table as recently written.
+ */
+void
+pool_track_table_mutation_mark_table_written(
+ int table_oid, int dboid)
+{
+ if (table_oid > 0 && dboid > 0)
+ {
+ const int tables[1] = { table_oid };
+
+ pool_track_table_mutation_mark_tables_written(
+ tables, 1, dboid);
+ }
+}
+
+/*
+ * Update the staleness TTL based on observed
+ * replication delay. New TTL = delay * factor,
+ * clamped to [default_ttl, 1 hour].
+ */
+void
+pool_track_table_mutation_update_ttl(uint64 delay_us)
+{
+ uint64 new_ttl;
+ double factor;
+ TrackTableMutationState *st;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return;
+
+ factor = pool_config->track_table_mutation_ttl_factor;
+ new_ttl = (uint64)(delay_us * factor);
+ if (new_ttl < TRACK_TABLE_MUTATION_DEFAULT_TTL_US)
+ new_ttl = TRACK_TABLE_MUTATION_DEFAULT_TTL_US;
+
+ /* Maximum TTL of 1 hour */
+ if (new_ttl > 3600ULL * 1000000ULL)
+ new_ttl = 3600ULL * 1000000ULL;
+
+ st = &track_table_mutation_shmem->state;
+ st->current_ttl_us = new_ttl;
+ get_current_time(&st->ttl_last_updated);
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "TTL=%lu us (delay=%lu factor=%.1f)",
+ (unsigned long)new_ttl,
+ (unsigned long)delay_us,
+ factor)));
+}
+
+/*
+ * Look up a cached parse result by query hash.
+ * Returns true and fills output parameters if
+ * the query was found in the parse cache.
+ */
+bool
+pool_track_table_mutation_get_cached_parse(
+ uint64 hash, bool *is_write,
+ char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int *num_tables)
+{
+ QueryParseCache *cache;
+ int idx;
+ bool found = false;
+ int max_tables;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return false;
+
+ max_tables = TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY;
+ cache = track_table_mutation_shmem->query_cache;
+
+ parse_cache_lock();
+
+ idx = parse_cache_lookup(cache, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ QueryParseEntry *entries;
+ int i;
+ int namelen;
+
+ entries = PARSE_CACHE_ENTRIES(cache);
+ namelen = TRACK_TABLE_MUTATION_TABLE_NAME_LEN;
+ *is_write = entries[idx].is_write;
+ *num_tables = entries[idx].num_tables;
+
+ for (i = 0;
+ i < entries[idx].num_tables &&
+ i < max_tables;
+ i++)
+ {
+ strlcpy(table_names[i],
+ entries[idx].table_names[i],
+ namelen);
+ }
+
+ /* Move to front of LRU */
+ parse_cache_lru_touch(cache, idx);
+ found = true;
+ }
+
+ parse_cache_unlock();
+
+ return found;
+}
+
+/*
+ * Store a parse result in the shared cache.
+ * Evicts the LRU entry if the cache is full.
+ */
+void
+pool_track_table_mutation_cache_parse(
+ uint64 hash, bool is_write,
+ const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int num_tables)
+{
+ QueryParseCache *cache;
+ int *buckets;
+ QueryParseEntry *entries;
+ int idx;
+ int bucket;
+ int max_tables;
+ int namelen;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return;
+
+ max_tables = TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY;
+ namelen = TRACK_TABLE_MUTATION_TABLE_NAME_LEN;
+ cache = track_table_mutation_shmem->query_cache;
+
+ parse_cache_lock();
+
+ /* Check if already exists */
+ idx = parse_cache_lookup(cache, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ parse_cache_unlock();
+ return;
+ }
+
+ /* Allocate new entry (may evict LRU) */
+ idx = parse_cache_alloc_entry(cache);
+ if (idx == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ parse_cache_unlock();
+ ereport(WARNING,
+ (errmsg("track_table_mutation: "
+ "parse cache alloc failed")));
+ return;
+ }
+
+ entries = PARSE_CACHE_ENTRIES(cache);
+ buckets = PARSE_CACHE_BUCKETS(cache);
+
+ /* Fill in entry */
+ entries[idx].query_hash = hash;
+ entries[idx].is_write = is_write;
+ entries[idx].num_tables =
+ (num_tables > max_tables) ?
+ max_tables : num_tables;
+
+ {
+ int i;
+
+ for (i = 0; i < entries[idx].num_tables; i++)
+ {
+ strlcpy(entries[idx].table_names[i],
+ table_names[i], namelen);
+ }
+ }
+
+ /* Insert into hash bucket */
+ bucket = hash % cache->num_buckets;
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ /* Add to LRU list */
+ parse_cache_lru_add(cache, idx);
+
+ parse_cache_unlock();
+}
+
+/*
+ * Normalize a SQL query and compute its 64-bit hash.
+ * Strips comments, collapses whitespace, lowercases,
+ * and replaces literals with placeholders.
+ */
+uint64
+pool_track_table_mutation_normalize_and_hash(
+ const char *query)
+{
+ char normalized[8192];
+ size_t len;
+
+ if (query == NULL || query[0] == '\0')
+ return 0;
+
+ len = normalize_query(query, normalized,
+ sizeof(normalized));
+ if (len == 0)
+ return 0;
+
+ return fnv1a_hash_64(normalized, len);
+}
--
2.53.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
In-Reply-To: <CACeKOO2fMX9f9hFVHCYEF9BUWV0xD21K2JPQFYzZSWVdC6ZrXQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox