public inbox for [email protected]
help / color / mirror / Atom feedFrom: Nadav Shatz <[email protected]>
To: Tatsuo Ishii <[email protected]>
Cc: [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
Date: Thu, 12 Feb 2026 11:05:19 +0200
Message-ID: <CACeKOO21jgrafj6RzO+ibLjBDUdDeQ0Tuu3tatkmGUdQi+GAAQ@mail.gmail.com> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<CACeKOO2HVSKqu+Zp8WC+QhE7yS9LBLiifwRpQVfjOEAka_YpKw@mail.gmail.com>
<CACeKOO2QqpnML1OkQqqCCc+xG1d2M+sS7y7zE9vw5W-DXu+xKQ@mail.gmail.com>
<[email protected]>
Hi Tatsuo,
Thank you for the careful review. You raised an important concern. I've
addressed it in the updated patch — here's the explanation:
The attack scenario you describe is now handled. In the updated patch,
writes inside explicit transactions are only flushed to the shared-memory
table map at COMMIT time. If the transaction is rolled back, the table is
never marked as stale. So the attack pattern:
BEGIN;
UPDATE t1 SET i = 1 WHERE FALSE;
ROLLBACK;
has zero effect on the shared-memory table map. The dml_adaptive_global
mode piggybacks on the existing dml_adaptive per-transaction write list
(transaction_temp_write_list). On COMMIT, the accumulated table names are
resolved to OIDs and flushed to shared memory. On ROLLBACK,
the list is simply discarded (the existing dml_adaptive behavior).
For autocommit statements (outside explicit transactions), tables are
marked immediately — but in that case the write is committed, so this is
correct.
Regression test included. Test 042 now includes:
- Test 10: verifies that BEGIN; INSERT; ROLLBACK; SELECT does NOT route
the SELECT to primary
- Test 11: verifies that BEGIN; INSERT; COMMIT; SELECT DOES route the
SELECT to primary
Additional context on the threat model:
1. This feature requires disable_load_balance_on_write =
'dml_adaptive_global' — it is opt-in, not enabled by default. Operators who
enable it accept documented trade-offs (additional shared memory, TTL-based
staleness window).
2. An attacker who can connect and execute SQL against pgpool already has
the ability to cause far more damage (DROP TABLE, mass DELETEs, resource
exhaustion via expensive queries, connection flooding, etc.). The
table-marking via committed writes is a minor concern compared to
those vectors. Authentication, connection limits, and network security
are the appropriate defenses at that layer.
3. Even in the worst case (an attacker commits real writes in a loop),
the impact is bounded: the stale marking is temporary (TTL-based, typically
a few seconds), and only affects load-balancing decisions — it doesn't
cause data loss or correctness issues.
4. The existing dml_adaptive mode has analogous behavior: within a
transaction, a write to table T causes all reads of T to go to primary for
the remainder of that transaction. The only difference is scope —
dml_adaptive_global extends this across sessions with a TTL.
Thanks!
On Wed, Feb 11, 2026 at 12:28 PM Tatsuo Ishii <[email protected]> wrote:
> > Hi Tatsuo,
> >
> > After reading more about disable_load_balance_on_write=dml_adaptive i
> came
> > to the thought that this feature is actually an "extension" of that since
> > it covers "global" and not just per transaction behavior. in any case i
> > think it makes more sense that it sits under
> > the disable_load_balance_on_write and not as a standalone for clarity.
> >
> > I'm attaching below an updated patch with these adjustments.
> >
> > Please let me know what you think.
>
> I worry about the transactional behavior with the patch:
>
> + This means that if a transaction is rolled back, the table remains
> marked as stale until
> + the TTL expires, even though no actual data modification occurred.
> This is by design:
>
> This allows attackers to issue simple command continuously to
> effectively disable load balance (and increase the load of primary) in
> whole system:
>
> BEGIN;
> UPDATE t1 SET i = 1 WHERE FALSE;
> ROLLBACK;
>
> I think if the patch allows that, we cannot accept the patch.
>
> Best regards,
> --
> Tatsuo Ishii
> SRA OSS K.K.
> English: http://www.sraoss.co.jp/index_en/
> Japanese:http://www.sraoss.co.jp
>
> > On Fri, Feb 6, 2026 at 1:29 PM Nadav Shatz <[email protected]>
> wrote:
> >
> >> Hi Tatsuo,
> >>
> >> Thank you for all the great comments and questions! I took under
> >> consideration all of them either adding support/tests or detailing the
> >> limitations in the docs.
> >>
> >> Let me know what you think of the latest patch attached here
> >>
> >> On Wed, Feb 4, 2026 at 1:23 AM Tatsuo Ishii <[email protected]>
> wrote:
> >>
> >>> From: Tatsuo Ishii <[email protected]>
> >>> Subject: Re: Proposal: Recent mutated table tracking in memory
> >>> Date: Tue, 03 Feb 2026 16:43:53 +0900 (JST)
> >>> Message-ID: <[email protected]>
> >>>
> >>> > Hi Nadav,
> >>> >
> >>> > Thank you for updating the patch!
> >>> >
> >>> >> Thank you for the comments!
> >>> >>
> >>> >> I agree with all of them. Let me know what you think of the changes
> >>> and new
> >>> >> naming.
> >>> >
> >>> > I still think "memory_map" is too generic. Anything put on memory for
> >>> > data mapping could be called "memory map". I recommend to change the
> >>> > name to more feature specific one: What about replacing "memory_map"
> >>> > with "track_table_mutation"? It's a little bit longer name but it
> >>> > clearly represents the feature. Any better ideas are welcome.
> >>> >
> >>> > - memory_map_enabled: Enable/disable the feature (default: off)
> >>> > - memory_map_ttl_factor: TTL multiplier for replication delay
> (default:
> >>> 5.0)
> >>> > - memory_map_cold_start_duration: Cold start period in ms (default:
> >>> 2000)
> >>> > - memory_map_table_buckets: Hash buckets for table map (default:
> 1024)
> >>> > - memory_map_table_size: Max tracked tables (default: 2048)
> >>> > - memory_map_query_buckets: Hash buckets for query cache (default:
> 2048)
> >>> > - memory_map_query_cache_size: Max cached queries (default: 10000)
> >>> >
> >>> > Also I feel memory_map_query_cache_size is confusing because there's
> >>> > already "query cache" feature in pgpool. Can we change it something
> >>> > like "query_parse_cache_size"?
> >>> >
> >>> > Review comments:
> >>> >
> >>> > (1) Why the regression test is 45? Shouldn't it be 42? (the last
> >>> > feature test is 041.external_replication_delay).
> >>> >
> >>> > (2) You enhance the patch to deal with leader watch changing. That's
> >>> > good. However, I don't see a test case for it in test.sh.
> >>> >
> >>> > (3) It seems the patch does not support TRUNCATE, MERGE, PREPARE and
> >>> > WITH + updating. If so, it should be noted in the docs as a
> limitation
> >>> > of the feature.
> >>>
> >>> (4) It seems the patch does not consider transactions. If an UPDATE is
> >>> performed in a transaction and the transaction gets rollbacked, load
> >>> balance is disabled despite that fact that the table modification did
> >>> not happen.
> >>>
> >>> Best regards,
> >>> --
> >>> Tatsuo Ishii
> >>> SRA OSS K.K.
> >>> English: http://www.sraoss.co.jp/index_en/
> >>> Japanese:http://www.sraoss.co.jp
> >>>
> >>
> >>
> >> --
> >> Nadav Shatz
> >> Tailor Brands | CTO
> >>
> >
> >
> > --
> > Nadav Shatz
> > Tailor Brands | CTO
>
--
Nadav Shatz
Tailor Brands | CTO
Attachments:
[application/octet-stream] table_track.patch (99.1K, 3-table_track.patch)
download | inline diff:
From ad6acadf4661875c56ae8e5e901f16fafb5e78a2 Mon Sep 17 00:00:00 2001
From: Nadav Shatz <[email protected]>
Date: Tue, 6 Jan 2026 12:41:50 +0200
Subject: [PATCH] Feature: add in-memory table tracking to prevent stale reads
from replicas
Introduces 'dml_adaptive_global' as a new value for disable_load_balance_on_write.
This mode is a superset of dml_adaptive: it performs per-transaction local tracking
AND cross-session shared-memory tracking of recently written tables, routing reads
to primary until a TTL (based on measured replication delay) expires.
Sub-parameters (track_table_mutation_*) control TTL factor, cold start duration,
hash table sizing, and query parse cache sizing.
diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml
index ee19fabebab2210cd4abe59a711a036ac0ac8943..74162ef2f81f38879c552438ee9321dfde34a4be 100644
--- a/doc/src/sgml/loadbalance.sgml
+++ b/doc/src/sgml/loadbalance.sgml
@@ -1108,6 +1108,18 @@ app_name_redirect_preference_list > database_redirect_preference_list > us
Dependent functions, triggers, and views on the tables can be configured
using <xref linkend="guc-dml-adaptive-object-relationship-list">
</para>
+
+ <para>
+ If this parameter is set to <varname>dml_adaptive_global</varname>,
+ <productname>Pgpool-II</> behaves like <varname>dml_adaptive</varname>
+ (per-transaction write tracking) and additionally uses shared memory to track
+ recently written tables across all sessions cluster-wide. When a table is
+ written in any session, subsequent reads of that table from any session are
+ routed to primary until a TTL (based on measured replication delay) expires.
+ This prevents stale reads after writes even across different connections.
+ See <xref linkend="runtime-config-track-table-mutation"> for the sub-parameters
+ that control the shared-memory tracking behavior.
+ </para>
</listitem>
</varlistentry>
@@ -1193,4 +1205,255 @@ dml_adaptive_object_relationship_list = 'table_1:table_2'
</variablelist>
</sect2>
+
+ <sect2 id="runtime-config-table-mutation-map">
+ <title>Table Mutation Map Configuration (Lagless Replica Reads)</title>
+
+ <para>
+ These parameters configure the track table mutation feature, which is activated by setting
+ <xref linkend="guc-disable-load-balance-on-write"> to <literal>dml_adaptive_global</literal>.
+ The feature tracks recently written tables to prevent stale reads from replica nodes during
+ replication lag, implementing the "lagless" architecture pattern for distributed systems
+ with read replicas.
+ </para>
+
+ <para>
+ When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period
+ (<literal>replication_delay * track_table_mutation_ttl_factor</literal>). Any SELECT queries on stale tables are routed
+ to the primary node instead of replicas, ensuring read-after-write consistency.
+ </para>
+
+ <para>
+ This feature requires <xref linkend="guc-replication-delay-source-cmd"> to be configured
+ for monitoring replication delay from replicas.
+ </para>
+
+ <warning>
+ <para>
+ Enabling <literal>dml_adaptive_global</literal> increases shared memory consumption. With default settings,
+ the feature requires approximately 6.4 MB of shared memory (0.1 MB for table tracking + 6.3 MB for query cache).
+ Memory usage scales with configuration parameters:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ Table tracking: <literal>track_table_mutation_table_size * 40 bytes</literal> (default: 2048 * 40 = ~80 KB)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ Query cache: <literal>track_table_mutation_query_parse_cache_size * 640 bytes</literal> (default: 10000 * 640 = ~6.3 MB)
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ For high-traffic systems with large cache sizes (e.g., <literal>track_table_mutation_query_parse_cache_size = 100000</literal>),
+ memory usage can reach 64 MB or more. Consider your system's available shared memory when using <literal>dml_adaptive_global</literal>.
+ </para>
+ </warning>
+
+ <variablelist>
+
+ <varlistentry id="guc-track-table-mutation-ttl-factor" xreflabel="track_table_mutation_ttl_factor">
+ <term><varname>track_table_mutation_ttl_factor</varname> (<type>floating point</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_ttl_factor</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Multiplier for calculating the TTL: <literal>TTL = replication_delay * track_table_mutation_ttl_factor</literal>.
+ Higher values provide more safety margin but may reduce read replica utilization.
+ </para>
+ <para>
+ Valid range: 1.0-100.0. Default is <literal>5.0</literal>.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-cold-start-duration" xreflabel="track_table_mutation_cold_start_duration">
+ <term><varname>track_table_mutation_cold_start_duration</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_cold_start_duration</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Duration in milliseconds to route all queries to primary after a child process starts.
+ This prevents stale reads when a new connection is established before the track table mutation
+ is populated with recent write history.
+ </para>
+ <para>
+ When watchdog is enabled and the local node becomes the leader, Pgpool-II also triggers a
+ global cold start for this duration to avoid stale reads after leadership changes.
+ </para>
+ <para>
+ Valid range: 0-60000 ms. Default is <literal>2000</literal> (2 seconds).
+ Set to 0 to disable cold start behavior.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-table-buckets" xreflabel="track_table_mutation_table_buckets">
+ <term><varname>track_table_mutation_table_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_table_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the track table mutation hash table.
+ Higher values reduce hash collisions and improve lookup performance.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>1024</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-table-size" xreflabel="track_table_mutation_table_size">
+ <term><varname>track_table_mutation_table_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_table_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of tables that can be tracked simultaneously in the track table mutation.
+ When full, oldest entries are evicted using a simple eviction strategy.
+ </para>
+ <para>
+ Valid range: 128-131072. Default is <literal>2048</literal>.
+ Memory usage: approximately 40 bytes per entry.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-query-buckets" xreflabel="track_table_mutation_query_buckets">
+ <term><varname>track_table_mutation_query_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_query_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the query parse cache. The cache stores normalized
+ query strings mapped to their table dependencies to avoid repeated parsing.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>2048</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-query-parse-cache-size" xreflabel="track_table_mutation_query_parse_cache_size">
+ <term><varname>track_table_mutation_query_parse_cache_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_query_parse_cache_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of query parse results to cache. Uses LRU eviction when full.
+ Larger caches reduce parsing overhead but consume more shared memory.
+ </para>
+ <para>
+ Valid range: 100-1000000. Default is <literal>10000</literal>.
+ Memory usage: approximately 640 bytes per entry (~6.3 MB for default, ~64 MB for 100000 entries).
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+
+ <sect3 id="runtime-config-track-table-mutation-example">
+ <title>Track Table Mutation Configuration Example</title>
+ <para>
+ To enable track table mutation with replication delay monitoring:
+ </para>
+ <programlisting>
+# Enable dml_adaptive_global mode (includes track table mutation)
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_ttl_factor = 5.0
+track_table_mutation_cold_start_duration = 2000
+
+# Configure external replication delay monitoring
+replication_delay_source_cmd = '/path/to/get-replication-delay.sh'
+replication_delay_source_timeout = 10
+
+# Adjust cache sizes based on workload (increases memory usage)
+track_table_mutation_table_size = 4096 # Track up to 4096 tables (~160 KB)
+track_table_mutation_query_parse_cache_size = 50000 # Cache 50k queries (~31 MB)
+ </programlisting>
+ <para>
+ Total shared memory required for above configuration: approximately 31.2 MB (31 MB query cache + 0.2 MB table map + overhead).
+ Default configuration (10000 query cache entries, 2048 tables) requires approximately 6.4 MB.
+ </para>
+ </sect3>
+
+ <sect3 id="runtime-config-track-table-mutation-limitations">
+ <title>Limitations</title>
+ <para>
+ The track table mutation feature has the following limitation:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>PREPARE</literal> statements are not tracked. When a prepared statement
+ containing data modification is executed, the table mutation is not recorded.
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ If your application uses prepared statements and requires read-after-write consistency,
+ consider using explicit transaction routing or the <literal>/*NO LOAD BALANCE*/</literal>
+ comment directive for affected queries.
+ </para>
+ <para>
+ The following statement types <emphasis>are</emphasis> tracked and will mark tables as stale:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>INSERT</literal>, <literal>UPDATE</literal>, <literal>DELETE</literal>
+ statements (including those with <literal>RETURNING</literal> clauses).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>TRUNCATE</literal> statements (including multiple tables).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>MERGE</literal> statements (PostgreSQL 15+).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>WITH</literal> clauses containing data modifications (Common Table Expressions
+ with <literal>INSERT</literal>, <literal>UPDATE</literal>, or <literal>DELETE</literal>).
+ For example, <literal>WITH deleted AS (DELETE FROM t1 RETURNING *) SELECT * FROM deleted</literal>
+ will properly mark table <literal>t1</literal> as stale.
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ <emphasis>Transaction Rollback Behavior:</emphasis> Within explicit transactions, tables
+ are only marked as stale in shared memory when the transaction is committed. If the
+ transaction is rolled back, no tables are marked, since no actual data modification
+ occurred on replicas. This prevents rolled-back transactions from unnecessarily
+ disabling load balancing. For autocommit statements (outside explicit transactions),
+ tables are marked immediately upon command completion.
+ </para>
+ </sect3>
+
+ </sect2>
+
</sect1>
diff --git a/src/Makefile.am b/src/Makefile.am
index 4678ab53055e828a37b6477801640aff17ff84a7..39588af58deba045dffc01ae932115b8a9dbfcf2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \
rewrite/pool_timestamp.c \
rewrite/pool_lobj.c \
utils/pool_select_walker.c \
+ utils/pool_track_table_mutation.c \
utils/strlcpy.c \
utils/psprintf.c \
utils/pool_params.c \
diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c
index 68abb7f41cb96d856c824a148842748bfb7a4d12..623d8751677fd6f39d0e12f0e3e899171890f6e0 100644
--- a/src/config/pool_config_variables.c
+++ b/src/config/pool_config_variables.c
@@ -290,6 +290,7 @@ static const struct config_enum_entry disable_load_balance_on_write_options[] =
{"trans_transaction", DLBOW_TRANS_TRANSACTION, false},
{"always", DLBOW_ALWAYS, false},
{"dml_adaptive", DLBOW_DML_ADAPTIVE, false},
+ {"dml_adaptive_global", DLBOW_DML_ADAPTIVE_GLOBAL, false},
{NULL, 0, false}
};
@@ -1757,6 +1758,17 @@ static struct config_int_array ConfigureNamesIntArray[] =
static struct config_double ConfigureNamesDouble[] =
{
+ {
+ {"track_table_mutation_ttl_factor", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "TTL multiplier for track table mutation (TTL = replication_delay * factor)",
+ CONFIG_VAR_TYPE_DOUBLE, false, 0
+ },
+ &g_pool_config.track_table_mutation_ttl_factor,
+ 5.0, /* boot value: 5x replication delay */
+ 1.0, 100.0, /* min, max */
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_DOUBLE
};
@@ -2355,6 +2367,61 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"track_table_mutation_cold_start_duration", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Duration in milliseconds to force queries to primary after child process starts.",
+ CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS
+ },
+ &g_pool_config.track_table_mutation_cold_start_duration,
+ 2000, /* 2 seconds */
+ 0, 60000, /* 0 to 60 seconds */
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_table_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for track table mutation.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_table_buckets,
+ 1024,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_table_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in track table mutation.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_table_size,
+ 2048,
+ 128, 131072,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_query_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_query_buckets,
+ 2048,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_query_parse_cache_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_query_parse_cache_size,
+ 10000,
+ 100, 1000000,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_INT
};
diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c
index 7cf9813eb7d58678bc86a0aaa38bd3c6445b6687..aa123222eccaa8505f984dbe3224958fc79424c8 100644
--- a/src/context/pool_query_context.c
+++ b/src/context/pool_query_context.c
@@ -29,6 +29,7 @@
#include "utils/statistics.h"
#include "utils/pool_select_walker.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_session_context.h"
#include "context/pool_query_context.h"
#include "parser/nodes.h"
@@ -1828,7 +1829,7 @@ is_in_list(char *name, List *list)
static bool
is_select_object_in_temp_write_list(Node *node, void *context)
{
- if (node == NULL || pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
+ if (node == NULL || !DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
return false;
if (IsA(node, RangeVar))
@@ -1836,7 +1837,7 @@ is_select_object_in_temp_write_list(Node *node, void *context)
RangeVar *rgv = (RangeVar *) node;
POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context->is_in_transaction)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && session_context->is_in_transaction)
{
ereport(DEBUG1,
(errmsg("is_select_object_in_temp_write_list: \"%s\", found relation \"%s\"", (char *) context, rgv->relname)));
@@ -1880,7 +1881,7 @@ static char *get_associated_object_from_dml_adaptive_relations
void
check_object_relationship_list(char *name, bool is_func_name)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && pool_config->parsed_dml_adaptive_object_relationship_list)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && pool_config->parsed_dml_adaptive_object_relationship_list)
{
POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
@@ -1944,7 +1945,7 @@ add_object_into_temp_write_list(Node *node, void *context)
static void
dml_adaptive(Node *node, char *query)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
/* Set/Unset transaction status flags */
if (IsA(node, TransactionStmt))
@@ -1963,6 +1964,34 @@ dml_adaptive(Node *node, char *query)
}
else if (is_commit_or_rollback_query(node))
{
+ /*
+ * For dml_adaptive_global: on COMMIT, flush the accumulated
+ * table writes to shared memory. On ROLLBACK, skip — the
+ * writes never committed so no stale-read risk exists.
+ * This prevents attackers from polluting the table map with
+ * rolled-back transactions.
+ */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL &&
+ is_commit_query(node) &&
+ session_context->transaction_temp_write_list != NIL)
+ {
+ ListCell *cell;
+ int dboid;
+
+ dboid = pool_track_table_mutation_get_database_oid();
+ if (dboid > 0)
+ {
+ foreach(cell, session_context->transaction_temp_write_list)
+ {
+ char *table_name = (char *) lfirst(cell);
+ int table_oid = pool_table_name_to_oid(table_name);
+
+ if (table_oid > 0)
+ pool_track_table_mutation_mark_table_written(table_oid, dboid);
+ }
+ }
+ }
+
session_context->is_in_transaction = false;
if (session_context->transaction_temp_write_list != NIL)
@@ -2010,6 +2039,18 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
if (dest == POOL_PRIMARY)
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+
+ /*
+ * Resolve table and database OIDs now to populate relcache.
+ * This avoids potential hangs in CommandComplete where we shouldn't
+ * be running new queries against the backend.
+ */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ int *oids;
+ pool_extract_table_oids(node, &oids);
+ pool_track_table_mutation_get_database_oid();
+ }
}
/* Should be sent to both primary and standby? */
else if (dest == POOL_BOTH)
@@ -2139,6 +2180,107 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
}
+ /*
+ * Check track table mutation for recently written tables.
+ * If in cold start or any table was recently written,
+ * route to primary to avoid stale reads.
+ */
+ else if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ bool force_primary = false;
+
+ /* During cold start, route everything to primary */
+ if (pool_track_table_mutation_in_cold_start())
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because of track table mutation cold start"),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ }
+ else
+ {
+ /* Extract table oids and check if any are stale */
+ SelectContext ctx;
+ int dboid;
+ int num_oids;
+ int i;
+
+ memset(&ctx, 0, sizeof(ctx));
+ num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
+ if (num_oids > 0)
+ {
+ dboid = pool_track_table_mutation_get_database_oid();
+
+ if (dboid <= 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because database oid was unavailable"),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ }
+ else
+ {
+ for (i = 0; i < num_oids; i++)
+ {
+ if (pool_track_table_mutation_table_is_stale(ctx.table_oids[i], dboid))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because table \"%s\" was recently written",
+ ctx.table_names[i]),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (force_primary)
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ else
+ {
+ /* Proceed with load balancing */
+ if (pool_config->statement_level_load_balance)
+ {
+ session_context->load_balance_node_id = select_load_balancing_node();
+ }
+
+ /*
+ * As streaming replication delay is too much, if
+ * prefer_lower_delay_standby is true then elect new load
+ * balance node which is lowest delayed, false then send
+ * to the primary.
+ */
+ if (STREAM && check_replication_delay(session_context->load_balance_node_id))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because of too much replication delay"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ if (pool_config->prefer_lower_delay_standby)
+ {
+ int new_load_balancing_node = select_load_balancing_node();
+
+ session_context->load_balance_node_id = new_load_balancing_node;
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id);
+ }
+ else
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ }
+ else
+ {
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context,
+ session_context->query_context->load_balance_node_id);
+ }
+ }
+ }
else
{
if (pool_config->statement_level_load_balance)
diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c
index ded41c7fc64ceba1d1fafd6f4a9f10a750872374..a9596561a7e0265e928b957a2766f46fb4e9ebaa 100644
--- a/src/context/pool_session_context.c
+++ b/src/context/pool_session_context.c
@@ -532,7 +532,7 @@ dump_sent_message(char *caller, POOL_SENT_MESSAGE *m)
static void
dml_adaptive_init(void)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
session_context->is_in_transaction = false;
session_context->transaction_temp_write_list = NIL;
@@ -542,7 +542,7 @@ dml_adaptive_init(void)
static void
dml_adaptive_destroy(void)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && session_context)
{
if (session_context->transaction_temp_write_list != NIL)
list_free_deep(session_context->transaction_temp_write_list);
@@ -738,10 +738,10 @@ void
pool_set_writing_transaction(void)
{
/*
- * If disable_transaction_on_write is 'off' or 'dml_adaptive', then never
- * turn on writing transaction flag.
+ * If disable_load_balance_on_write is 'off' or 'dml_adaptive' or
+ * 'dml_adaptive_global', then never turn on writing transaction flag.
*/
- if (pool_config->disable_load_balance_on_write != DLBOW_OFF && pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
+ if (pool_config->disable_load_balance_on_write != DLBOW_OFF && !DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
pool_get_session_context(false)->writing_transaction = true;
ereport(DEBUG5,
diff --git a/src/include/pool.h b/src/include/pool.h
index ea6f87e120af866b8ed3a15790d9d8a8e009fe91..7168c1aea877856b5978de332ad636325eb9c30c 100644
--- a/src/include/pool.h
+++ b/src/include/pool.h
@@ -424,7 +424,7 @@ typedef enum
#define Min(x, y) ((x) < (y) ? (x) : (y))
-#define MAX_NUM_SEMAPHORES 8
+#define MAX_NUM_SEMAPHORES 10
#define CONN_COUNTER_SEM 0
#define REQUEST_INFO_SEM 1
#define QUERY_CACHE_STATS_SEM 2
@@ -434,6 +434,8 @@ typedef enum
#define FOLLOW_PRIMARY_SEM 6
#define MAIN_EXIT_HANDLER_SEM 7 /* used in exit_hander in pgpool main
* process */
+#define TRACK_TABLE_MUTATION_TABLE_SEM 8
+#define TRACK_TABLE_MUTATION_QUERY_SEM 9
#define MAX_REQUEST_QUEUE_SIZE 10
#define MAX_SEC_WAIT_FOR_CLUSTER_TRANSACTION 10 /* time in seconds to keep
diff --git a/src/include/pool_config.h b/src/include/pool_config.h
index 741de6cc5fc3368f813d6b6efa68eb7f8a79506b..8798b86eb3620ab36be733bb60bbb8464b0063c8 100644
--- a/src/include/pool_config.h
+++ b/src/include/pool_config.h
@@ -105,9 +105,13 @@ typedef enum DLBOW_OPTION
DLBOW_TRANSACTION,
DLBOW_TRANS_TRANSACTION,
DLBOW_ALWAYS,
- DLBOW_DML_ADAPTIVE
+ DLBOW_DML_ADAPTIVE,
+ DLBOW_DML_ADAPTIVE_GLOBAL
} DLBOW_OPTION;
+#define DLBOW_IS_DML_ADAPTIVE(opt) \
+ ((opt) == DLBOW_DML_ADAPTIVE || (opt) == DLBOW_DML_ADAPTIVE_GLOBAL)
+
typedef enum RELQTARGET_OPTION
{
RELQTARGET_PRIMARY = 1,
@@ -365,6 +369,15 @@ typedef struct
* replication check */
char *replication_delay_source_cmd; /* external command for replication delay */
int replication_delay_source_timeout; /* timeout for external command in seconds */
+
+ /* Track table mutation configuration for tracking recently written tables */
+ double track_table_mutation_ttl_factor; /* TTL multiplier for replication delay */
+ int track_table_mutation_cold_start_duration; /* Cold start duration in ms */
+ int track_table_mutation_table_buckets; /* Number of hash buckets for table map */
+ int track_table_mutation_table_size; /* Max entries in table map */
+ int track_table_mutation_query_buckets; /* Number of hash buckets for query cache */
+ int track_table_mutation_query_parse_cache_size; /* Max entries in query parse cache */
+
char *failover_command; /* execute command when failover happens */
char *follow_primary_command; /* execute command when failover is
* ended */
diff --git a/src/include/utils/pool_track_table_mutation.h b/src/include/utils/pool_track_table_mutation.h
new file mode 100644
index 0000000000000000000000000000000000000000..5cd5d4ef409645fe77e3bb02239e140456de0554
--- /dev/null
+++ b/src/include/utils/pool_track_table_mutation.h
@@ -0,0 +1,237 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_track_table_mutation.h: In-memory tracking of recently written tables
+ * to avoid stale reads from replicas during replication lag
+ */
+
+#ifndef POOL_TRACK_TABLE_MUTATION_H
+#define POOL_TRACK_TABLE_MUTATION_H
+
+#include "pool.h"
+#include <sys/time.h>
+
+/*
+ * Maximum table name length including schema: "schema"."table"
+ * Using NAMEDATALEN * 2 + 4 for quotes and dot
+ */
+#define TRACK_TABLE_MUTATION_TABLE_NAME_LEN (NAMEDATALEN * 2 + 4)
+
+/*
+ * Maximum number of tables we track per query
+ */
+#define TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY 8
+
+/*
+ * Invalid index marker for linked lists
+ */
+#define TRACK_TABLE_MUTATION_INVALID_INDEX (-1)
+
+/*
+ * Default TTL in microseconds (100ms) used when replication delay is unknown
+ */
+#define TRACK_TABLE_MUTATION_DEFAULT_TTL_US (100 * 1000)
+
+/*
+ * Entry in the table mutation hash table (keyed by table/database oids)
+ */
+typedef struct TrackTableMutationEntry
+{
+ int table_oid; /* Table oid */
+ int dboid; /* Database oid */
+ struct timeval last_write_time; /* When the table was last written */
+ uint32 hash; /* Pre-computed hash value */
+ int next; /* Next entry in collision chain (-1 if none) */
+ bool in_use; /* Is this entry in use? */
+} TrackTableMutationEntry;
+
+/*
+ * Header for the table mutation hash table in shared memory
+ */
+typedef struct TrackTableMutationHashTable
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * TrackTableMutationEntry entries[max_entries];
+ */
+} TrackTableMutationHashTable;
+
+/*
+ * Entry in the query parse cache
+ */
+typedef struct QueryParseEntry
+{
+ uint64 query_hash; /* Hash of normalized query */
+ bool is_write; /* True if INSERT/UPDATE/DELETE */
+ int num_tables; /* Number of tables in query */
+ char table_names[TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY][TRACK_TABLE_MUTATION_TABLE_NAME_LEN];
+ int next; /* Next entry in collision chain */
+ int lru_prev; /* Previous in LRU list */
+ int lru_next; /* Next in LRU list */
+ bool in_use; /* Is this entry in use? */
+} QueryParseEntry;
+
+/*
+ * Header for the query parse cache in shared memory
+ */
+typedef struct QueryParseCache
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ int lru_head; /* Most recently used */
+ int lru_tail; /* Least recently used */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * QueryParseEntry entries[max_entries];
+ */
+} QueryParseCache;
+
+/*
+ * Global state for track table mutation feature
+ */
+typedef struct TrackTableMutationState
+{
+ bool initialized; /* Has shared memory been initialized? */
+ uint64 current_ttl_us; /* Current TTL in microseconds */
+ struct timeval ttl_last_updated; /* When TTL was last updated */
+ struct timeval last_cleanup_time; /* When last expired cleanup ran */
+ struct timeval global_cold_start_until; /* Global cold start end time */
+ volatile uint32 stats_queries_checked; /* Number of queries checked */
+ volatile uint32 stats_forced_primary; /* Queries forced to primary */
+ volatile uint32 stats_allowed_replica; /* Queries allowed to replica */
+} TrackTableMutationState;
+
+/*
+ * Main shared memory structure containing all components
+ */
+typedef struct TrackTableMutationShmem
+{
+ TrackTableMutationState state;
+ TrackTableMutationHashTable *table_map;
+ QueryParseCache *query_cache;
+} TrackTableMutationShmem;
+
+/* ----------------
+ * Public API functions
+ * ----------------
+ */
+
+/*
+ * Initialize shared memory structures for track table mutation.
+ * Called from pgpool_main.c after pool_init_pool_info().
+ */
+extern void pool_track_table_mutation_init(void);
+
+/*
+ * Initialize per-child process state for track table mutation.
+ * Called from child.c when a new child process starts.
+ * Sets up cold start tracking.
+ */
+extern void pool_track_table_mutation_child_init(void);
+
+/*
+ * Check if the child process is in cold start period.
+ * During cold start, all queries are routed to primary.
+ * Returns true if in cold start, false otherwise.
+ */
+extern bool pool_track_table_mutation_in_cold_start(void);
+
+/*
+ * Trigger a global cold start period for all processes.
+ * Used after watchdog leader change to avoid stale reads.
+ */
+extern void pool_track_table_mutation_trigger_global_cold_start(void);
+
+/*
+ * Get oid of current database.
+ */
+extern int pool_track_table_mutation_get_database_oid(void);
+
+/*
+ * Check if a table was recently written to (is "stale").
+ * If stale, reads from this table should go to primary.
+ * Returns true if table is stale (recently written), false otherwise.
+ */
+extern bool pool_track_table_mutation_table_is_stale(int table_oid, int dboid);
+
+/*
+ * Mark tables as recently written.
+ * Called after INSERT/UPDATE/DELETE queries complete.
+ * table_oids: array of table oids
+ * num_tables: number of tables in array
+ * dboid: database oid
+ */
+extern void pool_track_table_mutation_mark_tables_written(const int *table_oids, int num_tables, int dboid);
+
+/*
+ * Convenience function to mark a single table as written.
+ * table_oid: table oid
+ * dboid: database oid
+ */
+extern void pool_track_table_mutation_mark_table_written(int table_oid, int dboid);
+
+/*
+ * Update the TTL based on current replication delay.
+ * Called from pool_worker_child.c when replication delay is updated.
+ * delay_us: replication delay in microseconds
+ */
+extern void pool_track_table_mutation_update_ttl(uint64 delay_us);
+
+/*
+ * Look up cached parse result for a query.
+ * hash: hash of normalized query
+ * is_write: output - true if query is a write
+ * table_names: output - array to fill with table names
+ * num_tables: output - number of tables found
+ * Returns true if found in cache, false otherwise.
+ */
+extern bool pool_track_table_mutation_get_cached_parse(uint64 hash, bool *is_write,
+ char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int *num_tables);
+
+/*
+ * Cache a parse result for a query.
+ * hash: hash of normalized query
+ * is_write: true if query is a write
+ * table_names: array of table names
+ * num_tables: number of tables
+ */
+extern void pool_track_table_mutation_cache_parse(uint64 hash, bool is_write,
+ const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int num_tables);
+
+/*
+ * Normalize a query and compute its hash.
+ * Strips comments, normalizes whitespace and literals.
+ * query: input SQL query string
+ * Returns: 64-bit hash of normalized query
+ */
+extern uint64 pool_track_table_mutation_normalize_and_hash(const char *query);
+
+/*
+ * Calculate required shared memory size for track table mutation.
+ */
+extern Size pool_track_table_mutation_shmem_size(void);
+
+#endif /* POOL_TRACK_TABLE_MUTATION_H */
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index fa05e15e7ac435e072298063f918c70aa4e5680c..b88b0478cb150f89bd9b6b8ab38db0d6912fddd0 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -57,6 +57,7 @@
#include "auth/pool_passwd.h"
#include "auth/pool_hba.h"
#include "query_cache/pool_memqcache.h"
+#include "utils/pool_track_table_mutation.h"
#include "watchdog/wd_internal_commands.h"
#include "watchdog/wd_lifecheck.h"
#include "watchdog/watchdog.h"
@@ -1485,11 +1486,14 @@ sigusr1_interrupt_processor(void)
if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
{
+ WD_STATES wd_state;
+
ereport(LOG,
(errmsg("Pgpool-II parent process received watchdog state change signal from watchdog")));
user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
- if (wd_internal_get_watchdog_local_node_state() == WD_STANDBY)
+ wd_state = wd_internal_get_watchdog_local_node_state();
+ if (wd_state == WD_STANDBY)
{
ereport(LOG,
(errmsg("we have joined the watchdog cluster as STANDBY node"),
@@ -1503,6 +1507,10 @@ sigusr1_interrupt_processor(void)
*/
pool_release_follow_primary_lock(true);
}
+ else if (wd_state == WD_COORDINATOR && pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_trigger_global_cold_start();
+ }
}
if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT])
{
@@ -3068,6 +3076,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size()));
}
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ size += MAXALIGN(pool_track_table_mutation_shmem_size());
+ elog(DEBUG1, "track_table_mutation: %zu bytes requested for shared memory", MAXALIGN(pool_track_table_mutation_shmem_size()));
+ }
+
initialize_shared_memory_main_segment(size);
/* Move the backend descriptors to shared memory */
@@ -3184,6 +3198,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
wd_ipc_initialize_data();
}
+ /* Initialize track table mutation for tracking recently written tables */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_init();
+ }
+
}
/*
diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c
index a3b8f0ea194ffecc79e58566be80562a46eb75ab..b15db53248433cb3112246274ed771b79abe1392 100644
--- a/src/protocol/CommandComplete.c
+++ b/src/protocol/CommandComplete.c
@@ -38,6 +38,8 @@
#include "utils/palloc.h"
#include "utils/memutils.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
+#include "query_cache/pool_memqcache.h"
static int extract_ntuples(char *message);
static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete);
@@ -304,6 +306,29 @@ handle_query_context(POOL_CONNECTION_POOL *backend)
node = session_context->query_context->parse_tree;
+ /*
+ * Track table writes for dml_adaptive_global feature.
+ * For autocommit statements (not in explicit transaction), mark tables
+ * immediately. For explicit transactions, marking is deferred to COMMIT
+ * in dml_adaptive() so that ROLLBACKed writes don't pollute the shared
+ * memory table map.
+ */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL &&
+ node != NULL && !session_context->is_in_transaction)
+ {
+ int *oids;
+ int num_oids;
+
+ num_oids = pool_extract_table_oids(node, &oids);
+ if (num_oids > 0)
+ {
+ int dboid = pool_track_table_mutation_get_database_oid();
+
+ if (dboid > 0)
+ pool_track_table_mutation_mark_tables_written(oids, num_oids, dboid);
+ }
+ }
+
if (IsA(node, PrepareStmt))
{
if (session_context->uncompleted_message)
diff --git a/src/protocol/child.c b/src/protocol/child.c
index c34f057281be62feaf39db1bb605062f56dc398c..d53f571421968bd789d0b55f97e0a1eb68a813e5 100644
--- a/src/protocol/child.c
+++ b/src/protocol/child.c
@@ -57,6 +57,7 @@
#include "utils/elog.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -213,6 +214,12 @@ do_child(int *fds)
/* Initialize per process context */
pool_init_process_context();
+ /* Initialize track table mutation child state for cold start tracking */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_child_init();
+ }
+
/* initialize connection pool */
if (pool_init_cp())
{
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index 47b5c8f98a5b4c92d675840eea88f7e03bb18b4c..75fc7508480d79aacc281dd5e624f9e34a998833 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -1461,7 +1461,7 @@ Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
pool_where_to_send(query_context, query_context->original_query,
query_context->parse_tree);
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && strlen(name) != 0)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && strlen(name) != 0)
pool_setall_node_to_be_sent(query_context);
if (REPLICATION)
@@ -1804,7 +1804,7 @@ Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
return POOL_END;
}
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE &&
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) &&
TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T')
{
pool_where_to_send(query_context, query_context->original_query,
diff --git a/src/query_cache/pool_memqcache.c b/src/query_cache/pool_memqcache.c
index f38f711469576342ce59469b085c97365116004c..dca93334e9e47bb7978064edece5ca0e40021ce3 100644
--- a/src/query_cache/pool_memqcache.c
+++ b/src/query_cache/pool_memqcache.c
@@ -1305,6 +1305,12 @@ pool_extract_table_oids(Node *node, int **oidsp)
}
return num_oids;
}
+ else if (IsA(node, MergeStmt))
+ {
+ MergeStmt *stmt = (MergeStmt *) node;
+
+ table = make_table_name_from_rangevar(stmt->relation);
+ }
else if (IsA(node, ExplainStmt))
{
ListCell *cell;
diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream
index 454fdb9e5d1fd65437b6a67f12ab62658ea08f49..de99a7a97ba4a1a03cb3d5589d55ea61cb6e51fa 100644
--- a/src/sample/pgpool.conf.sample-stream
+++ b/src/sample/pgpool.conf.sample-stream
@@ -478,6 +478,14 @@ backend_clustering_mode = streaming_replication
# modified within the current explicit transaction will
# not be load balanced until the end of the transaction.
#
+ # dml_adaptive_global:
+ # Superset of dml_adaptive. In addition to per-transaction
+ # tracking, uses shared memory to track recently written
+ # tables across all sessions. Reads from recently written
+ # tables are routed to primary until a TTL (based on
+ # replication delay) expires. Requires additional shared
+ # memory. See track_table_mutation_* parameters below.
+ #
# always:
# if a write query is issued, read queries will
# not be load balanced until the session ends.
@@ -499,6 +507,46 @@ backend_clustering_mode = streaming_replication
#statement_level_load_balance = off
# Enables statement level load balancing
+# - Track Table Mutation (used by dml_adaptive_global) -
+ # WARNING: dml_adaptive_global increases shared memory usage
+ # Default settings require ~6.4 MB shared memory
+ # (0.1 MB table tracking + 6.3 MB query cache)
+
+#track_table_mutation_ttl_factor = 5.0
+ # TTL multiplier: TTL = replication_delay * factor
+ # Higher values provide more safety margin
+ # Range: 1.0-100.0 (default: 5.0)
+ # (change requires reload)
+
+#track_table_mutation_cold_start_duration = 2000
+ # Duration in milliseconds to route all queries to primary
+ # after child process starts (cold start period)
+ # Range: 0-60000 ms (default: 2000 ms = 2 seconds)
+ # Set to 0 to disable cold start behavior
+ # (change requires reload)
+
+#track_table_mutation_table_buckets = 1024
+ # Number of hash buckets for track table mutation
+ # Higher values reduce hash collisions
+ # Range: 64-65536 (default: 1024)
+ # (change requires restart)
+
+#track_table_mutation_table_size = 2048
+ # Maximum number of tables to track simultaneously
+ # Range: 128-131072 (default: 2048)
+ # (change requires restart)
+
+#track_table_mutation_query_buckets = 2048
+ # Number of hash buckets for query parse cache
+ # Range: 64-65536 (default: 2048)
+ # (change requires restart)
+
+#track_table_mutation_query_parse_cache_size = 10000
+ # Maximum number of query parse results to cache
+ # Range: 100-1000000 (default: 10000)
+ # Memory usage: ~640 bytes per entry (~6.3 MB default, ~64 MB for 100000)
+ # (change requires restart)
+
#------------------------------------------------------------------------------
# STREAMING REPLICATION MODE
#------------------------------------------------------------------------------
diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c
index 311b638658e66ebb56162ad9fa4392315b2df64e..347f88a88688309b298311a282fe1c1ef2aa0f73 100644
--- a/src/streaming_replication/pool_worker_child.c
+++ b/src/streaming_replication/pool_worker_child.c
@@ -58,6 +58,7 @@
#include "utils/pool_ip.h"
#include "utils/ps_status.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -695,6 +696,7 @@ check_replication_time_lag_with_cmd(void)
double delay_ms;
uint64 delay;
uint64 delay_threshold_by_time;
+ uint64 max_delay_us = 0; /* Track maximum delay for table mutation map */
int token_count = 0;
int primary_node_id;
int save_errno;
@@ -1003,6 +1005,10 @@ check_replication_time_lag_with_cmd(void)
bkinfo->standby_delay = delay;
bkinfo->standby_delay_by_time = true;
+ /* Track maximum delay for table mutation map TTL calculation */
+ if (delay > max_delay_us)
+ max_delay_us = delay;
+
/*
* Log delay if necessary. threshold is in milliseconds, convert
* to microseconds.
@@ -1021,6 +1027,10 @@ check_replication_time_lag_with_cmd(void)
token = strtok_r(NULL, " \t\n", &saveptr);
}
+ /* Update track table mutation TTL based on maximum observed delay */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL && max_delay_us > 0)
+ pool_track_table_mutation_update_ttl(max_delay_us);
+
}
PG_CATCH();
{
diff --git a/src/test/regression/tests/042.track_table_mutation/test.sh b/src/test/regression/tests/042.track_table_mutation/test.sh
new file mode 100755
index 0000000000000000000000000000000000000000..ede56bd1968711fc15f45784c6958e12a4e4e589
--- /dev/null
+++ b/src/test/regression/tests/042.track_table_mutation/test.sh
@@ -0,0 +1,352 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# test script for track table mutation feature (in-memory table tracking).
+# Tests routing of queries based on recently written tables.
+#
+source $TESTLIBS
+TESTDIR=testdir
+PSQL=$PGBIN/psql
+PSQLOPTS="-a -q -X"
+PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin
+export PGDATABASE=test
+
+# Only run in streaming replication mode since that's the target use case
+for mode in s
+do
+ rm -fr $TESTDIR
+ mkdir $TESTDIR
+ cd $TESTDIR
+
+ # Create test environment with 2 nodes
+ echo -n "creating test environment..."
+ $PGPOOL_SETUP -m $mode -n 2 || exit 1
+ echo "done."
+
+ source ./bashrc.ports
+
+ # Configure track table mutation feature via dml_adaptive_global
+ echo "disable_load_balance_on_write = 'dml_adaptive_global'" >> etc/pgpool.conf
+ echo "track_table_mutation_ttl_factor = 5.0" >> etc/pgpool.conf
+ echo "track_table_mutation_cold_start_duration = 2000" >> etc/pgpool.conf
+
+ # Enable load balancing explicitly
+ echo "load_balance_mode = on" >> etc/pgpool.conf
+
+ # Configure weights so we can distinguish routing
+ # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1
+ # This means load balanced queries go to node 1 by default
+ echo "backend_weight0 = 0" >> etc/pgpool.conf
+ echo "backend_weight1 = 1" >> etc/pgpool.conf
+
+ # Enable debug logging to see routing decisions
+ echo "log_min_messages = debug1" >> etc/pgpool.conf
+
+ ./startall
+
+ export PGPORT=$PGPOOL_PORT
+
+ wait_for_pgpool_startup
+
+ # Create test tables
+ $PSQL test <<EOF
+CREATE TABLE t1(i INTEGER);
+CREATE TABLE t2(i INTEGER);
+CREATE TABLE t3(i INTEGER);
+EOF
+
+ echo "=== Test 1: Cold Start Routing ==="
+ # During cold start, all queries should go to primary
+ # Restart pgpool to trigger cold start
+ ./shutdownall
+ ./startall
+ wait_for_pgpool_startup
+
+ # Immediately query - should go to primary due to cold start
+ $PSQL test -c "SELECT 'cold_start_test' as marker, * FROM t1;" > /dev/null 2>&1
+
+ # Check log for cold start message (use -a to handle binary log files)
+ if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then
+ echo "Test 1 PASSED: Cold start routing works"
+ else
+ echo "Test 1 FAILED: Cold start routing not detected"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 2: Wait for cold start to end ==="
+ # Wait for cold start period to end (2 seconds)
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Now a clean table query should load balance (go to node 1)
+ $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1
+
+ # After cold start, queries to clean tables should load balance
+ # Check that it did NOT get forced to primary due to track table mutation
+ if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then
+ echo "Test 2 FAILED: Still in cold start after waiting"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 2 PASSED: Cold start ended correctly"
+
+ echo "=== Test 3: Write-then-Read Routing ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Write to t1 and then read - use single connection to ensure same session
+ $PSQL test <<EOF
+INSERT INTO t1 VALUES (1);
+SELECT 'write_read_test' as marker, * FROM t1;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ # Check log for table staleness message
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 3 PASSED: Write-then-read routing works"
+ else
+ echo "Test 3 FAILED: Table staleness not detected after write"
+ # Show relevant log entries for debugging
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 4: Clean Table Still Load Balances ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Read from t2 (never written to) - should load balance
+ $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1
+
+ # Should NOT see track table mutation blocking message for t2
+ if grep -a -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then
+ echo "Test 4 FAILED: Clean table incorrectly marked as stale"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 4 PASSED: Clean tables still load balance"
+
+ echo "=== Test 5: UPDATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Update t2 and then read - use single connection
+ $PSQL test <<EOF
+UPDATE t2 SET i = 999 WHERE i = 0;
+SELECT 'update_test' as marker, * FROM t2;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 5 PASSED: UPDATE marks table as stale"
+ else
+ echo "Test 5 FAILED: UPDATE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 6: DELETE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Delete from t3 and then read - use single connection
+ $PSQL test <<EOF
+DELETE FROM t3 WHERE i = 0;
+SELECT 'delete_test' as marker, * FROM t3;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 6 PASSED: DELETE marks table as stale"
+ else
+ echo "Test 6 FAILED: DELETE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 7: TRUNCATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a fresh table for TRUNCATE test
+ $PSQL test -c "CREATE TABLE t_truncate(i INTEGER);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_truncate VALUES (1), (2), (3);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Truncate and then read - use single connection
+ $PSQL test <<EOF
+TRUNCATE t_truncate;
+SELECT 'truncate_test' as marker, * FROM t_truncate;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 7 PASSED: TRUNCATE marks table as stale"
+ else
+ echo "Test 7 FAILED: TRUNCATE did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 8: WITH Clause (CTE with DELETE) Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a fresh table for WITH test
+ $PSQL test -c "CREATE TABLE t_cte(i INTEGER);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_cte VALUES (1), (2), (3);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Use WITH clause with DELETE, then read from the table
+ $PSQL test <<EOF
+WITH deleted AS (DELETE FROM t_cte WHERE i = 1 RETURNING *)
+SELECT * FROM deleted;
+SELECT 'cte_test' as marker, * FROM t_cte;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 8 PASSED: WITH clause (CTE) marks table as stale"
+ else
+ echo "Test 8 FAILED: WITH clause (CTE) did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ # Test 9: MERGE (PostgreSQL 15+ only)
+ PG_MAJOR_VERSION=$($PSQL -t -c "SELECT substring(version() from 'PostgreSQL ([0-9]+)');" | tr -d ' ')
+ if [ "$PG_MAJOR_VERSION" -ge 15 ] 2>/dev/null; then
+ echo "=== Test 9: MERGE Marks Table as Stale (PostgreSQL $PG_MAJOR_VERSION) ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create tables for MERGE test
+ $PSQL test -c "CREATE TABLE t_merge_target(id INTEGER PRIMARY KEY, val TEXT);" > /dev/null 2>&1
+ $PSQL test -c "CREATE TABLE t_merge_source(id INTEGER, val TEXT);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_merge_target VALUES (1, 'old');" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_merge_source VALUES (1, 'new'), (2, 'insert');" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Use MERGE, then read from the target table
+ $PSQL test <<EOF
+MERGE INTO t_merge_target t
+USING t_merge_source s ON t.id = s.id
+WHEN MATCHED THEN UPDATE SET val = s.val
+WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val);
+SELECT 'merge_test' as marker, * FROM t_merge_target;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 9 PASSED: MERGE marks table as stale"
+ else
+ echo "Test 9 FAILED: MERGE did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+ else
+ echo "=== Test 9: MERGE skipped (requires PostgreSQL 15+, have $PG_MAJOR_VERSION) ==="
+ fi
+
+ echo "=== Test 10: ROLLBACK Does NOT Mark Table as Stale ==="
+ # Create a fresh table for rollback test
+ $PSQL test -c "CREATE TABLE t_rollback(i INTEGER);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Write inside a transaction, then rollback
+ $PSQL test <<EOF
+BEGIN;
+INSERT INTO t_rollback VALUES (1);
+ROLLBACK;
+SELECT 'rollback_test' as marker, * FROM t_rollback;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ # Should NOT see t_rollback marked as stale since the write was rolled back
+ if grep -a -q "could not load balance because table.*t_rollback.*was recently written" log/pgpool.log; then
+ echo "Test 10 FAILED: Rolled-back write incorrectly marked table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 10 PASSED: ROLLBACK does not mark table as stale"
+
+ echo "=== Test 11: COMMIT Marks Table as Stale ==="
+ # Create a fresh table for commit test
+ $PSQL test -c "CREATE TABLE t_commit(i INTEGER);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Write inside a transaction, then commit, then read
+ $PSQL test <<EOF
+BEGIN;
+INSERT INTO t_commit VALUES (1);
+COMMIT;
+SELECT 'commit_test' as marker, * FROM t_commit;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 11 PASSED: COMMIT marks table as stale"
+ else
+ echo "Test 11 FAILED: Committed write did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo ""
+ echo "=== All Track Table Mutation Tests PASSED ==="
+
+ ./shutdownall
+
+ cd ..
+done
+
+exit 0
diff --git a/src/test/regression/tests/043.track_table_mutation_watchdog/.gitignore b/src/test/regression/tests/043.track_table_mutation_watchdog/.gitignore
new file mode 100644
index 0000000000000000000000000000000000000000..fcb93d27a7e7e8a5efe6eacfb0f88f6f3c8bc765
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation_watchdog/.gitignore
@@ -0,0 +1,3 @@
+leader
+standby
+*.pid
diff --git a/src/test/regression/tests/043.track_table_mutation_watchdog/leader.conf b/src/test/regression/tests/043.track_table_mutation_watchdog/leader.conf
new file mode 100644
index 0000000000000000000000000000000000000000..945cff9860d0357fbb0e3e9a5643124d916bd9c3
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation_watchdog/leader.conf
@@ -0,0 +1,25 @@
+# leader watchdog config for track_table_mutation watchdog test
+use_watchdog = on
+wd_interval = 1
+wd_priority = 2
+
+hostname0 = 'localhost'
+wd_port0 = 21004
+pgpool_port0 = 11000
+hostname1 = 'localhost'
+wd_port1 = 21104
+pgpool_port1 = 11100
+
+heartbeat_hostname0 = 'localhost'
+heartbeat_port0 = 21005
+heartbeat_hostname1 = 'localhost'
+heartbeat_port1 = 21105
+
+enable_consensus_with_half_votes = on
+
+# Enable track table mutation feature via dml_adaptive_global
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_cold_start_duration = 2000
+
+# Enable debug logging to see feature messages
+log_min_messages = debug1
diff --git a/src/test/regression/tests/043.track_table_mutation_watchdog/standby.conf b/src/test/regression/tests/043.track_table_mutation_watchdog/standby.conf
new file mode 100644
index 0000000000000000000000000000000000000000..a11c3dfca427cf6b246451d067c30b0255b9c4ce
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation_watchdog/standby.conf
@@ -0,0 +1,27 @@
+# standby watchdog config for track_table_mutation watchdog test
+port = 11100
+pcp_port = 11105
+use_watchdog = on
+wd_interval = 1
+wd_priority = 1
+
+hostname0 = 'localhost'
+wd_port0 = 21004
+pgpool_port0 = 11000
+hostname1 = 'localhost'
+wd_port1 = 21104
+pgpool_port1 = 11100
+
+heartbeat_hostname0 = 'localhost'
+heartbeat_port0 = 21005
+heartbeat_hostname1 = 'localhost'
+heartbeat_port1 = 21105
+
+enable_consensus_with_half_votes = on
+
+# Enable track table mutation feature via dml_adaptive_global
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_cold_start_duration = 2000
+
+# Enable debug logging to see feature messages
+log_min_messages = debug1
diff --git a/src/test/regression/tests/043.track_table_mutation_watchdog/test.sh b/src/test/regression/tests/043.track_table_mutation_watchdog/test.sh
new file mode 100755
index 0000000000000000000000000000000000000000..752a6e6aa377fe0c54244975e606648101c98cf8
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation_watchdog/test.sh
@@ -0,0 +1,179 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# test script for track table mutation global cold start on watchdog leader change.
+# Tests that when the watchdog leader changes, the new leader triggers
+# a global cold start to force all queries to primary.
+#
+source $TESTLIBS
+LEADER_DIR=leader
+STANDBY_DIR=standby
+PSQL=$PGBIN/psql
+success_count=0
+
+rm -fr $LEADER_DIR
+rm -fr $STANDBY_DIR
+
+mkdir $LEADER_DIR
+mkdir $STANDBY_DIR
+
+# dir in leader directory
+cd $LEADER_DIR
+
+# create leader environment with streaming replication
+echo -n "creating leader pgpool..."
+$PGPOOL_SETUP -m s -n 2 -p 11000 || exit 1
+echo "leader setup done."
+
+# copy the configurations to standby
+cp -r etc ../$STANDBY_DIR/
+
+source ./bashrc.ports
+cat ../leader.conf >> etc/pgpool.conf
+echo 0 > etc/pgpool_node_id
+
+./startall
+wait_for_pgpool_startup
+
+# back to test root dir
+cd ..
+
+# create standby environment
+mkdir $STANDBY_DIR/log
+echo -n "creating standby pgpool..."
+cat standby.conf >> $STANDBY_DIR/etc/pgpool.conf
+# since we are using the same pgpool-II conf as of leader, change the pid file path
+echo "pid_file_name = '$PWD/pgpool2.pid'" >> $STANDBY_DIR/etc/pgpool.conf
+echo 1 > $STANDBY_DIR/etc/pgpool_node_id
+# start the standby pgpool-II by hand
+$PGPOOL_INSTALL_DIR/bin/pgpool -D -n -f $STANDBY_DIR/etc/pgpool.conf -F $STANDBY_DIR/etc/pcp.conf -a $STANDBY_DIR/etc/pool_hba.conf > $STANDBY_DIR/log/pgpool.log 2>&1 &
+
+# Test 1: Check if leader pgpool-II started correctly
+echo "=== Test 1: Waiting for the pgpool leader... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "I am the cluster leader node. Starting escalation process" $LEADER_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 1 PASSED: Leader brought up successfully."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 1 ]; then
+ echo "Test 1 FAILED: Leader did not start"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 2: Check if standby has successfully joined
+echo "=== Test 2: Waiting for the standby to join cluster... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "successfully joined the watchdog cluster as standby node" $STANDBY_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 2 PASSED: Standby successfully connected."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 2 ]; then
+ echo "Test 2 FAILED: Standby did not join cluster"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 3: Verify track_table_mutation is enabled and working on leader
+echo "=== Test 3: Verify track_table_mutation is enabled ==="
+if grep -a "track_table_mutation: initialized" $LEADER_DIR/log/pgpool.log > /dev/null 2>&1; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 3 PASSED: track_table_mutation initialized on leader"
+else
+ echo "Test 3 FAILED: track_table_mutation not initialized on leader"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 4: Stop leader pgpool and trigger failover
+echo "=== Test 4: Triggering leader failover... ==="
+$PGPOOL_INSTALL_DIR/bin/pgpool -f $LEADER_DIR/etc/pgpool.conf -m f stop
+
+echo "Checking if the Standby pgpool-II detected the leader shutdown..."
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a " is shutting down" $STANDBY_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 4 PASSED: Leader shutdown detected."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 4 ]; then
+ echo "Test 4 FAILED: Leader shutdown not detected"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 5: Verify standby becomes new leader
+echo "=== Test 5: Checking if standby takes over as leader... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "I am the cluster leader node. Starting escalation process" $STANDBY_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 5 PASSED: Standby became the new leader."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 5 ]; then
+ echo "Test 5 FAILED: Standby did not become leader"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 6: Verify global cold start was triggered on new leader
+echo "=== Test 6: Checking if global cold start was triggered... ==="
+# The new leader should trigger global cold start when it becomes coordinator
+# Look for the log message that indicates global cold start was triggered
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "track_table_mutation: entering global cold start" $STANDBY_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 6 PASSED: Global cold start triggered on new leader."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+# Cleanup
+$PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+cd $LEADER_DIR
+./shutdownall
+
+echo ""
+echo "$success_count out of 6 successful"
+
+if test $success_count -eq 6
+then
+ echo "=== All Track Table Mutation Watchdog Tests PASSED ==="
+ exit 0
+fi
+
+exit 1
diff --git a/src/utils/pool_track_table_mutation.c b/src/utils/pool_track_table_mutation.c
new file mode 100644
index 0000000000000000000000000000000000000000..27d4f0380d43a237f518c60cdd73aba2ff51b723
--- /dev/null
+++ b/src/utils/pool_track_table_mutation.c
@@ -0,0 +1,1188 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_track_table_mutation.c: In-memory tracking of recently written tables
+ * to avoid stale reads from replicas during replication lag
+ *
+ * Based on the "lagless" architecture from Tailor Brands:
+ * https://medium.com/tailor-tech/using-database-read-replicas-in-distributed-systems-d80eaf6bbf8a
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include "pool.h"
+#include "pool_config.h"
+#include "context/pool_session_context.h"
+#include "utils/pool_track_table_mutation.h"
+#include "utils/elog.h"
+#include "utils/pool_ipc.h"
+#include "utils/palloc.h"
+#include "utils/pool_relcache.h"
+
+#define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_catalog.pg_database WHERE datname = '%s'"
+
+/* ----------------
+ * Local variables
+ * ----------------
+ */
+
+/* Pointer to shared memory structure */
+static TrackTableMutationShmem *track_table_mutation_shmem = NULL;
+
+/* Per-process cold start tracking (not in shared memory) */
+static struct timeval process_start_time;
+static bool cold_start_initialized = false;
+
+/* ----------------
+ * Helper macros for accessing flexible arrays in shared memory
+ * ----------------
+ */
+
+/* Get pointer to bucket array in table map */
+#define TABLE_MAP_BUCKETS(map) \
+ ((int *)((char *)(map) + sizeof(TrackTableMutationHashTable)))
+
+/* Get pointer to entry array in table map */
+#define TABLE_MAP_ENTRIES(map) \
+ ((TrackTableMutationEntry *)((char *)(map) + sizeof(TrackTableMutationHashTable) + \
+ (map)->num_buckets * sizeof(int)))
+
+/* Get pointer to bucket array in query cache */
+#define QUERY_CACHE_BUCKETS(cache) \
+ ((int *)((char *)(cache) + sizeof(QueryParseCache)))
+
+/* Get pointer to entry array in query cache */
+#define QUERY_CACHE_ENTRIES(cache) \
+ ((QueryParseEntry *)((char *)(cache) + sizeof(QueryParseCache) + \
+ (cache)->num_buckets * sizeof(int)))
+
+/* ----------------
+ * Semaphore lock helpers
+ * ----------------
+ */
+
+static inline void
+table_map_lock(void)
+{
+ pool_semaphore_lock(TRACK_TABLE_MUTATION_TABLE_SEM);
+}
+
+static inline void
+table_map_unlock(void)
+{
+ pool_semaphore_unlock(TRACK_TABLE_MUTATION_TABLE_SEM);
+}
+
+static inline void
+query_cache_lock(void)
+{
+ pool_semaphore_lock(TRACK_TABLE_MUTATION_QUERY_SEM);
+}
+
+static inline void
+query_cache_unlock(void)
+{
+ pool_semaphore_unlock(TRACK_TABLE_MUTATION_QUERY_SEM);
+}
+
+/* ----------------
+ * Hash functions
+ * ----------------
+ */
+
+/*
+ * FNV-1a hash for table/database oid pair
+ */
+static uint32
+fnv1a_hash_table_key(int table_oid, int dboid)
+{
+ uint32 hash = 2166136261u; /* FNV offset basis */
+ uint32 data[2];
+ const unsigned char *bytes;
+ size_t i;
+
+ data[0] = (uint32) table_oid;
+ data[1] = (uint32) dboid;
+ bytes = (const unsigned char *) data;
+
+ for (i = 0; i < sizeof(data); i++)
+ {
+ hash ^= bytes[i];
+ hash *= 16777619u; /* FNV prime */
+ }
+
+ return hash;
+}
+
+/*
+ * FNV-1a hash for 64-bit value
+ */
+static uint64
+fnv1a_hash_64(const char *str, size_t len)
+{
+ uint64 hash = 14695981039346656037ULL; /* FNV offset basis for 64-bit */
+ size_t i;
+
+ for (i = 0; i < len; i++)
+ {
+ hash ^= (uint8)str[i];
+ hash *= 1099511628211ULL; /* FNV prime for 64-bit */
+ }
+
+ return hash;
+}
+
+/* ----------------
+ * Time utilities
+ * ----------------
+ */
+
+/*
+ * Get elapsed time in microseconds between two timevals
+ */
+static int64
+elapsed_us(struct timeval *start, struct timeval *end)
+{
+ return ((int64)(end->tv_sec - start->tv_sec) * 1000000) +
+ (end->tv_usec - start->tv_usec);
+}
+
+/*
+ * Get current time
+ */
+static void
+get_current_time(struct timeval *tv)
+{
+ gettimeofday(tv, NULL);
+}
+
+/* ----------------
+ * Database oid lookup
+ * ----------------
+ */
+
+static int
+track_table_mutation_get_database_oid_internal(void)
+{
+ int oid = 0;
+ static POOL_RELCACHE *relcache;
+ POOL_CONNECTION_POOL *backend;
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Safety check: must have shmem initialized */
+ if (track_table_mutation_shmem == NULL)
+ return oid;
+
+ session_context = pool_get_session_context(false);
+ if (session_context == NULL)
+ return oid;
+
+ /* Ensure we have a valid query context */
+ if (session_context->query_context == NULL)
+ return oid;
+
+ backend = session_context->backend;
+ if (backend == NULL || MAIN_CONNECTION(backend) == NULL || MAIN_CONNECTION(backend)->sp == NULL)
+ return oid;
+
+ /* Ensure database name is valid */
+ if (MAIN_CONNECTION(backend)->sp->database == NULL)
+ return oid;
+
+ if (!relcache)
+ {
+ relcache = pool_create_relcache(pool_config->relcache_size,
+ DATABASE_TO_OID_QUERY,
+ int_register_func,
+ int_unregister_func,
+ false);
+ if (relcache == NULL)
+ {
+ ereport(LOG,
+ (errmsg("track_table_mutation: error creating relcache while getting database OID")));
+ return oid;
+ }
+ }
+
+ oid = (int) (intptr_t) pool_search_relcache(relcache, backend,
+ MAIN_CONNECTION(backend)->sp->database);
+ return oid;
+}
+
+int
+pool_track_table_mutation_get_database_oid(void)
+{
+ return track_table_mutation_get_database_oid_internal();
+}
+
+/* ----------------
+ * Table mutation hash table operations
+ * ----------------
+ */
+
+/*
+ * Initialize table mutation hash table
+ */
+static void
+table_map_init(TrackTableMutationHashTable *map, int num_buckets, int max_entries)
+{
+ int *buckets;
+ TrackTableMutationEntry *entries;
+ int i;
+
+ map->num_buckets = num_buckets;
+ map->max_entries = max_entries;
+ map->num_entries = 0;
+ map->free_list_head = 0;
+
+ buckets = TABLE_MAP_BUCKETS(map);
+ entries = TABLE_MAP_ENTRIES(map);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ /* Initialize free list - chain all entries */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ? i + 1 : TRACK_TABLE_MUTATION_INVALID_INDEX;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: initialized table map with %d buckets, %d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Allocate an entry from the free list
+ */
+static int
+table_map_alloc_entry(TrackTableMutationHashTable *map)
+{
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int idx;
+
+ if (map->free_list_head == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ return TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ idx = map->free_list_head;
+ map->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ map->num_entries++;
+
+ return idx;
+}
+
+/*
+ * Free an entry back to the free list
+ */
+static void
+table_map_free_entry(TrackTableMutationHashTable *map, int idx)
+{
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+
+ entries[idx].in_use = false;
+ entries[idx].next = map->free_list_head;
+ map->free_list_head = idx;
+ map->num_entries--;
+}
+
+/*
+ * Look up a table in the hash table
+ * Returns entry index or TRACK_TABLE_MUTATION_INVALID_INDEX if not found
+ * Must be called with lock held
+ */
+static int
+table_map_lookup(TrackTableMutationHashTable *map, int table_oid, int dboid, uint32 hash)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int bucket = hash % map->num_buckets;
+ int idx = buckets[bucket];
+
+ while (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ if (entries[idx].hash == hash &&
+ entries[idx].table_oid == table_oid &&
+ entries[idx].dboid == dboid)
+ {
+ return idx;
+ }
+ idx = entries[idx].next;
+ }
+
+ return TRACK_TABLE_MUTATION_INVALID_INDEX;
+}
+
+/*
+ * Insert or update a table entry
+ * Must be called with lock held
+ */
+static void
+table_map_insert(TrackTableMutationHashTable *map, int table_oid, int dboid,
+ uint32 hash, struct timeval *write_time)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int bucket = hash % map->num_buckets;
+ int idx;
+
+ /* Check if entry already exists */
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ /* Update existing entry */
+ entries[idx].last_write_time = *write_time;
+ return;
+ }
+
+ /* Allocate new entry */
+ idx = table_map_alloc_entry(map);
+ if (idx == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ int b;
+ /* Table is full - evict an entry */
+ /* For simplicity, just use the first entry in first non-empty bucket */
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ if (buckets[b] != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ int victim = buckets[b];
+ buckets[b] = entries[victim].next;
+ table_map_free_entry(map, victim);
+ idx = table_map_alloc_entry(map);
+ break;
+ }
+ }
+
+ if (idx == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ ereport(WARNING,
+ (errmsg("track_table_mutation: failed to allocate entry for table oid %d (dboid %d)",
+ table_oid, dboid)));
+ return;
+ }
+ }
+
+ /* Initialize new entry */
+ entries[idx].table_oid = table_oid;
+ entries[idx].dboid = dboid;
+ entries[idx].hash = hash;
+ entries[idx].last_write_time = *write_time;
+
+ /* Insert at head of bucket chain */
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: marked table oid %d (dboid %d) as written",
+ table_oid, dboid)));
+}
+
+/*
+ * Remove expired entries from the table map
+ * Must be called with lock held
+ */
+static void
+table_map_cleanup_expired(TrackTableMutationHashTable *map, uint64 ttl_us)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ struct timeval now;
+ int removed = 0;
+ int b;
+
+ get_current_time(&now);
+
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ int *prev_ptr = &buckets[b];
+ int idx = buckets[b];
+
+ while (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now);
+
+ if (elapsed > (int64)ttl_us)
+ {
+ /* Entry has expired - remove it */
+ int next = entries[idx].next;
+ *prev_ptr = next;
+ table_map_free_entry(map, idx);
+ idx = next;
+ removed++;
+ }
+ else
+ {
+ prev_ptr = &entries[idx].next;
+ idx = entries[idx].next;
+ }
+ }
+ }
+
+ if (removed > 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: cleaned up %d expired table entries", removed)));
+ }
+}
+
+/* ----------------
+ * Query parse cache operations
+ * ----------------
+ */
+
+/*
+ * Initialize query parse cache
+ */
+static void
+query_cache_init(QueryParseCache *cache, int num_buckets, int max_entries)
+{
+ int *buckets;
+ QueryParseEntry *entries;
+ int i;
+
+ cache->num_buckets = num_buckets;
+ cache->max_entries = max_entries;
+ cache->num_entries = 0;
+ cache->free_list_head = 0;
+ cache->lru_head = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ cache->lru_tail = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ buckets = QUERY_CACHE_BUCKETS(cache);
+ entries = QUERY_CACHE_ENTRIES(cache);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ /* Initialize free list */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ? i + 1 : TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[i].lru_prev = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[i].lru_next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: initialized query cache with %d buckets, %d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Move entry to front of LRU list (most recently used)
+ */
+static void
+query_cache_lru_touch(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ /* Already at head? */
+ if (cache->lru_head == idx)
+ return;
+
+ /* Remove from current position */
+ if (entries[idx].lru_prev != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next;
+ if (entries[idx].lru_next != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev;
+ if (cache->lru_tail == idx)
+ cache->lru_tail = entries[idx].lru_prev;
+
+ /* Insert at head */
+ entries[idx].lru_prev = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[idx].lru_next = cache->lru_head;
+ if (cache->lru_head != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[cache->lru_head].lru_prev = idx;
+ cache->lru_head = idx;
+ if (cache->lru_tail == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Add entry to LRU list (at head)
+ */
+static void
+query_cache_lru_add(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ entries[idx].lru_prev = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[idx].lru_next = cache->lru_head;
+
+ if (cache->lru_head != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[cache->lru_head].lru_prev = idx;
+
+ cache->lru_head = idx;
+
+ if (cache->lru_tail == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Remove entry from LRU list
+ */
+static void
+query_cache_lru_remove(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ if (entries[idx].lru_prev != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next;
+ else
+ cache->lru_head = entries[idx].lru_next;
+
+ if (entries[idx].lru_next != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev;
+ else
+ cache->lru_tail = entries[idx].lru_prev;
+
+ entries[idx].lru_prev = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[idx].lru_next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+}
+
+/*
+ * Allocate entry from free list, evicting LRU if necessary
+ */
+static int
+query_cache_alloc_entry(QueryParseCache *cache)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int *buckets = QUERY_CACHE_BUCKETS(cache);
+ int idx;
+
+ if (cache->free_list_head != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ idx = cache->free_list_head;
+ cache->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ cache->num_entries++;
+ return idx;
+ }
+
+ /* No free entries - evict LRU */
+ if (cache->lru_tail == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ return TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ idx = cache->lru_tail;
+
+ /* Remove from hash bucket */
+ int bucket = entries[idx].query_hash % cache->num_buckets;
+ int *prev_ptr = &buckets[bucket];
+ int curr = buckets[bucket];
+
+ while (curr != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ if (curr == idx)
+ {
+ *prev_ptr = entries[curr].next;
+ break;
+ }
+ prev_ptr = &entries[curr].next;
+ curr = entries[curr].next;
+ }
+
+ /* Remove from LRU list */
+ query_cache_lru_remove(cache, idx);
+
+ /* Reinitialize entry */
+ entries[idx].in_use = true;
+ entries[idx].next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ return idx;
+}
+
+/*
+ * Look up a query in the cache
+ */
+static int
+query_cache_lookup(QueryParseCache *cache, uint64 hash)
+{
+ int *buckets = QUERY_CACHE_BUCKETS(cache);
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int bucket = hash % cache->num_buckets;
+ int idx = buckets[bucket];
+
+ while (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ if (entries[idx].query_hash == hash)
+ return idx;
+ idx = entries[idx].next;
+ }
+
+ return TRACK_TABLE_MUTATION_INVALID_INDEX;
+}
+
+/* ----------------
+ * Query normalization
+ * ----------------
+ */
+
+/*
+ * Simple query normalization:
+ * - Strip comments (-- style and C-style block comments)
+ * - Collapse whitespace
+ * - Convert to lowercase (except inside strings)
+ * - Replace literal values with placeholders
+ *
+ * This is a simplified version - pgpool2 already does this elsewhere,
+ * but we need a standalone version for the track table mutation feature.
+ */
+static size_t
+normalize_query(const char *query, char *output, size_t output_size)
+{
+ const char *src = query;
+ char *dst = output;
+ char *dst_end = output + output_size - 1;
+ bool in_string = false;
+ char string_char = 0;
+ bool last_was_space = true; /* Start true to skip leading space */
+
+ while (*src && dst < dst_end)
+ {
+ /* Handle string literals */
+ if (in_string)
+ {
+ if (*src == string_char)
+ {
+ if (*(src + 1) == string_char)
+ {
+ /* Escaped quote */
+ src += 2;
+ continue;
+ }
+ in_string = false;
+ *dst++ = '$'; /* Replace string content with placeholder */
+ }
+ src++;
+ continue;
+ }
+
+ /* Check for string start */
+ if (*src == '\'' || *src == '"')
+ {
+ in_string = true;
+ string_char = *src;
+ src++;
+ continue;
+ }
+
+ /* Handle single-line comments */
+ if (*src == '-' && *(src + 1) == '-')
+ {
+ while (*src && *src != '\n')
+ src++;
+ continue;
+ }
+
+ /* Handle multi-line comments */
+ if (*src == '/' && *(src + 1) == '*')
+ {
+ src += 2;
+ while (*src && !(*src == '*' && *(src + 1) == '/'))
+ src++;
+ if (*src)
+ src += 2;
+ continue;
+ }
+
+ /* Handle whitespace */
+ if (*src == ' ' || *src == '\t' || *src == '\n' || *src == '\r')
+ {
+ if (!last_was_space)
+ {
+ *dst++ = ' ';
+ last_was_space = true;
+ }
+ src++;
+ continue;
+ }
+
+ /* Handle numbers - replace with placeholder */
+ if ((*src >= '0' && *src <= '9') ||
+ (*src == '.' && *(src + 1) >= '0' && *(src + 1) <= '9'))
+ {
+ while (*src && ((*src >= '0' && *src <= '9') || *src == '.'))
+ src++;
+ if (!last_was_space && dst > output && *(dst - 1) != '$')
+ *dst++ = '$';
+ last_was_space = false;
+ continue;
+ }
+
+ /* Regular character - convert to lowercase */
+ if (*src >= 'A' && *src <= 'Z')
+ *dst++ = *src + 32;
+ else
+ *dst++ = *src;
+
+ last_was_space = false;
+ src++;
+ }
+
+ /* Remove trailing space */
+ if (dst > output && *(dst - 1) == ' ')
+ dst--;
+
+ *dst = '\0';
+ return dst - output;
+}
+
+/* ----------------
+ * Public API implementation
+ * ----------------
+ */
+
+Size
+pool_track_table_mutation_shmem_size(void)
+{
+ Size size = 0;
+ int table_buckets = pool_config->track_table_mutation_table_buckets;
+ int table_size = pool_config->track_table_mutation_table_size;
+ int query_buckets = pool_config->track_table_mutation_query_buckets;
+ int query_cache_size = pool_config->track_table_mutation_query_parse_cache_size;
+
+ /* Main structure */
+ size += sizeof(TrackTableMutationShmem);
+
+ /* Table mutation hash table */
+ size += sizeof(TrackTableMutationHashTable);
+ size += table_buckets * sizeof(int); /* buckets array */
+ size += table_size * sizeof(TrackTableMutationEntry); /* entries array */
+
+ /* Query parse cache */
+ size += sizeof(QueryParseCache);
+ size += query_buckets * sizeof(int); /* buckets array */
+ size += query_cache_size * sizeof(QueryParseEntry); /* entries array */
+
+ return size;
+}
+
+void
+pool_track_table_mutation_init(void)
+{
+#ifndef POOL_PRIVATE
+ Size shmem_size;
+ char *shmem_ptr;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: feature disabled")));
+ return;
+ }
+
+ shmem_size = pool_track_table_mutation_shmem_size();
+
+ /*
+ * Allocate from the main shared memory segment.
+ * Memory is already zeroed by initialize_shared_memory_main_segment().
+ */
+ shmem_ptr = pool_shared_memory_segment_get_chunk(shmem_size);
+ if (shmem_ptr == NULL)
+ {
+ ereport(ERROR,
+ (errmsg("track_table_mutation: failed to allocate %zu bytes of shared memory",
+ shmem_size)));
+ return;
+ }
+
+ /* Set up pointers to structures within shared memory */
+ track_table_mutation_shmem = (TrackTableMutationShmem *)shmem_ptr;
+ shmem_ptr += sizeof(TrackTableMutationShmem);
+
+ track_table_mutation_shmem->table_map = (TrackTableMutationHashTable *)shmem_ptr;
+ shmem_ptr += sizeof(TrackTableMutationHashTable);
+ shmem_ptr += pool_config->track_table_mutation_table_buckets * sizeof(int);
+ shmem_ptr += pool_config->track_table_mutation_table_size * sizeof(TrackTableMutationEntry);
+
+ track_table_mutation_shmem->query_cache = (QueryParseCache *)shmem_ptr;
+
+ /* Initialize structures */
+ table_map_init(track_table_mutation_shmem->table_map,
+ pool_config->track_table_mutation_table_buckets,
+ pool_config->track_table_mutation_table_size);
+
+ query_cache_init(track_table_mutation_shmem->query_cache,
+ pool_config->track_table_mutation_query_buckets,
+ pool_config->track_table_mutation_query_parse_cache_size);
+
+ /* Initialize global state */
+ track_table_mutation_shmem->state.initialized = true;
+ track_table_mutation_shmem->state.current_ttl_us = TRACK_TABLE_MUTATION_DEFAULT_TTL_US;
+ get_current_time(&track_table_mutation_shmem->state.ttl_last_updated);
+ get_current_time(&track_table_mutation_shmem->state.last_cleanup_time);
+ track_table_mutation_shmem->state.global_cold_start_until.tv_sec = 0;
+ track_table_mutation_shmem->state.global_cold_start_until.tv_usec = 0;
+ track_table_mutation_shmem->state.stats_queries_checked = 0;
+ track_table_mutation_shmem->state.stats_forced_primary = 0;
+ track_table_mutation_shmem->state.stats_allowed_replica = 0;
+
+ ereport(LOG,
+ (errmsg("track_table_mutation: initialized with %zu bytes shared memory",
+ shmem_size)));
+#endif
+}
+
+void
+pool_track_table_mutation_child_init(void)
+{
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ get_current_time(&process_start_time);
+ cold_start_initialized = true;
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: child initialized, cold start period %d ms",
+ pool_config->track_table_mutation_cold_start_duration)));
+}
+
+bool
+pool_track_table_mutation_in_cold_start(void)
+{
+ struct timeval now;
+ int64 elapsed_ms;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return false;
+
+ if (pool_config->track_table_mutation_cold_start_duration <= 0)
+ return false;
+
+ get_current_time(&now);
+
+ /* Check for watchdog-triggered global cold start first */
+ if (track_table_mutation_shmem->state.global_cold_start_until.tv_sec != 0 &&
+ elapsed_us(&now, &track_table_mutation_shmem->state.global_cold_start_until) > 0)
+ {
+ return true;
+ }
+
+ /* Check per-process cold start */
+ if (!cold_start_initialized)
+ return false;
+
+ elapsed_ms = elapsed_us(&process_start_time, &now) / 1000;
+
+ if (elapsed_ms < pool_config->track_table_mutation_cold_start_duration)
+ {
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: in cold start (%ld/%d ms)",
+ (long)elapsed_ms, pool_config->track_table_mutation_cold_start_duration)));
+ return true;
+ }
+
+ return false;
+}
+
+void
+pool_track_table_mutation_trigger_global_cold_start(void)
+{
+ struct timeval now;
+ int duration_ms;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ duration_ms = pool_config->track_table_mutation_cold_start_duration;
+ if (duration_ms <= 0)
+ return;
+
+ get_current_time(&now);
+ track_table_mutation_shmem->state.global_cold_start_until = now;
+ track_table_mutation_shmem->state.global_cold_start_until.tv_sec += duration_ms / 1000;
+ track_table_mutation_shmem->state.global_cold_start_until.tv_usec += (duration_ms % 1000) * 1000;
+ if (track_table_mutation_shmem->state.global_cold_start_until.tv_usec >= 1000000)
+ {
+ track_table_mutation_shmem->state.global_cold_start_until.tv_sec +=
+ track_table_mutation_shmem->state.global_cold_start_until.tv_usec / 1000000;
+ track_table_mutation_shmem->state.global_cold_start_until.tv_usec %=
+ 1000000;
+ }
+
+ ereport(LOG,
+ (errmsg("track_table_mutation: entering global cold start for %d ms",
+ duration_ms)));
+}
+
+bool
+pool_track_table_mutation_table_is_stale(int table_oid, int dboid)
+{
+ TrackTableMutationHashTable *map;
+ struct timeval now;
+ uint64 ttl_us;
+ uint32 hash;
+ int idx;
+ bool is_stale = false;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return false;
+
+ if (table_oid <= 0 || dboid <= 0)
+ {
+ is_stale = true;
+ goto update_stats;
+ }
+
+ map = track_table_mutation_shmem->table_map;
+ hash = fnv1a_hash_table_key(table_oid, dboid);
+
+ table_map_lock();
+
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ get_current_time(&now);
+ ttl_us = track_table_mutation_shmem->state.current_ttl_us;
+
+ int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now);
+ is_stale = (elapsed < (int64)ttl_us);
+
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: table oid %d (dboid %d) elapsed=%ld us, ttl=%lu us, stale=%d",
+ table_oid, dboid, (long)elapsed, (unsigned long)ttl_us, is_stale)));
+ }
+
+ table_map_unlock();
+
+update_stats:
+ /* Update statistics - skip if shmem not available */
+ if (track_table_mutation_shmem != NULL)
+ {
+ __sync_fetch_and_add(&track_table_mutation_shmem->state.stats_queries_checked, 1);
+ if (is_stale)
+ __sync_fetch_and_add(&track_table_mutation_shmem->state.stats_forced_primary, 1);
+ else
+ __sync_fetch_and_add(&track_table_mutation_shmem->state.stats_allowed_replica, 1);
+ }
+
+ return is_stale;
+}
+
+void
+pool_track_table_mutation_mark_tables_written(const int *table_oids, int num_tables, int dboid)
+{
+ TrackTableMutationHashTable *map;
+ struct timeval now;
+ int i;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ if (num_tables <= 0 || table_oids == NULL || dboid <= 0)
+ return;
+
+ map = track_table_mutation_shmem->table_map;
+ get_current_time(&now);
+
+ table_map_lock();
+
+ /* Periodically clean up expired entries */
+ if (map->num_entries > map->max_entries * 3 / 4)
+ {
+ /* Limit cleanup frequency to avoid O(N) scan on every write */
+ /* 100ms interval */
+ if (elapsed_us(&track_table_mutation_shmem->state.last_cleanup_time, &now) > 100000)
+ {
+ table_map_cleanup_expired(map, track_table_mutation_shmem->state.current_ttl_us);
+ track_table_mutation_shmem->state.last_cleanup_time = now;
+ }
+ }
+
+ for (i = 0; i < num_tables; i++)
+ {
+ uint32 hash;
+ int table_oid = table_oids[i];
+
+ if (table_oid > 0)
+ {
+ hash = fnv1a_hash_table_key(table_oid, dboid);
+ table_map_insert(map, table_oid, dboid, hash, &now);
+ }
+ }
+
+ table_map_unlock();
+}
+
+/*
+ * Convenience function to mark a single table as written
+ */
+void
+pool_track_table_mutation_mark_table_written(int table_oid, int dboid)
+{
+ if (table_oid > 0 && dboid > 0)
+ {
+ const int tables[1] = { table_oid };
+ pool_track_table_mutation_mark_tables_written(tables, 1, dboid);
+ }
+}
+
+void
+pool_track_table_mutation_update_ttl(uint64 delay_us)
+{
+ uint64 new_ttl;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ /* Calculate new TTL: delay * factor, with minimum of default TTL */
+ new_ttl = (uint64)(delay_us * pool_config->track_table_mutation_ttl_factor);
+ if (new_ttl < TRACK_TABLE_MUTATION_DEFAULT_TTL_US)
+ new_ttl = TRACK_TABLE_MUTATION_DEFAULT_TTL_US;
+
+ /* Maximum TTL of 1 hour */
+ if (new_ttl > 3600ULL * 1000000ULL)
+ new_ttl = 3600ULL * 1000000ULL;
+
+ track_table_mutation_shmem->state.current_ttl_us = new_ttl;
+ get_current_time(&track_table_mutation_shmem->state.ttl_last_updated);
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: updated TTL to %lu us (delay=%lu us, factor=%.1f)",
+ (unsigned long)new_ttl, (unsigned long)delay_us,
+ pool_config->track_table_mutation_ttl_factor)));
+}
+
+bool
+pool_track_table_mutation_get_cached_parse(uint64 hash, bool *is_write,
+ char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int *num_tables)
+{
+ QueryParseCache *cache;
+ int idx;
+ bool found = false;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return false;
+
+ cache = track_table_mutation_shmem->query_cache;
+
+ query_cache_lock();
+
+ idx = query_cache_lookup(cache, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int i;
+
+ *is_write = entries[idx].is_write;
+ *num_tables = entries[idx].num_tables;
+
+ for (i = 0; i < entries[idx].num_tables && i < TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY; i++)
+ {
+ strlcpy(table_names[i], entries[idx].table_names[i], TRACK_TABLE_MUTATION_TABLE_NAME_LEN);
+ }
+
+ /* Move to front of LRU */
+ query_cache_lru_touch(cache, idx);
+ found = true;
+ }
+
+ query_cache_unlock();
+
+ return found;
+}
+
+void
+pool_track_table_mutation_cache_parse(uint64 hash, bool is_write,
+ const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int num_tables)
+{
+ QueryParseCache *cache;
+ int *buckets;
+ QueryParseEntry *entries;
+ int idx;
+ int bucket;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ cache = track_table_mutation_shmem->query_cache;
+
+ query_cache_lock();
+
+ /* Check if already exists */
+ idx = query_cache_lookup(cache, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ query_cache_unlock();
+ return;
+ }
+
+ /* Allocate new entry (may evict LRU) */
+ idx = query_cache_alloc_entry(cache);
+ if (idx == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ query_cache_unlock();
+ ereport(WARNING,
+ (errmsg("track_table_mutation: failed to allocate query cache entry")));
+ return;
+ }
+
+ entries = QUERY_CACHE_ENTRIES(cache);
+ buckets = QUERY_CACHE_BUCKETS(cache);
+
+ /* Fill in entry */
+ entries[idx].query_hash = hash;
+ entries[idx].is_write = is_write;
+ entries[idx].num_tables = (num_tables > TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY) ?
+ TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY : num_tables;
+
+ {
+ int i;
+ for (i = 0; i < entries[idx].num_tables; i++)
+ {
+ strlcpy(entries[idx].table_names[i], table_names[i], TRACK_TABLE_MUTATION_TABLE_NAME_LEN);
+ }
+ }
+
+ /* Insert into hash bucket */
+ bucket = hash % cache->num_buckets;
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ /* Add to LRU list */
+ query_cache_lru_add(cache, idx);
+
+ query_cache_unlock();
+}
+
+uint64
+pool_track_table_mutation_normalize_and_hash(const char *query)
+{
+ char normalized[8192];
+ size_t len;
+
+ if (query == NULL || query[0] == '\0')
+ return 0;
+
+ len = normalize_query(query, normalized, sizeof(normalized));
+ if (len == 0)
+ return 0;
+
+ return fnv1a_hash_64(normalized, len);
+}
--
2.52.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
In-Reply-To: <CACeKOO21jgrafj6RzO+ibLjBDUdDeQ0Tuu3tatkmGUdQi+GAAQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox