public inbox for [email protected]
help / color / mirror / Atom feedFrom: Nadav Shatz <[email protected]>
To: Tatsuo Ishii <[email protected]>
Cc: [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
Date: Thu, 19 Feb 2026 13:05:41 +0200
Message-ID: <CACeKOO0gEfwBQ1J6sdpXDV_=2RYhg9wemE=x=LOEDimvyH5Xfg@mail.gmail.com> (raw)
In-Reply-To: <CACeKOO14e4h1na+J5TEPkHGuSVKA8YzL6gUaoCqgCqyoffCjHA@mail.gmail.com>
References: <CACeKOO2QqpnML1OkQqqCCc+xG1d2M+sS7y7zE9vw5W-DXu+xKQ@mail.gmail.com>
<[email protected]>
<CACeKOO21jgrafj6RzO+ibLjBDUdDeQ0Tuu3tatkmGUdQi+GAAQ@mail.gmail.com>
<[email protected]>
<CACeKOO14e4h1na+J5TEPkHGuSVKA8YzL6gUaoCqgCqyoffCjHA@mail.gmail.com>
Added some handling for possible causes - works now.
On Thu, Feb 19, 2026 at 6:40 AM Nadav Shatz <[email protected]> wrote:
> Thanks! I’ll look into it and share an updated patch
>
> Nadav Shatz
> Tailor Brands | CTO
>
>
> On Thu, Feb 19, 2026 at 1:51 AM Tatsuo Ishii <[email protected]> wrote:
>
>> > Hi Tatsuo,
>> >
>> > Thank you for the careful review. You raised an important concern.
>> I've
>> > addressed it in the updated patch ― here's the explanation:
>> >
>> > The attack scenario you describe is now handled. In the updated patch,
>> > writes inside explicit transactions are only flushed to the
>> shared-memory
>> > table map at COMMIT time. If the transaction is rolled back, the table
>> is
>> > never marked as stale. So the attack pattern:
>> >
>> > BEGIN;
>> > UPDATE t1 SET i = 1 WHERE FALSE;
>> > ROLLBACK;
>> >
>> > has zero effect on the shared-memory table map. The
>> dml_adaptive_global
>> > mode piggybacks on the existing dml_adaptive per-transaction write list
>> > (transaction_temp_write_list). On COMMIT, the accumulated table names
>> are
>> > resolved to OIDs and flushed to shared memory. On ROLLBACK,
>> > the list is simply discarded (the existing dml_adaptive behavior).
>> >
>> > For autocommit statements (outside explicit transactions), tables are
>> > marked immediately ― but in that case the write is committed, so this is
>> > correct.
>> >
>> > Regression test included. Test 042 now includes:
>> > - Test 10: verifies that BEGIN; INSERT; ROLLBACK; SELECT does NOT
>> route
>> > the SELECT to primary
>> > - Test 11: verifies that BEGIN; INSERT; COMMIT; SELECT DOES route the
>> > SELECT to primary
>> >
>> > Additional context on the threat model:
>> >
>> > 1. This feature requires disable_load_balance_on_write =
>> > 'dml_adaptive_global' ― it is opt-in, not enabled by default. Operators
>> who
>> > enable it accept documented trade-offs (additional shared memory,
>> TTL-based
>> > staleness window).
>> > 2. An attacker who can connect and execute SQL against pgpool already
>> has
>> > the ability to cause far more damage (DROP TABLE, mass DELETEs, resource
>> > exhaustion via expensive queries, connection flooding, etc.). The
>> > table-marking via committed writes is a minor concern compared to
>> > those vectors. Authentication, connection limits, and network security
>> > are the appropriate defenses at that layer.
>> > 3. Even in the worst case (an attacker commits real writes in a loop),
>> > the impact is bounded: the stale marking is temporary (TTL-based,
>> typically
>> > a few seconds), and only affects load-balancing decisions ― it doesn't
>> > cause data loss or correctness issues.
>> > 4. The existing dml_adaptive mode has analogous behavior: within a
>> > transaction, a write to table T causes all reads of T to go to primary
>> for
>> > the remainder of that transaction. The only difference is scope ―
>> > dml_adaptive_global extends this across sessions with a TTL.
>> >
>> > Thanks!
>>
>> Thank you for the patch. While I am looking into it, I noticed a
>> regression test failure.
>>
>> t-ishii$ ./regress.sh 04[12]
>> creating pgpool-II temporary installation ...
>> :
>> :
>> testing 041.external_replication_delay...ok.
>> testing 042.track_table_mutation...failed.
>> out of 2 ok:1 failed:1 timeout:0
>>
>> However if I run 042 only, it succeeds.
>>
>> t-ishii$ ./regress.sh 042
>> :
>> :
>> testing 042.track_table_mutation...ok.
>> out of 1 ok:1 failed:0 timeout:0
>>
>> Can you please take a look at this? log/042.track_table_mutation
>> attached.
>>
>> Best regards,
>> --
>> Tatsuo Ishii
>> SRA OSS K.K.
>> English: http://www.sraoss.co.jp/index_en/
>> Japanese:http://www.sraoss.co.jp
>>
>
--
Nadav Shatz
Tailor Brands | CTO
Attachments:
[application/octet-stream] table_track.patch (99.8K, 3-table_track.patch)
download | inline diff:
From d819632f2dac41cbe1e01363628d1d1c2f648961 Mon Sep 17 00:00:00 2001
From: Nadav Shatz <[email protected]>
Date: Tue, 6 Jan 2026 12:41:50 +0200
Subject: [PATCH] Feature: add in-memory table tracking to prevent stale reads
from replicas
Introduces 'dml_adaptive_global' as a new value for disable_load_balance_on_write.
This mode is a superset of dml_adaptive: it performs per-transaction local tracking
AND cross-session shared-memory tracking of recently written tables, routing reads
to primary until a TTL (based on measured replication delay) expires.
Sub-parameters (track_table_mutation_*) control TTL factor, cold start duration,
hash table sizing, and query parse cache sizing.
diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml
index ee19fabebab2210cd4abe59a711a036ac0ac8943..74162ef2f81f38879c552438ee9321dfde34a4be 100644
--- a/doc/src/sgml/loadbalance.sgml
+++ b/doc/src/sgml/loadbalance.sgml
@@ -1108,6 +1108,18 @@ app_name_redirect_preference_list > database_redirect_preference_list > us
Dependent functions, triggers, and views on the tables can be configured
using <xref linkend="guc-dml-adaptive-object-relationship-list">
</para>
+
+ <para>
+ If this parameter is set to <varname>dml_adaptive_global</varname>,
+ <productname>Pgpool-II</> behaves like <varname>dml_adaptive</varname>
+ (per-transaction write tracking) and additionally uses shared memory to track
+ recently written tables across all sessions cluster-wide. When a table is
+ written in any session, subsequent reads of that table from any session are
+ routed to primary until a TTL (based on measured replication delay) expires.
+ This prevents stale reads after writes even across different connections.
+ See <xref linkend="runtime-config-track-table-mutation"> for the sub-parameters
+ that control the shared-memory tracking behavior.
+ </para>
</listitem>
</varlistentry>
@@ -1193,4 +1205,255 @@ dml_adaptive_object_relationship_list = 'table_1:table_2'
</variablelist>
</sect2>
+
+ <sect2 id="runtime-config-table-mutation-map">
+ <title>Table Mutation Map Configuration (Lagless Replica Reads)</title>
+
+ <para>
+ These parameters configure the track table mutation feature, which is activated by setting
+ <xref linkend="guc-disable-load-balance-on-write"> to <literal>dml_adaptive_global</literal>.
+ The feature tracks recently written tables to prevent stale reads from replica nodes during
+ replication lag, implementing the "lagless" architecture pattern for distributed systems
+ with read replicas.
+ </para>
+
+ <para>
+ When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period
+ (<literal>replication_delay * track_table_mutation_ttl_factor</literal>). Any SELECT queries on stale tables are routed
+ to the primary node instead of replicas, ensuring read-after-write consistency.
+ </para>
+
+ <para>
+ This feature requires <xref linkend="guc-replication-delay-source-cmd"> to be configured
+ for monitoring replication delay from replicas.
+ </para>
+
+ <warning>
+ <para>
+ Enabling <literal>dml_adaptive_global</literal> increases shared memory consumption. With default settings,
+ the feature requires approximately 6.4 MB of shared memory (0.1 MB for table tracking + 6.3 MB for query cache).
+ Memory usage scales with configuration parameters:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ Table tracking: <literal>track_table_mutation_table_size * 40 bytes</literal> (default: 2048 * 40 = ~80 KB)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ Query cache: <literal>track_table_mutation_query_parse_cache_size * 640 bytes</literal> (default: 10000 * 640 = ~6.3 MB)
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ For high-traffic systems with large cache sizes (e.g., <literal>track_table_mutation_query_parse_cache_size = 100000</literal>),
+ memory usage can reach 64 MB or more. Consider your system's available shared memory when using <literal>dml_adaptive_global</literal>.
+ </para>
+ </warning>
+
+ <variablelist>
+
+ <varlistentry id="guc-track-table-mutation-ttl-factor" xreflabel="track_table_mutation_ttl_factor">
+ <term><varname>track_table_mutation_ttl_factor</varname> (<type>floating point</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_ttl_factor</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Multiplier for calculating the TTL: <literal>TTL = replication_delay * track_table_mutation_ttl_factor</literal>.
+ Higher values provide more safety margin but may reduce read replica utilization.
+ </para>
+ <para>
+ Valid range: 1.0-100.0. Default is <literal>5.0</literal>.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-cold-start-duration" xreflabel="track_table_mutation_cold_start_duration">
+ <term><varname>track_table_mutation_cold_start_duration</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_cold_start_duration</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Duration in milliseconds to route all queries to primary after a child process starts.
+ This prevents stale reads when a new connection is established before the track table mutation
+ is populated with recent write history.
+ </para>
+ <para>
+ When watchdog is enabled and the local node becomes the leader, Pgpool-II also triggers a
+ global cold start for this duration to avoid stale reads after leadership changes.
+ </para>
+ <para>
+ Valid range: 0-60000 ms. Default is <literal>2000</literal> (2 seconds).
+ Set to 0 to disable cold start behavior.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-table-buckets" xreflabel="track_table_mutation_table_buckets">
+ <term><varname>track_table_mutation_table_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_table_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the track table mutation hash table.
+ Higher values reduce hash collisions and improve lookup performance.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>1024</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-table-size" xreflabel="track_table_mutation_table_size">
+ <term><varname>track_table_mutation_table_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_table_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of tables that can be tracked simultaneously in the track table mutation.
+ When full, oldest entries are evicted using a simple eviction strategy.
+ </para>
+ <para>
+ Valid range: 128-131072. Default is <literal>2048</literal>.
+ Memory usage: approximately 40 bytes per entry.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-query-buckets" xreflabel="track_table_mutation_query_buckets">
+ <term><varname>track_table_mutation_query_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_query_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the query parse cache. The cache stores normalized
+ query strings mapped to their table dependencies to avoid repeated parsing.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>2048</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-query-parse-cache-size" xreflabel="track_table_mutation_query_parse_cache_size">
+ <term><varname>track_table_mutation_query_parse_cache_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_query_parse_cache_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of query parse results to cache. Uses LRU eviction when full.
+ Larger caches reduce parsing overhead but consume more shared memory.
+ </para>
+ <para>
+ Valid range: 100-1000000. Default is <literal>10000</literal>.
+ Memory usage: approximately 640 bytes per entry (~6.3 MB for default, ~64 MB for 100000 entries).
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+
+ <sect3 id="runtime-config-track-table-mutation-example">
+ <title>Track Table Mutation Configuration Example</title>
+ <para>
+ To enable track table mutation with replication delay monitoring:
+ </para>
+ <programlisting>
+# Enable dml_adaptive_global mode (includes track table mutation)
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_ttl_factor = 5.0
+track_table_mutation_cold_start_duration = 2000
+
+# Configure external replication delay monitoring
+replication_delay_source_cmd = '/path/to/get-replication-delay.sh'
+replication_delay_source_timeout = 10
+
+# Adjust cache sizes based on workload (increases memory usage)
+track_table_mutation_table_size = 4096 # Track up to 4096 tables (~160 KB)
+track_table_mutation_query_parse_cache_size = 50000 # Cache 50k queries (~31 MB)
+ </programlisting>
+ <para>
+ Total shared memory required for above configuration: approximately 31.2 MB (31 MB query cache + 0.2 MB table map + overhead).
+ Default configuration (10000 query cache entries, 2048 tables) requires approximately 6.4 MB.
+ </para>
+ </sect3>
+
+ <sect3 id="runtime-config-track-table-mutation-limitations">
+ <title>Limitations</title>
+ <para>
+ The track table mutation feature has the following limitation:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>PREPARE</literal> statements are not tracked. When a prepared statement
+ containing data modification is executed, the table mutation is not recorded.
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ If your application uses prepared statements and requires read-after-write consistency,
+ consider using explicit transaction routing or the <literal>/*NO LOAD BALANCE*/</literal>
+ comment directive for affected queries.
+ </para>
+ <para>
+ The following statement types <emphasis>are</emphasis> tracked and will mark tables as stale:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>INSERT</literal>, <literal>UPDATE</literal>, <literal>DELETE</literal>
+ statements (including those with <literal>RETURNING</literal> clauses).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>TRUNCATE</literal> statements (including multiple tables).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>MERGE</literal> statements (PostgreSQL 15+).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>WITH</literal> clauses containing data modifications (Common Table Expressions
+ with <literal>INSERT</literal>, <literal>UPDATE</literal>, or <literal>DELETE</literal>).
+ For example, <literal>WITH deleted AS (DELETE FROM t1 RETURNING *) SELECT * FROM deleted</literal>
+ will properly mark table <literal>t1</literal> as stale.
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ <emphasis>Transaction Rollback Behavior:</emphasis> Within explicit transactions, tables
+ are only marked as stale in shared memory when the transaction is committed. If the
+ transaction is rolled back, no tables are marked, since no actual data modification
+ occurred on replicas. This prevents rolled-back transactions from unnecessarily
+ disabling load balancing. For autocommit statements (outside explicit transactions),
+ tables are marked immediately upon command completion.
+ </para>
+ </sect3>
+
+ </sect2>
+
</sect1>
diff --git a/src/Makefile.am b/src/Makefile.am
index 4678ab53055e828a37b6477801640aff17ff84a7..39588af58deba045dffc01ae932115b8a9dbfcf2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \
rewrite/pool_timestamp.c \
rewrite/pool_lobj.c \
utils/pool_select_walker.c \
+ utils/pool_track_table_mutation.c \
utils/strlcpy.c \
utils/psprintf.c \
utils/pool_params.c \
diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c
index 68abb7f41cb96d856c824a148842748bfb7a4d12..623d8751677fd6f39d0e12f0e3e899171890f6e0 100644
--- a/src/config/pool_config_variables.c
+++ b/src/config/pool_config_variables.c
@@ -290,6 +290,7 @@ static const struct config_enum_entry disable_load_balance_on_write_options[] =
{"trans_transaction", DLBOW_TRANS_TRANSACTION, false},
{"always", DLBOW_ALWAYS, false},
{"dml_adaptive", DLBOW_DML_ADAPTIVE, false},
+ {"dml_adaptive_global", DLBOW_DML_ADAPTIVE_GLOBAL, false},
{NULL, 0, false}
};
@@ -1757,6 +1758,17 @@ static struct config_int_array ConfigureNamesIntArray[] =
static struct config_double ConfigureNamesDouble[] =
{
+ {
+ {"track_table_mutation_ttl_factor", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "TTL multiplier for track table mutation (TTL = replication_delay * factor)",
+ CONFIG_VAR_TYPE_DOUBLE, false, 0
+ },
+ &g_pool_config.track_table_mutation_ttl_factor,
+ 5.0, /* boot value: 5x replication delay */
+ 1.0, 100.0, /* min, max */
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_DOUBLE
};
@@ -2355,6 +2367,61 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"track_table_mutation_cold_start_duration", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Duration in milliseconds to force queries to primary after child process starts.",
+ CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS
+ },
+ &g_pool_config.track_table_mutation_cold_start_duration,
+ 2000, /* 2 seconds */
+ 0, 60000, /* 0 to 60 seconds */
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_table_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for track table mutation.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_table_buckets,
+ 1024,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_table_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in track table mutation.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_table_size,
+ 2048,
+ 128, 131072,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_query_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_query_buckets,
+ 2048,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_query_parse_cache_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_query_parse_cache_size,
+ 10000,
+ 100, 1000000,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_INT
};
diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c
index 7cf9813eb7d58678bc86a0aaa38bd3c6445b6687..aa123222eccaa8505f984dbe3224958fc79424c8 100644
--- a/src/context/pool_query_context.c
+++ b/src/context/pool_query_context.c
@@ -29,6 +29,7 @@
#include "utils/statistics.h"
#include "utils/pool_select_walker.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_session_context.h"
#include "context/pool_query_context.h"
#include "parser/nodes.h"
@@ -1828,7 +1829,7 @@ is_in_list(char *name, List *list)
static bool
is_select_object_in_temp_write_list(Node *node, void *context)
{
- if (node == NULL || pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
+ if (node == NULL || !DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
return false;
if (IsA(node, RangeVar))
@@ -1836,7 +1837,7 @@ is_select_object_in_temp_write_list(Node *node, void *context)
RangeVar *rgv = (RangeVar *) node;
POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context->is_in_transaction)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && session_context->is_in_transaction)
{
ereport(DEBUG1,
(errmsg("is_select_object_in_temp_write_list: \"%s\", found relation \"%s\"", (char *) context, rgv->relname)));
@@ -1880,7 +1881,7 @@ static char *get_associated_object_from_dml_adaptive_relations
void
check_object_relationship_list(char *name, bool is_func_name)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && pool_config->parsed_dml_adaptive_object_relationship_list)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && pool_config->parsed_dml_adaptive_object_relationship_list)
{
POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
@@ -1944,7 +1945,7 @@ add_object_into_temp_write_list(Node *node, void *context)
static void
dml_adaptive(Node *node, char *query)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
/* Set/Unset transaction status flags */
if (IsA(node, TransactionStmt))
@@ -1963,6 +1964,34 @@ dml_adaptive(Node *node, char *query)
}
else if (is_commit_or_rollback_query(node))
{
+ /*
+ * For dml_adaptive_global: on COMMIT, flush the accumulated
+ * table writes to shared memory. On ROLLBACK, skip — the
+ * writes never committed so no stale-read risk exists.
+ * This prevents attackers from polluting the table map with
+ * rolled-back transactions.
+ */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL &&
+ is_commit_query(node) &&
+ session_context->transaction_temp_write_list != NIL)
+ {
+ ListCell *cell;
+ int dboid;
+
+ dboid = pool_track_table_mutation_get_database_oid();
+ if (dboid > 0)
+ {
+ foreach(cell, session_context->transaction_temp_write_list)
+ {
+ char *table_name = (char *) lfirst(cell);
+ int table_oid = pool_table_name_to_oid(table_name);
+
+ if (table_oid > 0)
+ pool_track_table_mutation_mark_table_written(table_oid, dboid);
+ }
+ }
+ }
+
session_context->is_in_transaction = false;
if (session_context->transaction_temp_write_list != NIL)
@@ -2010,6 +2039,18 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
if (dest == POOL_PRIMARY)
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+
+ /*
+ * Resolve table and database OIDs now to populate relcache.
+ * This avoids potential hangs in CommandComplete where we shouldn't
+ * be running new queries against the backend.
+ */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ int *oids;
+ pool_extract_table_oids(node, &oids);
+ pool_track_table_mutation_get_database_oid();
+ }
}
/* Should be sent to both primary and standby? */
else if (dest == POOL_BOTH)
@@ -2139,6 +2180,107 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
}
+ /*
+ * Check track table mutation for recently written tables.
+ * If in cold start or any table was recently written,
+ * route to primary to avoid stale reads.
+ */
+ else if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ bool force_primary = false;
+
+ /* During cold start, route everything to primary */
+ if (pool_track_table_mutation_in_cold_start())
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because of track table mutation cold start"),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ }
+ else
+ {
+ /* Extract table oids and check if any are stale */
+ SelectContext ctx;
+ int dboid;
+ int num_oids;
+ int i;
+
+ memset(&ctx, 0, sizeof(ctx));
+ num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
+ if (num_oids > 0)
+ {
+ dboid = pool_track_table_mutation_get_database_oid();
+
+ if (dboid <= 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because database oid was unavailable"),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ }
+ else
+ {
+ for (i = 0; i < num_oids; i++)
+ {
+ if (pool_track_table_mutation_table_is_stale(ctx.table_oids[i], dboid))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because table \"%s\" was recently written",
+ ctx.table_names[i]),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (force_primary)
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ else
+ {
+ /* Proceed with load balancing */
+ if (pool_config->statement_level_load_balance)
+ {
+ session_context->load_balance_node_id = select_load_balancing_node();
+ }
+
+ /*
+ * As streaming replication delay is too much, if
+ * prefer_lower_delay_standby is true then elect new load
+ * balance node which is lowest delayed, false then send
+ * to the primary.
+ */
+ if (STREAM && check_replication_delay(session_context->load_balance_node_id))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because of too much replication delay"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ if (pool_config->prefer_lower_delay_standby)
+ {
+ int new_load_balancing_node = select_load_balancing_node();
+
+ session_context->load_balance_node_id = new_load_balancing_node;
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id);
+ }
+ else
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ }
+ else
+ {
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context,
+ session_context->query_context->load_balance_node_id);
+ }
+ }
+ }
else
{
if (pool_config->statement_level_load_balance)
diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c
index ded41c7fc64ceba1d1fafd6f4a9f10a750872374..a9596561a7e0265e928b957a2766f46fb4e9ebaa 100644
--- a/src/context/pool_session_context.c
+++ b/src/context/pool_session_context.c
@@ -532,7 +532,7 @@ dump_sent_message(char *caller, POOL_SENT_MESSAGE *m)
static void
dml_adaptive_init(void)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
session_context->is_in_transaction = false;
session_context->transaction_temp_write_list = NIL;
@@ -542,7 +542,7 @@ dml_adaptive_init(void)
static void
dml_adaptive_destroy(void)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && session_context)
{
if (session_context->transaction_temp_write_list != NIL)
list_free_deep(session_context->transaction_temp_write_list);
@@ -738,10 +738,10 @@ void
pool_set_writing_transaction(void)
{
/*
- * If disable_transaction_on_write is 'off' or 'dml_adaptive', then never
- * turn on writing transaction flag.
+ * If disable_load_balance_on_write is 'off' or 'dml_adaptive' or
+ * 'dml_adaptive_global', then never turn on writing transaction flag.
*/
- if (pool_config->disable_load_balance_on_write != DLBOW_OFF && pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
+ if (pool_config->disable_load_balance_on_write != DLBOW_OFF && !DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
pool_get_session_context(false)->writing_transaction = true;
ereport(DEBUG5,
diff --git a/src/include/pool.h b/src/include/pool.h
index ea6f87e120af866b8ed3a15790d9d8a8e009fe91..7168c1aea877856b5978de332ad636325eb9c30c 100644
--- a/src/include/pool.h
+++ b/src/include/pool.h
@@ -424,7 +424,7 @@ typedef enum
#define Min(x, y) ((x) < (y) ? (x) : (y))
-#define MAX_NUM_SEMAPHORES 8
+#define MAX_NUM_SEMAPHORES 10
#define CONN_COUNTER_SEM 0
#define REQUEST_INFO_SEM 1
#define QUERY_CACHE_STATS_SEM 2
@@ -434,6 +434,8 @@ typedef enum
#define FOLLOW_PRIMARY_SEM 6
#define MAIN_EXIT_HANDLER_SEM 7 /* used in exit_hander in pgpool main
* process */
+#define TRACK_TABLE_MUTATION_TABLE_SEM 8
+#define TRACK_TABLE_MUTATION_QUERY_SEM 9
#define MAX_REQUEST_QUEUE_SIZE 10
#define MAX_SEC_WAIT_FOR_CLUSTER_TRANSACTION 10 /* time in seconds to keep
diff --git a/src/include/pool_config.h b/src/include/pool_config.h
index 741de6cc5fc3368f813d6b6efa68eb7f8a79506b..8798b86eb3620ab36be733bb60bbb8464b0063c8 100644
--- a/src/include/pool_config.h
+++ b/src/include/pool_config.h
@@ -105,9 +105,13 @@ typedef enum DLBOW_OPTION
DLBOW_TRANSACTION,
DLBOW_TRANS_TRANSACTION,
DLBOW_ALWAYS,
- DLBOW_DML_ADAPTIVE
+ DLBOW_DML_ADAPTIVE,
+ DLBOW_DML_ADAPTIVE_GLOBAL
} DLBOW_OPTION;
+#define DLBOW_IS_DML_ADAPTIVE(opt) \
+ ((opt) == DLBOW_DML_ADAPTIVE || (opt) == DLBOW_DML_ADAPTIVE_GLOBAL)
+
typedef enum RELQTARGET_OPTION
{
RELQTARGET_PRIMARY = 1,
@@ -365,6 +369,15 @@ typedef struct
* replication check */
char *replication_delay_source_cmd; /* external command for replication delay */
int replication_delay_source_timeout; /* timeout for external command in seconds */
+
+ /* Track table mutation configuration for tracking recently written tables */
+ double track_table_mutation_ttl_factor; /* TTL multiplier for replication delay */
+ int track_table_mutation_cold_start_duration; /* Cold start duration in ms */
+ int track_table_mutation_table_buckets; /* Number of hash buckets for table map */
+ int track_table_mutation_table_size; /* Max entries in table map */
+ int track_table_mutation_query_buckets; /* Number of hash buckets for query cache */
+ int track_table_mutation_query_parse_cache_size; /* Max entries in query parse cache */
+
char *failover_command; /* execute command when failover happens */
char *follow_primary_command; /* execute command when failover is
* ended */
diff --git a/src/include/utils/pool_track_table_mutation.h b/src/include/utils/pool_track_table_mutation.h
new file mode 100644
index 0000000000000000000000000000000000000000..5cd5d4ef409645fe77e3bb02239e140456de0554
--- /dev/null
+++ b/src/include/utils/pool_track_table_mutation.h
@@ -0,0 +1,237 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_track_table_mutation.h: In-memory tracking of recently written tables
+ * to avoid stale reads from replicas during replication lag
+ */
+
+#ifndef POOL_TRACK_TABLE_MUTATION_H
+#define POOL_TRACK_TABLE_MUTATION_H
+
+#include "pool.h"
+#include <sys/time.h>
+
+/*
+ * Maximum table name length including schema: "schema"."table"
+ * Using NAMEDATALEN * 2 + 4 for quotes and dot
+ */
+#define TRACK_TABLE_MUTATION_TABLE_NAME_LEN (NAMEDATALEN * 2 + 4)
+
+/*
+ * Maximum number of tables we track per query
+ */
+#define TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY 8
+
+/*
+ * Invalid index marker for linked lists
+ */
+#define TRACK_TABLE_MUTATION_INVALID_INDEX (-1)
+
+/*
+ * Default TTL in microseconds (100ms) used when replication delay is unknown
+ */
+#define TRACK_TABLE_MUTATION_DEFAULT_TTL_US (100 * 1000)
+
+/*
+ * Entry in the table mutation hash table (keyed by table/database oids)
+ */
+typedef struct TrackTableMutationEntry
+{
+ int table_oid; /* Table oid */
+ int dboid; /* Database oid */
+ struct timeval last_write_time; /* When the table was last written */
+ uint32 hash; /* Pre-computed hash value */
+ int next; /* Next entry in collision chain (-1 if none) */
+ bool in_use; /* Is this entry in use? */
+} TrackTableMutationEntry;
+
+/*
+ * Header for the table mutation hash table in shared memory
+ */
+typedef struct TrackTableMutationHashTable
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * TrackTableMutationEntry entries[max_entries];
+ */
+} TrackTableMutationHashTable;
+
+/*
+ * Entry in the query parse cache
+ */
+typedef struct QueryParseEntry
+{
+ uint64 query_hash; /* Hash of normalized query */
+ bool is_write; /* True if INSERT/UPDATE/DELETE */
+ int num_tables; /* Number of tables in query */
+ char table_names[TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY][TRACK_TABLE_MUTATION_TABLE_NAME_LEN];
+ int next; /* Next entry in collision chain */
+ int lru_prev; /* Previous in LRU list */
+ int lru_next; /* Next in LRU list */
+ bool in_use; /* Is this entry in use? */
+} QueryParseEntry;
+
+/*
+ * Header for the query parse cache in shared memory
+ */
+typedef struct QueryParseCache
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ int lru_head; /* Most recently used */
+ int lru_tail; /* Least recently used */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * QueryParseEntry entries[max_entries];
+ */
+} QueryParseCache;
+
+/*
+ * Global state for track table mutation feature
+ */
+typedef struct TrackTableMutationState
+{
+ bool initialized; /* Has shared memory been initialized? */
+ uint64 current_ttl_us; /* Current TTL in microseconds */
+ struct timeval ttl_last_updated; /* When TTL was last updated */
+ struct timeval last_cleanup_time; /* When last expired cleanup ran */
+ struct timeval global_cold_start_until; /* Global cold start end time */
+ volatile uint32 stats_queries_checked; /* Number of queries checked */
+ volatile uint32 stats_forced_primary; /* Queries forced to primary */
+ volatile uint32 stats_allowed_replica; /* Queries allowed to replica */
+} TrackTableMutationState;
+
+/*
+ * Main shared memory structure containing all components
+ */
+typedef struct TrackTableMutationShmem
+{
+ TrackTableMutationState state;
+ TrackTableMutationHashTable *table_map;
+ QueryParseCache *query_cache;
+} TrackTableMutationShmem;
+
+/* ----------------
+ * Public API functions
+ * ----------------
+ */
+
+/*
+ * Initialize shared memory structures for track table mutation.
+ * Called from pgpool_main.c after pool_init_pool_info().
+ */
+extern void pool_track_table_mutation_init(void);
+
+/*
+ * Initialize per-child process state for track table mutation.
+ * Called from child.c when a new child process starts.
+ * Sets up cold start tracking.
+ */
+extern void pool_track_table_mutation_child_init(void);
+
+/*
+ * Check if the child process is in cold start period.
+ * During cold start, all queries are routed to primary.
+ * Returns true if in cold start, false otherwise.
+ */
+extern bool pool_track_table_mutation_in_cold_start(void);
+
+/*
+ * Trigger a global cold start period for all processes.
+ * Used after watchdog leader change to avoid stale reads.
+ */
+extern void pool_track_table_mutation_trigger_global_cold_start(void);
+
+/*
+ * Get oid of current database.
+ */
+extern int pool_track_table_mutation_get_database_oid(void);
+
+/*
+ * Check if a table was recently written to (is "stale").
+ * If stale, reads from this table should go to primary.
+ * Returns true if table is stale (recently written), false otherwise.
+ */
+extern bool pool_track_table_mutation_table_is_stale(int table_oid, int dboid);
+
+/*
+ * Mark tables as recently written.
+ * Called after INSERT/UPDATE/DELETE queries complete.
+ * table_oids: array of table oids
+ * num_tables: number of tables in array
+ * dboid: database oid
+ */
+extern void pool_track_table_mutation_mark_tables_written(const int *table_oids, int num_tables, int dboid);
+
+/*
+ * Convenience function to mark a single table as written.
+ * table_oid: table oid
+ * dboid: database oid
+ */
+extern void pool_track_table_mutation_mark_table_written(int table_oid, int dboid);
+
+/*
+ * Update the TTL based on current replication delay.
+ * Called from pool_worker_child.c when replication delay is updated.
+ * delay_us: replication delay in microseconds
+ */
+extern void pool_track_table_mutation_update_ttl(uint64 delay_us);
+
+/*
+ * Look up cached parse result for a query.
+ * hash: hash of normalized query
+ * is_write: output - true if query is a write
+ * table_names: output - array to fill with table names
+ * num_tables: output - number of tables found
+ * Returns true if found in cache, false otherwise.
+ */
+extern bool pool_track_table_mutation_get_cached_parse(uint64 hash, bool *is_write,
+ char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int *num_tables);
+
+/*
+ * Cache a parse result for a query.
+ * hash: hash of normalized query
+ * is_write: true if query is a write
+ * table_names: array of table names
+ * num_tables: number of tables
+ */
+extern void pool_track_table_mutation_cache_parse(uint64 hash, bool is_write,
+ const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int num_tables);
+
+/*
+ * Normalize a query and compute its hash.
+ * Strips comments, normalizes whitespace and literals.
+ * query: input SQL query string
+ * Returns: 64-bit hash of normalized query
+ */
+extern uint64 pool_track_table_mutation_normalize_and_hash(const char *query);
+
+/*
+ * Calculate required shared memory size for track table mutation.
+ */
+extern Size pool_track_table_mutation_shmem_size(void);
+
+#endif /* POOL_TRACK_TABLE_MUTATION_H */
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index fa05e15e7ac435e072298063f918c70aa4e5680c..b88b0478cb150f89bd9b6b8ab38db0d6912fddd0 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -57,6 +57,7 @@
#include "auth/pool_passwd.h"
#include "auth/pool_hba.h"
#include "query_cache/pool_memqcache.h"
+#include "utils/pool_track_table_mutation.h"
#include "watchdog/wd_internal_commands.h"
#include "watchdog/wd_lifecheck.h"
#include "watchdog/watchdog.h"
@@ -1485,11 +1486,14 @@ sigusr1_interrupt_processor(void)
if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
{
+ WD_STATES wd_state;
+
ereport(LOG,
(errmsg("Pgpool-II parent process received watchdog state change signal from watchdog")));
user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
- if (wd_internal_get_watchdog_local_node_state() == WD_STANDBY)
+ wd_state = wd_internal_get_watchdog_local_node_state();
+ if (wd_state == WD_STANDBY)
{
ereport(LOG,
(errmsg("we have joined the watchdog cluster as STANDBY node"),
@@ -1503,6 +1507,10 @@ sigusr1_interrupt_processor(void)
*/
pool_release_follow_primary_lock(true);
}
+ else if (wd_state == WD_COORDINATOR && pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_trigger_global_cold_start();
+ }
}
if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT])
{
@@ -3068,6 +3076,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size()));
}
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ size += MAXALIGN(pool_track_table_mutation_shmem_size());
+ elog(DEBUG1, "track_table_mutation: %zu bytes requested for shared memory", MAXALIGN(pool_track_table_mutation_shmem_size()));
+ }
+
initialize_shared_memory_main_segment(size);
/* Move the backend descriptors to shared memory */
@@ -3184,6 +3198,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
wd_ipc_initialize_data();
}
+ /* Initialize track table mutation for tracking recently written tables */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_init();
+ }
+
}
/*
diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c
index a3b8f0ea194ffecc79e58566be80562a46eb75ab..b15db53248433cb3112246274ed771b79abe1392 100644
--- a/src/protocol/CommandComplete.c
+++ b/src/protocol/CommandComplete.c
@@ -38,6 +38,8 @@
#include "utils/palloc.h"
#include "utils/memutils.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
+#include "query_cache/pool_memqcache.h"
static int extract_ntuples(char *message);
static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete);
@@ -304,6 +306,29 @@ handle_query_context(POOL_CONNECTION_POOL *backend)
node = session_context->query_context->parse_tree;
+ /*
+ * Track table writes for dml_adaptive_global feature.
+ * For autocommit statements (not in explicit transaction), mark tables
+ * immediately. For explicit transactions, marking is deferred to COMMIT
+ * in dml_adaptive() so that ROLLBACKed writes don't pollute the shared
+ * memory table map.
+ */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL &&
+ node != NULL && !session_context->is_in_transaction)
+ {
+ int *oids;
+ int num_oids;
+
+ num_oids = pool_extract_table_oids(node, &oids);
+ if (num_oids > 0)
+ {
+ int dboid = pool_track_table_mutation_get_database_oid();
+
+ if (dboid > 0)
+ pool_track_table_mutation_mark_tables_written(oids, num_oids, dboid);
+ }
+ }
+
if (IsA(node, PrepareStmt))
{
if (session_context->uncompleted_message)
diff --git a/src/protocol/child.c b/src/protocol/child.c
index c34f057281be62feaf39db1bb605062f56dc398c..d53f571421968bd789d0b55f97e0a1eb68a813e5 100644
--- a/src/protocol/child.c
+++ b/src/protocol/child.c
@@ -57,6 +57,7 @@
#include "utils/elog.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -213,6 +214,12 @@ do_child(int *fds)
/* Initialize per process context */
pool_init_process_context();
+ /* Initialize track table mutation child state for cold start tracking */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_child_init();
+ }
+
/* initialize connection pool */
if (pool_init_cp())
{
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index 47b5c8f98a5b4c92d675840eea88f7e03bb18b4c..75fc7508480d79aacc281dd5e624f9e34a998833 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -1461,7 +1461,7 @@ Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
pool_where_to_send(query_context, query_context->original_query,
query_context->parse_tree);
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && strlen(name) != 0)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) && strlen(name) != 0)
pool_setall_node_to_be_sent(query_context);
if (REPLICATION)
@@ -1804,7 +1804,7 @@ Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
return POOL_END;
}
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE &&
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) &&
TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T')
{
pool_where_to_send(query_context, query_context->original_query,
diff --git a/src/query_cache/pool_memqcache.c b/src/query_cache/pool_memqcache.c
index f38f711469576342ce59469b085c97365116004c..dca93334e9e47bb7978064edece5ca0e40021ce3 100644
--- a/src/query_cache/pool_memqcache.c
+++ b/src/query_cache/pool_memqcache.c
@@ -1305,6 +1305,12 @@ pool_extract_table_oids(Node *node, int **oidsp)
}
return num_oids;
}
+ else if (IsA(node, MergeStmt))
+ {
+ MergeStmt *stmt = (MergeStmt *) node;
+
+ table = make_table_name_from_rangevar(stmt->relation);
+ }
else if (IsA(node, ExplainStmt))
{
ListCell *cell;
diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream
index 454fdb9e5d1fd65437b6a67f12ab62658ea08f49..de99a7a97ba4a1a03cb3d5589d55ea61cb6e51fa 100644
--- a/src/sample/pgpool.conf.sample-stream
+++ b/src/sample/pgpool.conf.sample-stream
@@ -478,6 +478,14 @@ backend_clustering_mode = streaming_replication
# modified within the current explicit transaction will
# not be load balanced until the end of the transaction.
#
+ # dml_adaptive_global:
+ # Superset of dml_adaptive. In addition to per-transaction
+ # tracking, uses shared memory to track recently written
+ # tables across all sessions. Reads from recently written
+ # tables are routed to primary until a TTL (based on
+ # replication delay) expires. Requires additional shared
+ # memory. See track_table_mutation_* parameters below.
+ #
# always:
# if a write query is issued, read queries will
# not be load balanced until the session ends.
@@ -499,6 +507,46 @@ backend_clustering_mode = streaming_replication
#statement_level_load_balance = off
# Enables statement level load balancing
+# - Track Table Mutation (used by dml_adaptive_global) -
+ # WARNING: dml_adaptive_global increases shared memory usage
+ # Default settings require ~6.4 MB shared memory
+ # (0.1 MB table tracking + 6.3 MB query cache)
+
+#track_table_mutation_ttl_factor = 5.0
+ # TTL multiplier: TTL = replication_delay * factor
+ # Higher values provide more safety margin
+ # Range: 1.0-100.0 (default: 5.0)
+ # (change requires reload)
+
+#track_table_mutation_cold_start_duration = 2000
+ # Duration in milliseconds to route all queries to primary
+ # after child process starts (cold start period)
+ # Range: 0-60000 ms (default: 2000 ms = 2 seconds)
+ # Set to 0 to disable cold start behavior
+ # (change requires reload)
+
+#track_table_mutation_table_buckets = 1024
+ # Number of hash buckets for track table mutation
+ # Higher values reduce hash collisions
+ # Range: 64-65536 (default: 1024)
+ # (change requires restart)
+
+#track_table_mutation_table_size = 2048
+ # Maximum number of tables to track simultaneously
+ # Range: 128-131072 (default: 2048)
+ # (change requires restart)
+
+#track_table_mutation_query_buckets = 2048
+ # Number of hash buckets for query parse cache
+ # Range: 64-65536 (default: 2048)
+ # (change requires restart)
+
+#track_table_mutation_query_parse_cache_size = 10000
+ # Maximum number of query parse results to cache
+ # Range: 100-1000000 (default: 10000)
+ # Memory usage: ~640 bytes per entry (~6.3 MB default, ~64 MB for 100000)
+ # (change requires restart)
+
#------------------------------------------------------------------------------
# STREAMING REPLICATION MODE
#------------------------------------------------------------------------------
diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c
index 311b638658e66ebb56162ad9fa4392315b2df64e..347f88a88688309b298311a282fe1c1ef2aa0f73 100644
--- a/src/streaming_replication/pool_worker_child.c
+++ b/src/streaming_replication/pool_worker_child.c
@@ -58,6 +58,7 @@
#include "utils/pool_ip.h"
#include "utils/ps_status.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -695,6 +696,7 @@ check_replication_time_lag_with_cmd(void)
double delay_ms;
uint64 delay;
uint64 delay_threshold_by_time;
+ uint64 max_delay_us = 0; /* Track maximum delay for table mutation map */
int token_count = 0;
int primary_node_id;
int save_errno;
@@ -1003,6 +1005,10 @@ check_replication_time_lag_with_cmd(void)
bkinfo->standby_delay = delay;
bkinfo->standby_delay_by_time = true;
+ /* Track maximum delay for table mutation map TTL calculation */
+ if (delay > max_delay_us)
+ max_delay_us = delay;
+
/*
* Log delay if necessary. threshold is in milliseconds, convert
* to microseconds.
@@ -1021,6 +1027,10 @@ check_replication_time_lag_with_cmd(void)
token = strtok_r(NULL, " \t\n", &saveptr);
}
+ /* Update track table mutation TTL based on maximum observed delay */
+ if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE_GLOBAL && max_delay_us > 0)
+ pool_track_table_mutation_update_ttl(max_delay_us);
+
}
PG_CATCH();
{
diff --git a/src/test/regression/libs.sh b/src/test/regression/libs.sh
index 7c5a0c1821191a572430b658d80ab34554110363..1c8ae392daa10056119c09c7127e839d859d700d 100644
--- a/src/test/regression/libs.sh
+++ b/src/test/regression/libs.sh
@@ -42,6 +42,8 @@ function wait_for_failover_done {
function clean_all {
pgrep pgpool | xargs kill -9 > /dev/null 2>&1
pgrep postgres | xargs kill -9 > /dev/null 2>&1
+ # Clean up leaked SysV IPC resources left behind by kill -9
+ ipcrm --all 2>/dev/null || true
rm -f $PGSOCKET_DIR/.s.PGSQL.*
netstat -t -p 2>/dev/null|grep pgpool
}
diff --git a/src/test/regression/tests/042.track_table_mutation/test.sh b/src/test/regression/tests/042.track_table_mutation/test.sh
new file mode 100755
index 0000000000000000000000000000000000000000..8b4dd17b820d36e3fc48216ac7f0544cbf0f5a9c
--- /dev/null
+++ b/src/test/regression/tests/042.track_table_mutation/test.sh
@@ -0,0 +1,354 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# test script for track table mutation feature (in-memory table tracking).
+# Tests routing of queries based on recently written tables.
+#
+source $TESTLIBS
+TESTDIR=testdir
+PSQL=$PGBIN/psql
+PSQLOPTS="-a -q -X"
+PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin
+export PGDATABASE=test
+
+# Only run in streaming replication mode since that's the target use case
+for mode in s
+do
+ rm -fr $TESTDIR
+ mkdir $TESTDIR
+ cd $TESTDIR
+
+ # Create test environment with 2 nodes
+ echo -n "creating test environment..."
+ $PGPOOL_SETUP -m $mode -n 2 || exit 1
+ echo "done."
+
+ source ./bashrc.ports
+
+ # Configure track table mutation feature via dml_adaptive_global
+ echo "disable_load_balance_on_write = 'dml_adaptive_global'" >> etc/pgpool.conf
+ echo "track_table_mutation_ttl_factor = 5.0" >> etc/pgpool.conf
+ echo "track_table_mutation_cold_start_duration = 10000" >> etc/pgpool.conf
+
+ # Enable load balancing explicitly
+ echo "load_balance_mode = on" >> etc/pgpool.conf
+
+ # Configure weights so we can distinguish routing
+ # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1
+ # This means load balanced queries go to node 1 by default
+ echo "backend_weight0 = 0" >> etc/pgpool.conf
+ echo "backend_weight1 = 1" >> etc/pgpool.conf
+
+ # Enable debug logging to see routing decisions
+ echo "log_min_messages = debug1" >> etc/pgpool.conf
+
+ ./startall
+
+ export PGPORT=$PGPOOL_PORT
+ export PGHOST=localhost
+
+ wait_for_pgpool_startup
+
+ # Create test tables
+ $PSQL test <<EOF
+CREATE TABLE t1(i INTEGER);
+CREATE TABLE t2(i INTEGER);
+CREATE TABLE t3(i INTEGER);
+EOF
+
+ echo "=== Test 1: Cold Start Routing ==="
+ # During cold start, all queries should go to primary
+ # Restart pgpool to trigger cold start
+ ./shutdownall
+ ./startall
+ wait_for_pgpool_startup
+
+ # Immediately query - should go to primary due to cold start
+ $PSQL test -c "SELECT 'cold_start_test' as marker, * FROM t1;" > /dev/null 2>&1
+
+ # Check log for cold start message (use -a to handle binary log files)
+ if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then
+ echo "Test 1 PASSED: Cold start routing works"
+ else
+ echo "Test 1 FAILED: Cold start routing not detected"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 2: Wait for cold start to end ==="
+ # Wait for cold start period to end (10 seconds).
+ # Use generous margin to avoid flakiness under load (e.g. full regression suite).
+ sleep 12
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Now a clean table query should load balance (go to node 1)
+ $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1
+
+ # After cold start, queries to clean tables should load balance
+ # Check that it did NOT get forced to primary due to track table mutation
+ if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then
+ echo "Test 2 FAILED: Still in cold start after waiting"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 2 PASSED: Cold start ended correctly"
+
+ echo "=== Test 3: Write-then-Read Routing ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Write to t1 and then read - use single connection to ensure same session
+ $PSQL test <<EOF
+INSERT INTO t1 VALUES (1);
+SELECT 'write_read_test' as marker, * FROM t1;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ # Check log for table staleness message
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 3 PASSED: Write-then-read routing works"
+ else
+ echo "Test 3 FAILED: Table staleness not detected after write"
+ # Show relevant log entries for debugging
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 4: Clean Table Still Load Balances ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Read from t2 (never written to) - should load balance
+ $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1
+
+ # Should NOT see track table mutation blocking message for t2
+ if grep -a -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then
+ echo "Test 4 FAILED: Clean table incorrectly marked as stale"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 4 PASSED: Clean tables still load balance"
+
+ echo "=== Test 5: UPDATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Update t2 and then read - use single connection
+ $PSQL test <<EOF
+UPDATE t2 SET i = 999 WHERE i = 0;
+SELECT 'update_test' as marker, * FROM t2;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 5 PASSED: UPDATE marks table as stale"
+ else
+ echo "Test 5 FAILED: UPDATE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 6: DELETE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Delete from t3 and then read - use single connection
+ $PSQL test <<EOF
+DELETE FROM t3 WHERE i = 0;
+SELECT 'delete_test' as marker, * FROM t3;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 6 PASSED: DELETE marks table as stale"
+ else
+ echo "Test 6 FAILED: DELETE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 7: TRUNCATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a fresh table for TRUNCATE test
+ $PSQL test -c "CREATE TABLE t_truncate(i INTEGER);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_truncate VALUES (1), (2), (3);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Truncate and then read - use single connection
+ $PSQL test <<EOF
+TRUNCATE t_truncate;
+SELECT 'truncate_test' as marker, * FROM t_truncate;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 7 PASSED: TRUNCATE marks table as stale"
+ else
+ echo "Test 7 FAILED: TRUNCATE did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 8: WITH Clause (CTE with DELETE) Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a fresh table for WITH test
+ $PSQL test -c "CREATE TABLE t_cte(i INTEGER);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_cte VALUES (1), (2), (3);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Use WITH clause with DELETE, then read from the table
+ $PSQL test <<EOF
+WITH deleted AS (DELETE FROM t_cte WHERE i = 1 RETURNING *)
+SELECT * FROM deleted;
+SELECT 'cte_test' as marker, * FROM t_cte;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 8 PASSED: WITH clause (CTE) marks table as stale"
+ else
+ echo "Test 8 FAILED: WITH clause (CTE) did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ # Test 9: MERGE (PostgreSQL 15+ only)
+ PG_MAJOR_VERSION=$($PSQL -t -c "SELECT substring(version() from 'PostgreSQL ([0-9]+)');" | tr -d ' ')
+ if [ "$PG_MAJOR_VERSION" -ge 15 ] 2>/dev/null; then
+ echo "=== Test 9: MERGE Marks Table as Stale (PostgreSQL $PG_MAJOR_VERSION) ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create tables for MERGE test
+ $PSQL test -c "CREATE TABLE t_merge_target(id INTEGER PRIMARY KEY, val TEXT);" > /dev/null 2>&1
+ $PSQL test -c "CREATE TABLE t_merge_source(id INTEGER, val TEXT);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_merge_target VALUES (1, 'old');" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_merge_source VALUES (1, 'new'), (2, 'insert');" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Use MERGE, then read from the target table
+ $PSQL test <<EOF
+MERGE INTO t_merge_target t
+USING t_merge_source s ON t.id = s.id
+WHEN MATCHED THEN UPDATE SET val = s.val
+WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val);
+SELECT 'merge_test' as marker, * FROM t_merge_target;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 9 PASSED: MERGE marks table as stale"
+ else
+ echo "Test 9 FAILED: MERGE did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+ else
+ echo "=== Test 9: MERGE skipped (requires PostgreSQL 15+, have $PG_MAJOR_VERSION) ==="
+ fi
+
+ echo "=== Test 10: ROLLBACK Does NOT Mark Table as Stale ==="
+ # Create a fresh table for rollback test
+ $PSQL test -c "CREATE TABLE t_rollback(i INTEGER);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Write inside a transaction, then rollback
+ $PSQL test <<EOF
+BEGIN;
+INSERT INTO t_rollback VALUES (1);
+ROLLBACK;
+SELECT 'rollback_test' as marker, * FROM t_rollback;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ # Should NOT see t_rollback marked as stale since the write was rolled back
+ if grep -a -q "could not load balance because table.*t_rollback.*was recently written" log/pgpool.log; then
+ echo "Test 10 FAILED: Rolled-back write incorrectly marked table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 10 PASSED: ROLLBACK does not mark table as stale"
+
+ echo "=== Test 11: COMMIT Marks Table as Stale ==="
+ # Create a fresh table for commit test
+ $PSQL test -c "CREATE TABLE t_commit(i INTEGER);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Write inside a transaction, then commit, then read
+ $PSQL test <<EOF
+BEGIN;
+INSERT INTO t_commit VALUES (1);
+COMMIT;
+SELECT 'commit_test' as marker, * FROM t_commit;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 11 PASSED: COMMIT marks table as stale"
+ else
+ echo "Test 11 FAILED: Committed write did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo ""
+ echo "=== All Track Table Mutation Tests PASSED ==="
+
+ ./shutdownall
+
+ cd ..
+done
+
+exit 0
diff --git a/src/test/regression/tests/043.track_table_mutation_watchdog/.gitignore b/src/test/regression/tests/043.track_table_mutation_watchdog/.gitignore
new file mode 100644
index 0000000000000000000000000000000000000000..fcb93d27a7e7e8a5efe6eacfb0f88f6f3c8bc765
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation_watchdog/.gitignore
@@ -0,0 +1,3 @@
+leader
+standby
+*.pid
diff --git a/src/test/regression/tests/043.track_table_mutation_watchdog/leader.conf b/src/test/regression/tests/043.track_table_mutation_watchdog/leader.conf
new file mode 100644
index 0000000000000000000000000000000000000000..945cff9860d0357fbb0e3e9a5643124d916bd9c3
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation_watchdog/leader.conf
@@ -0,0 +1,25 @@
+# leader watchdog config for track_table_mutation watchdog test
+use_watchdog = on
+wd_interval = 1
+wd_priority = 2
+
+hostname0 = 'localhost'
+wd_port0 = 21004
+pgpool_port0 = 11000
+hostname1 = 'localhost'
+wd_port1 = 21104
+pgpool_port1 = 11100
+
+heartbeat_hostname0 = 'localhost'
+heartbeat_port0 = 21005
+heartbeat_hostname1 = 'localhost'
+heartbeat_port1 = 21105
+
+enable_consensus_with_half_votes = on
+
+# Enable track table mutation feature via dml_adaptive_global
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_cold_start_duration = 2000
+
+# Enable debug logging to see feature messages
+log_min_messages = debug1
diff --git a/src/test/regression/tests/043.track_table_mutation_watchdog/standby.conf b/src/test/regression/tests/043.track_table_mutation_watchdog/standby.conf
new file mode 100644
index 0000000000000000000000000000000000000000..a11c3dfca427cf6b246451d067c30b0255b9c4ce
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation_watchdog/standby.conf
@@ -0,0 +1,27 @@
+# standby watchdog config for track_table_mutation watchdog test
+port = 11100
+pcp_port = 11105
+use_watchdog = on
+wd_interval = 1
+wd_priority = 1
+
+hostname0 = 'localhost'
+wd_port0 = 21004
+pgpool_port0 = 11000
+hostname1 = 'localhost'
+wd_port1 = 21104
+pgpool_port1 = 11100
+
+heartbeat_hostname0 = 'localhost'
+heartbeat_port0 = 21005
+heartbeat_hostname1 = 'localhost'
+heartbeat_port1 = 21105
+
+enable_consensus_with_half_votes = on
+
+# Enable track table mutation feature via dml_adaptive_global
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_cold_start_duration = 2000
+
+# Enable debug logging to see feature messages
+log_min_messages = debug1
diff --git a/src/test/regression/tests/043.track_table_mutation_watchdog/test.sh b/src/test/regression/tests/043.track_table_mutation_watchdog/test.sh
new file mode 100755
index 0000000000000000000000000000000000000000..752a6e6aa377fe0c54244975e606648101c98cf8
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation_watchdog/test.sh
@@ -0,0 +1,179 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# test script for track table mutation global cold start on watchdog leader change.
+# Tests that when the watchdog leader changes, the new leader triggers
+# a global cold start to force all queries to primary.
+#
+source $TESTLIBS
+LEADER_DIR=leader
+STANDBY_DIR=standby
+PSQL=$PGBIN/psql
+success_count=0
+
+rm -fr $LEADER_DIR
+rm -fr $STANDBY_DIR
+
+mkdir $LEADER_DIR
+mkdir $STANDBY_DIR
+
+# dir in leader directory
+cd $LEADER_DIR
+
+# create leader environment with streaming replication
+echo -n "creating leader pgpool..."
+$PGPOOL_SETUP -m s -n 2 -p 11000 || exit 1
+echo "leader setup done."
+
+# copy the configurations to standby
+cp -r etc ../$STANDBY_DIR/
+
+source ./bashrc.ports
+cat ../leader.conf >> etc/pgpool.conf
+echo 0 > etc/pgpool_node_id
+
+./startall
+wait_for_pgpool_startup
+
+# back to test root dir
+cd ..
+
+# create standby environment
+mkdir $STANDBY_DIR/log
+echo -n "creating standby pgpool..."
+cat standby.conf >> $STANDBY_DIR/etc/pgpool.conf
+# since we are using the same pgpool-II conf as of leader, change the pid file path
+echo "pid_file_name = '$PWD/pgpool2.pid'" >> $STANDBY_DIR/etc/pgpool.conf
+echo 1 > $STANDBY_DIR/etc/pgpool_node_id
+# start the standby pgpool-II by hand
+$PGPOOL_INSTALL_DIR/bin/pgpool -D -n -f $STANDBY_DIR/etc/pgpool.conf -F $STANDBY_DIR/etc/pcp.conf -a $STANDBY_DIR/etc/pool_hba.conf > $STANDBY_DIR/log/pgpool.log 2>&1 &
+
+# Test 1: Check if leader pgpool-II started correctly
+echo "=== Test 1: Waiting for the pgpool leader... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "I am the cluster leader node. Starting escalation process" $LEADER_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 1 PASSED: Leader brought up successfully."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 1 ]; then
+ echo "Test 1 FAILED: Leader did not start"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 2: Check if standby has successfully joined
+echo "=== Test 2: Waiting for the standby to join cluster... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "successfully joined the watchdog cluster as standby node" $STANDBY_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 2 PASSED: Standby successfully connected."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 2 ]; then
+ echo "Test 2 FAILED: Standby did not join cluster"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 3: Verify track_table_mutation is enabled and working on leader
+echo "=== Test 3: Verify track_table_mutation is enabled ==="
+if grep -a "track_table_mutation: initialized" $LEADER_DIR/log/pgpool.log > /dev/null 2>&1; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 3 PASSED: track_table_mutation initialized on leader"
+else
+ echo "Test 3 FAILED: track_table_mutation not initialized on leader"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 4: Stop leader pgpool and trigger failover
+echo "=== Test 4: Triggering leader failover... ==="
+$PGPOOL_INSTALL_DIR/bin/pgpool -f $LEADER_DIR/etc/pgpool.conf -m f stop
+
+echo "Checking if the Standby pgpool-II detected the leader shutdown..."
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a " is shutting down" $STANDBY_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 4 PASSED: Leader shutdown detected."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 4 ]; then
+ echo "Test 4 FAILED: Leader shutdown not detected"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 5: Verify standby becomes new leader
+echo "=== Test 5: Checking if standby takes over as leader... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "I am the cluster leader node. Starting escalation process" $STANDBY_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 5 PASSED: Standby became the new leader."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 5 ]; then
+ echo "Test 5 FAILED: Standby did not become leader"
+ $PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+ cd $LEADER_DIR && ./shutdownall
+ exit 1
+fi
+
+# Test 6: Verify global cold start was triggered on new leader
+echo "=== Test 6: Checking if global cold start was triggered... ==="
+# The new leader should trigger global cold start when it becomes coordinator
+# Look for the log message that indicates global cold start was triggered
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "track_table_mutation: entering global cold start" $STANDBY_DIR/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 6 PASSED: Global cold start triggered on new leader."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+# Cleanup
+$PGPOOL_INSTALL_DIR/bin/pgpool -f $STANDBY_DIR/etc/pgpool.conf -m f stop 2>/dev/null
+cd $LEADER_DIR
+./shutdownall
+
+echo ""
+echo "$success_count out of 6 successful"
+
+if test $success_count -eq 6
+then
+ echo "=== All Track Table Mutation Watchdog Tests PASSED ==="
+ exit 0
+fi
+
+exit 1
diff --git a/src/utils/pool_track_table_mutation.c b/src/utils/pool_track_table_mutation.c
new file mode 100644
index 0000000000000000000000000000000000000000..27d4f0380d43a237f518c60cdd73aba2ff51b723
--- /dev/null
+++ b/src/utils/pool_track_table_mutation.c
@@ -0,0 +1,1188 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_track_table_mutation.c: In-memory tracking of recently written tables
+ * to avoid stale reads from replicas during replication lag
+ *
+ * Based on the "lagless" architecture from Tailor Brands:
+ * https://medium.com/tailor-tech/using-database-read-replicas-in-distributed-systems-d80eaf6bbf8a
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include "pool.h"
+#include "pool_config.h"
+#include "context/pool_session_context.h"
+#include "utils/pool_track_table_mutation.h"
+#include "utils/elog.h"
+#include "utils/pool_ipc.h"
+#include "utils/palloc.h"
+#include "utils/pool_relcache.h"
+
+#define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_catalog.pg_database WHERE datname = '%s'"
+
+/* ----------------
+ * Local variables
+ * ----------------
+ */
+
+/* Pointer to shared memory structure */
+static TrackTableMutationShmem *track_table_mutation_shmem = NULL;
+
+/* Per-process cold start tracking (not in shared memory) */
+static struct timeval process_start_time;
+static bool cold_start_initialized = false;
+
+/* ----------------
+ * Helper macros for accessing flexible arrays in shared memory
+ * ----------------
+ */
+
+/* Get pointer to bucket array in table map */
+#define TABLE_MAP_BUCKETS(map) \
+ ((int *)((char *)(map) + sizeof(TrackTableMutationHashTable)))
+
+/* Get pointer to entry array in table map */
+#define TABLE_MAP_ENTRIES(map) \
+ ((TrackTableMutationEntry *)((char *)(map) + sizeof(TrackTableMutationHashTable) + \
+ (map)->num_buckets * sizeof(int)))
+
+/* Get pointer to bucket array in query cache */
+#define QUERY_CACHE_BUCKETS(cache) \
+ ((int *)((char *)(cache) + sizeof(QueryParseCache)))
+
+/* Get pointer to entry array in query cache */
+#define QUERY_CACHE_ENTRIES(cache) \
+ ((QueryParseEntry *)((char *)(cache) + sizeof(QueryParseCache) + \
+ (cache)->num_buckets * sizeof(int)))
+
+/* ----------------
+ * Semaphore lock helpers
+ * ----------------
+ */
+
+static inline void
+table_map_lock(void)
+{
+ pool_semaphore_lock(TRACK_TABLE_MUTATION_TABLE_SEM);
+}
+
+static inline void
+table_map_unlock(void)
+{
+ pool_semaphore_unlock(TRACK_TABLE_MUTATION_TABLE_SEM);
+}
+
+static inline void
+query_cache_lock(void)
+{
+ pool_semaphore_lock(TRACK_TABLE_MUTATION_QUERY_SEM);
+}
+
+static inline void
+query_cache_unlock(void)
+{
+ pool_semaphore_unlock(TRACK_TABLE_MUTATION_QUERY_SEM);
+}
+
+/* ----------------
+ * Hash functions
+ * ----------------
+ */
+
+/*
+ * FNV-1a hash for table/database oid pair
+ */
+static uint32
+fnv1a_hash_table_key(int table_oid, int dboid)
+{
+ uint32 hash = 2166136261u; /* FNV offset basis */
+ uint32 data[2];
+ const unsigned char *bytes;
+ size_t i;
+
+ data[0] = (uint32) table_oid;
+ data[1] = (uint32) dboid;
+ bytes = (const unsigned char *) data;
+
+ for (i = 0; i < sizeof(data); i++)
+ {
+ hash ^= bytes[i];
+ hash *= 16777619u; /* FNV prime */
+ }
+
+ return hash;
+}
+
+/*
+ * FNV-1a hash for 64-bit value
+ */
+static uint64
+fnv1a_hash_64(const char *str, size_t len)
+{
+ uint64 hash = 14695981039346656037ULL; /* FNV offset basis for 64-bit */
+ size_t i;
+
+ for (i = 0; i < len; i++)
+ {
+ hash ^= (uint8)str[i];
+ hash *= 1099511628211ULL; /* FNV prime for 64-bit */
+ }
+
+ return hash;
+}
+
+/* ----------------
+ * Time utilities
+ * ----------------
+ */
+
+/*
+ * Get elapsed time in microseconds between two timevals
+ */
+static int64
+elapsed_us(struct timeval *start, struct timeval *end)
+{
+ return ((int64)(end->tv_sec - start->tv_sec) * 1000000) +
+ (end->tv_usec - start->tv_usec);
+}
+
+/*
+ * Get current time
+ */
+static void
+get_current_time(struct timeval *tv)
+{
+ gettimeofday(tv, NULL);
+}
+
+/* ----------------
+ * Database oid lookup
+ * ----------------
+ */
+
+static int
+track_table_mutation_get_database_oid_internal(void)
+{
+ int oid = 0;
+ static POOL_RELCACHE *relcache;
+ POOL_CONNECTION_POOL *backend;
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Safety check: must have shmem initialized */
+ if (track_table_mutation_shmem == NULL)
+ return oid;
+
+ session_context = pool_get_session_context(false);
+ if (session_context == NULL)
+ return oid;
+
+ /* Ensure we have a valid query context */
+ if (session_context->query_context == NULL)
+ return oid;
+
+ backend = session_context->backend;
+ if (backend == NULL || MAIN_CONNECTION(backend) == NULL || MAIN_CONNECTION(backend)->sp == NULL)
+ return oid;
+
+ /* Ensure database name is valid */
+ if (MAIN_CONNECTION(backend)->sp->database == NULL)
+ return oid;
+
+ if (!relcache)
+ {
+ relcache = pool_create_relcache(pool_config->relcache_size,
+ DATABASE_TO_OID_QUERY,
+ int_register_func,
+ int_unregister_func,
+ false);
+ if (relcache == NULL)
+ {
+ ereport(LOG,
+ (errmsg("track_table_mutation: error creating relcache while getting database OID")));
+ return oid;
+ }
+ }
+
+ oid = (int) (intptr_t) pool_search_relcache(relcache, backend,
+ MAIN_CONNECTION(backend)->sp->database);
+ return oid;
+}
+
+int
+pool_track_table_mutation_get_database_oid(void)
+{
+ return track_table_mutation_get_database_oid_internal();
+}
+
+/* ----------------
+ * Table mutation hash table operations
+ * ----------------
+ */
+
+/*
+ * Initialize table mutation hash table
+ */
+static void
+table_map_init(TrackTableMutationHashTable *map, int num_buckets, int max_entries)
+{
+ int *buckets;
+ TrackTableMutationEntry *entries;
+ int i;
+
+ map->num_buckets = num_buckets;
+ map->max_entries = max_entries;
+ map->num_entries = 0;
+ map->free_list_head = 0;
+
+ buckets = TABLE_MAP_BUCKETS(map);
+ entries = TABLE_MAP_ENTRIES(map);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ /* Initialize free list - chain all entries */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ? i + 1 : TRACK_TABLE_MUTATION_INVALID_INDEX;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: initialized table map with %d buckets, %d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Allocate an entry from the free list
+ */
+static int
+table_map_alloc_entry(TrackTableMutationHashTable *map)
+{
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int idx;
+
+ if (map->free_list_head == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ return TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ idx = map->free_list_head;
+ map->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ map->num_entries++;
+
+ return idx;
+}
+
+/*
+ * Free an entry back to the free list
+ */
+static void
+table_map_free_entry(TrackTableMutationHashTable *map, int idx)
+{
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+
+ entries[idx].in_use = false;
+ entries[idx].next = map->free_list_head;
+ map->free_list_head = idx;
+ map->num_entries--;
+}
+
+/*
+ * Look up a table in the hash table
+ * Returns entry index or TRACK_TABLE_MUTATION_INVALID_INDEX if not found
+ * Must be called with lock held
+ */
+static int
+table_map_lookup(TrackTableMutationHashTable *map, int table_oid, int dboid, uint32 hash)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int bucket = hash % map->num_buckets;
+ int idx = buckets[bucket];
+
+ while (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ if (entries[idx].hash == hash &&
+ entries[idx].table_oid == table_oid &&
+ entries[idx].dboid == dboid)
+ {
+ return idx;
+ }
+ idx = entries[idx].next;
+ }
+
+ return TRACK_TABLE_MUTATION_INVALID_INDEX;
+}
+
+/*
+ * Insert or update a table entry
+ * Must be called with lock held
+ */
+static void
+table_map_insert(TrackTableMutationHashTable *map, int table_oid, int dboid,
+ uint32 hash, struct timeval *write_time)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int bucket = hash % map->num_buckets;
+ int idx;
+
+ /* Check if entry already exists */
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ /* Update existing entry */
+ entries[idx].last_write_time = *write_time;
+ return;
+ }
+
+ /* Allocate new entry */
+ idx = table_map_alloc_entry(map);
+ if (idx == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ int b;
+ /* Table is full - evict an entry */
+ /* For simplicity, just use the first entry in first non-empty bucket */
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ if (buckets[b] != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ int victim = buckets[b];
+ buckets[b] = entries[victim].next;
+ table_map_free_entry(map, victim);
+ idx = table_map_alloc_entry(map);
+ break;
+ }
+ }
+
+ if (idx == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ ereport(WARNING,
+ (errmsg("track_table_mutation: failed to allocate entry for table oid %d (dboid %d)",
+ table_oid, dboid)));
+ return;
+ }
+ }
+
+ /* Initialize new entry */
+ entries[idx].table_oid = table_oid;
+ entries[idx].dboid = dboid;
+ entries[idx].hash = hash;
+ entries[idx].last_write_time = *write_time;
+
+ /* Insert at head of bucket chain */
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: marked table oid %d (dboid %d) as written",
+ table_oid, dboid)));
+}
+
+/*
+ * Remove expired entries from the table map
+ * Must be called with lock held
+ */
+static void
+table_map_cleanup_expired(TrackTableMutationHashTable *map, uint64 ttl_us)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ struct timeval now;
+ int removed = 0;
+ int b;
+
+ get_current_time(&now);
+
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ int *prev_ptr = &buckets[b];
+ int idx = buckets[b];
+
+ while (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now);
+
+ if (elapsed > (int64)ttl_us)
+ {
+ /* Entry has expired - remove it */
+ int next = entries[idx].next;
+ *prev_ptr = next;
+ table_map_free_entry(map, idx);
+ idx = next;
+ removed++;
+ }
+ else
+ {
+ prev_ptr = &entries[idx].next;
+ idx = entries[idx].next;
+ }
+ }
+ }
+
+ if (removed > 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: cleaned up %d expired table entries", removed)));
+ }
+}
+
+/* ----------------
+ * Query parse cache operations
+ * ----------------
+ */
+
+/*
+ * Initialize query parse cache
+ */
+static void
+query_cache_init(QueryParseCache *cache, int num_buckets, int max_entries)
+{
+ int *buckets;
+ QueryParseEntry *entries;
+ int i;
+
+ cache->num_buckets = num_buckets;
+ cache->max_entries = max_entries;
+ cache->num_entries = 0;
+ cache->free_list_head = 0;
+ cache->lru_head = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ cache->lru_tail = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ buckets = QUERY_CACHE_BUCKETS(cache);
+ entries = QUERY_CACHE_ENTRIES(cache);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ /* Initialize free list */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ? i + 1 : TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[i].lru_prev = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[i].lru_next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: initialized query cache with %d buckets, %d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Move entry to front of LRU list (most recently used)
+ */
+static void
+query_cache_lru_touch(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ /* Already at head? */
+ if (cache->lru_head == idx)
+ return;
+
+ /* Remove from current position */
+ if (entries[idx].lru_prev != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next;
+ if (entries[idx].lru_next != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev;
+ if (cache->lru_tail == idx)
+ cache->lru_tail = entries[idx].lru_prev;
+
+ /* Insert at head */
+ entries[idx].lru_prev = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[idx].lru_next = cache->lru_head;
+ if (cache->lru_head != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[cache->lru_head].lru_prev = idx;
+ cache->lru_head = idx;
+ if (cache->lru_tail == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Add entry to LRU list (at head)
+ */
+static void
+query_cache_lru_add(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ entries[idx].lru_prev = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[idx].lru_next = cache->lru_head;
+
+ if (cache->lru_head != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[cache->lru_head].lru_prev = idx;
+
+ cache->lru_head = idx;
+
+ if (cache->lru_tail == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Remove entry from LRU list
+ */
+static void
+query_cache_lru_remove(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ if (entries[idx].lru_prev != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next;
+ else
+ cache->lru_head = entries[idx].lru_next;
+
+ if (entries[idx].lru_next != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev;
+ else
+ cache->lru_tail = entries[idx].lru_prev;
+
+ entries[idx].lru_prev = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ entries[idx].lru_next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+}
+
+/*
+ * Allocate entry from free list, evicting LRU if necessary
+ */
+static int
+query_cache_alloc_entry(QueryParseCache *cache)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int *buckets = QUERY_CACHE_BUCKETS(cache);
+ int idx;
+
+ if (cache->free_list_head != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ idx = cache->free_list_head;
+ cache->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+ cache->num_entries++;
+ return idx;
+ }
+
+ /* No free entries - evict LRU */
+ if (cache->lru_tail == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ return TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ idx = cache->lru_tail;
+
+ /* Remove from hash bucket */
+ int bucket = entries[idx].query_hash % cache->num_buckets;
+ int *prev_ptr = &buckets[bucket];
+ int curr = buckets[bucket];
+
+ while (curr != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ if (curr == idx)
+ {
+ *prev_ptr = entries[curr].next;
+ break;
+ }
+ prev_ptr = &entries[curr].next;
+ curr = entries[curr].next;
+ }
+
+ /* Remove from LRU list */
+ query_cache_lru_remove(cache, idx);
+
+ /* Reinitialize entry */
+ entries[idx].in_use = true;
+ entries[idx].next = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ return idx;
+}
+
+/*
+ * Look up a query in the cache
+ */
+static int
+query_cache_lookup(QueryParseCache *cache, uint64 hash)
+{
+ int *buckets = QUERY_CACHE_BUCKETS(cache);
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int bucket = hash % cache->num_buckets;
+ int idx = buckets[bucket];
+
+ while (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ if (entries[idx].query_hash == hash)
+ return idx;
+ idx = entries[idx].next;
+ }
+
+ return TRACK_TABLE_MUTATION_INVALID_INDEX;
+}
+
+/* ----------------
+ * Query normalization
+ * ----------------
+ */
+
+/*
+ * Simple query normalization:
+ * - Strip comments (-- style and C-style block comments)
+ * - Collapse whitespace
+ * - Convert to lowercase (except inside strings)
+ * - Replace literal values with placeholders
+ *
+ * This is a simplified version - pgpool2 already does this elsewhere,
+ * but we need a standalone version for the track table mutation feature.
+ */
+static size_t
+normalize_query(const char *query, char *output, size_t output_size)
+{
+ const char *src = query;
+ char *dst = output;
+ char *dst_end = output + output_size - 1;
+ bool in_string = false;
+ char string_char = 0;
+ bool last_was_space = true; /* Start true to skip leading space */
+
+ while (*src && dst < dst_end)
+ {
+ /* Handle string literals */
+ if (in_string)
+ {
+ if (*src == string_char)
+ {
+ if (*(src + 1) == string_char)
+ {
+ /* Escaped quote */
+ src += 2;
+ continue;
+ }
+ in_string = false;
+ *dst++ = '$'; /* Replace string content with placeholder */
+ }
+ src++;
+ continue;
+ }
+
+ /* Check for string start */
+ if (*src == '\'' || *src == '"')
+ {
+ in_string = true;
+ string_char = *src;
+ src++;
+ continue;
+ }
+
+ /* Handle single-line comments */
+ if (*src == '-' && *(src + 1) == '-')
+ {
+ while (*src && *src != '\n')
+ src++;
+ continue;
+ }
+
+ /* Handle multi-line comments */
+ if (*src == '/' && *(src + 1) == '*')
+ {
+ src += 2;
+ while (*src && !(*src == '*' && *(src + 1) == '/'))
+ src++;
+ if (*src)
+ src += 2;
+ continue;
+ }
+
+ /* Handle whitespace */
+ if (*src == ' ' || *src == '\t' || *src == '\n' || *src == '\r')
+ {
+ if (!last_was_space)
+ {
+ *dst++ = ' ';
+ last_was_space = true;
+ }
+ src++;
+ continue;
+ }
+
+ /* Handle numbers - replace with placeholder */
+ if ((*src >= '0' && *src <= '9') ||
+ (*src == '.' && *(src + 1) >= '0' && *(src + 1) <= '9'))
+ {
+ while (*src && ((*src >= '0' && *src <= '9') || *src == '.'))
+ src++;
+ if (!last_was_space && dst > output && *(dst - 1) != '$')
+ *dst++ = '$';
+ last_was_space = false;
+ continue;
+ }
+
+ /* Regular character - convert to lowercase */
+ if (*src >= 'A' && *src <= 'Z')
+ *dst++ = *src + 32;
+ else
+ *dst++ = *src;
+
+ last_was_space = false;
+ src++;
+ }
+
+ /* Remove trailing space */
+ if (dst > output && *(dst - 1) == ' ')
+ dst--;
+
+ *dst = '\0';
+ return dst - output;
+}
+
+/* ----------------
+ * Public API implementation
+ * ----------------
+ */
+
+Size
+pool_track_table_mutation_shmem_size(void)
+{
+ Size size = 0;
+ int table_buckets = pool_config->track_table_mutation_table_buckets;
+ int table_size = pool_config->track_table_mutation_table_size;
+ int query_buckets = pool_config->track_table_mutation_query_buckets;
+ int query_cache_size = pool_config->track_table_mutation_query_parse_cache_size;
+
+ /* Main structure */
+ size += sizeof(TrackTableMutationShmem);
+
+ /* Table mutation hash table */
+ size += sizeof(TrackTableMutationHashTable);
+ size += table_buckets * sizeof(int); /* buckets array */
+ size += table_size * sizeof(TrackTableMutationEntry); /* entries array */
+
+ /* Query parse cache */
+ size += sizeof(QueryParseCache);
+ size += query_buckets * sizeof(int); /* buckets array */
+ size += query_cache_size * sizeof(QueryParseEntry); /* entries array */
+
+ return size;
+}
+
+void
+pool_track_table_mutation_init(void)
+{
+#ifndef POOL_PRIVATE
+ Size shmem_size;
+ char *shmem_ptr;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: feature disabled")));
+ return;
+ }
+
+ shmem_size = pool_track_table_mutation_shmem_size();
+
+ /*
+ * Allocate from the main shared memory segment.
+ * Memory is already zeroed by initialize_shared_memory_main_segment().
+ */
+ shmem_ptr = pool_shared_memory_segment_get_chunk(shmem_size);
+ if (shmem_ptr == NULL)
+ {
+ ereport(ERROR,
+ (errmsg("track_table_mutation: failed to allocate %zu bytes of shared memory",
+ shmem_size)));
+ return;
+ }
+
+ /* Set up pointers to structures within shared memory */
+ track_table_mutation_shmem = (TrackTableMutationShmem *)shmem_ptr;
+ shmem_ptr += sizeof(TrackTableMutationShmem);
+
+ track_table_mutation_shmem->table_map = (TrackTableMutationHashTable *)shmem_ptr;
+ shmem_ptr += sizeof(TrackTableMutationHashTable);
+ shmem_ptr += pool_config->track_table_mutation_table_buckets * sizeof(int);
+ shmem_ptr += pool_config->track_table_mutation_table_size * sizeof(TrackTableMutationEntry);
+
+ track_table_mutation_shmem->query_cache = (QueryParseCache *)shmem_ptr;
+
+ /* Initialize structures */
+ table_map_init(track_table_mutation_shmem->table_map,
+ pool_config->track_table_mutation_table_buckets,
+ pool_config->track_table_mutation_table_size);
+
+ query_cache_init(track_table_mutation_shmem->query_cache,
+ pool_config->track_table_mutation_query_buckets,
+ pool_config->track_table_mutation_query_parse_cache_size);
+
+ /* Initialize global state */
+ track_table_mutation_shmem->state.initialized = true;
+ track_table_mutation_shmem->state.current_ttl_us = TRACK_TABLE_MUTATION_DEFAULT_TTL_US;
+ get_current_time(&track_table_mutation_shmem->state.ttl_last_updated);
+ get_current_time(&track_table_mutation_shmem->state.last_cleanup_time);
+ track_table_mutation_shmem->state.global_cold_start_until.tv_sec = 0;
+ track_table_mutation_shmem->state.global_cold_start_until.tv_usec = 0;
+ track_table_mutation_shmem->state.stats_queries_checked = 0;
+ track_table_mutation_shmem->state.stats_forced_primary = 0;
+ track_table_mutation_shmem->state.stats_allowed_replica = 0;
+
+ ereport(LOG,
+ (errmsg("track_table_mutation: initialized with %zu bytes shared memory",
+ shmem_size)));
+#endif
+}
+
+void
+pool_track_table_mutation_child_init(void)
+{
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ get_current_time(&process_start_time);
+ cold_start_initialized = true;
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: child initialized, cold start period %d ms",
+ pool_config->track_table_mutation_cold_start_duration)));
+}
+
+bool
+pool_track_table_mutation_in_cold_start(void)
+{
+ struct timeval now;
+ int64 elapsed_ms;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return false;
+
+ if (pool_config->track_table_mutation_cold_start_duration <= 0)
+ return false;
+
+ get_current_time(&now);
+
+ /* Check for watchdog-triggered global cold start first */
+ if (track_table_mutation_shmem->state.global_cold_start_until.tv_sec != 0 &&
+ elapsed_us(&now, &track_table_mutation_shmem->state.global_cold_start_until) > 0)
+ {
+ return true;
+ }
+
+ /* Check per-process cold start */
+ if (!cold_start_initialized)
+ return false;
+
+ elapsed_ms = elapsed_us(&process_start_time, &now) / 1000;
+
+ if (elapsed_ms < pool_config->track_table_mutation_cold_start_duration)
+ {
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: in cold start (%ld/%d ms)",
+ (long)elapsed_ms, pool_config->track_table_mutation_cold_start_duration)));
+ return true;
+ }
+
+ return false;
+}
+
+void
+pool_track_table_mutation_trigger_global_cold_start(void)
+{
+ struct timeval now;
+ int duration_ms;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ duration_ms = pool_config->track_table_mutation_cold_start_duration;
+ if (duration_ms <= 0)
+ return;
+
+ get_current_time(&now);
+ track_table_mutation_shmem->state.global_cold_start_until = now;
+ track_table_mutation_shmem->state.global_cold_start_until.tv_sec += duration_ms / 1000;
+ track_table_mutation_shmem->state.global_cold_start_until.tv_usec += (duration_ms % 1000) * 1000;
+ if (track_table_mutation_shmem->state.global_cold_start_until.tv_usec >= 1000000)
+ {
+ track_table_mutation_shmem->state.global_cold_start_until.tv_sec +=
+ track_table_mutation_shmem->state.global_cold_start_until.tv_usec / 1000000;
+ track_table_mutation_shmem->state.global_cold_start_until.tv_usec %=
+ 1000000;
+ }
+
+ ereport(LOG,
+ (errmsg("track_table_mutation: entering global cold start for %d ms",
+ duration_ms)));
+}
+
+bool
+pool_track_table_mutation_table_is_stale(int table_oid, int dboid)
+{
+ TrackTableMutationHashTable *map;
+ struct timeval now;
+ uint64 ttl_us;
+ uint32 hash;
+ int idx;
+ bool is_stale = false;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return false;
+
+ if (table_oid <= 0 || dboid <= 0)
+ {
+ is_stale = true;
+ goto update_stats;
+ }
+
+ map = track_table_mutation_shmem->table_map;
+ hash = fnv1a_hash_table_key(table_oid, dboid);
+
+ table_map_lock();
+
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ TrackTableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ get_current_time(&now);
+ ttl_us = track_table_mutation_shmem->state.current_ttl_us;
+
+ int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now);
+ is_stale = (elapsed < (int64)ttl_us);
+
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: table oid %d (dboid %d) elapsed=%ld us, ttl=%lu us, stale=%d",
+ table_oid, dboid, (long)elapsed, (unsigned long)ttl_us, is_stale)));
+ }
+
+ table_map_unlock();
+
+update_stats:
+ /* Update statistics - skip if shmem not available */
+ if (track_table_mutation_shmem != NULL)
+ {
+ __sync_fetch_and_add(&track_table_mutation_shmem->state.stats_queries_checked, 1);
+ if (is_stale)
+ __sync_fetch_and_add(&track_table_mutation_shmem->state.stats_forced_primary, 1);
+ else
+ __sync_fetch_and_add(&track_table_mutation_shmem->state.stats_allowed_replica, 1);
+ }
+
+ return is_stale;
+}
+
+void
+pool_track_table_mutation_mark_tables_written(const int *table_oids, int num_tables, int dboid)
+{
+ TrackTableMutationHashTable *map;
+ struct timeval now;
+ int i;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ if (num_tables <= 0 || table_oids == NULL || dboid <= 0)
+ return;
+
+ map = track_table_mutation_shmem->table_map;
+ get_current_time(&now);
+
+ table_map_lock();
+
+ /* Periodically clean up expired entries */
+ if (map->num_entries > map->max_entries * 3 / 4)
+ {
+ /* Limit cleanup frequency to avoid O(N) scan on every write */
+ /* 100ms interval */
+ if (elapsed_us(&track_table_mutation_shmem->state.last_cleanup_time, &now) > 100000)
+ {
+ table_map_cleanup_expired(map, track_table_mutation_shmem->state.current_ttl_us);
+ track_table_mutation_shmem->state.last_cleanup_time = now;
+ }
+ }
+
+ for (i = 0; i < num_tables; i++)
+ {
+ uint32 hash;
+ int table_oid = table_oids[i];
+
+ if (table_oid > 0)
+ {
+ hash = fnv1a_hash_table_key(table_oid, dboid);
+ table_map_insert(map, table_oid, dboid, hash, &now);
+ }
+ }
+
+ table_map_unlock();
+}
+
+/*
+ * Convenience function to mark a single table as written
+ */
+void
+pool_track_table_mutation_mark_table_written(int table_oid, int dboid)
+{
+ if (table_oid > 0 && dboid > 0)
+ {
+ const int tables[1] = { table_oid };
+ pool_track_table_mutation_mark_tables_written(tables, 1, dboid);
+ }
+}
+
+void
+pool_track_table_mutation_update_ttl(uint64 delay_us)
+{
+ uint64 new_ttl;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ /* Calculate new TTL: delay * factor, with minimum of default TTL */
+ new_ttl = (uint64)(delay_us * pool_config->track_table_mutation_ttl_factor);
+ if (new_ttl < TRACK_TABLE_MUTATION_DEFAULT_TTL_US)
+ new_ttl = TRACK_TABLE_MUTATION_DEFAULT_TTL_US;
+
+ /* Maximum TTL of 1 hour */
+ if (new_ttl > 3600ULL * 1000000ULL)
+ new_ttl = 3600ULL * 1000000ULL;
+
+ track_table_mutation_shmem->state.current_ttl_us = new_ttl;
+ get_current_time(&track_table_mutation_shmem->state.ttl_last_updated);
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: updated TTL to %lu us (delay=%lu us, factor=%.1f)",
+ (unsigned long)new_ttl, (unsigned long)delay_us,
+ pool_config->track_table_mutation_ttl_factor)));
+}
+
+bool
+pool_track_table_mutation_get_cached_parse(uint64 hash, bool *is_write,
+ char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int *num_tables)
+{
+ QueryParseCache *cache;
+ int idx;
+ bool found = false;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return false;
+
+ cache = track_table_mutation_shmem->query_cache;
+
+ query_cache_lock();
+
+ idx = query_cache_lookup(cache, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int i;
+
+ *is_write = entries[idx].is_write;
+ *num_tables = entries[idx].num_tables;
+
+ for (i = 0; i < entries[idx].num_tables && i < TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY; i++)
+ {
+ strlcpy(table_names[i], entries[idx].table_names[i], TRACK_TABLE_MUTATION_TABLE_NAME_LEN);
+ }
+
+ /* Move to front of LRU */
+ query_cache_lru_touch(cache, idx);
+ found = true;
+ }
+
+ query_cache_unlock();
+
+ return found;
+}
+
+void
+pool_track_table_mutation_cache_parse(uint64 hash, bool is_write,
+ const char table_names[][TRACK_TABLE_MUTATION_TABLE_NAME_LEN],
+ int num_tables)
+{
+ QueryParseCache *cache;
+ int *buckets;
+ QueryParseEntry *entries;
+ int idx;
+ int bucket;
+
+ if (pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE_GLOBAL || track_table_mutation_shmem == NULL)
+ return;
+
+ cache = track_table_mutation_shmem->query_cache;
+
+ query_cache_lock();
+
+ /* Check if already exists */
+ idx = query_cache_lookup(cache, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ query_cache_unlock();
+ return;
+ }
+
+ /* Allocate new entry (may evict LRU) */
+ idx = query_cache_alloc_entry(cache);
+ if (idx == TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ query_cache_unlock();
+ ereport(WARNING,
+ (errmsg("track_table_mutation: failed to allocate query cache entry")));
+ return;
+ }
+
+ entries = QUERY_CACHE_ENTRIES(cache);
+ buckets = QUERY_CACHE_BUCKETS(cache);
+
+ /* Fill in entry */
+ entries[idx].query_hash = hash;
+ entries[idx].is_write = is_write;
+ entries[idx].num_tables = (num_tables > TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY) ?
+ TRACK_TABLE_MUTATION_MAX_TABLES_PER_QUERY : num_tables;
+
+ {
+ int i;
+ for (i = 0; i < entries[idx].num_tables; i++)
+ {
+ strlcpy(entries[idx].table_names[i], table_names[i], TRACK_TABLE_MUTATION_TABLE_NAME_LEN);
+ }
+ }
+
+ /* Insert into hash bucket */
+ bucket = hash % cache->num_buckets;
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ /* Add to LRU list */
+ query_cache_lru_add(cache, idx);
+
+ query_cache_unlock();
+}
+
+uint64
+pool_track_table_mutation_normalize_and_hash(const char *query)
+{
+ char normalized[8192];
+ size_t len;
+
+ if (query == NULL || query[0] == '\0')
+ return 0;
+
+ len = normalize_query(query, normalized, sizeof(normalized));
+ if (len == 0)
+ return 0;
+
+ return fnv1a_hash_64(normalized, len);
+}
--
2.53.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
In-Reply-To: <CACeKOO0gEfwBQ1J6sdpXDV_=2RYhg9wemE=x=LOEDimvyH5Xfg@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox