public inbox for [email protected]
help / color / mirror / Atom feedFrom: Nadav Shatz <[email protected]>
To: Tatsuo Ishii <[email protected]>
Cc: [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
Date: Sun, 19 Apr 2026 17:29:42 +0300
Message-ID: <CACeKOO1-FUo1E=1iUeQeua7XrVzYVoy2FawQM4d3SG-X5f2mdQ@mail.gmail.com> (raw)
In-Reply-To: <[email protected]>
References: <CACeKOO2VQ1o9TGk77-hfJzwcGeEd0cCx7AqEDXphqkG4xzG+7w@mail.gmail.com>
<[email protected]>
<CACeKOO3k8K0=u9AKmctyFVivrCuMtN5OdcCDXt0ew=qwbrQGcA@mail.gmail.com>
<[email protected]>
Hi Tatsuo,
Rebased onto current master, renumbered the regression tests
(043/044 to avoid collision with the new 042.ssl_reload), and
combined everything into a single commit.
Attached: v2-0001-Feature-load-balancing-control-by-table-tracking.patch
Looking forward to your review.
On Sun, Apr 19, 2026 at 10:25 AM Tatsuo Ishii <[email protected]> wrote:
> > Hi Tatsuo,
> >
> > hank you for the detailed review. Attached patch addresses all items.
>
> I guess the attached patch is on top of
> v1-0001-Feature-load-balancing-control-by-table-tracking.patch. To
> apply v2-0001-address-review.patch, we need to apply
> v1-0001-Feature-load-balancing-control-by-table-tracking.patch first.
> Unfortunately due to recent commit, it does not apply anymore. Can you
> please provide v1 + v2 that are rebased against latest master branch?
> Also 042 regression test is already used by recent commit. Can you
> renumber 042.track_table_mutation and
> 043.track_table_mutation_watchdog to 043.track_table_mutation and
> 044.track_table_mutation_watchdog accordingly?
>
> Looking forward to seeing new patch.
>
> Regards,
> --
> Tatsuo Ishii
> SRA OSS K.K.
> English: http://www.sraoss.co.jp/index_en/
> Japanese:http://www.sraoss.co.jp
>
>
> > memqcache bug fix
> > -----------------
> >
> > Good catch. The root cause: pool_set_writing_transaction() was
> > explicitly skipping dml_adaptive_global, so
> > pool_is_writing_transaction() always returned false in this mode.
> > The query cache fetch guard at pool_proto_modules.c:270
> > (!pool_is_writing_transaction()) then served stale cached results
> > after DML in the same transaction.
> >
> > Fix: pool_set_writing_transaction() now sets the flag for
> > dml_adaptive_global (only 'off' and 'dml_adaptive' skip it). This
> > ensures the query cache is properly bypassed after writes within
> > the same transaction.
> >
> > Removed dead query parse cache code (~700 lines)
> > -------------------------------------------------
> >
> > You're right -- pool_track_table_mutation_get_cached_parse,
> > pool_track_table_mutation_cache_parse, and
> > pool_track_table_mutation_normalize_and_hash were never called.
> > These were leftover from an earlier design where we planned to
> > cache SQL parse results in shared memory. The feature ended up
> > using pgpool's existing parser directly, and this code was never
> > wired up.
> >
> > Removed: QueryParseCache and QueryParseEntry structs, all related
> > static functions, the TRACK_TABLE_MUTATION_QUERY_SEM semaphore,
> > and the track_table_mutation_query_buckets /
> > track_table_mutation_query_parse_cache_size configuration
> > parameters. This also reduces shared memory usage from ~6.4 MB
> > to ~80 KB with default settings.
> >
> > check_object_relationship_list scope
> > -------------------------------------
> >
> > You're correct -- dml_adaptive_global does not use
> > dml_adaptive_object_relationship_list. Changed
> > check_object_relationship_list() to check for DLBOW_DML_ADAPTIVE
> > only, not DLBOW_IS_DML_ADAPTIVE (which includes global).
> >
> > Documentation fixes
> > -------------------
> >
> > - Removed "(Lagless Replica Reads)" from section title and
> > "lagless" language from description.
> >
> > - Described fallback behavior when neither
> > replication_delay_source_cmd nor delay_threshold_by_time is
> > configured (TTL stays at 100ms default minimum).
> >
> > - "query cache" references removed (the query parse cache is gone).
> >
> > - Added 128-table-per-SELECT limit to Limitations section
> > (uses POOL_MAX_SELECT_OIDS).
> >
> > Code style fixes
> > ----------------
> >
> > - DLBOW_IS_DML_ADAPTIVE() calls no longer split across lines.
> >
> > - Split the long errmsg line in
> > is_select_object_in_temp_write_list.
> >
> > - Removed redundant is_adaptive variable in
> > is_select_object_in_temp_write_list (the check at function
> > entry already guarantees it).
> >
> > Thanks!
> >
> > On Wed, Apr 15, 2026 at 1:43 AM Tatsuo Ishii <[email protected]>
> wrote:
> >
> >> Hi Nadav,
> >>
> >> > Hi Tatsuo,
> >> >
> >> > Looks good to me thanks!
> >> >
> >> > Please go ahead with your review. waiting to hear back from you.
> >>
> >> Here are the code review results.
> >>
> >> diff --git a/doc/src/sgml/loadbalance.sgml
> b/doc/src/sgml/loadbalance.sgml
> >> index 9e1e7b39b..7384ce81a 100644
> >> --- a/doc/src/sgml/loadbalance.sgml
> >> +++ b/doc/src/sgml/loadbalance.sgml
> >> :
> >> + <sect2 id="runtime-config-table-mutation-map">
> >> + <title>Table Mutation Map Configuration (Lagless Replica
> Reads)</title>
> >>
> >> "(Lagless Replica Reads)" sounds like an advertisement to me. It
> >> should be removed.
> >>
> >> + <para>
> >> + These parameters configure the track table mutation feature, which
> is
> >> activated by setting
> >> + <xref linkend="guc-disable-load-balance-on-write"> to
> >> <literal>dml_adaptive_global</literal>.
> >> + The feature tracks recently written tables to prevent stale reads
> from
> >> replica nodes during
> >> + replication lag, implementing the "lagless" architecture pattern for
> >> distributed systems
> >> + with read replicas.
> >>
> >> I think the feature does not guarantee "lagless" anytime, in all cases.
> >>
> >> + <para>
> >> + This feature requires time-based replication delay monitoring. This
> >> can be provided by either
> >> + <xref linkend="guc-replication-delay-source-cmd"> (external command
> >> mode) or by setting
> >> + <xref linkend="guc-delay-threshold-by-time"> (which uses
> >> <literal>pg_stat_replication.replay_lag</literal>
> >> + from PostgreSQL 10+). At least one of these must be configured for
> the
> >> TTL calculation to work.
> >>
> >> If one of these is not set, what happens? Error? Need to describe it.
> >>
> >> + </para>
> >> +
> >> + <warning>
> >> + <para>
> >> + Enabling <literal>dml_adaptive_global</literal> increases shared
> >> memory consumption. With default settings,
> >> + the feature requires approximately 6.4 MB of shared memory (0.1 MB
> >> for table tracking + 6.3 MB for query cache).
> >>
> >> "query cache" should be "query parse cache".
> >>
> >> + Memory usage scales with configuration parameters:
> >> + </para>
> >> + <itemizedlist>
> >> + <listitem>
> >> + <para>
> >> + Table tracking: <literal>track_table_mutation_table_size * 40
> >> bytes</literal> (default: 2048 * 40 = ~80 KB)
> >> + </para>
> >> + </listitem>
> >> + <listitem>
> >> + <para>
> >> + Query cache:
> <literal>track_table_mutation_query_parse_cache_size *
> >> 640 bytes</literal> (default: 10000 * 640 = ~6.3 MB)
> >>
> >> "query cache" should be "query parse cache".
> >>
> >> + <title>Limitations</title>
> >>
> >> I think number of tables tacked in a SELECT is limited to 8. It should
> >> be mentioned.
> >>
> >> diff --git a/src/context/pool_query_context.c
> >> b/src/context/pool_query_context.c
> >> index a056ac596..0190d3673 100644
> >> --- a/src/context/pool_query_context.c
> >> +++ b/src/context/pool_query_context.c
> >> @@ -1828,15 +1829,23 @@ is_in_list(char *name, List *list)
> >> static bool
> >> is_select_object_in_temp_write_list(Node *node, void *context)
> >> {
> >> - if (node == NULL || pool_config->disable_load_balance_on_write
> !=
> >> DLBOW_DML_ADAPTIVE)
> >> + if (node == NULL ||
> >> + !DLBOW_IS_DML_ADAPTIVE(
> >> +
> >> pool_config->disable_load_balance_on_write))
> >>
> >> You don't need to split the line.
> >>
> >> + is_adaptive = DLBOW_IS_DML_ADAPTIVE(
> >> +
> >> pool_config->disable_load_balance_on_write);
> >>
> >> You don't need to split the line.
> >>
> >> - if (pool_config->disable_load_balance_on_write ==
> >> DLBOW_DML_ADAPTIVE && session_context->is_in_transaction)
> >> + if (is_adaptive &&
> >> + session_context->is_in_transaction)
> >> {
> >> ereport(DEBUG1,
> >>
> >> (errmsg("is_select_object_in_temp_write_list: \"%s\", found relation
> >> \"%s\"", (char *) context, rgv->relname)));
> >> This line is too long. Please split.
> >>
> >> @@ -1880,7 +1889,13 @@ static char
> >> *get_associated_object_from_dml_adaptive_relations
> >> void
> >> check_object_relationship_list(char *name, bool is_func_name)
> >> {
> >> - if (pool_config->disable_load_balance_on_write ==
> >> DLBOW_DML_ADAPTIVE &&
> >> pool_config->parsed_dml_adaptive_object_relationship_list)
> >> + bool is_adaptive;
> >> +
> >> + is_adaptive = DLBOW_IS_DML_ADAPTIVE(
> >> +
> >> pool_config->disable_load_balance_on_write);
> >>
> >> I wrote in the commit message:
> >>
> >> modifications are only detected in the same transaction). Note,
> >> however, you cannot use dml_adaptive_object_relationship_list to track
> >> dependency among table and other objects.
> >>
> >> In my understanding the feature does not use
> >> dml_adaptive_object_relationship_list. If this is correct, why
> >> check_object_relationship_list() is called here in case
> >> dml_adaptive_global? If the feature uses
> >> dml_adaptive_object_relationship_list, test cases should be included.
> >>
> >> diff --git a/src/utils/pool_track_table_mutation.c
> >> b/src/utils/pool_track_table_mutation.c
> >> new file mode 100644
> >> index 000000000..9be46b28f
> >> --- /dev/null
> >> +++ b/src/utils/pool_track_table_mutation.c
> >>
> >> It seems following functions are not used anywhere. I wonder if this
> >> feature actually use "query parse cache".
> >>
> >> pool_track_table_mutation_get_cached_parse
> >> pool_track_table_mutation_cache_parse
> >> pool_track_table_mutation_normalize_and_hash
> >>
> >> Besides the code review, I mutated one of regression tests to check
> >> whether the feature co exists with in the existing memory query cache
> >> feature. After attached patch applied, I ran 006.memqcache and got the
> >> following result.
> >>
> >> cd src/test/regression
> >> ./regress.sh 006
> >> creating pgpool-II temporary installation ...
> >> moving pgpool_setup to temporary installation path ...
> >> moving watchdog_setup to temporary installation path ...
> >> using pgpool-II at
> >>
> /home/t-ishii/work/Pgpool-II/current/pgpool2/src/test/regression/temp/installed
> >> *************************
> >> REGRESSION MODE : install
> >> Pgpool-II version : pgpool-II version 4.8devel (mitsukakeboshi)
> >> Pgpool-II install path :
> >>
> /home/t-ishii/work/Pgpool-II/current/pgpool2/src/test/regression/temp/installed
> >> PostgreSQL bin : /usr/local/pgsql/bin
> >> PostgreSQL Major version : 18
> >> pgbench : /usr/local/pgsql/bin/pgbench
> >> PostgreSQL jdbc :
> >> /usr/local/pgsql/share/postgresql-9.2-1003.jdbc4.jar
> >> *************************
> >> testing 006.memqcache...failed.
> >> out of 1 ok:0 failed:1 timeout:0
> >>
> >> log/006.memqcache shows:
> >>
> >> ../expected.txt result.txt differ: char 1, line 1
> >>
> >> So I checked the test script and found the error was generated by a
> >> Java program test.
> >>
> >> java jdbctest > result.txt 2>&1
> >> cmp ../expected.txt result.txt
> >> if [ $? != 0 ];then
> >> ./shutdownall
> >> exit 1
> >> fi
> >>
> >> In jdbctest.java:
> >>
> >> /*
> >> * Cache test in an explicit transaction
> >> */
> >> conn.setAutoCommit(false);
> >> // execute DML. This should prevent SELECTs from using
> >> query cache in the transaction.
> >> sql = "UPDATE t1 SET i = 2;";
> >> pst = conn.createStatement();
> >> pst.executeUpdate(sql);
> >> pst.close();
> >> // should not use the cache and should return "2",
> rather
> >> than "1"
> >> prest = conn.prepareStatement("SELECT * FROM t1");
> >> rs = prest.executeQuery();
> >>
> >> The expected file (expected.txt) has "2" but the result file
> >> (testdir/result.txt) was "1". This is the reason why the test
> >> failed. I wonder if there's something wrong with the feature when the
> >> query cache is enabled. Can you look into this?
> >>
> >> Regards,
> >> --
> >> Tatsuo Ishii
> >> SRA OSS K.K.
> >> English: http://www.sraoss.co.jp/index_en/
> >> Japanese:http://www.sraoss.co.jp
> >>
> >
> >
> > --
> > Nadav Shatz
> > Tailor Brands | CTO
>
--
Nadav Shatz
Tailor Brands | CTO
Attachments:
[application/octet-stream] v2-0001-Feature-load-balancing-control-by-table-tracking.patch (89.7K, 3-v2-0001-Feature-load-balancing-control-by-table-tracking.patch)
download | inline diff:
From 1ad39659cf4cec0baeabfc3d02ea9b88163e9046 Mon Sep 17 00:00:00 2001
From: Nadav Shatz <[email protected]>
Date: Sun, 19 Apr 2026 17:10:24 +0300
Subject: [PATCH v2] Feature: load balancing control by table tracking.
Prevent routing of read only queries to standby if replication delay
of tables used in the query exceeds certain amount of value
collected by streaming replication process. To enable this feature,
set disable_load_balance_on_write to dml_adaptive_global.
In this mode, when tables are modified by
INSERT/UPDATE/DELETE/TRUNCATE/MERGE/data modification WITH, for
certain peoriod SELECTs using the tables are not load balanced:
i.e. routed to the primary PostgreSQL server to avoid the data
staleness by replication delay.
Unlike dml_adaptive mode, any table modifications decribed above are
detected even they happn in other sessions (in dml_adaptive, table
modifications are only detected in the same transaction). Note,
however, you cannot use dml_adaptive_object_relationship_list to track
dependency among table and other objects.
Besides dml_adaptive_global, there are some tuning knobs for the
feature:
- track_table_mutation_ttl_factor
Parameter to calculate TTL of each tracking data.
- track_table_mutation_max_staleness
Maximum duration in milliseconds that a single table entry can
continuously force queries to primary.
- track_table_mutation_cold_start_duration
Duration in milliseconds to route all queries to primary after a
child process starts.
- track_table_mutation_table_buckets
Number of hash buckets for the track table mutation hash table.
- track_table_mutation_table_size
Maximum number of tables that can be tracked simultaneously in the
track table mutation.
Author: Nadav Shatz <[email protected]>
Reviewed-by: Tatsuo Ishii <[email protected]>
Discussion: https://www.postgresql.org/message-id/flat/20260407.181009.1762204033074164841.ishii%40postgresql.org#58c139c1a7f8d5562865921d0733667b
---
doc/src/sgml/loadbalance.sgml | 288 ++++++
src/Makefile.am | 1 +
src/config/pool_config_variables.c | 65 ++
src/context/pool_query_context.c | 242 ++++-
src/context/pool_session_context.c | 17 +-
src/include/pool.h | 3 +-
src/include/pool_config.h | 24 +-
src/include/utils/pool_track_table_mutation.h | 167 ++++
src/main/pgpool_main.c | 29 +-
src/protocol/CommandComplete.c | 28 +
src/protocol/child.c | 8 +
src/protocol/pool_proto_modules.c | 6 +-
src/sample/pgpool.conf.sample-stream | 45 +
src/streaming_replication/pool_worker_child.c | 24 +
src/test/regression/libs.sh | 2 +
.../tests/043.track_table_mutation/test.sh | 354 +++++++
.../044.track_table_mutation_watchdog/test.sh | 184 ++++
src/tools/pgindent/typedefs.list | 4 +
src/utils/pool_track_table_mutation.c | 902 ++++++++++++++++++
19 files changed, 2368 insertions(+), 25 deletions(-)
create mode 100644 src/include/utils/pool_track_table_mutation.h
create mode 100755 src/test/regression/tests/043.track_table_mutation/test.sh
create mode 100755 src/test/regression/tests/044.track_table_mutation_watchdog/test.sh
create mode 100644 src/utils/pool_track_table_mutation.c
diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml
index 9e1e7b39b..d4fbcf1a5 100644
--- a/doc/src/sgml/loadbalance.sgml
+++ b/doc/src/sgml/loadbalance.sgml
@@ -1110,6 +1110,18 @@ app_name_redirect_preference_list > database_redirect_preference_list > us
Dependent functions, triggers, and views on the tables can be configured
using <xref linkend="guc-dml-adaptive-object-relationship-list">
</para>
+
+ <para>
+ If this parameter is set to <varname>dml_adaptive_global</varname>,
+ <productname>Pgpool-II</> behaves like <varname>dml_adaptive</varname>
+ (per-transaction write tracking) and additionally uses shared memory to track
+ recently written tables across all sessions cluster-wide. When a table is
+ written in any session, subsequent reads of that table from any session are
+ routed to primary until a TTL (based on measured replication delay) expires.
+ This prevents stale reads after writes even across different connections.
+ See <xref linkend="runtime-config-table-mutation-map"> for the sub-parameters
+ that control the shared-memory tracking behavior.
+ </para>
</listitem>
</varlistentry>
@@ -1195,4 +1207,280 @@ dml_adaptive_object_relationship_list = 'table_1:table_2'
</variablelist>
</sect2>
+
+ <sect2 id="runtime-config-table-mutation-map">
+ <title>Table Mutation Tracking Configuration</title>
+
+ <para>
+ These parameters configure the track table mutation feature, which is activated by setting
+ <xref linkend="guc-disable-load-balance-on-write"> to <literal>dml_adaptive_global</literal>.
+ The feature tracks recently written tables to prevent stale reads from replica nodes during
+ replication lag.
+ </para>
+
+ <para>
+ When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period
+ (<literal>replication_delay * track_table_mutation_ttl_factor</literal>). Any SELECT queries on stale tables are routed
+ to the primary node instead of replicas, ensuring read-after-write consistency.
+ </para>
+
+ <para>
+ This feature requires time-based replication delay monitoring. This can be provided by either
+ <xref linkend="guc-replication-delay-source-cmd"> (external command mode) or by setting
+ <xref linkend="guc-delay-threshold-by-time"> (which uses <literal>pg_stat_replication.replay_lag</literal>
+ from PostgreSQL 10+). If neither is configured, the TTL remains at its default minimum value
+ (100 milliseconds) and is never updated based on actual replication delay, which may result
+ in suboptimal routing decisions.
+ </para>
+
+ <warning>
+ <para>
+ Enabling <literal>dml_adaptive_global</literal> increases shared memory consumption. With default settings,
+ the feature requires approximately 80 KB of shared memory for table tracking:
+ <literal>track_table_mutation_table_size * 40 bytes</literal> (default: 2048 * 40 = ~80 KB).
+ </para>
+ </warning>
+
+ <variablelist>
+
+ <varlistentry id="guc-track-table-mutation-ttl-factor" xreflabel="track_table_mutation_ttl_factor">
+ <term><varname>track_table_mutation_ttl_factor</varname> (<type>floating point</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_ttl_factor</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Multiplier for calculating the TTL: <literal>TTL = replication_delay * track_table_mutation_ttl_factor</literal>.
+ Higher values provide more safety margin but may reduce read replica utilization.
+ </para>
+ <para>
+ Valid range: 1.0-100.0. Default is <literal>5.0</literal>.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-max-staleness" xreflabel="track_table_mutation_max_staleness">
+ <term><varname>track_table_mutation_max_staleness</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_max_staleness</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum duration in milliseconds that a single table entry can continuously force queries to primary,
+ measured from when the table was first marked stale. When this cap is reached, the entry is expired
+ regardless of recent writes. If the table is written to again after expiry, a fresh tracking entry
+ is created.
+ </para>
+ <para>
+ This parameter bounds the cross-session impact of table mutation tracking. Even if a table is written
+ to in a tight loop, its effect on other sessions' load balancing is limited to this duration. For
+ legitimately busy tables, the gap between forced expiry and the next write re-marking the table is
+ negligible (typically milliseconds).
+ </para>
+ <para>
+ Set to 0 to disable the cap (not recommended for production).
+ Valid range: 0-3600000 ms. Default is <literal>60000</literal> (60 seconds).
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-cold-start-duration" xreflabel="track_table_mutation_cold_start_duration">
+ <term><varname>track_table_mutation_cold_start_duration</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_cold_start_duration</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Duration in milliseconds to route all queries to primary after a child process starts.
+ This prevents stale reads when a new connection is established before the track table mutation
+ is populated with recent write history.
+ </para>
+ <para>
+ When watchdog is enabled and the local node becomes the leader, Pgpool-II also triggers a
+ global cold start for this duration to avoid stale reads after leadership changes.
+ </para>
+ <para>
+ Valid range: 0-60000 ms. Default is <literal>2000</literal> (2 seconds).
+ Set to 0 to disable cold start behavior.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-table-buckets" xreflabel="track_table_mutation_table_buckets">
+ <term><varname>track_table_mutation_table_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_table_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the track table mutation hash table.
+ Higher values reduce hash collisions and improve lookup performance.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>1024</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-track-table-mutation-table-size" xreflabel="track_table_mutation_table_size">
+ <term><varname>track_table_mutation_table_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>track_table_mutation_table_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of tables that can be tracked simultaneously in the track table mutation.
+ When full, oldest entries are evicted using a simple eviction strategy.
+ </para>
+ <para>
+ Valid range: 128-131072. Default is <literal>2048</literal>.
+ Memory usage: approximately 40 bytes per entry.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+
+ <sect3 id="runtime-config-track-table-mutation-example">
+ <title>Track Table Mutation Configuration Example</title>
+ <para>
+ To enable track table mutation with replication delay monitoring:
+ </para>
+ <programlisting>
+# Enable dml_adaptive_global mode (includes track table mutation)
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_ttl_factor = 5.0
+track_table_mutation_max_staleness = 60000
+track_table_mutation_cold_start_duration = 2000
+
+# Option A: Use external command for replication delay
+replication_delay_source_cmd = '/path/to/get-replication-delay.sh'
+replication_delay_source_timeout = 10
+
+# Option B: Use pg_stat_replication replay_lag (PG 10+)
+# delay_threshold_by_time = 1000
+
+# Adjust table map size based on workload
+track_table_mutation_table_size = 4096
+ </programlisting>
+ <para>
+ Shared memory required for above configuration: approximately 160 KB for the table map.
+ Default configuration (2048 tables) requires approximately 80 KB.
+ </para>
+ </sect3>
+
+ <sect3 id="runtime-config-track-table-mutation-limitations">
+ <title>Limitations</title>
+ <para>
+ The track table mutation feature has the following limitations:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>PREPARE</literal> statements are not tracked. When a prepared statement
+ containing data modification is executed, the table mutation is not recorded.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ A maximum of 128 tables can be tracked per SELECT query for staleness checking.
+ This limit is shared with the query cache subsystem
+ (<literal>POOL_MAX_SELECT_OIDS</literal>).
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ If your application uses prepared statements and requires read-after-write consistency,
+ consider using explicit transaction routing or the <literal>/*NO LOAD BALANCE*/</literal>
+ comment directive for affected queries.
+ </para>
+ <para>
+ The following statement types <emphasis>are</emphasis> tracked and will mark tables as stale:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>INSERT</literal>, <literal>UPDATE</literal>, <literal>DELETE</literal>
+ statements (including those with <literal>RETURNING</literal> clauses).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>TRUNCATE</literal> statements (including multiple tables).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>MERGE</literal> statements (PostgreSQL 15+).
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>WITH</literal> clauses containing data modifications (Common Table Expressions
+ with <literal>INSERT</literal>, <literal>UPDATE</literal>, or <literal>DELETE</literal>).
+ For example, <literal>WITH deleted AS (DELETE FROM t1 RETURNING *) SELECT * FROM deleted</literal>
+ will properly mark table <literal>t1</literal> as stale.
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ <emphasis>Transaction Rollback Behavior:</emphasis> Within explicit transactions, tables
+ are only marked as stale in shared memory when the transaction is committed. If the
+ transaction is rolled back, no tables are marked, since no actual data modification
+ occurred on replicas. This prevents rolled-back transactions from unnecessarily
+ disabling load balancing. For autocommit statements (outside explicit transactions),
+ tables are marked immediately upon command completion.
+ </para>
+
+ <para>
+ <emphasis>Cross-Session Impact and Safety Bounds:</emphasis>
+ Unlike <literal>dml_adaptive</literal> (which only affects the session that issued the write),
+ <literal>dml_adaptive_global</literal> affects all sessions reading the same table in the same database.
+ The following safety mechanisms bound this cross-session impact:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ <emphasis>Maximum staleness cap:</emphasis> The <xref linkend="guc-track-table-mutation-max-staleness">
+ parameter (default: 60 seconds) limits how long any single table entry can continuously force primary
+ routing. Even under sustained writes, the entry expires after this period and is only renewed by
+ subsequent committed writes.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <emphasis>Database isolation:</emphasis> Table staleness tracking is scoped by database OID. Writes
+ in one database never affect load balancing decisions for sessions connected to a different database.
+ In multi-tenant deployments where tenants use separate databases, one tenant's write activity cannot
+ influence another tenant's query routing.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <emphasis>Committed writes only:</emphasis> Only committed transactions mark tables as stale.
+ Rolled-back transactions have no effect on the shared tracking state.
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <emphasis>Bounded table map size:</emphasis> The shared memory table map has a fixed maximum size
+ (<xref linkend="guc-track-table-mutation-table-size">). At most this many tables can be marked stale
+ simultaneously, providing a natural ceiling on the feature's impact.
+ </para>
+ </listitem>
+ </itemizedlist>
+ </sect3>
+
+ </sect2>
+
</sect1>
diff --git a/src/Makefile.am b/src/Makefile.am
index 4678ab530..39588af58 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \
rewrite/pool_timestamp.c \
rewrite/pool_lobj.c \
utils/pool_select_walker.c \
+ utils/pool_track_table_mutation.c \
utils/strlcpy.c \
utils/psprintf.c \
utils/pool_params.c \
diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c
index b775b2106..3039e32f0 100644
--- a/src/config/pool_config_variables.c
+++ b/src/config/pool_config_variables.c
@@ -290,6 +290,7 @@ static const struct config_enum_entry disable_load_balance_on_write_options[] =
{"trans_transaction", DLBOW_TRANS_TRANSACTION, false},
{"always", DLBOW_ALWAYS, false},
{"dml_adaptive", DLBOW_DML_ADAPTIVE, false},
+ {"dml_adaptive_global", DLBOW_DML_ADAPTIVE_GLOBAL, false},
{NULL, 0, false}
};
@@ -1777,6 +1778,19 @@ static struct config_int_array ConfigureNamesIntArray[] =
static struct config_double ConfigureNamesDouble[] =
{
+ {
+ {"track_table_mutation_ttl_factor",
+ CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "TTL multiplier for track table mutation "
+ "(TTL = replication_delay * factor)",
+ CONFIG_VAR_TYPE_DOUBLE, false, 0
+ },
+ &g_pool_config.track_table_mutation_ttl_factor,
+ 5.0, /* boot value: 5x replication delay */
+ 1.0, 100.0, /* min, max */
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_DOUBLE
};
@@ -2397,6 +2411,57 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"track_table_mutation_max_staleness",
+ CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Maximum duration in milliseconds that a "
+ "table can be marked stale from its first "
+ "write. 0 disables the cap.",
+ CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS
+ },
+ &g_pool_config.track_table_mutation_max_staleness,
+ 60000, /* 60 seconds */
+ 0, 3600000, /* 0 to 1 hour */
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_cold_start_duration",
+ CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Duration in milliseconds to force queries "
+ "to primary after child process starts.",
+ CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS
+ },
+ &g_pool_config.track_table_mutation_cold_start_duration,
+ 2000, /* 2 seconds */
+ 0, 60000, /* 0 to 60 seconds */
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_table_buckets",
+ CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for track table mutation.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_table_buckets,
+ 1024,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"track_table_mutation_table_size",
+ CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in track table mutation.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.track_table_mutation_table_size,
+ 2048,
+ 128, 131072,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_INT
};
diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c
index a056ac596..c20a3a420 100644
--- a/src/context/pool_query_context.c
+++ b/src/context/pool_query_context.c
@@ -29,6 +29,7 @@
#include "utils/statistics.h"
#include "utils/pool_select_walker.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_session_context.h"
#include "context/pool_query_context.h"
#include "parser/nodes.h"
@@ -1828,20 +1829,26 @@ is_in_list(char *name, List *list)
static bool
is_select_object_in_temp_write_list(Node *node, void *context)
{
- if (node == NULL || pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
+ if (node == NULL ||
+ !DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
return false;
if (IsA(node, RangeVar))
{
RangeVar *rgv = (RangeVar *) node;
- POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
+ POOL_SESSION_CONTEXT *session_context;
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context->is_in_transaction)
+ session_context = pool_get_session_context(false);
+
+ if (session_context->is_in_transaction)
{
ereport(DEBUG1,
- (errmsg("is_select_object_in_temp_write_list: \"%s\", found relation \"%s\"", (char *) context, rgv->relname)));
+ (errmsg("is_select_object_in_temp_write_list:"
+ " \"%s\", found relation \"%s\"",
+ (char *) context, rgv->relname)));
- return is_in_list(rgv->relname, session_context->transaction_temp_write_list);
+ return is_in_list(rgv->relname,
+ session_context->transaction_temp_write_list);
}
}
@@ -1880,15 +1887,22 @@ static char *get_associated_object_from_dml_adaptive_relations
void
check_object_relationship_list(char *name, bool is_func_name)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && pool_config->parsed_dml_adaptive_object_relationship_list)
+ bool is_adaptive;
+
+ is_adaptive =
+ (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE);
+
+ if (is_adaptive &&
+ pool_config->parsed_dml_adaptive_object_relationship_list)
{
POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
if (session_context->is_in_transaction)
{
char *right_token =
- get_associated_object_from_dml_adaptive_relations
- (name, is_func_name ? OBJECT_TYPE_FUNCTION : OBJECT_TYPE_RELATION);
+ get_associated_object_from_dml_adaptive_relations
+ (name, is_func_name ? OBJECT_TYPE_FUNCTION : OBJECT_TYPE_RELATION);
if (right_token)
{
@@ -1947,7 +1961,7 @@ add_object_into_temp_write_list(Node *node, void *context)
static void
dml_adaptive(Node *node, char *query)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
/* Set/Unset transaction status flags */
if (IsA(node, TransactionStmt))
@@ -1966,6 +1980,45 @@ dml_adaptive(Node *node, char *query)
}
else if (is_commit_or_rollback_query(node))
{
+ /*
+ * For dml_adaptive_global: on COMMIT, flush the accumulated
+ * table writes to shared memory. On ROLLBACK, skip -- the
+ * writes never committed so no stale-read risk exists. This
+ * prevents polluting the table map with rolled-back
+ * transactions.
+ */
+ int dlbow =
+ pool_config->disable_load_balance_on_write;
+ List *wlist =
+ session_context->transaction_temp_write_list;
+
+ if (dlbow == DLBOW_DML_ADAPTIVE_GLOBAL &&
+ is_commit_query(node) &&
+ wlist != NIL)
+ {
+ ListCell *cell;
+ int dboid;
+
+ dboid =
+ pool_track_table_mutation_get_database_oid();
+ if (dboid > 0)
+ {
+ foreach(cell, wlist)
+ {
+ char *tname;
+ int toid;
+
+ tname = (char *) lfirst(cell);
+ toid =
+ pool_table_name_to_oid(tname);
+
+ if (toid > 0)
+ pool_track_table_mutation_mark_table_written(
+ toid, dboid);
+ }
+ }
+ }
+
session_context->is_in_transaction = false;
if (session_context->transaction_temp_write_list != NIL)
@@ -2008,7 +2061,7 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
session_context = pool_get_session_context(false);
backend = session_context->backend;
- /*
+ /*
* Collect/discard information for disable_load_balance_on_write =
* dml_adaptive case.
*/
@@ -2022,6 +2075,20 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
if (dest == POOL_PRIMARY)
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+
+ /*
+ * Resolve table and database OIDs now to populate relcache. This
+ * avoids potential hangs in CommandComplete where we shouldn't be
+ * running new queries against the backend.
+ */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ int *oids;
+
+ pool_extract_table_oids(node, &oids);
+ pool_track_table_mutation_get_database_oid();
+ }
}
/* Should be sent to both primary and standby? */
else if (dest == POOL_BOTH)
@@ -2151,6 +2218,153 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
}
+
+ /*
+ * Check track table mutation for recently written tables. If
+ * in cold start or any table was recently written, route to
+ * primary to avoid stale reads.
+ */
+ else if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ bool force_primary = false;
+ int lb_node;
+ POOL_QUERY_CONTEXT *qctx =
+ session_context->query_context;
+
+ if (pool_track_table_mutation_in_cold_start())
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance"
+ " because of track table"
+ " mutation cold start"),
+ errdetail("destination = PRIMARY"
+ " for query= \"%s\"",
+ query)));
+ force_primary = true;
+ }
+ else
+ {
+ SelectContext ctx;
+ int dboid;
+ int num_oids;
+ int i;
+
+ memset(&ctx, 0, sizeof(ctx));
+ num_oids =
+ pool_extract_table_oids_from_select_stmt(
+ node, &ctx);
+ if (num_oids > 0)
+ {
+ dboid =
+ pool_track_table_mutation_get_database_oid();
+
+ if (dboid <= 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load"
+ " balance because"
+ " database oid was"
+ " unavailable"),
+ errdetail("destination"
+ " = PRIMARY for"
+ " query= \"%s\"",
+ query)));
+ force_primary = true;
+ }
+ else
+ {
+ for (i = 0; i < num_oids; i++)
+ {
+ bool stale;
+
+ stale =
+ pool_track_table_mutation_table_is_stale(
+ ctx.table_oids[i],
+ dboid);
+ if (stale)
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load"
+ " balance because"
+ " table \"%s\" was"
+ " recently written",
+ ctx.table_names[i]),
+ errdetail("destination"
+ " = PRIMARY for"
+ " query= \"%s\"",
+ query)));
+ force_primary = true;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (force_primary)
+ {
+ pool_set_node_to_be_sent(
+ query_context,
+ PRIMARY_NODE_ID);
+ }
+ else
+ {
+ if (pool_config->statement_level_load_balance)
+ {
+ session_context->load_balance_node_id =
+ select_load_balancing_node();
+ }
+
+ /*
+ * If replication delay is too much, and
+ * prefer_lower_delay_standby is true then elect the
+ * lowest-delayed node, otherwise send to primary.
+ */
+ lb_node =
+ session_context->load_balance_node_id;
+ if (STREAM &&
+ check_replication_delay(lb_node))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load"
+ " balance because of"
+ " too much replication"
+ " delay"),
+ errdetail("destination"
+ " = %d for"
+ " query= \"%s\"",
+ dest, query)));
+
+ if (pool_config->prefer_lower_delay_standby)
+ {
+ lb_node =
+ select_load_balancing_node();
+ session_context->load_balance_node_id =
+ lb_node;
+ qctx->load_balance_node_id =
+ lb_node;
+ pool_set_node_to_be_sent(
+ query_context,
+ lb_node);
+ }
+ else
+ {
+ pool_set_node_to_be_sent(
+ query_context,
+ PRIMARY_NODE_ID);
+ }
+ }
+ else
+ {
+ qctx->load_balance_node_id =
+ session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(
+ query_context,
+ qctx->load_balance_node_id);
+ }
+ }
+ }
else
{
if (pool_config->statement_level_load_balance)
@@ -2171,7 +2385,8 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
errdetail("destination = %d for query= \"%s\"", dest, query)));
/*
- * If prefer_lower_delay_standby is on, choose lower delay standby.
+ * If prefer_lower_delay_standby is on, choose lower
+ * delay standby.
*/
if (pool_config->prefer_lower_delay_standby)
{
@@ -2181,7 +2396,8 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id);
}
- else /* delay is too much. prefer to send to primary */
+ else /* delay is too much. prefer to send to
+ * primary */
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
}
@@ -2191,7 +2407,7 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
* Not streaming replication mode, or delay_threshold is 0
* or replication delay is small enough.
*/
- else
+ else
{
session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
pool_set_node_to_be_sent(query_context,
diff --git a/src/context/pool_session_context.c b/src/context/pool_session_context.c
index ded41c7fc..be30f1a7c 100644
--- a/src/context/pool_session_context.c
+++ b/src/context/pool_session_context.c
@@ -532,7 +532,7 @@ dump_sent_message(char *caller, POOL_SENT_MESSAGE *m)
static void
dml_adaptive_init(void)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE)
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write))
{
session_context->is_in_transaction = false;
session_context->transaction_temp_write_list = NIL;
@@ -542,7 +542,9 @@ dml_adaptive_init(void)
static void
dml_adaptive_destroy(void)
{
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && session_context)
+ if (DLBOW_IS_DML_ADAPTIVE(
+ pool_config->disable_load_balance_on_write) &&
+ session_context)
{
if (session_context->transaction_temp_write_list != NIL)
list_free_deep(session_context->transaction_temp_write_list);
@@ -738,10 +740,15 @@ void
pool_set_writing_transaction(void)
{
/*
- * If disable_transaction_on_write is 'off' or 'dml_adaptive', then never
- * turn on writing transaction flag.
+ * If disable_load_balance_on_write is 'off' or 'dml_adaptive', then never
+ * turn on writing transaction flag. For dml_adaptive_global we do set it
+ * so that the query cache (memqcache) is properly skipped after DML
+ * within the same transaction.
*/
- if (pool_config->disable_load_balance_on_write != DLBOW_OFF && pool_config->disable_load_balance_on_write != DLBOW_DML_ADAPTIVE)
+ if (pool_config->disable_load_balance_on_write !=
+ DLBOW_OFF &&
+ pool_config->disable_load_balance_on_write !=
+ DLBOW_DML_ADAPTIVE)
{
pool_get_session_context(false)->writing_transaction = true;
ereport(DEBUG5,
diff --git a/src/include/pool.h b/src/include/pool.h
index 65907dcf1..79d7988fc 100644
--- a/src/include/pool.h
+++ b/src/include/pool.h
@@ -424,7 +424,7 @@ typedef enum
#define Min(x, y) ((x) < (y) ? (x) : (y))
-#define MAX_NUM_SEMAPHORES 8
+#define MAX_NUM_SEMAPHORES 9
#define CONN_COUNTER_SEM 0
#define REQUEST_INFO_SEM 1
#define QUERY_CACHE_STATS_SEM 2
@@ -434,6 +434,7 @@ typedef enum
#define FOLLOW_PRIMARY_SEM 6
#define MAIN_EXIT_HANDLER_SEM 7 /* used in exit_hander in pgpool main
* process */
+#define TRACK_TABLE_MUTATION_TABLE_SEM 8
#define MAX_REQUEST_QUEUE_SIZE 10
#define MAX_SEC_WAIT_FOR_CLUSTER_TRANSACTION 10 /* time in seconds to keep
diff --git a/src/include/pool_config.h b/src/include/pool_config.h
index 9a397d166..b8abadd50 100644
--- a/src/include/pool_config.h
+++ b/src/include/pool_config.h
@@ -105,9 +105,13 @@ typedef enum DLBOW_OPTION
DLBOW_TRANSACTION,
DLBOW_TRANS_TRANSACTION,
DLBOW_ALWAYS,
- DLBOW_DML_ADAPTIVE
+ DLBOW_DML_ADAPTIVE,
+ DLBOW_DML_ADAPTIVE_GLOBAL
} DLBOW_OPTION;
+#define DLBOW_IS_DML_ADAPTIVE(opt) \
+ ((opt) == DLBOW_DML_ADAPTIVE || (opt) == DLBOW_DML_ADAPTIVE_GLOBAL)
+
typedef enum RELQTARGET_OPTION
{
RELQTARGET_PRIMARY = 1,
@@ -363,8 +367,22 @@ typedef struct
char *sr_check_password; /* password for sr_check_user */
char *sr_check_database; /* PostgreSQL database name for streaming
* replication check */
- char *replication_delay_source_cmd; /* external command for replication delay */
- int replication_delay_source_timeout; /* timeout for external command in seconds */
+ char *replication_delay_source_cmd; /* external command for
+ * replication delay */
+ int replication_delay_source_timeout; /* timeout for external
+ * command in seconds */
+
+ /* Track table mutation configuration */
+ double track_table_mutation_ttl_factor; /* TTL multiplier for
+ * replication delay */
+ int track_table_mutation_max_staleness; /* max staleness duration
+ * ms */
+ int track_table_mutation_cold_start_duration; /* cold start duration
+ * ms */
+ int track_table_mutation_table_buckets; /* hash buckets for table
+ * map */
+ int track_table_mutation_table_size; /* max table map entries */
+
char *failover_command; /* execute command when failover happens */
char *follow_primary_command; /* execute command when failover is
* ended */
diff --git a/src/include/utils/pool_track_table_mutation.h b/src/include/utils/pool_track_table_mutation.h
new file mode 100644
index 000000000..dfbac666d
--- /dev/null
+++ b/src/include/utils/pool_track_table_mutation.h
@@ -0,0 +1,167 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_track_table_mutation.h: In-memory tracking of
+ * recently written tables to prevent stale reads.
+ */
+
+#ifndef POOL_TRACK_TABLE_MUTATION_H
+#define POOL_TRACK_TABLE_MUTATION_H
+
+#include "pool.h"
+#include <sys/time.h>
+
+/*
+ * Invalid index marker for linked lists
+ */
+#define TRACK_TABLE_MUTATION_INVALID_INDEX (-1)
+
+/*
+ * Default TTL in microseconds (100ms) used when replication delay is unknown
+ */
+#define TRACK_TABLE_MUTATION_DEFAULT_TTL_US (100 * 1000)
+
+/*
+ * Entry in the table mutation hash table (keyed by table/database oids)
+ */
+typedef struct TrackTableMutationEntry
+{
+ int table_oid; /* Table oid */
+ int dboid; /* Database oid */
+ struct timeval first_write_time; /* When the entry was first created */
+ struct timeval last_write_time; /* When the table was last written */
+ uint32 hash; /* Pre-computed hash value */
+ int next; /* Next in collision chain */
+ bool in_use; /* Is this entry in use? */
+} TrackTableMutationEntry;
+
+/*
+ * Header for the table mutation hash table in shared memory
+ */
+typedef struct TrackTableMutationHashTable
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+
+ /*
+ * Flexible array members follow in shared memory: int
+ * buckets[num_buckets]; TrackTableMutationEntry entries[max_entries];
+ */
+} TrackTableMutationHashTable;
+
+/*
+ * Global state for track table mutation feature
+ */
+typedef struct TrackTableMutationState
+{
+ bool initialized; /* Shmem initialized? */
+ uint64 current_ttl_us; /* Current TTL in microseconds */
+ struct timeval ttl_last_updated; /* When TTL was last updated */
+ struct timeval last_cleanup_time; /* When last expired cleanup ran */
+ struct timeval global_cold_start_until; /* Global cold start end time */
+ uint32 stats_queries_checked; /* Queries checked */
+ uint32 stats_forced_primary; /* Forced to primary */
+ uint32 stats_allowed_replica; /* Allowed to replica */
+} TrackTableMutationState;
+
+/*
+ * Main shared memory structure containing all components
+ */
+typedef struct TrackTableMutationShmem
+{
+ TrackTableMutationState state;
+ TrackTableMutationHashTable *table_map;
+} TrackTableMutationShmem;
+
+/* ----------------
+ * Public API functions
+ * ----------------
+ */
+
+/*
+ * Initialize shared memory structures for track table mutation.
+ * Called from pgpool_main.c after pool_init_pool_info().
+ */
+extern void pool_track_table_mutation_init(void);
+
+/*
+ * Initialize per-child process state for track table mutation.
+ * Called from child.c when a new child process starts.
+ * Sets up cold start tracking.
+ */
+extern void pool_track_table_mutation_child_init(void);
+
+/*
+ * Check if the child process is in cold start period.
+ * During cold start, all queries are routed to primary.
+ * Returns true if in cold start, false otherwise.
+ */
+extern bool pool_track_table_mutation_in_cold_start(void);
+
+/*
+ * Trigger a global cold start period for all processes.
+ * Used after watchdog leader change to avoid stale reads.
+ */
+extern void pool_track_table_mutation_trigger_global_cold_start(void);
+
+/*
+ * Get oid of current database.
+ */
+extern int pool_track_table_mutation_get_database_oid(void);
+
+/*
+ * Check if a table was recently written to (is "stale").
+ * If stale, reads from this table should go to primary.
+ * Returns true if table is stale (recently written), false otherwise.
+ */
+extern bool pool_track_table_mutation_table_is_stale(
+ int table_oid, int dboid);
+
+/*
+ * Mark tables as recently written.
+ * Called after INSERT/UPDATE/DELETE queries complete.
+ * table_oids: array of table oids
+ * num_tables: number of tables in array
+ * dboid: database oid
+ */
+extern void pool_track_table_mutation_mark_tables_written(
+ const int *table_oids, int num_tables, int dboid);
+
+/*
+ * Convenience function to mark a single table as written.
+ * table_oid: table oid
+ * dboid: database oid
+ */
+extern void pool_track_table_mutation_mark_table_written(
+ int table_oid, int dboid);
+
+/*
+ * Update the TTL based on current replication delay.
+ * Called from pool_worker_child.c when replication delay is updated.
+ * delay_us: replication delay in microseconds
+ */
+extern void pool_track_table_mutation_update_ttl(uint64 delay_us);
+
+/*
+ * Calculate required shared memory size for track table mutation.
+ */
+extern Size pool_track_table_mutation_shmem_size(void);
+
+#endif /* POOL_TRACK_TABLE_MUTATION_H */
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index 32bcb0a1f..e41c575be 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -57,6 +57,7 @@
#include "auth/pool_passwd.h"
#include "auth/pool_hba.h"
#include "query_cache/pool_memqcache.h"
+#include "utils/pool_track_table_mutation.h"
#include "watchdog/wd_internal_commands.h"
#include "watchdog/wd_lifecheck.h"
#include "watchdog/watchdog.h"
@@ -1501,11 +1502,14 @@ sigusr1_interrupt_processor(void)
if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
{
+ WD_STATES wd_state;
+
ereport(LOG,
(errmsg("Pgpool-II parent process received watchdog state change signal from watchdog")));
user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
- if (wd_internal_get_watchdog_local_node_state() == WD_STANDBY)
+ wd_state = wd_internal_get_watchdog_local_node_state();
+ if (wd_state == WD_STANDBY)
{
ereport(LOG,
(errmsg("we have joined the watchdog cluster as STANDBY node"),
@@ -1519,6 +1523,12 @@ sigusr1_interrupt_processor(void)
*/
pool_release_follow_primary_lock(true);
}
+ else if (wd_state == WD_COORDINATOR &&
+ pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_trigger_global_cold_start();
+ }
}
if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT])
{
@@ -3084,6 +3094,16 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size()));
}
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ size += MAXALIGN(pool_track_table_mutation_shmem_size());
+ elog(DEBUG1,
+ "track_table_mutation: %zu bytes requested"
+ " for shared memory",
+ MAXALIGN(pool_track_table_mutation_shmem_size()));
+ }
+
initialize_shared_memory_main_segment(size);
/* Move the backend descriptors to shared memory */
@@ -3200,6 +3220,13 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
wd_ipc_initialize_data();
}
+ /* Initialize track table mutation for recently written tables */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_init();
+ }
+
}
/*
diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c
index a3b8f0ea1..f445f268b 100644
--- a/src/protocol/CommandComplete.c
+++ b/src/protocol/CommandComplete.c
@@ -38,6 +38,8 @@
#include "utils/palloc.h"
#include "utils/memutils.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
+#include "query_cache/pool_memqcache.h"
static int extract_ntuples(char *message);
static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete);
@@ -304,6 +306,32 @@ handle_query_context(POOL_CONNECTION_POOL *backend)
node = session_context->query_context->parse_tree;
+ /*
+ * Track table writes for dml_adaptive_global feature. For autocommit
+ * statements (not in explicit transaction), mark tables immediately. For
+ * explicit transactions, marking is deferred to COMMIT in dml_adaptive()
+ * so that ROLLBACKed writes don't pollute the shared memory table map.
+ */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL &&
+ node != NULL &&
+ !session_context->is_in_transaction)
+ {
+ int *oids;
+ int num_oids;
+
+ num_oids = pool_extract_table_oids(node, &oids);
+ if (num_oids > 0)
+ {
+ int dboid;
+
+ dboid = pool_track_table_mutation_get_database_oid();
+ if (dboid > 0)
+ pool_track_table_mutation_mark_tables_written(
+ oids, num_oids, dboid);
+ }
+ }
+
if (IsA(node, PrepareStmt))
{
if (session_context->uncompleted_message)
diff --git a/src/protocol/child.c b/src/protocol/child.c
index 761876f53..4a527c84c 100644
--- a/src/protocol/child.c
+++ b/src/protocol/child.c
@@ -57,6 +57,7 @@
#include "utils/elog.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -213,6 +214,13 @@ do_child(int *fds)
/* Initialize per process context */
pool_init_process_context();
+ /* Initialize track table mutation child state for cold start tracking */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ pool_track_table_mutation_child_init();
+ }
+
/* initialize connection pool */
if (pool_init_cp())
{
diff --git a/src/protocol/pool_proto_modules.c b/src/protocol/pool_proto_modules.c
index f9458bb55..74ee00d16 100644
--- a/src/protocol/pool_proto_modules.c
+++ b/src/protocol/pool_proto_modules.c
@@ -1461,7 +1461,9 @@ Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
pool_where_to_send(query_context, query_context->original_query,
query_context->parse_tree);
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && strlen(name) != 0)
+ if (DLBOW_IS_DML_ADAPTIVE(
+ pool_config->disable_load_balance_on_write)
+ && strlen(name) != 0)
pool_setall_node_to_be_sent(query_context);
if (REPLICATION)
@@ -1804,7 +1806,7 @@ Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
return POOL_END;
}
- if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE &&
+ if (DLBOW_IS_DML_ADAPTIVE(pool_config->disable_load_balance_on_write) &&
TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T')
{
pool_where_to_send(query_context, query_context->original_query,
diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream
index 1ac982907..ce9b92da0 100644
--- a/src/sample/pgpool.conf.sample-stream
+++ b/src/sample/pgpool.conf.sample-stream
@@ -478,6 +478,14 @@ backend_clustering_mode = streaming_replication
# modified within the current explicit transaction will
# not be load balanced until the end of the transaction.
#
+ # dml_adaptive_global:
+ # Superset of dml_adaptive. In addition to per-transaction
+ # tracking, uses shared memory to track recently written
+ # tables across all sessions. Reads from recently written
+ # tables are routed to primary until a TTL (based on
+ # replication delay) expires. Requires additional shared
+ # memory. See track_table_mutation_* parameters below.
+ #
# always:
# if a write query is issued, read queries will
# not be load balanced until the session ends.
@@ -499,6 +507,43 @@ backend_clustering_mode = streaming_replication
#statement_level_load_balance = off
# Enables statement level load balancing
+# - Track Table Mutation (used by dml_adaptive_global) -
+ # WARNING: dml_adaptive_global increases shared memory usage
+ # Default settings require ~80 KB shared memory for table tracking
+
+#track_table_mutation_ttl_factor = 5.0
+ # TTL multiplier: TTL = replication_delay * factor
+ # Higher values provide more safety margin
+ # Range: 1.0-100.0 (default: 5.0)
+ # (change requires reload)
+
+#track_table_mutation_max_staleness = 60000
+ # Maximum duration (ms) a table can be marked stale
+ # from its first write. Bounds cross-session impact:
+ # even under continuous writes, staleness expires
+ # after this period and is only renewed by new writes.
+ # 0 disables the cap. Range: 0-3600000 (default: 60000 = 60s)
+ # (change requires reload)
+
+#track_table_mutation_cold_start_duration = 2000
+ # Duration in milliseconds to route all queries to primary
+ # after child process starts (cold start period)
+ # Range: 0-60000 ms (default: 2000 ms = 2 seconds)
+ # Set to 0 to disable cold start behavior
+ # (change requires reload)
+
+#track_table_mutation_table_buckets = 1024
+ # Number of hash buckets for track table mutation
+ # Higher values reduce hash collisions
+ # Range: 64-65536 (default: 1024)
+ # (change requires restart)
+
+#track_table_mutation_table_size = 2048
+ # Maximum number of tables to track simultaneously
+ # Range: 128-131072 (default: 2048)
+ # (change requires restart)
+
+
#------------------------------------------------------------------------------
# STREAMING REPLICATION MODE
#------------------------------------------------------------------------------
diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c
index 311b63865..cdd570396 100644
--- a/src/streaming_replication/pool_worker_child.c
+++ b/src/streaming_replication/pool_worker_child.c
@@ -58,6 +58,7 @@
#include "utils/pool_ip.h"
#include "utils/ps_status.h"
#include "utils/pool_stream.h"
+#include "utils/pool_track_table_mutation.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -419,6 +420,7 @@ check_replication_time_lag(void)
BackendInfo *bkinfo;
uint64 lag;
uint64 delay_threshold_by_time;
+ uint64 max_delay_us = 0;
ErrorContextCallback callback;
int active_standby_node;
bool replication_delay_by_time;
@@ -643,6 +645,10 @@ check_replication_time_lag(void)
* seconds to micro
* seconds */
+ /* Track max delay for mutation TTL */
+ if (lag > max_delay_us)
+ max_delay_us = lag;
+
/* Log delay if necessary */
if ((pool_config->log_standby_delay == LSD_ALWAYS && lag > 0) ||
(pool_config->log_standby_delay == LSD_OVER_THRESHOLD &&
@@ -668,6 +674,13 @@ check_replication_time_lag(void)
}
}
+ /*
+ * Update track table mutation TTL from the max observed time-based
+ * replication delay.
+ */
+ if (replication_delay_by_time && max_delay_us > 0)
+ pool_track_table_mutation_update_ttl(max_delay_us);
+
error_context_stack = callback.previous;
}
@@ -695,6 +708,7 @@ check_replication_time_lag_with_cmd(void)
double delay_ms;
uint64 delay;
uint64 delay_threshold_by_time;
+ uint64 max_delay_us = 0; /* Track max delay for mutation map */
int token_count = 0;
int primary_node_id;
int save_errno;
@@ -1003,6 +1017,10 @@ check_replication_time_lag_with_cmd(void)
bkinfo->standby_delay = delay;
bkinfo->standby_delay_by_time = true;
+ /* Track maximum delay for table mutation map TTL calculation */
+ if (delay > max_delay_us)
+ max_delay_us = delay;
+
/*
* Log delay if necessary. threshold is in milliseconds, convert
* to microseconds.
@@ -1021,6 +1039,12 @@ check_replication_time_lag_with_cmd(void)
token = strtok_r(NULL, " \t\n", &saveptr);
}
+ /* Update table mutation TTL based on max observed delay */
+ if (pool_config->disable_load_balance_on_write ==
+ DLBOW_DML_ADAPTIVE_GLOBAL &&
+ max_delay_us > 0)
+ pool_track_table_mutation_update_ttl(max_delay_us);
+
}
PG_CATCH();
{
diff --git a/src/test/regression/libs.sh b/src/test/regression/libs.sh
index 7c5a0c182..1c8ae392d 100644
--- a/src/test/regression/libs.sh
+++ b/src/test/regression/libs.sh
@@ -42,6 +42,8 @@ function wait_for_failover_done {
function clean_all {
pgrep pgpool | xargs kill -9 > /dev/null 2>&1
pgrep postgres | xargs kill -9 > /dev/null 2>&1
+ # Clean up leaked SysV IPC resources left behind by kill -9
+ ipcrm --all 2>/dev/null || true
rm -f $PGSOCKET_DIR/.s.PGSQL.*
netstat -t -p 2>/dev/null|grep pgpool
}
diff --git a/src/test/regression/tests/043.track_table_mutation/test.sh b/src/test/regression/tests/043.track_table_mutation/test.sh
new file mode 100755
index 000000000..8b4dd17b8
--- /dev/null
+++ b/src/test/regression/tests/043.track_table_mutation/test.sh
@@ -0,0 +1,354 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# test script for track table mutation feature (in-memory table tracking).
+# Tests routing of queries based on recently written tables.
+#
+source $TESTLIBS
+TESTDIR=testdir
+PSQL=$PGBIN/psql
+PSQLOPTS="-a -q -X"
+PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin
+export PGDATABASE=test
+
+# Only run in streaming replication mode since that's the target use case
+for mode in s
+do
+ rm -fr $TESTDIR
+ mkdir $TESTDIR
+ cd $TESTDIR
+
+ # Create test environment with 2 nodes
+ echo -n "creating test environment..."
+ $PGPOOL_SETUP -m $mode -n 2 || exit 1
+ echo "done."
+
+ source ./bashrc.ports
+
+ # Configure track table mutation feature via dml_adaptive_global
+ echo "disable_load_balance_on_write = 'dml_adaptive_global'" >> etc/pgpool.conf
+ echo "track_table_mutation_ttl_factor = 5.0" >> etc/pgpool.conf
+ echo "track_table_mutation_cold_start_duration = 10000" >> etc/pgpool.conf
+
+ # Enable load balancing explicitly
+ echo "load_balance_mode = on" >> etc/pgpool.conf
+
+ # Configure weights so we can distinguish routing
+ # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1
+ # This means load balanced queries go to node 1 by default
+ echo "backend_weight0 = 0" >> etc/pgpool.conf
+ echo "backend_weight1 = 1" >> etc/pgpool.conf
+
+ # Enable debug logging to see routing decisions
+ echo "log_min_messages = debug1" >> etc/pgpool.conf
+
+ ./startall
+
+ export PGPORT=$PGPOOL_PORT
+ export PGHOST=localhost
+
+ wait_for_pgpool_startup
+
+ # Create test tables
+ $PSQL test <<EOF
+CREATE TABLE t1(i INTEGER);
+CREATE TABLE t2(i INTEGER);
+CREATE TABLE t3(i INTEGER);
+EOF
+
+ echo "=== Test 1: Cold Start Routing ==="
+ # During cold start, all queries should go to primary
+ # Restart pgpool to trigger cold start
+ ./shutdownall
+ ./startall
+ wait_for_pgpool_startup
+
+ # Immediately query - should go to primary due to cold start
+ $PSQL test -c "SELECT 'cold_start_test' as marker, * FROM t1;" > /dev/null 2>&1
+
+ # Check log for cold start message (use -a to handle binary log files)
+ if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then
+ echo "Test 1 PASSED: Cold start routing works"
+ else
+ echo "Test 1 FAILED: Cold start routing not detected"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 2: Wait for cold start to end ==="
+ # Wait for cold start period to end (10 seconds).
+ # Use generous margin to avoid flakiness under load (e.g. full regression suite).
+ sleep 12
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Now a clean table query should load balance (go to node 1)
+ $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1
+
+ # After cold start, queries to clean tables should load balance
+ # Check that it did NOT get forced to primary due to track table mutation
+ if grep -a -q "could not load balance because of track table mutation cold start" log/pgpool.log; then
+ echo "Test 2 FAILED: Still in cold start after waiting"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 2 PASSED: Cold start ended correctly"
+
+ echo "=== Test 3: Write-then-Read Routing ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Write to t1 and then read - use single connection to ensure same session
+ $PSQL test <<EOF
+INSERT INTO t1 VALUES (1);
+SELECT 'write_read_test' as marker, * FROM t1;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ # Check log for table staleness message
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 3 PASSED: Write-then-read routing works"
+ else
+ echo "Test 3 FAILED: Table staleness not detected after write"
+ # Show relevant log entries for debugging
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 4: Clean Table Still Load Balances ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Read from t2 (never written to) - should load balance
+ $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1
+
+ # Should NOT see track table mutation blocking message for t2
+ if grep -a -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then
+ echo "Test 4 FAILED: Clean table incorrectly marked as stale"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 4 PASSED: Clean tables still load balance"
+
+ echo "=== Test 5: UPDATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Update t2 and then read - use single connection
+ $PSQL test <<EOF
+UPDATE t2 SET i = 999 WHERE i = 0;
+SELECT 'update_test' as marker, * FROM t2;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 5 PASSED: UPDATE marks table as stale"
+ else
+ echo "Test 5 FAILED: UPDATE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 6: DELETE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Delete from t3 and then read - use single connection
+ $PSQL test <<EOF
+DELETE FROM t3 WHERE i = 0;
+SELECT 'delete_test' as marker, * FROM t3;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 6 PASSED: DELETE marks table as stale"
+ else
+ echo "Test 6 FAILED: DELETE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 7: TRUNCATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a fresh table for TRUNCATE test
+ $PSQL test -c "CREATE TABLE t_truncate(i INTEGER);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_truncate VALUES (1), (2), (3);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Truncate and then read - use single connection
+ $PSQL test <<EOF
+TRUNCATE t_truncate;
+SELECT 'truncate_test' as marker, * FROM t_truncate;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 7 PASSED: TRUNCATE marks table as stale"
+ else
+ echo "Test 7 FAILED: TRUNCATE did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 8: WITH Clause (CTE with DELETE) Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a fresh table for WITH test
+ $PSQL test -c "CREATE TABLE t_cte(i INTEGER);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_cte VALUES (1), (2), (3);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Use WITH clause with DELETE, then read from the table
+ $PSQL test <<EOF
+WITH deleted AS (DELETE FROM t_cte WHERE i = 1 RETURNING *)
+SELECT * FROM deleted;
+SELECT 'cte_test' as marker, * FROM t_cte;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 8 PASSED: WITH clause (CTE) marks table as stale"
+ else
+ echo "Test 8 FAILED: WITH clause (CTE) did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ # Test 9: MERGE (PostgreSQL 15+ only)
+ PG_MAJOR_VERSION=$($PSQL -t -c "SELECT substring(version() from 'PostgreSQL ([0-9]+)');" | tr -d ' ')
+ if [ "$PG_MAJOR_VERSION" -ge 15 ] 2>/dev/null; then
+ echo "=== Test 9: MERGE Marks Table as Stale (PostgreSQL $PG_MAJOR_VERSION) ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create tables for MERGE test
+ $PSQL test -c "CREATE TABLE t_merge_target(id INTEGER PRIMARY KEY, val TEXT);" > /dev/null 2>&1
+ $PSQL test -c "CREATE TABLE t_merge_source(id INTEGER, val TEXT);" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_merge_target VALUES (1, 'old');" > /dev/null 2>&1
+ $PSQL test -c "INSERT INTO t_merge_source VALUES (1, 'new'), (2, 'insert');" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log again
+ > log/pgpool.log
+
+ # Use MERGE, then read from the target table
+ $PSQL test <<EOF
+MERGE INTO t_merge_target t
+USING t_merge_source s ON t.id = s.id
+WHEN MATCHED THEN UPDATE SET val = s.val
+WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.val);
+SELECT 'merge_test' as marker, * FROM t_merge_target;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 9 PASSED: MERGE marks table as stale"
+ else
+ echo "Test 9 FAILED: MERGE did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+ else
+ echo "=== Test 9: MERGE skipped (requires PostgreSQL 15+, have $PG_MAJOR_VERSION) ==="
+ fi
+
+ echo "=== Test 10: ROLLBACK Does NOT Mark Table as Stale ==="
+ # Create a fresh table for rollback test
+ $PSQL test -c "CREATE TABLE t_rollback(i INTEGER);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Write inside a transaction, then rollback
+ $PSQL test <<EOF
+BEGIN;
+INSERT INTO t_rollback VALUES (1);
+ROLLBACK;
+SELECT 'rollback_test' as marker, * FROM t_rollback;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ # Should NOT see t_rollback marked as stale since the write was rolled back
+ if grep -a -q "could not load balance because table.*t_rollback.*was recently written" log/pgpool.log; then
+ echo "Test 10 FAILED: Rolled-back write incorrectly marked table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 10 PASSED: ROLLBACK does not mark table as stale"
+
+ echo "=== Test 11: COMMIT Marks Table as Stale ==="
+ # Create a fresh table for commit test
+ $PSQL test -c "CREATE TABLE t_commit(i INTEGER);" > /dev/null 2>&1
+
+ # Wait for any TTL to expire
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Write inside a transaction, then commit, then read
+ $PSQL test <<EOF
+BEGIN;
+INSERT INTO t_commit VALUES (1);
+COMMIT;
+SELECT 'commit_test' as marker, * FROM t_commit;
+EOF
+
+ # Small delay to ensure log is flushed
+ sleep 0.5
+
+ if grep -a -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 11 PASSED: COMMIT marks table as stale"
+ else
+ echo "Test 11 FAILED: Committed write did not mark table as stale"
+ grep -a -i "track_table_mutation" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo ""
+ echo "=== All Track Table Mutation Tests PASSED ==="
+
+ ./shutdownall
+
+ cd ..
+done
+
+exit 0
diff --git a/src/test/regression/tests/044.track_table_mutation_watchdog/test.sh b/src/test/regression/tests/044.track_table_mutation_watchdog/test.sh
new file mode 100755
index 000000000..c50c213d6
--- /dev/null
+++ b/src/test/regression/tests/044.track_table_mutation_watchdog/test.sh
@@ -0,0 +1,184 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# Test script for track table mutation global cold start
+# on watchdog leader change.
+#
+# Uses $WATCHDOG_SETUP to create a 2-node watchdog cluster,
+# then verifies that when the leader is stopped the new
+# leader triggers a global cold start.
+#-------------------------------------------------------------------
+source $TESTLIBS
+TESTDIR=testdir
+PSQL=$PGBIN/psql
+success_count=0
+
+dir=`pwd`
+rm -fr $TESTDIR
+mkdir $TESTDIR
+cd $TESTDIR
+
+# Create 2-node watchdog cluster
+$WATCHDOG_SETUP -wn 2 || exit 1
+
+# Ensure per-node scripts are executable
+# (sed -i in watchdog_setup can strip permissions)
+chmod 755 pgpool*/startall pgpool*/shutdownall
+
+# Append track_table_mutation config to both nodes
+for i in 0 1
+do
+ cat >> pgpool${i}/etc/pgpool.conf <<EOF
+disable_load_balance_on_write = 'dml_adaptive_global'
+track_table_mutation_cold_start_duration = 2000
+enable_consensus_with_half_votes = on
+log_min_messages = debug1
+EOF
+done
+
+./startall
+export PCPPASSFILE=$dir/$TESTDIR/pgpool0/pcppass
+
+# Wait for watchdog lifecheck on node 0
+echo -n "waiting for watchdog node 0 starting up..."
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "lifecheck started" \
+ pgpool0/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ break
+ fi
+ sleep 2
+done
+echo "done."
+
+# Test 1: Verify leader came up
+echo "=== Test 1: Waiting for the pgpool leader... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "I am the cluster leader node" \
+ pgpool0/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 1 PASSED: Leader brought up."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 1 ]; then
+ echo "Test 1 FAILED: Leader did not start"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 2: Verify standby joined cluster
+echo "=== Test 2: Waiting for standby to join... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep "successfully joined the watchdog cluster" \
+ pgpool1/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 2 PASSED: Standby joined."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 2 ]; then
+ echo "Test 2 FAILED: Standby did not join"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 3: Verify track_table_mutation initialized
+echo "=== Test 3: Verify feature initialized ==="
+if grep -a "track_table_mutation: initialized" \
+ pgpool0/log/pgpool.log > /dev/null 2>&1; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 3 PASSED: Feature initialized."
+else
+ echo "Test 3 FAILED: Feature not initialized"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 4: Stop leader (pgpool0) to trigger failover
+echo "=== Test 4: Stopping leader... ==="
+cd pgpool0
+source ./bashrc.ports
+$PGPOOL_INSTALL_DIR/bin/pgpool \
+ -f etc/pgpool.conf -m f stop
+cd ..
+
+echo "Checking standby detected shutdown..."
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "is shutting down" \
+ pgpool1/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 4 PASSED: Shutdown detected."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 4 ]; then
+ echo "Test 4 FAILED: Shutdown not detected"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 5: Verify standby became new leader
+echo "=== Test 5: Checking standby takes over... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "I am the cluster leader node" \
+ pgpool1/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 5 PASSED: Standby became leader."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+if [ $success_count -lt 5 ]; then
+ echo "Test 5 FAILED: Standby did not become leader"
+ ./shutdownall
+ exit 1
+fi
+
+# Test 6: Verify global cold start was triggered
+echo "=== Test 6: Checking global cold start... ==="
+for i in 1 2 3 4 5 6 7 8 9 10
+do
+ grep -a "track_table_mutation: global cold start" \
+ pgpool1/log/pgpool.log > /dev/null 2>&1
+ if [ $? = 0 ]; then
+ success_count=$(( success_count + 1 ))
+ echo "Test 6 PASSED: Global cold start triggered."
+ break
+ fi
+ echo "[check] $i times"
+ sleep 2
+done
+
+# Cleanup
+./shutdownall
+
+echo ""
+echo "$success_count out of 6 successful"
+
+if test $success_count -eq 6
+then
+ echo "=== All Watchdog Tests PASSED ==="
+ exit 0
+fi
+
+exit 1
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 939200965..467ec114c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -519,6 +519,10 @@ TableLikeClause
TableSampleClause
TargetEntry
TokenizedLine
+TrackTableMutationEntry
+TrackTableMutationHashTable
+TrackTableMutationShmem
+TrackTableMutationState
TransactionId
TransactionStmt
TransactionStmtKind
diff --git a/src/utils/pool_track_table_mutation.c b/src/utils/pool_track_table_mutation.c
new file mode 100644
index 000000000..e7771e7bf
--- /dev/null
+++ b/src/utils/pool_track_table_mutation.c
@@ -0,0 +1,902 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_track_table_mutation.c: In-memory tracking of recently
+ * written tables to prevent stale reads from replicas.
+ *
+ * Based on the "lagless" architecture from Tailor Brands.
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include "pool.h"
+#include "pool_config.h"
+#include "context/pool_session_context.h"
+#include "utils/pool_track_table_mutation.h"
+#include "utils/elog.h"
+#include "utils/pool_ipc.h"
+#include "utils/palloc.h"
+#include "utils/pool_relcache.h"
+
+#define DATABASE_TO_OID_QUERY \
+ "SELECT oid FROM pg_catalog.pg_database" \
+ " WHERE datname = '%s'"
+
+/*
+ * Helper macro: true when the feature is not active.
+ */
+#define TRACK_TABLE_MUTATION_DISABLED() \
+ (pool_config->disable_load_balance_on_write != \
+ DLBOW_DML_ADAPTIVE_GLOBAL || \
+ track_table_mutation_shmem == NULL)
+
+/* ----------------
+ * Local variables
+ * ----------------
+ */
+
+/* Pointer to shared memory structure */
+static TrackTableMutationShmem *track_table_mutation_shmem = NULL;
+
+/* Per-process cold start tracking (not in shared memory) */
+static struct timeval process_start_time;
+static bool cold_start_initialized = false;
+
+/* ----------------
+ * Helper macros for flexible arrays in shared memory
+ * ----------------
+ */
+
+/* Get pointer to bucket array in table map */
+#define TABLE_MAP_BUCKETS(map) \
+ ((int *)((char *)(map) + \
+ sizeof(TrackTableMutationHashTable)))
+
+/* Get pointer to entry array in table map */
+#define TABLE_MAP_ENTRIES(map) \
+ ((TrackTableMutationEntry *)((char *)(map) + \
+ sizeof(TrackTableMutationHashTable) + \
+ (map)->num_buckets * sizeof(int)))
+
+/* ----------------
+ * Semaphore lock helpers
+ * ----------------
+ */
+
+static inline void
+table_map_lock(void)
+{
+ pool_semaphore_lock(TRACK_TABLE_MUTATION_TABLE_SEM);
+}
+
+static inline void
+table_map_unlock(void)
+{
+ pool_semaphore_unlock(TRACK_TABLE_MUTATION_TABLE_SEM);
+}
+
+/* ----------------
+ * Hash functions
+ * ----------------
+ */
+
+/*
+ * FNV-1a hash for table/database oid pair
+ */
+static uint32
+fnv1a_hash_table_key(int table_oid, int dboid)
+{
+ uint32 hash = 2166136261u; /* FNV offset basis */
+ uint32 data[2];
+ const unsigned char *bytes;
+ size_t i;
+
+ data[0] = (uint32) table_oid;
+ data[1] = (uint32) dboid;
+ bytes = (const unsigned char *) data;
+
+ for (i = 0; i < sizeof(data); i++)
+ {
+ hash ^= bytes[i];
+ hash *= 16777619u; /* FNV prime */
+ }
+
+ return hash;
+}
+
+/* ----------------
+ * Time utilities
+ * ----------------
+ */
+
+/*
+ * Get elapsed time in microseconds between two timevals
+ */
+static int64
+elapsed_us(struct timeval *start, struct timeval *end)
+{
+ return ((int64) (end->tv_sec - start->tv_sec) * 1000000)
+ + (end->tv_usec - start->tv_usec);
+}
+
+/*
+ * Get current time
+ */
+static void
+get_current_time(struct timeval *tv)
+{
+ gettimeofday(tv, NULL);
+}
+
+/* ----------------
+ * Database oid lookup
+ * ----------------
+ */
+
+static int
+track_table_mutation_get_database_oid_internal(void)
+{
+ int oid = 0;
+ static POOL_RELCACHE *relcache;
+ POOL_CONNECTION_POOL *backend;
+ POOL_SESSION_CONTEXT *session_context;
+
+ /* Safety check: must have shmem initialized */
+ if (track_table_mutation_shmem == NULL)
+ return oid;
+
+ session_context = pool_get_session_context(false);
+ if (session_context == NULL)
+ return oid;
+
+ backend = session_context->backend;
+ if (backend == NULL ||
+ MAIN_CONNECTION(backend) == NULL ||
+ MAIN_CONNECTION(backend)->sp == NULL)
+ return oid;
+
+ /* Ensure database name is valid */
+ if (MAIN_CONNECTION(backend)->sp->database == NULL)
+ return oid;
+
+ if (!relcache)
+ {
+ relcache = pool_create_relcache(
+ pool_config->relcache_size,
+ DATABASE_TO_OID_QUERY,
+ int_register_func,
+ int_unregister_func,
+ false);
+ if (relcache == NULL)
+ {
+ ereport(LOG,
+ (errmsg("track_table_mutation: "
+ "error creating relcache")));
+ return oid;
+ }
+ }
+
+ oid = (int) (intptr_t) pool_search_relcache(
+ relcache, backend,
+ MAIN_CONNECTION(backend)->sp->database);
+ return oid;
+}
+
+int
+pool_track_table_mutation_get_database_oid(void)
+{
+ return track_table_mutation_get_database_oid_internal();
+}
+
+/* ----------------
+ * Table mutation hash table operations
+ * ----------------
+ */
+
+/*
+ * Initialize table mutation hash table
+ */
+static void
+table_map_init(TrackTableMutationHashTable *map,
+ int num_buckets, int max_entries)
+{
+ int *buckets;
+ TrackTableMutationEntry *entries;
+ int i;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ map->num_buckets = num_buckets;
+ map->max_entries = max_entries;
+ map->num_entries = 0;
+ map->free_list_head = 0;
+
+ buckets = TABLE_MAP_BUCKETS(map);
+ entries = TABLE_MAP_ENTRIES(map);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = invalid;
+
+ /* Initialize free list - chain all entries */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ?
+ i + 1 : invalid;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "table map init %d buckets, "
+ "%d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Allocate an entry from the free list
+ */
+static int
+table_map_alloc_entry(TrackTableMutationHashTable *map)
+{
+ TrackTableMutationEntry *entries;
+ int idx;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ entries = TABLE_MAP_ENTRIES(map);
+
+ if (map->free_list_head == invalid)
+ return invalid;
+
+ idx = map->free_list_head;
+ map->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = invalid;
+ map->num_entries++;
+
+ return idx;
+}
+
+/*
+ * Free an entry back to the free list
+ */
+static void
+table_map_free_entry(TrackTableMutationHashTable *map,
+ int idx)
+{
+ TrackTableMutationEntry *entries;
+
+ entries = TABLE_MAP_ENTRIES(map);
+
+ entries[idx].in_use = false;
+ entries[idx].next = map->free_list_head;
+ map->free_list_head = idx;
+ map->num_entries--;
+}
+
+/*
+ * Look up a table in the hash table.
+ * Returns entry index or INVALID_INDEX if not found.
+ * Must be called with lock held.
+ */
+static int
+table_map_lookup(TrackTableMutationHashTable *map,
+ int table_oid, int dboid,
+ uint32 hash)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries;
+ int bucket = hash % map->num_buckets;
+ int idx = buckets[bucket];
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ entries = TABLE_MAP_ENTRIES(map);
+
+ while (idx != invalid)
+ {
+ if (entries[idx].hash == hash &&
+ entries[idx].table_oid == table_oid &&
+ entries[idx].dboid == dboid)
+ {
+ return idx;
+ }
+ idx = entries[idx].next;
+ }
+
+ return invalid;
+}
+
+/*
+ * Insert or update a table entry.
+ * Must be called with lock held.
+ */
+static void
+table_map_insert(TrackTableMutationHashTable *map,
+ int table_oid, int dboid,
+ uint32 hash,
+ struct timeval *write_time)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries;
+ int bucket = hash % map->num_buckets;
+ int idx;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ entries = TABLE_MAP_ENTRIES(map);
+
+ /* Check if entry already exists */
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != invalid)
+ {
+ /* Update last write time; keep first_write_time */
+ entries[idx].last_write_time = *write_time;
+ return;
+ }
+
+ /* Allocate new entry */
+ idx = table_map_alloc_entry(map);
+ if (idx == invalid)
+ {
+ int b;
+
+ /* Table is full - evict first non-empty bucket */
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ if (buckets[b] != invalid)
+ {
+ int victim = buckets[b];
+
+ buckets[b] = entries[victim].next;
+ table_map_free_entry(map, victim);
+ idx = table_map_alloc_entry(map);
+ break;
+ }
+ }
+
+ if (idx == invalid)
+ {
+ ereport(WARNING,
+ (errmsg("track_table_mutation: "
+ "failed to allocate entry "
+ "for oid %d (dboid %d)",
+ table_oid, dboid)));
+ return;
+ }
+ }
+
+ /* Initialize new entry */
+ entries[idx].table_oid = table_oid;
+ entries[idx].dboid = dboid;
+ entries[idx].hash = hash;
+ entries[idx].first_write_time = *write_time;
+ entries[idx].last_write_time = *write_time;
+
+ /* Insert at head of bucket chain */
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: "
+ "marked oid %d (dboid %d) written",
+ table_oid, dboid)));
+}
+
+/*
+ * Remove expired entries from the table map.
+ * Must be called with lock held.
+ */
+static void
+table_map_cleanup_expired(
+ TrackTableMutationHashTable *map, uint64 ttl_us)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TrackTableMutationEntry *entries;
+ struct timeval now;
+ int64 max_stale_us;
+ int removed = 0;
+ int b;
+ int invalid = TRACK_TABLE_MUTATION_INVALID_INDEX;
+
+ entries = TABLE_MAP_ENTRIES(map);
+ get_current_time(&now);
+
+ max_stale_us = (int64) pool_config
+ ->track_table_mutation_max_staleness * 1000LL;
+
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ int *prev_ptr = &buckets[b];
+ int idx = buckets[b];
+
+ while (idx != invalid)
+ {
+ int64 age;
+ int64 total_age;
+ bool expired;
+
+ age = elapsed_us(
+ &entries[idx].last_write_time, &now);
+ expired = (age > (int64) ttl_us);
+
+ /*
+ * Also evict entries that exceed max_staleness from first write.
+ */
+ if (!expired && max_stale_us > 0)
+ {
+ total_age = elapsed_us(
+ &entries[idx].first_write_time,
+ &now);
+ expired = (total_age >= max_stale_us);
+ }
+
+ if (expired)
+ {
+ /* Entry has expired - remove it */
+ int next = entries[idx].next;
+
+ *prev_ptr = next;
+ table_map_free_entry(map, idx);
+ idx = next;
+ removed++;
+ }
+ else
+ {
+ prev_ptr = &entries[idx].next;
+ idx = entries[idx].next;
+ }
+ }
+ }
+
+ if (removed > 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "cleaned up %d expired entries",
+ removed)));
+ }
+}
+
+
+/* ----------------
+ * Public API implementation
+ * ----------------
+ */
+
+/*
+ * Calculate the total shared memory size required
+ * for the track table mutation feature.
+ */
+Size
+pool_track_table_mutation_shmem_size(void)
+{
+ Size size = 0;
+ int tbl_bkt;
+ int tbl_sz;
+
+ tbl_bkt = pool_config->track_table_mutation_table_buckets;
+ tbl_sz = pool_config->track_table_mutation_table_size;
+
+ /* Main structure */
+ size += sizeof(TrackTableMutationShmem);
+
+ /* Table mutation hash table */
+ size += sizeof(TrackTableMutationHashTable);
+ size += tbl_bkt * sizeof(int);
+ size += tbl_sz * sizeof(TrackTableMutationEntry);
+
+ return size;
+}
+
+/*
+ * Initialize shared memory structures for the
+ * track table mutation feature. Allocates and sets
+ * up the table map and parse cache in shared memory.
+ * Called once from pgpool main process at startup.
+ */
+void
+pool_track_table_mutation_init(void)
+{
+#ifndef POOL_PRIVATE
+ Size shmem_size;
+ char *shmem_ptr;
+ TrackTableMutationState *st;
+ int tbl_bkt;
+ int tbl_sz;
+
+ if (pool_config->disable_load_balance_on_write !=
+ DLBOW_DML_ADAPTIVE_GLOBAL)
+ {
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "feature disabled")));
+ return;
+ }
+
+ tbl_bkt = pool_config->track_table_mutation_table_buckets;
+ tbl_sz = pool_config->track_table_mutation_table_size;
+
+ shmem_size = pool_track_table_mutation_shmem_size();
+
+ /*
+ * Allocate from the main shared memory segment. Memory is zeroed by
+ * initialize_shared_memory_main_segment().
+ */
+ shmem_ptr = pool_shared_memory_segment_get_chunk(
+ shmem_size);
+ if (shmem_ptr == NULL)
+ {
+ ereport(ERROR,
+ (errmsg("track_table_mutation: "
+ "failed to allocate %zu bytes",
+ shmem_size)));
+ return;
+ }
+
+ /* Set up pointers within shared memory */
+ track_table_mutation_shmem =
+ (TrackTableMutationShmem *) shmem_ptr;
+ shmem_ptr += sizeof(TrackTableMutationShmem);
+
+ track_table_mutation_shmem->table_map =
+ (TrackTableMutationHashTable *) shmem_ptr;
+
+ /* Initialize table map */
+ table_map_init(
+ track_table_mutation_shmem->table_map,
+ tbl_bkt, tbl_sz);
+
+ /* Initialize global state */
+ st = &track_table_mutation_shmem->state;
+ st->initialized = true;
+ st->current_ttl_us = TRACK_TABLE_MUTATION_DEFAULT_TTL_US;
+ get_current_time(&st->ttl_last_updated);
+ get_current_time(&st->last_cleanup_time);
+ st->global_cold_start_until.tv_sec = 0;
+ st->global_cold_start_until.tv_usec = 0;
+ st->stats_queries_checked = 0;
+ st->stats_forced_primary = 0;
+ st->stats_allowed_replica = 0;
+
+ ereport(LOG,
+ (errmsg("track_table_mutation: "
+ "initialized with %zu bytes shmem",
+ shmem_size)));
+#endif
+}
+
+/*
+ * Initialize per-child process state.
+ * Records the process start time for cold start
+ * period tracking. Called when a child process starts.
+ */
+void
+pool_track_table_mutation_child_init(void)
+{
+ int dur;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return;
+
+ get_current_time(&process_start_time);
+ cold_start_initialized = true;
+ dur = pool_config->track_table_mutation_cold_start_duration;
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "child init, cold start %d ms",
+ dur)));
+}
+
+/*
+ * Check if the process is in cold start period.
+ * During cold start, all queries are routed to
+ * primary to avoid stale reads. Checks both
+ * per-process and global (watchdog) cold start.
+ */
+bool
+pool_track_table_mutation_in_cold_start(void)
+{
+ struct timeval now;
+ int64 elapsed_ms;
+ int dur;
+ TrackTableMutationState *st;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return false;
+
+ dur = pool_config->track_table_mutation_cold_start_duration;
+ if (dur <= 0)
+ return false;
+
+ get_current_time(&now);
+ st = &track_table_mutation_shmem->state;
+
+ /* Check watchdog-triggered global cold start */
+ if (st->global_cold_start_until.tv_sec != 0 &&
+ elapsed_us(&now,
+ &st->global_cold_start_until) > 0)
+ {
+ return true;
+ }
+
+ /* Check per-process cold start */
+ if (!cold_start_initialized)
+ return false;
+
+ elapsed_ms = elapsed_us(&process_start_time, &now) / 1000;
+
+ if (elapsed_ms < dur)
+ {
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: "
+ "cold start (%ld/%d ms)",
+ (long) elapsed_ms, dur)));
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Trigger a global cold start for all processes.
+ * Sets the cold start end time in shared memory.
+ * Called after watchdog leader change to force all
+ * queries to primary during the transition.
+ */
+void
+pool_track_table_mutation_trigger_global_cold_start(void)
+{
+ struct timeval now;
+ struct timeval *until;
+ int dur;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return;
+
+ dur = pool_config->track_table_mutation_cold_start_duration;
+ if (dur <= 0)
+ return;
+
+ get_current_time(&now);
+ until = &track_table_mutation_shmem->state
+ .global_cold_start_until;
+ *until = now;
+ until->tv_sec += dur / 1000;
+ until->tv_usec += (dur % 1000) * 1000;
+ if (until->tv_usec >= 1000000)
+ {
+ until->tv_sec += until->tv_usec / 1000000;
+ until->tv_usec %= 1000000;
+ }
+
+ ereport(LOG,
+ (errmsg("track_table_mutation: "
+ "global cold start for %d ms",
+ dur)));
+}
+
+/*
+ * Check if a table was recently written (is "stale").
+ * Returns true if reads should go to primary because
+ * the table was written within the current TTL window.
+ */
+bool
+pool_track_table_mutation_table_is_stale(
+ int table_oid, int dboid)
+{
+ TrackTableMutationHashTable *map;
+ struct timeval now;
+ uint64 ttl_us;
+ uint32 hash;
+ int idx;
+ bool is_stale = false;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return false;
+
+ if (table_oid <= 0 || dboid <= 0)
+ {
+ is_stale = true;
+ goto update_stats;
+ }
+
+ map = track_table_mutation_shmem->table_map;
+ hash = fnv1a_hash_table_key(table_oid, dboid);
+
+ table_map_lock();
+
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != TRACK_TABLE_MUTATION_INVALID_INDEX)
+ {
+ TrackTableMutationEntry *entries;
+ int64 age;
+ int64 total_age;
+ int64 max_stale_us;
+
+ entries = TABLE_MAP_ENTRIES(map);
+ get_current_time(&now);
+ ttl_us = track_table_mutation_shmem->state
+ .current_ttl_us;
+
+ age = elapsed_us(
+ &entries[idx].last_write_time, &now);
+ is_stale = (age < (int64) ttl_us);
+
+ /*
+ * Enforce max_staleness hard cap: no entry can force primary routing
+ * longer than max_staleness from its first write.
+ */
+ if (is_stale)
+ {
+ max_stale_us = (int64) pool_config
+ ->track_table_mutation_max_staleness
+ * 1000LL;
+ if (max_stale_us > 0)
+ {
+ total_age = elapsed_us(
+ &entries[idx].first_write_time,
+ &now);
+ if (total_age >= max_stale_us)
+ is_stale = false;
+ }
+ }
+
+ ereport(DEBUG2,
+ (errmsg("track_table_mutation: "
+ "oid %d dboid %d "
+ "elapsed=%ld ttl=%lu stale=%d",
+ table_oid, dboid,
+ (long) age,
+ (unsigned long) ttl_us,
+ is_stale)));
+ }
+
+ table_map_unlock();
+
+update_stats:
+ /* Update statistics using semaphore */
+ if (track_table_mutation_shmem != NULL)
+ {
+ TrackTableMutationState *st;
+
+ table_map_lock();
+ st = &track_table_mutation_shmem->state;
+ st->stats_queries_checked++;
+ if (is_stale)
+ st->stats_forced_primary++;
+ else
+ st->stats_allowed_replica++;
+ table_map_unlock();
+ }
+
+ return is_stale;
+}
+
+/*
+ * Mark multiple tables as recently written.
+ * Called after DML queries complete to record
+ * which tables were modified.
+ */
+void
+pool_track_table_mutation_mark_tables_written(
+ const int *table_oids, int num_tables, int dboid)
+{
+ TrackTableMutationHashTable *map;
+ TrackTableMutationState *st;
+ struct timeval now;
+ int i;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return;
+
+ if (num_tables <= 0 || table_oids == NULL ||
+ dboid <= 0)
+ return;
+
+ map = track_table_mutation_shmem->table_map;
+ st = &track_table_mutation_shmem->state;
+ get_current_time(&now);
+
+ table_map_lock();
+
+ /* Periodically clean up expired entries */
+ if (map->num_entries > map->max_entries * 3 / 4)
+ {
+ int64 since_cleanup;
+
+ since_cleanup = elapsed_us(
+ &st->last_cleanup_time, &now);
+ /* 100ms interval */
+ if (since_cleanup > 100000)
+ {
+ table_map_cleanup_expired(
+ map, st->current_ttl_us);
+ st->last_cleanup_time = now;
+ }
+ }
+
+ for (i = 0; i < num_tables; i++)
+ {
+ uint32 hash;
+ int table_oid = table_oids[i];
+
+ if (table_oid > 0)
+ {
+ hash = fnv1a_hash_table_key(
+ table_oid, dboid);
+ table_map_insert(map, table_oid,
+ dboid, hash, &now);
+ }
+ }
+
+ table_map_unlock();
+}
+
+/*
+ * Mark a single table as recently written.
+ */
+void
+pool_track_table_mutation_mark_table_written(
+ int table_oid, int dboid)
+{
+ if (table_oid > 0 && dboid > 0)
+ {
+ const int tables[1] = {table_oid};
+
+ pool_track_table_mutation_mark_tables_written(
+ tables, 1, dboid);
+ }
+}
+
+/*
+ * Update the staleness TTL based on observed
+ * replication delay. New TTL = delay * factor,
+ * clamped to [default_ttl, 1 hour].
+ */
+void
+pool_track_table_mutation_update_ttl(uint64 delay_us)
+{
+ uint64 new_ttl;
+ double factor;
+ TrackTableMutationState *st;
+
+ if (TRACK_TABLE_MUTATION_DISABLED())
+ return;
+
+ factor = pool_config->track_table_mutation_ttl_factor;
+ new_ttl = (uint64) (delay_us * factor);
+ if (new_ttl < TRACK_TABLE_MUTATION_DEFAULT_TTL_US)
+ new_ttl = TRACK_TABLE_MUTATION_DEFAULT_TTL_US;
+
+ /* Maximum TTL of 1 hour */
+ if (new_ttl > 3600ULL * 1000000ULL)
+ new_ttl = 3600ULL * 1000000ULL;
+
+ st = &track_table_mutation_shmem->state;
+ st->current_ttl_us = new_ttl;
+ get_current_time(&st->ttl_last_updated);
+
+ ereport(DEBUG1,
+ (errmsg("track_table_mutation: "
+ "TTL=%lu us (delay=%lu factor=%.1f)",
+ (unsigned long) new_ttl,
+ (unsigned long) delay_us,
+ factor)));
+}
--
2.53.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
In-Reply-To: <CACeKOO1-FUo1E=1iUeQeua7XrVzYVoy2FawQM4d3SG-X5f2mdQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox