public inbox for [email protected]
help / color / mirror / Atom feedFrom: Nadav Shatz <[email protected]>
To: Tatsuo Ishii <[email protected]>
Cc: [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
Date: Sat, 31 Jan 2026 19:11:14 +0200
Message-ID: <CACeKOO1W=NxxqxBFtVa++skatF67DawxJwF5R6iMnqYw=OHoHA@mail.gmail.com> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<[email protected]>
<CACeKOO36wVT2k5b9G2J4XYvjNQTYf-ARoh8Lp-jbh7Yo8Qo+-w@mail.gmail.com>
<[email protected]>
Thank you for the comments!
I agree with all of them. Let me know what you think of the changes and new
naming.
Please see attached another version of the patch.
On Fri, Jan 30, 2026 at 10:10 AM Tatsuo Ishii <[email protected]> wrote:
> > yes indeed, please find attached.
>
> Thanks. Here some comments on the patch:
>
> - It seems you use a table name (and schema) for a key to identify the
> TableMutationEntry and other objects. I think you should use table
> oids for the key because the same table name could exists in
> different schema. Moreover, if the database is different from the
> database when the map entry was created, a map look up could return
> incorrect result. In summary the key should be table oid and
> database oid (which is already done by query cache subsystem).
>
> - In the patch spin lock primitives are introduced. Why can't we use
> semaphore instead? A spin lock uses busy loop, which could increase
> the system load if the duration of locking becomes longer.
>
> - What would happen if the leader watchdog fails and other watchdog
> node take the place of the leader role?
>
> - pool_memory_map_get_ttl() and pool_memory_map_get_stats() are
> defined but are not used anywhere. Why do you have them?
>
> - I think "memory_map" is a too generic name. Can we use more specific
> name for the feature?
>
> Best regards,
> --
> Tatsuo Ishii
> SRA OSS K.K.
> English: http://www.sraoss.co.jp/index_en/
> Japanese:http://www.sraoss.co.jp
>
--
Nadav Shatz
Tailor Brands | CTO
Attachments:
[application/octet-stream] mutated_table.patch (76.5K, 3-mutated_table.patch)
download | inline diff:
From 403ed46f0d2b33858c05a25d74be2b027db7d21b Mon Sep 17 00:00:00 2001
From: Nadav Shatz <[email protected]>
Date: Tue, 6 Jan 2026 12:41:50 +0200
Subject: [PATCH] Feature: add in-memory table tracking to prevent stale reads
from replicas
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Implement "memory map" feature that tracks recently-written database
tables in shared memory to prevent stale reads during replication lag.
When a write (INSERT/UPDATE/DELETE) occurs on a table, that table is
marked as "dirty" for a configurable TTL period. Any SELECT on a dirty
table within the TTL window is routed to primary instead of replica.
Key features:
- Shared memory hash table for tracking table mutations with TTL
- Query parse cache with LRU eviction for performance
- Cold start protection (routes all queries to primary initially)
- Automatic TTL calculation: replication_delay × configurable factor
- Per-table staleness tracking with microsecond precision
New configuration parameters:
- memory_map_enabled: Enable/disable the feature (default: off)
- memory_map_ttl_factor: TTL multiplier for replication delay (default: 5.0)
- memory_map_cold_start_duration: Cold start period in ms (default: 2000)
- memory_map_table_buckets: Hash buckets for table map (default: 1024)
- memory_map_table_size: Max tracked tables (default: 2048)
- memory_map_query_buckets: Hash buckets for query cache (default: 2048)
- memory_map_query_cache_size: Max cached queries (default: 10000)
diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml
index ee19fabebab2210cd4abe59a711a036ac0ac8943..dce8dec199371e3a24d92baaad6647757b7edf5f 100644
--- a/doc/src/sgml/loadbalance.sgml
+++ b/doc/src/sgml/loadbalance.sgml
@@ -1193,4 +1193,214 @@ dml_adaptive_object_relationship_list = 'table_1:table_2'
</variablelist>
</sect2>
+
+ <sect2 id="runtime-config-table-mutation-map">
+ <title>Table Mutation Map Configuration (Lagless Replica Reads)</title>
+
+ <para>
+ These parameters configure the table mutation map feature, which tracks recently written tables
+ to prevent stale reads from replica nodes during replication lag. This implements the
+ "lagless" architecture pattern for distributed systems with read replicas.
+ </para>
+
+ <para>
+ When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period
+ (<literal>replication_delay * table_mutation_map_ttl_factor</literal>). Any SELECT queries on stale tables are routed
+ to the primary node instead of replicas, ensuring read-after-write consistency.
+ </para>
+
+ <para>
+ This feature requires <xref linkend="guc-replication-delay-source-cmd"> to be configured
+ for monitoring replication delay from replicas.
+ </para>
+
+ <warning>
+ <para>
+ Enabling the table mutation map feature increases shared memory consumption. With default settings,
+ the feature requires approximately 6.4 MB of shared memory (0.1 MB for table tracking + 6.3 MB for query cache).
+ Memory usage scales with configuration parameters:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ Table tracking: <literal>table_mutation_map_table_size * 40 bytes</literal> (default: 2048 * 40 = ~80 KB)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ Query cache: <literal>table_mutation_map_query_cache_size * 640 bytes</literal> (default: 10000 * 640 = ~6.3 MB)
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ For high-traffic systems with large cache sizes (e.g., <literal>table_mutation_map_query_cache_size = 100000</literal>),
+ memory usage can reach 64 MB or more. Consider your system's available shared memory when enabling this feature.
+ </para>
+ </warning>
+
+ <variablelist>
+
+ <varlistentry id="guc-table-mutation-map-enabled" xreflabel="table_mutation_map_enabled">
+ <term><varname>table_mutation_map_enabled</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>table_mutation_map_enabled</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables in-memory tracking of recently written tables. When enabled, tables are marked
+ as stale after write operations, and reads are routed to primary until the TTL expires.
+ </para>
+ <para>
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ Default is <literal>off</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-table-mutation-map-ttl-factor" xreflabel="table_mutation_map_ttl_factor">
+ <term><varname>table_mutation_map_ttl_factor</varname> (<type>floating point</type>)
+ <indexterm>
+ <primary><varname>table_mutation_map_ttl_factor</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Multiplier for calculating the TTL: <literal>TTL = replication_delay * table_mutation_map_ttl_factor</literal>.
+ Higher values provide more safety margin but may reduce read replica utilization.
+ </para>
+ <para>
+ Valid range: 1.0-100.0. Default is <literal>5.0</literal>.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-table-mutation-map-cold-start-duration" xreflabel="table_mutation_map_cold_start_duration">
+ <term><varname>table_mutation_map_cold_start_duration</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>table_mutation_map_cold_start_duration</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Duration in milliseconds to route all queries to primary after a child process starts.
+ This prevents stale reads when a new connection is established before the table mutation map
+ is populated with recent write history.
+ </para>
+ <para>
+ When watchdog is enabled and the local node becomes the leader, Pgpool-II also triggers a
+ global cold start for this duration to avoid stale reads after leadership changes.
+ </para>
+ <para>
+ Valid range: 0-60000 ms. Default is <literal>2000</literal> (2 seconds).
+ Set to 0 to disable cold start behavior.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-table-mutation-map-table-buckets" xreflabel="table_mutation_map_table_buckets">
+ <term><varname>table_mutation_map_table_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>table_mutation_map_table_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the table mutation tracking hash table.
+ Higher values reduce hash collisions and improve lookup performance.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>1024</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-table-mutation-map-table-size" xreflabel="table_mutation_map_table_size">
+ <term><varname>table_mutation_map_table_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>table_mutation_map_table_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of tables that can be tracked simultaneously in the table mutation map.
+ When full, oldest entries are evicted using a simple eviction strategy.
+ </para>
+ <para>
+ Valid range: 128-131072. Default is <literal>2048</literal>.
+ Memory usage: approximately 40 bytes per entry.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-table-mutation-map-query-buckets" xreflabel="table_mutation_map_query_buckets">
+ <term><varname>table_mutation_map_query_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>table_mutation_map_query_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the query parse cache. The cache stores normalized
+ query strings mapped to their table dependencies to avoid repeated parsing.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>2048</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-table-mutation-map-query-cache-size" xreflabel="table_mutation_map_query_cache_size">
+ <term><varname>table_mutation_map_query_cache_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>table_mutation_map_query_cache_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of query parse results to cache. Uses LRU eviction when full.
+ Larger caches reduce parsing overhead but consume more shared memory.
+ </para>
+ <para>
+ Valid range: 100-1000000. Default is <literal>10000</literal>.
+ Memory usage: approximately 640 bytes per entry (~6.3 MB for default, ~64 MB for 100000 entries).
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+
+ <sect3 id="runtime-config-table-mutation-map-example">
+ <title>Table Mutation Map Configuration Example</title>
+ <para>
+ To enable table mutation map with replication delay monitoring:
+ </para>
+ <programlisting>
+# Enable table mutation map feature
+table_mutation_map_enabled = on
+table_mutation_map_ttl_factor = 5.0
+table_mutation_map_cold_start_duration = 2000
+
+# Configure external replication delay monitoring
+replication_delay_source_cmd = '/path/to/get-replication-delay.sh'
+replication_delay_source_timeout = 10
+
+# Adjust cache sizes based on workload (increases memory usage)
+table_mutation_map_table_size = 4096 # Track up to 4096 tables (~160 KB)
+table_mutation_map_query_cache_size = 50000 # Cache 50k queries (~31 MB)
+ </programlisting>
+ <para>
+ Total shared memory required for above configuration: approximately 31.2 MB (31 MB query cache + 0.2 MB table map + overhead).
+ Default configuration (10000 query cache entries, 2048 tables) requires approximately 6.4 MB.
+ </para>
+ </sect3>
+
+ </sect2>
+
</sect1>
diff --git a/src/Makefile.am b/src/Makefile.am
index 4678ab53055e828a37b6477801640aff17ff84a7..fc69bb98c8907d23855837cefaad0a972b4e2171 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \
rewrite/pool_timestamp.c \
rewrite/pool_lobj.c \
utils/pool_select_walker.c \
+ utils/pool_table_mutation_map.c \
utils/strlcpy.c \
utils/psprintf.c \
utils/pool_params.c \
diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c
index 68abb7f41cb96d856c824a148842748bfb7a4d12..099191af7629c0ca145628e9a9e9ac92c4bb2f6e 100644
--- a/src/config/pool_config_variables.c
+++ b/src/config/pool_config_variables.c
@@ -783,6 +783,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"table_mutation_map_enabled", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Enable in-memory tracking of recently written tables to avoid stale reads from replicas",
+ CONFIG_VAR_TYPE_BOOL, false, 0
+ },
+ &g_pool_config.table_mutation_map_enabled,
+ false,
+ NULL, NULL, NULL
+ },
+
{
{"auto_failback", CFGCXT_RELOAD, FAILOVER_CONFIG,
"Enables nodes automatically reattach, when detached node continue streaming replication.",
@@ -1757,6 +1767,17 @@ static struct config_int_array ConfigureNamesIntArray[] =
static struct config_double ConfigureNamesDouble[] =
{
+ {
+ {"table_mutation_map_ttl_factor", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "TTL multiplier for table mutation map (TTL = replication_delay * factor)",
+ CONFIG_VAR_TYPE_DOUBLE, false, 0
+ },
+ &g_pool_config.table_mutation_map_ttl_factor,
+ 5.0, /* boot value: 5x replication delay */
+ 1.0, 100.0, /* min, max */
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_DOUBLE
};
@@ -2355,6 +2376,61 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"table_mutation_map_cold_start_duration", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Duration in milliseconds to force queries to primary after child process starts.",
+ CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS
+ },
+ &g_pool_config.table_mutation_map_cold_start_duration,
+ 2000, /* 2 seconds */
+ 0, 60000, /* 0 to 60 seconds */
+ NULL, NULL, NULL
+ },
+
+ {
+ {"table_mutation_map_table_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for table mutation map.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.table_mutation_map_table_buckets,
+ 1024,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"table_mutation_map_table_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in table mutation map.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.table_mutation_map_table_size,
+ 2048,
+ 128, 131072,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"table_mutation_map_query_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.table_mutation_map_query_buckets,
+ 2048,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"table_mutation_map_query_cache_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.table_mutation_map_query_cache_size,
+ 10000,
+ 100, 1000000,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_INT
};
diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c
index 7cf9813eb7d58678bc86a0aaa38bd3c6445b6687..2dbbee8abce8daff6a98bf8f202bdc10bf324006 100644
--- a/src/context/pool_query_context.c
+++ b/src/context/pool_query_context.c
@@ -29,6 +29,7 @@
#include "utils/statistics.h"
#include "utils/pool_select_walker.h"
#include "utils/pool_stream.h"
+#include "utils/pool_table_mutation_map.h"
#include "context/pool_session_context.h"
#include "context/pool_query_context.h"
#include "parser/nodes.h"
@@ -2010,6 +2011,19 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
if (dest == POOL_PRIMARY)
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+
+ /*
+ * Resolve table and database OIDs now to populate relcache.
+ * This avoids potential hangs in CommandComplete where we shouldn't
+ * be running new queries against the backend.
+ */
+ if (pool_config->table_mutation_map_enabled &&
+ (IsA(node, InsertStmt) || IsA(node, UpdateStmt) || IsA(node, DeleteStmt)))
+ {
+ int *oids;
+ pool_extract_table_oids(node, &oids);
+ pool_table_mutation_map_get_database_oid();
+ }
}
/* Should be sent to both primary and standby? */
else if (dest == POOL_BOTH)
@@ -2139,6 +2153,107 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
}
+ /*
+ * Check table mutation map for recently written tables.
+ * If in cold start or any table was recently written,
+ * route to primary to avoid stale reads.
+ */
+ else if (pool_config->table_mutation_map_enabled)
+ {
+ bool force_primary = false;
+
+ /* During cold start, route everything to primary */
+ if (pool_table_mutation_map_in_cold_start())
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because of table mutation map cold start"),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ }
+ else
+ {
+ /* Extract table oids and check if any are stale */
+ SelectContext ctx;
+ int dboid;
+ int num_oids;
+ int i;
+
+ memset(&ctx, 0, sizeof(ctx));
+ num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
+ if (num_oids > 0)
+ {
+ dboid = pool_table_mutation_map_get_database_oid();
+
+ if (dboid <= 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because database oid was unavailable"),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ }
+ else
+ {
+ for (i = 0; i < num_oids; i++)
+ {
+ if (pool_table_mutation_map_table_is_stale(ctx.table_oids[i], dboid))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because table \"%s\" was recently written",
+ ctx.table_names[i]),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if (force_primary)
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ else
+ {
+ /* Proceed with load balancing */
+ if (pool_config->statement_level_load_balance)
+ {
+ session_context->load_balance_node_id = select_load_balancing_node();
+ }
+
+ /*
+ * As streaming replication delay is too much, if
+ * prefer_lower_delay_standby is true then elect new load
+ * balance node which is lowest delayed, false then send
+ * to the primary.
+ */
+ if (STREAM && check_replication_delay(session_context->load_balance_node_id))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because of too much replication delay"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ if (pool_config->prefer_lower_delay_standby)
+ {
+ int new_load_balancing_node = select_load_balancing_node();
+
+ session_context->load_balance_node_id = new_load_balancing_node;
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id);
+ }
+ else
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ }
+ else
+ {
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context,
+ session_context->query_context->load_balance_node_id);
+ }
+ }
+ }
else
{
if (pool_config->statement_level_load_balance)
diff --git a/src/include/pool.h b/src/include/pool.h
index ea6f87e120af866b8ed3a15790d9d8a8e009fe91..c0d8170a8d6c23afdaa1e30dcf7c4bd4d88e7edd 100644
--- a/src/include/pool.h
+++ b/src/include/pool.h
@@ -424,7 +424,7 @@ typedef enum
#define Min(x, y) ((x) < (y) ? (x) : (y))
-#define MAX_NUM_SEMAPHORES 8
+#define MAX_NUM_SEMAPHORES 10
#define CONN_COUNTER_SEM 0
#define REQUEST_INFO_SEM 1
#define QUERY_CACHE_STATS_SEM 2
@@ -434,6 +434,8 @@ typedef enum
#define FOLLOW_PRIMARY_SEM 6
#define MAIN_EXIT_HANDLER_SEM 7 /* used in exit_hander in pgpool main
* process */
+#define TABLE_MUTATION_MAP_TABLE_SEM 8
+#define TABLE_MUTATION_MAP_QUERY_SEM 9
#define MAX_REQUEST_QUEUE_SIZE 10
#define MAX_SEC_WAIT_FOR_CLUSTER_TRANSACTION 10 /* time in seconds to keep
diff --git a/src/include/pool_config.h b/src/include/pool_config.h
index 741de6cc5fc3368f813d6b6efa68eb7f8a79506b..e6c727823dedeedd0225420b66be8382f5bb83fe 100644
--- a/src/include/pool_config.h
+++ b/src/include/pool_config.h
@@ -365,6 +365,16 @@ typedef struct
* replication check */
char *replication_delay_source_cmd; /* external command for replication delay */
int replication_delay_source_timeout; /* timeout for external command in seconds */
+
+ /* Table mutation map configuration for tracking recently written tables */
+ bool table_mutation_map_enabled; /* Enable in-memory table tracking */
+ double table_mutation_map_ttl_factor; /* TTL multiplier for replication delay */
+ int table_mutation_map_cold_start_duration; /* Cold start duration in ms */
+ int table_mutation_map_table_buckets; /* Number of hash buckets for table map */
+ int table_mutation_map_table_size; /* Max entries in table map */
+ int table_mutation_map_query_buckets; /* Number of hash buckets for query cache */
+ int table_mutation_map_query_cache_size; /* Max entries in query cache */
+
char *failover_command; /* execute command when failover happens */
char *follow_primary_command; /* execute command when failover is
* ended */
diff --git a/src/include/utils/pool_table_mutation_map.h b/src/include/utils/pool_table_mutation_map.h
new file mode 100644
index 0000000000000000000000000000000000000000..4c96cb9085107a2682be2948b83f83835fe555c8
--- /dev/null
+++ b/src/include/utils/pool_table_mutation_map.h
@@ -0,0 +1,237 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_table_mutation_map.h: In-memory tracking of recently written tables
+ * to avoid stale reads from replicas during replication lag
+ */
+
+#ifndef POOL_TABLE_MUTATION_MAP_H
+#define POOL_TABLE_MUTATION_MAP_H
+
+#include "pool.h"
+#include <sys/time.h>
+
+/*
+ * Maximum table name length including schema: "schema"."table"
+ * Using NAMEDATALEN * 2 + 4 for quotes and dot
+ */
+#define TABLE_MUTATION_MAP_TABLE_NAME_LEN (NAMEDATALEN * 2 + 4)
+
+/*
+ * Maximum number of tables we track per query
+ */
+#define TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY 8
+
+/*
+ * Invalid index marker for linked lists
+ */
+#define TABLE_MUTATION_MAP_INVALID_INDEX (-1)
+
+/*
+ * Default TTL in microseconds (100ms) used when replication delay is unknown
+ */
+#define TABLE_MUTATION_MAP_DEFAULT_TTL_US (100 * 1000)
+
+/*
+ * Entry in the table mutation hash table (keyed by table/database oids)
+ */
+typedef struct TableMutationEntry
+{
+ int table_oid; /* Table oid */
+ int dboid; /* Database oid */
+ struct timeval last_write_time; /* When the table was last written */
+ uint32 hash; /* Pre-computed hash value */
+ int next; /* Next entry in collision chain (-1 if none) */
+ bool in_use; /* Is this entry in use? */
+} TableMutationEntry;
+
+/*
+ * Header for the table mutation hash table in shared memory
+ */
+typedef struct TableMutationHashTable
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * TableMutationEntry entries[max_entries];
+ */
+} TableMutationHashTable;
+
+/*
+ * Entry in the query parse cache
+ */
+typedef struct QueryParseEntry
+{
+ uint64 query_hash; /* Hash of normalized query */
+ bool is_write; /* True if INSERT/UPDATE/DELETE */
+ int num_tables; /* Number of tables in query */
+ char table_names[TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY][TABLE_MUTATION_MAP_TABLE_NAME_LEN];
+ int next; /* Next entry in collision chain */
+ int lru_prev; /* Previous in LRU list */
+ int lru_next; /* Next in LRU list */
+ bool in_use; /* Is this entry in use? */
+} QueryParseEntry;
+
+/*
+ * Header for the query parse cache in shared memory
+ */
+typedef struct QueryParseCache
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ int lru_head; /* Most recently used */
+ int lru_tail; /* Least recently used */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * QueryParseEntry entries[max_entries];
+ */
+} QueryParseCache;
+
+/*
+ * Global state for table mutation map feature
+ */
+typedef struct TableMutationMapState
+{
+ bool initialized; /* Has shared memory been initialized? */
+ uint64 current_ttl_us; /* Current TTL in microseconds */
+ struct timeval ttl_last_updated; /* When TTL was last updated */
+ struct timeval last_cleanup_time; /* When last expired cleanup ran */
+ struct timeval global_cold_start_until; /* Global cold start end time */
+ volatile uint32 stats_queries_checked; /* Number of queries checked */
+ volatile uint32 stats_forced_primary; /* Queries forced to primary */
+ volatile uint32 stats_allowed_replica; /* Queries allowed to replica */
+} TableMutationMapState;
+
+/*
+ * Main shared memory structure containing all components
+ */
+typedef struct TableMutationMapShmem
+{
+ TableMutationMapState state;
+ TableMutationHashTable *table_map;
+ QueryParseCache *query_cache;
+} TableMutationMapShmem;
+
+/* ----------------
+ * Public API functions
+ * ----------------
+ */
+
+/*
+ * Initialize shared memory structures for table mutation map.
+ * Called from pgpool_main.c after pool_init_pool_info().
+ */
+extern void pool_table_mutation_map_init(void);
+
+/*
+ * Initialize per-child process state for table mutation map.
+ * Called from child.c when a new child process starts.
+ * Sets up cold start tracking.
+ */
+extern void pool_table_mutation_map_child_init(void);
+
+/*
+ * Check if the child process is in cold start period.
+ * During cold start, all queries are routed to primary.
+ * Returns true if in cold start, false otherwise.
+ */
+extern bool pool_table_mutation_map_in_cold_start(void);
+
+/*
+ * Trigger a global cold start period for all processes.
+ * Used after watchdog leader change to avoid stale reads.
+ */
+extern void pool_table_mutation_map_trigger_global_cold_start(void);
+
+/*
+ * Get oid of current database.
+ */
+extern int pool_table_mutation_map_get_database_oid(void);
+
+/*
+ * Check if a table was recently written to (is "stale").
+ * If stale, reads from this table should go to primary.
+ * Returns true if table is stale (recently written), false otherwise.
+ */
+extern bool pool_table_mutation_map_table_is_stale(int table_oid, int dboid);
+
+/*
+ * Mark tables as recently written.
+ * Called after INSERT/UPDATE/DELETE queries complete.
+ * table_oids: array of table oids
+ * num_tables: number of tables in array
+ * dboid: database oid
+ */
+extern void pool_table_mutation_map_mark_tables_written(const int *table_oids, int num_tables, int dboid);
+
+/*
+ * Convenience function to mark a single table as written.
+ * table_oid: table oid
+ * dboid: database oid
+ */
+extern void pool_table_mutation_map_mark_table_written(int table_oid, int dboid);
+
+/*
+ * Update the TTL based on current replication delay.
+ * Called from pool_worker_child.c when replication delay is updated.
+ * delay_us: replication delay in microseconds
+ */
+extern void pool_table_mutation_map_update_ttl(uint64 delay_us);
+
+/*
+ * Look up cached parse result for a query.
+ * hash: hash of normalized query
+ * is_write: output - true if query is a write
+ * table_names: output - array to fill with table names
+ * num_tables: output - number of tables found
+ * Returns true if found in cache, false otherwise.
+ */
+extern bool pool_table_mutation_map_get_cached_parse(uint64 hash, bool *is_write,
+ char table_names[][TABLE_MUTATION_MAP_TABLE_NAME_LEN],
+ int *num_tables);
+
+/*
+ * Cache a parse result for a query.
+ * hash: hash of normalized query
+ * is_write: true if query is a write
+ * table_names: array of table names
+ * num_tables: number of tables
+ */
+extern void pool_table_mutation_map_cache_parse(uint64 hash, bool is_write,
+ const char table_names[][TABLE_MUTATION_MAP_TABLE_NAME_LEN],
+ int num_tables);
+
+/*
+ * Normalize a query and compute its hash.
+ * Strips comments, normalizes whitespace and literals.
+ * query: input SQL query string
+ * Returns: 64-bit hash of normalized query
+ */
+extern uint64 pool_table_mutation_map_normalize_and_hash(const char *query);
+
+/*
+ * Calculate required shared memory size for table mutation map.
+ */
+extern Size pool_table_mutation_map_shmem_size(void);
+
+#endif /* POOL_TABLE_MUTATION_MAP_H */
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index fa05e15e7ac435e072298063f918c70aa4e5680c..87dc2c4f09a62e1cd680b8020975e3ecf0813ec0 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -57,6 +57,7 @@
#include "auth/pool_passwd.h"
#include "auth/pool_hba.h"
#include "query_cache/pool_memqcache.h"
+#include "utils/pool_table_mutation_map.h"
#include "watchdog/wd_internal_commands.h"
#include "watchdog/wd_lifecheck.h"
#include "watchdog/watchdog.h"
@@ -1485,11 +1486,14 @@ sigusr1_interrupt_processor(void)
if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED])
{
+ WD_STATES wd_state;
+
ereport(LOG,
(errmsg("Pgpool-II parent process received watchdog state change signal from watchdog")));
user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false;
- if (wd_internal_get_watchdog_local_node_state() == WD_STANDBY)
+ wd_state = wd_internal_get_watchdog_local_node_state();
+ if (wd_state == WD_STANDBY)
{
ereport(LOG,
(errmsg("we have joined the watchdog cluster as STANDBY node"),
@@ -1503,6 +1507,10 @@ sigusr1_interrupt_processor(void)
*/
pool_release_follow_primary_lock(true);
}
+ else if (wd_state == WD_COORDINATOR && pool_config->table_mutation_map_enabled)
+ {
+ pool_table_mutation_map_trigger_global_cold_start();
+ }
}
if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT])
{
@@ -3068,6 +3076,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size()));
}
+ if (pool_config->table_mutation_map_enabled)
+ {
+ size += MAXALIGN(pool_table_mutation_map_shmem_size());
+ elog(DEBUG1, "table_mutation_map: %zu bytes requested for shared memory", MAXALIGN(pool_table_mutation_map_shmem_size()));
+ }
+
initialize_shared_memory_main_segment(size);
/* Move the backend descriptors to shared memory */
@@ -3184,6 +3198,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
wd_ipc_initialize_data();
}
+ /* Initialize table mutation map for tracking recently written tables */
+ if (pool_config->table_mutation_map_enabled)
+ {
+ pool_table_mutation_map_init();
+ }
+
}
/*
diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c
index a3b8f0ea194ffecc79e58566be80562a46eb75ab..2de467496194dd219437eb3721ba9d8c8f999bb6 100644
--- a/src/protocol/CommandComplete.c
+++ b/src/protocol/CommandComplete.c
@@ -38,6 +38,8 @@
#include "utils/palloc.h"
#include "utils/memutils.h"
#include "utils/pool_stream.h"
+#include "utils/pool_table_mutation_map.h"
+#include "utils/pool_select_walker.h"
static int extract_ntuples(char *message);
static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete);
@@ -304,6 +306,45 @@ handle_query_context(POOL_CONNECTION_POOL *backend)
node = session_context->query_context->parse_tree;
+ /*
+ * Track table writes for table mutation map feature.
+ * Mark tables as written when INSERT/UPDATE/DELETE completes.
+ */
+ if (pool_config->table_mutation_map_enabled)
+ {
+ char *table_name = NULL;
+ int table_oid = 0;
+ int dboid = 0;
+
+ if (IsA(node, InsertStmt))
+ {
+ InsertStmt *stmt = (InsertStmt *) node;
+ table_name = make_table_name_from_rangevar(stmt->relation);
+ }
+ else if (IsA(node, UpdateStmt))
+ {
+ UpdateStmt *stmt = (UpdateStmt *) node;
+ table_name = make_table_name_from_rangevar(stmt->relation);
+ }
+ else if (IsA(node, DeleteStmt))
+ {
+ DeleteStmt *stmt = (DeleteStmt *) node;
+ table_name = make_table_name_from_rangevar(stmt->relation);
+ }
+
+ if (table_name != NULL)
+ {
+ table_oid = pool_table_name_to_oid(table_name);
+ dboid = pool_table_mutation_map_get_database_oid();
+ if (table_oid > 0 && dboid > 0)
+ {
+ pool_table_mutation_map_mark_table_written(table_oid, dboid);
+ ereport(DEBUG1,
+ (errmsg("table_mutation_map: marked table \"%s\" as written", table_name)));
+ }
+ }
+ }
+
if (IsA(node, PrepareStmt))
{
if (session_context->uncompleted_message)
diff --git a/src/protocol/child.c b/src/protocol/child.c
index c34f057281be62feaf39db1bb605062f56dc398c..26d7cf1d1a6768c109850a43b57373141f9f7eaf 100644
--- a/src/protocol/child.c
+++ b/src/protocol/child.c
@@ -57,6 +57,7 @@
#include "utils/elog.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
+#include "utils/pool_table_mutation_map.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -213,6 +214,12 @@ do_child(int *fds)
/* Initialize per process context */
pool_init_process_context();
+ /* Initialize table mutation map child state for cold start tracking */
+ if (pool_config->table_mutation_map_enabled)
+ {
+ pool_table_mutation_map_child_init();
+ }
+
/* initialize connection pool */
if (pool_init_cp())
{
diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream
index 454fdb9e5d1fd65437b6a67f12ab62658ea08f49..46052bad37bbd1f4affec8e08e5cecd3d4903976 100644
--- a/src/sample/pgpool.conf.sample-stream
+++ b/src/sample/pgpool.conf.sample-stream
@@ -499,6 +499,51 @@ backend_clustering_mode = streaming_replication
#statement_level_load_balance = off
# Enables statement level load balancing
+# - Table Mutation Map (Lagless Read Replica) -
+ # WARNING: Enabling this feature increases shared memory usage
+ # Default settings require ~6.4 MB shared memory
+ # (0.1 MB table tracking + 6.3 MB query cache)
+
+#table_mutation_map_enabled = off
+ # Enable in-memory tracking of recently written tables
+ # to prevent stale reads from replicas during replication lag
+ # (change requires reload)
+
+#table_mutation_map_ttl_factor = 5.0
+ # TTL multiplier: TTL = replication_delay * factor
+ # Higher values provide more safety margin
+ # Range: 1.0-100.0 (default: 5.0)
+ # (change requires reload)
+
+#table_mutation_map_cold_start_duration = 2000
+ # Duration in milliseconds to route all queries to primary
+ # after child process starts (cold start period)
+ # Range: 0-60000 ms (default: 2000 ms = 2 seconds)
+ # Set to 0 to disable cold start behavior
+ # (change requires reload)
+
+#table_mutation_map_table_buckets = 1024
+ # Number of hash buckets for table mutation tracking
+ # Higher values reduce hash collisions
+ # Range: 64-65536 (default: 1024)
+ # (change requires restart)
+
+#table_mutation_map_table_size = 2048
+ # Maximum number of tables to track simultaneously
+ # Range: 128-131072 (default: 2048)
+ # (change requires restart)
+
+#table_mutation_map_query_buckets = 2048
+ # Number of hash buckets for query parse cache
+ # Range: 64-65536 (default: 2048)
+ # (change requires restart)
+
+#table_mutation_map_query_cache_size = 10000
+ # Maximum number of query parse results to cache
+ # Range: 100-1000000 (default: 10000)
+ # Memory usage: ~640 bytes per entry (~6.3 MB default, ~64 MB for 100000)
+ # (change requires restart)
+
#------------------------------------------------------------------------------
# STREAMING REPLICATION MODE
#------------------------------------------------------------------------------
diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c
index 311b638658e66ebb56162ad9fa4392315b2df64e..38bd217be1972af57f80c26c8d726aad704d56bd 100644
--- a/src/streaming_replication/pool_worker_child.c
+++ b/src/streaming_replication/pool_worker_child.c
@@ -58,6 +58,7 @@
#include "utils/pool_ip.h"
#include "utils/ps_status.h"
#include "utils/pool_stream.h"
+#include "utils/pool_table_mutation_map.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -695,6 +696,7 @@ check_replication_time_lag_with_cmd(void)
double delay_ms;
uint64 delay;
uint64 delay_threshold_by_time;
+ uint64 max_delay_us = 0; /* Track maximum delay for table mutation map */
int token_count = 0;
int primary_node_id;
int save_errno;
@@ -1003,6 +1005,10 @@ check_replication_time_lag_with_cmd(void)
bkinfo->standby_delay = delay;
bkinfo->standby_delay_by_time = true;
+ /* Track maximum delay for table mutation map TTL calculation */
+ if (delay > max_delay_us)
+ max_delay_us = delay;
+
/*
* Log delay if necessary. threshold is in milliseconds, convert
* to microseconds.
@@ -1021,6 +1027,10 @@ check_replication_time_lag_with_cmd(void)
token = strtok_r(NULL, " \t\n", &saveptr);
}
+ /* Update table mutation map TTL based on maximum observed delay */
+ if (pool_config->table_mutation_map_enabled && max_delay_us > 0)
+ pool_table_mutation_map_update_ttl(max_delay_us);
+
}
PG_CATCH();
{
diff --git a/src/test/regression/tests/045.table_mutation_map/test.sh b/src/test/regression/tests/045.table_mutation_map/test.sh
new file mode 100755
index 0000000000000000000000000000000000000000..e0f229ee88a70e4643df1745a6e6992b867354ae
--- /dev/null
+++ b/src/test/regression/tests/045.table_mutation_map/test.sh
@@ -0,0 +1,228 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# test script for table mutation map feature (in-memory table tracking).
+# Tests routing of queries based on recently written tables.
+#
+source $TESTLIBS
+TESTDIR=testdir
+PSQL=$PGBIN/psql
+PSQLOPTS="-a -q -X"
+PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin
+export PGDATABASE=test
+
+# Only run in streaming replication mode since that's the target use case
+for mode in s
+do
+ rm -fr $TESTDIR
+ mkdir $TESTDIR
+ cd $TESTDIR
+
+ # Create test environment with 2 nodes
+ echo -n "creating test environment..."
+ $PGPOOL_SETUP -m $mode -n 2 || exit 1
+ echo "done."
+
+ source ./bashrc.ports
+
+ # Configure table mutation map feature
+ echo "table_mutation_map_enabled = on" >> etc/pgpool.conf
+ echo "table_mutation_map_ttl_factor = 5.0" >> etc/pgpool.conf
+ echo "table_mutation_map_cold_start_duration = 2000" >> etc/pgpool.conf
+
+ # Configure weights so we can distinguish routing
+ # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1
+ # This means load balanced queries go to node 1 by default
+ echo "backend_weight0 = 0" >> etc/pgpool.conf
+ echo "backend_weight1 = 1" >> etc/pgpool.conf
+
+ # Enable debug logging to see routing decisions
+ echo "log_min_messages = debug1" >> etc/pgpool.conf
+
+ ./startall
+
+ export PGPORT=$PGPOOL_PORT
+
+ wait_for_pgpool_startup
+
+ # Create test tables
+ $PSQL test <<EOF
+CREATE TABLE t1(i INTEGER);
+CREATE TABLE t2(i INTEGER);
+CREATE TABLE t3(i INTEGER);
+EOF
+
+ echo "=== Test 1: Cold Start Routing ==="
+ # During cold start, all queries should go to primary
+ # Restart pgpool to trigger cold start
+ ./shutdownall
+ ./startall
+ wait_for_pgpool_startup
+
+ # Immediately query - should go to primary due to cold start
+ $PSQL test -c "SELECT 'cold_start_test' as marker, * FROM t1;" > /dev/null 2>&1
+
+ # Check log for cold start message
+ if grep -q "could not load balance because of table mutation map cold start" log/pgpool.log; then
+ echo "Test 1 PASSED: Cold start routing works"
+ else
+ echo "Test 1 FAILED: Cold start routing not detected"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 2: Wait for cold start to end ==="
+ # Wait for cold start period to end (2 seconds)
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Now a clean table query should load balance (go to node 1)
+ $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1
+
+ # After cold start, queries to clean tables should load balance
+ # Check that it did NOT get forced to primary due to table mutation map
+ if grep -q "could not load balance because of table mutation map cold start" log/pgpool.log; then
+ echo "Test 2 FAILED: Still in cold start after waiting"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 2 PASSED: Cold start ended correctly"
+
+ echo "=== Test 3: Write-then-Read Routing ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Write to t1
+ $PSQL test -c "INSERT INTO t1 VALUES (1);" > /dev/null 2>&1
+
+ # Immediately read from t1 - should go to primary due to recent write
+ $PSQL test -c "SELECT 'write_read_test' as marker, * FROM t1;" > /dev/null 2>&1
+
+ # Check log for table staleness message
+ if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 3 PASSED: Write-then-read routing works"
+ else
+ echo "Test 3 FAILED: Table staleness not detected after write"
+ # Show relevant log entries for debugging
+ grep -i "table_mutation_map" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 4: Clean Table Still Load Balances ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Read from t2 (never written to) - should load balance
+ $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1
+
+ # Should NOT see table mutation map blocking message for t2
+ if grep -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then
+ echo "Test 4 FAILED: Clean table incorrectly marked as stale"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 4 PASSED: Clean tables still load balance"
+
+ echo "=== Test 5: UPDATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Update t2
+ $PSQL test -c "UPDATE t2 SET i = 999 WHERE i = 0;" > /dev/null 2>&1
+
+ # Immediately read from t2 - should go to primary
+ $PSQL test -c "SELECT 'update_test' as marker, * FROM t2;" > /dev/null 2>&1
+
+ if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 5 PASSED: UPDATE marks table as stale"
+ else
+ echo "Test 5 FAILED: UPDATE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 6: DELETE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Delete from t3
+ $PSQL test -c "DELETE FROM t3 WHERE i = 0;" > /dev/null 2>&1
+
+ # Immediately read from t3 - should go to primary
+ $PSQL test -c "SELECT 'delete_test' as marker, * FROM t3;" > /dev/null 2>&1
+
+ if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 6 PASSED: DELETE marks table as stale"
+ else
+ echo "Test 6 FAILED: DELETE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 7: Multi-Table Query with One Stale Table ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a new clean table
+ $PSQL test -c "CREATE TABLE t4(i INTEGER);" > /dev/null 2>&1
+
+ # Wait a bit for TTL to expire on other tables if factor is low
+ sleep 1
+
+ # Write to t1 only
+ $PSQL test -c "INSERT INTO t1 VALUES (100);" > /dev/null 2>&1
+
+ # Query joining t1 and t4 - should go to primary because t1 is stale
+ $PSQL test -c "SELECT 'multi_table_test' as marker FROM t1, t4;" > /dev/null 2>&1
+
+ if grep -q "could not load balance because table.*t1.*was recently written" log/pgpool.log; then
+ echo "Test 7 PASSED: Multi-table query routes to primary when one table is stale"
+ else
+ echo "Test 7 FAILED: Multi-table staleness not detected"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 8: Different Databases with Same Table Name ==="
+ # Create another database and a table with the same name
+ $PSQL test -c "CREATE DATABASE test2;" > /dev/null 2>&1
+ $PSQL test2 -c "CREATE TABLE t1(i INTEGER);" > /dev/null 2>&1
+
+ # Wait for TTL to expire
+ sleep 1
+
+ # Write to t1 in 'test' database
+ $PSQL test -c "INSERT INTO t1 VALUES (500);" > /dev/null 2>&1
+
+ # Read from t1 in 'test2' database - should load balance (Node 1)
+ # because it's a different database, even if table name is same
+ > log/pgpool.log
+ $PSQL test2 -c "SELECT 'diff_db_test' as marker, * FROM t1;" > /dev/null 2>&1
+
+ if grep -q "could not load balance because table.*t1.*was recently written" log/pgpool.log; then
+ echo "Test 8 FAILED: Table marked as stale in wrong database"
+ ./shutdownall
+ exit 1
+ fi
+
+ # Read from t1 in 'test' database - should go to primary
+ $PSQL test -c "SELECT 'same_db_test' as marker, * FROM t1;" > /dev/null 2>&1
+ if grep -q "could not load balance because table.*t1.*was recently written" log/pgpool.log; then
+ echo "Test 8 PASSED: Correctly distinguishes between databases"
+ else
+ echo "Test 8 FAILED: Table staleness not detected in correct database"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo ""
+ echo "=== All Table Mutation Map Tests PASSED ==="
+
+ ./shutdownall
+
+ cd ..
+done
+
+exit 0
diff --git a/src/utils/pool_table_mutation_map.c b/src/utils/pool_table_mutation_map.c
new file mode 100644
index 0000000000000000000000000000000000000000..300c230ad18aa2204a09974d13ecf8e8958ff36f
--- /dev/null
+++ b/src/utils/pool_table_mutation_map.c
@@ -0,0 +1,1166 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_table_mutation_map.c: In-memory tracking of recently written tables
+ * to avoid stale reads from replicas during replication lag
+ *
+ * Based on the "lagless" architecture from Tailor Brands:
+ * https://medium.com/tailor-tech/using-database-read-replicas-in-distributed-systems-d80eaf6bbf8a
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include "pool.h"
+#include "pool_config.h"
+#include "context/pool_session_context.h"
+#include "utils/pool_table_mutation_map.h"
+#include "utils/elog.h"
+#include "utils/pool_ipc.h"
+#include "utils/palloc.h"
+#include "utils/pool_relcache.h"
+
+#define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_catalog.pg_database WHERE datname = '%s'"
+
+/* ----------------
+ * Local variables
+ * ----------------
+ */
+
+/* Pointer to shared memory structure */
+static TableMutationMapShmem *table_mutation_map_shmem = NULL;
+
+/* Per-process cold start tracking (not in shared memory) */
+static struct timeval process_start_time;
+static bool cold_start_initialized = false;
+
+/* ----------------
+ * Helper macros for accessing flexible arrays in shared memory
+ * ----------------
+ */
+
+/* Get pointer to bucket array in table map */
+#define TABLE_MAP_BUCKETS(map) \
+ ((int *)((char *)(map) + sizeof(TableMutationHashTable)))
+
+/* Get pointer to entry array in table map */
+#define TABLE_MAP_ENTRIES(map) \
+ ((TableMutationEntry *)((char *)(map) + sizeof(TableMutationHashTable) + \
+ (map)->num_buckets * sizeof(int)))
+
+/* Get pointer to bucket array in query cache */
+#define QUERY_CACHE_BUCKETS(cache) \
+ ((int *)((char *)(cache) + sizeof(QueryParseCache)))
+
+/* Get pointer to entry array in query cache */
+#define QUERY_CACHE_ENTRIES(cache) \
+ ((QueryParseEntry *)((char *)(cache) + sizeof(QueryParseCache) + \
+ (cache)->num_buckets * sizeof(int)))
+
+/* ----------------
+ * Semaphore lock helpers
+ * ----------------
+ */
+
+static inline void
+table_map_lock(void)
+{
+ pool_semaphore_lock(TABLE_MUTATION_MAP_TABLE_SEM);
+}
+
+static inline void
+table_map_unlock(void)
+{
+ pool_semaphore_unlock(TABLE_MUTATION_MAP_TABLE_SEM);
+}
+
+static inline void
+query_cache_lock(void)
+{
+ pool_semaphore_lock(TABLE_MUTATION_MAP_QUERY_SEM);
+}
+
+static inline void
+query_cache_unlock(void)
+{
+ pool_semaphore_unlock(TABLE_MUTATION_MAP_QUERY_SEM);
+}
+
+/* ----------------
+ * Hash functions
+ * ----------------
+ */
+
+/*
+ * FNV-1a hash for table/database oid pair
+ */
+static uint32
+fnv1a_hash_table_key(int table_oid, int dboid)
+{
+ uint32 hash = 2166136261u; /* FNV offset basis */
+ uint32 data[2];
+ const unsigned char *bytes;
+ size_t i;
+
+ data[0] = (uint32) table_oid;
+ data[1] = (uint32) dboid;
+ bytes = (const unsigned char *) data;
+
+ for (i = 0; i < sizeof(data); i++)
+ {
+ hash ^= bytes[i];
+ hash *= 16777619u; /* FNV prime */
+ }
+
+ return hash;
+}
+
+/*
+ * FNV-1a hash for 64-bit value
+ */
+static uint64
+fnv1a_hash_64(const char *str, size_t len)
+{
+ uint64 hash = 14695981039346656037ULL; /* FNV offset basis for 64-bit */
+ size_t i;
+
+ for (i = 0; i < len; i++)
+ {
+ hash ^= (uint8)str[i];
+ hash *= 1099511628211ULL; /* FNV prime for 64-bit */
+ }
+
+ return hash;
+}
+
+/* ----------------
+ * Time utilities
+ * ----------------
+ */
+
+/*
+ * Get elapsed time in microseconds between two timevals
+ */
+static int64
+elapsed_us(struct timeval *start, struct timeval *end)
+{
+ return ((int64)(end->tv_sec - start->tv_sec) * 1000000) +
+ (end->tv_usec - start->tv_usec);
+}
+
+/*
+ * Get current time
+ */
+static void
+get_current_time(struct timeval *tv)
+{
+ gettimeofday(tv, NULL);
+}
+
+/* ----------------
+ * Database oid lookup
+ * ----------------
+ */
+
+static int
+table_mutation_map_get_database_oid_internal(void)
+{
+ int oid = 0;
+ static POOL_RELCACHE *relcache;
+ POOL_CONNECTION_POOL *backend;
+
+ backend = pool_get_session_context(false)->backend;
+ if (backend == NULL || MAIN_CONNECTION(backend) == NULL || MAIN_CONNECTION(backend)->sp == NULL)
+ return oid;
+
+ if (!relcache)
+ {
+ relcache = pool_create_relcache(pool_config->relcache_size,
+ DATABASE_TO_OID_QUERY,
+ int_register_func,
+ int_unregister_func,
+ false);
+ if (relcache == NULL)
+ {
+ ereport(LOG,
+ (errmsg("table_mutation_map: error creating relcache while getting database OID")));
+ return oid;
+ }
+ }
+
+ oid = (int) (intptr_t) pool_search_relcache(relcache, backend,
+ MAIN_CONNECTION(backend)->sp->database);
+ return oid;
+}
+
+int
+pool_table_mutation_map_get_database_oid(void)
+{
+ return table_mutation_map_get_database_oid_internal();
+}
+
+/* ----------------
+ * Table mutation hash table operations
+ * ----------------
+ */
+
+/*
+ * Initialize table mutation hash table
+ */
+static void
+table_map_init(TableMutationHashTable *map, int num_buckets, int max_entries)
+{
+ int *buckets;
+ TableMutationEntry *entries;
+ int i;
+
+ map->num_buckets = num_buckets;
+ map->max_entries = max_entries;
+ map->num_entries = 0;
+ map->free_list_head = 0;
+
+ buckets = TABLE_MAP_BUCKETS(map);
+ entries = TABLE_MAP_ENTRIES(map);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = TABLE_MUTATION_MAP_INVALID_INDEX;
+
+ /* Initialize free list - chain all entries */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ? i + 1 : TABLE_MUTATION_MAP_INVALID_INDEX;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("table_mutation_map: initialized table map with %d buckets, %d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Allocate an entry from the free list
+ */
+static int
+table_map_alloc_entry(TableMutationHashTable *map)
+{
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int idx;
+
+ if (map->free_list_head == TABLE_MUTATION_MAP_INVALID_INDEX)
+ return TABLE_MUTATION_MAP_INVALID_INDEX;
+
+ idx = map->free_list_head;
+ map->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = TABLE_MUTATION_MAP_INVALID_INDEX;
+ map->num_entries++;
+
+ return idx;
+}
+
+/*
+ * Free an entry back to the free list
+ */
+static void
+table_map_free_entry(TableMutationHashTable *map, int idx)
+{
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+
+ entries[idx].in_use = false;
+ entries[idx].next = map->free_list_head;
+ map->free_list_head = idx;
+ map->num_entries--;
+}
+
+/*
+ * Look up a table in the hash table
+ * Returns entry index or TABLE_MUTATION_MAP_INVALID_INDEX if not found
+ * Must be called with lock held
+ */
+static int
+table_map_lookup(TableMutationHashTable *map, int table_oid, int dboid, uint32 hash)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int bucket = hash % map->num_buckets;
+ int idx = buckets[bucket];
+
+ while (idx != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ if (entries[idx].hash == hash &&
+ entries[idx].table_oid == table_oid &&
+ entries[idx].dboid == dboid)
+ {
+ return idx;
+ }
+ idx = entries[idx].next;
+ }
+
+ return TABLE_MUTATION_MAP_INVALID_INDEX;
+}
+
+/*
+ * Insert or update a table entry
+ * Must be called with lock held
+ */
+static void
+table_map_insert(TableMutationHashTable *map, int table_oid, int dboid,
+ uint32 hash, struct timeval *write_time)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int bucket = hash % map->num_buckets;
+ int idx;
+
+ /* Check if entry already exists */
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ /* Update existing entry */
+ entries[idx].last_write_time = *write_time;
+ return;
+ }
+
+ /* Allocate new entry */
+ idx = table_map_alloc_entry(map);
+ if (idx == TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ int b;
+ /* Table is full - evict an entry */
+ /* For simplicity, just use the first entry in first non-empty bucket */
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ if (buckets[b] != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ int victim = buckets[b];
+ buckets[b] = entries[victim].next;
+ table_map_free_entry(map, victim);
+ idx = table_map_alloc_entry(map);
+ break;
+ }
+ }
+
+ if (idx == TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ ereport(WARNING,
+ (errmsg("table_mutation_map: failed to allocate entry for table oid %d (dboid %d)",
+ table_oid, dboid)));
+ return;
+ }
+ }
+
+ /* Initialize new entry */
+ entries[idx].table_oid = table_oid;
+ entries[idx].dboid = dboid;
+ entries[idx].hash = hash;
+ entries[idx].last_write_time = *write_time;
+
+ /* Insert at head of bucket chain */
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ ereport(DEBUG2,
+ (errmsg("table_mutation_map: marked table oid %d (dboid %d) as written",
+ table_oid, dboid)));
+}
+
+/*
+ * Remove expired entries from the table map
+ * Must be called with lock held
+ */
+static void
+table_map_cleanup_expired(TableMutationHashTable *map, uint64 ttl_us)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ struct timeval now;
+ int removed = 0;
+ int b;
+
+ get_current_time(&now);
+
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ int *prev_ptr = &buckets[b];
+ int idx = buckets[b];
+
+ while (idx != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now);
+
+ if (elapsed > (int64)ttl_us)
+ {
+ /* Entry has expired - remove it */
+ int next = entries[idx].next;
+ *prev_ptr = next;
+ table_map_free_entry(map, idx);
+ idx = next;
+ removed++;
+ }
+ else
+ {
+ prev_ptr = &entries[idx].next;
+ idx = entries[idx].next;
+ }
+ }
+ }
+
+ if (removed > 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("table_mutation_map: cleaned up %d expired table entries", removed)));
+ }
+}
+
+/* ----------------
+ * Query parse cache operations
+ * ----------------
+ */
+
+/*
+ * Initialize query parse cache
+ */
+static void
+query_cache_init(QueryParseCache *cache, int num_buckets, int max_entries)
+{
+ int *buckets;
+ QueryParseEntry *entries;
+ int i;
+
+ cache->num_buckets = num_buckets;
+ cache->max_entries = max_entries;
+ cache->num_entries = 0;
+ cache->free_list_head = 0;
+ cache->lru_head = TABLE_MUTATION_MAP_INVALID_INDEX;
+ cache->lru_tail = TABLE_MUTATION_MAP_INVALID_INDEX;
+
+ buckets = QUERY_CACHE_BUCKETS(cache);
+ entries = QUERY_CACHE_ENTRIES(cache);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = TABLE_MUTATION_MAP_INVALID_INDEX;
+
+ /* Initialize free list */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ? i + 1 : TABLE_MUTATION_MAP_INVALID_INDEX;
+ entries[i].lru_prev = TABLE_MUTATION_MAP_INVALID_INDEX;
+ entries[i].lru_next = TABLE_MUTATION_MAP_INVALID_INDEX;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("table_mutation_map: initialized query cache with %d buckets, %d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Move entry to front of LRU list (most recently used)
+ */
+static void
+query_cache_lru_touch(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ /* Already at head? */
+ if (cache->lru_head == idx)
+ return;
+
+ /* Remove from current position */
+ if (entries[idx].lru_prev != TABLE_MUTATION_MAP_INVALID_INDEX)
+ entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next;
+ if (entries[idx].lru_next != TABLE_MUTATION_MAP_INVALID_INDEX)
+ entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev;
+ if (cache->lru_tail == idx)
+ cache->lru_tail = entries[idx].lru_prev;
+
+ /* Insert at head */
+ entries[idx].lru_prev = TABLE_MUTATION_MAP_INVALID_INDEX;
+ entries[idx].lru_next = cache->lru_head;
+ if (cache->lru_head != TABLE_MUTATION_MAP_INVALID_INDEX)
+ entries[cache->lru_head].lru_prev = idx;
+ cache->lru_head = idx;
+ if (cache->lru_tail == TABLE_MUTATION_MAP_INVALID_INDEX)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Add entry to LRU list (at head)
+ */
+static void
+query_cache_lru_add(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ entries[idx].lru_prev = TABLE_MUTATION_MAP_INVALID_INDEX;
+ entries[idx].lru_next = cache->lru_head;
+
+ if (cache->lru_head != TABLE_MUTATION_MAP_INVALID_INDEX)
+ entries[cache->lru_head].lru_prev = idx;
+
+ cache->lru_head = idx;
+
+ if (cache->lru_tail == TABLE_MUTATION_MAP_INVALID_INDEX)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Remove entry from LRU list
+ */
+static void
+query_cache_lru_remove(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ if (entries[idx].lru_prev != TABLE_MUTATION_MAP_INVALID_INDEX)
+ entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next;
+ else
+ cache->lru_head = entries[idx].lru_next;
+
+ if (entries[idx].lru_next != TABLE_MUTATION_MAP_INVALID_INDEX)
+ entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev;
+ else
+ cache->lru_tail = entries[idx].lru_prev;
+
+ entries[idx].lru_prev = TABLE_MUTATION_MAP_INVALID_INDEX;
+ entries[idx].lru_next = TABLE_MUTATION_MAP_INVALID_INDEX;
+}
+
+/*
+ * Allocate entry from free list, evicting LRU if necessary
+ */
+static int
+query_cache_alloc_entry(QueryParseCache *cache)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int *buckets = QUERY_CACHE_BUCKETS(cache);
+ int idx;
+
+ if (cache->free_list_head != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ idx = cache->free_list_head;
+ cache->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = TABLE_MUTATION_MAP_INVALID_INDEX;
+ cache->num_entries++;
+ return idx;
+ }
+
+ /* No free entries - evict LRU */
+ if (cache->lru_tail == TABLE_MUTATION_MAP_INVALID_INDEX)
+ return TABLE_MUTATION_MAP_INVALID_INDEX;
+
+ idx = cache->lru_tail;
+
+ /* Remove from hash bucket */
+ int bucket = entries[idx].query_hash % cache->num_buckets;
+ int *prev_ptr = &buckets[bucket];
+ int curr = buckets[bucket];
+
+ while (curr != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ if (curr == idx)
+ {
+ *prev_ptr = entries[curr].next;
+ break;
+ }
+ prev_ptr = &entries[curr].next;
+ curr = entries[curr].next;
+ }
+
+ /* Remove from LRU list */
+ query_cache_lru_remove(cache, idx);
+
+ /* Reinitialize entry */
+ entries[idx].in_use = true;
+ entries[idx].next = TABLE_MUTATION_MAP_INVALID_INDEX;
+
+ return idx;
+}
+
+/*
+ * Look up a query in the cache
+ */
+static int
+query_cache_lookup(QueryParseCache *cache, uint64 hash)
+{
+ int *buckets = QUERY_CACHE_BUCKETS(cache);
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int bucket = hash % cache->num_buckets;
+ int idx = buckets[bucket];
+
+ while (idx != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ if (entries[idx].query_hash == hash)
+ return idx;
+ idx = entries[idx].next;
+ }
+
+ return TABLE_MUTATION_MAP_INVALID_INDEX;
+}
+
+/* ----------------
+ * Query normalization
+ * ----------------
+ */
+
+/*
+ * Simple query normalization:
+ * - Strip comments (-- style and C-style block comments)
+ * - Collapse whitespace
+ * - Convert to lowercase (except inside strings)
+ * - Replace literal values with placeholders
+ *
+ * This is a simplified version - pgpool2 already does this elsewhere,
+ * but we need a standalone version for the table mutation map feature.
+ */
+static size_t
+normalize_query(const char *query, char *output, size_t output_size)
+{
+ const char *src = query;
+ char *dst = output;
+ char *dst_end = output + output_size - 1;
+ bool in_string = false;
+ char string_char = 0;
+ bool last_was_space = true; /* Start true to skip leading space */
+
+ while (*src && dst < dst_end)
+ {
+ /* Handle string literals */
+ if (in_string)
+ {
+ if (*src == string_char)
+ {
+ if (*(src + 1) == string_char)
+ {
+ /* Escaped quote */
+ src += 2;
+ continue;
+ }
+ in_string = false;
+ *dst++ = '$'; /* Replace string content with placeholder */
+ }
+ src++;
+ continue;
+ }
+
+ /* Check for string start */
+ if (*src == '\'' || *src == '"')
+ {
+ in_string = true;
+ string_char = *src;
+ src++;
+ continue;
+ }
+
+ /* Handle single-line comments */
+ if (*src == '-' && *(src + 1) == '-')
+ {
+ while (*src && *src != '\n')
+ src++;
+ continue;
+ }
+
+ /* Handle multi-line comments */
+ if (*src == '/' && *(src + 1) == '*')
+ {
+ src += 2;
+ while (*src && !(*src == '*' && *(src + 1) == '/'))
+ src++;
+ if (*src)
+ src += 2;
+ continue;
+ }
+
+ /* Handle whitespace */
+ if (*src == ' ' || *src == '\t' || *src == '\n' || *src == '\r')
+ {
+ if (!last_was_space)
+ {
+ *dst++ = ' ';
+ last_was_space = true;
+ }
+ src++;
+ continue;
+ }
+
+ /* Handle numbers - replace with placeholder */
+ if ((*src >= '0' && *src <= '9') ||
+ (*src == '.' && *(src + 1) >= '0' && *(src + 1) <= '9'))
+ {
+ while (*src && ((*src >= '0' && *src <= '9') || *src == '.'))
+ src++;
+ if (!last_was_space && dst > output && *(dst - 1) != '$')
+ *dst++ = '$';
+ last_was_space = false;
+ continue;
+ }
+
+ /* Regular character - convert to lowercase */
+ if (*src >= 'A' && *src <= 'Z')
+ *dst++ = *src + 32;
+ else
+ *dst++ = *src;
+
+ last_was_space = false;
+ src++;
+ }
+
+ /* Remove trailing space */
+ if (dst > output && *(dst - 1) == ' ')
+ dst--;
+
+ *dst = '\0';
+ return dst - output;
+}
+
+/* ----------------
+ * Public API implementation
+ * ----------------
+ */
+
+Size
+pool_table_mutation_map_shmem_size(void)
+{
+ Size size = 0;
+ int table_buckets = pool_config->table_mutation_map_table_buckets;
+ int table_size = pool_config->table_mutation_map_table_size;
+ int query_buckets = pool_config->table_mutation_map_query_buckets;
+ int query_cache_size = pool_config->table_mutation_map_query_cache_size;
+
+ /* Main structure */
+ size += sizeof(TableMutationMapShmem);
+
+ /* Table mutation hash table */
+ size += sizeof(TableMutationHashTable);
+ size += table_buckets * sizeof(int); /* buckets array */
+ size += table_size * sizeof(TableMutationEntry); /* entries array */
+
+ /* Query parse cache */
+ size += sizeof(QueryParseCache);
+ size += query_buckets * sizeof(int); /* buckets array */
+ size += query_cache_size * sizeof(QueryParseEntry); /* entries array */
+
+ return size;
+}
+
+void
+pool_table_mutation_map_init(void)
+{
+#ifndef POOL_PRIVATE
+ Size shmem_size;
+ char *shmem_ptr;
+
+ if (!pool_config->table_mutation_map_enabled)
+ {
+ ereport(DEBUG1,
+ (errmsg("table_mutation_map: feature disabled")));
+ return;
+ }
+
+ shmem_size = pool_table_mutation_map_shmem_size();
+
+ /*
+ * Allocate from the main shared memory segment.
+ * Memory is already zeroed by initialize_shared_memory_main_segment().
+ */
+ shmem_ptr = pool_shared_memory_segment_get_chunk(shmem_size);
+ if (shmem_ptr == NULL)
+ {
+ ereport(ERROR,
+ (errmsg("table_mutation_map: failed to allocate %zu bytes of shared memory",
+ shmem_size)));
+ return;
+ }
+
+ /* Set up pointers to structures within shared memory */
+ table_mutation_map_shmem = (TableMutationMapShmem *)shmem_ptr;
+ shmem_ptr += sizeof(TableMutationMapShmem);
+
+ table_mutation_map_shmem->table_map = (TableMutationHashTable *)shmem_ptr;
+ shmem_ptr += sizeof(TableMutationHashTable);
+ shmem_ptr += pool_config->table_mutation_map_table_buckets * sizeof(int);
+ shmem_ptr += pool_config->table_mutation_map_table_size * sizeof(TableMutationEntry);
+
+ table_mutation_map_shmem->query_cache = (QueryParseCache *)shmem_ptr;
+
+ /* Initialize structures */
+ table_map_init(table_mutation_map_shmem->table_map,
+ pool_config->table_mutation_map_table_buckets,
+ pool_config->table_mutation_map_table_size);
+
+ query_cache_init(table_mutation_map_shmem->query_cache,
+ pool_config->table_mutation_map_query_buckets,
+ pool_config->table_mutation_map_query_cache_size);
+
+ /* Initialize global state */
+ table_mutation_map_shmem->state.initialized = true;
+ table_mutation_map_shmem->state.current_ttl_us = TABLE_MUTATION_MAP_DEFAULT_TTL_US;
+ get_current_time(&table_mutation_map_shmem->state.ttl_last_updated);
+ get_current_time(&table_mutation_map_shmem->state.last_cleanup_time);
+ table_mutation_map_shmem->state.global_cold_start_until.tv_sec = 0;
+ table_mutation_map_shmem->state.global_cold_start_until.tv_usec = 0;
+ table_mutation_map_shmem->state.stats_queries_checked = 0;
+ table_mutation_map_shmem->state.stats_forced_primary = 0;
+ table_mutation_map_shmem->state.stats_allowed_replica = 0;
+
+ ereport(LOG,
+ (errmsg("table_mutation_map: initialized with %zu bytes shared memory",
+ shmem_size)));
+#endif
+}
+
+void
+pool_table_mutation_map_child_init(void)
+{
+ if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL)
+ return;
+
+ get_current_time(&process_start_time);
+ cold_start_initialized = true;
+
+ ereport(DEBUG1,
+ (errmsg("table_mutation_map: child initialized, cold start period %d ms",
+ pool_config->table_mutation_map_cold_start_duration)));
+}
+
+bool
+pool_table_mutation_map_in_cold_start(void)
+{
+ struct timeval now;
+ int64 elapsed_ms;
+
+ if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL)
+ return false;
+
+ if (pool_config->table_mutation_map_cold_start_duration <= 0)
+ return false;
+
+ get_current_time(&now);
+
+ if (table_mutation_map_shmem->state.global_cold_start_until.tv_sec != 0 &&
+ elapsed_us(&now, &table_mutation_map_shmem->state.global_cold_start_until) > 0)
+ {
+ return true;
+ }
+
+ if (!cold_start_initialized)
+ return false;
+
+ elapsed_ms = elapsed_us(&process_start_time, &now) / 1000;
+
+ if (elapsed_ms < pool_config->table_mutation_map_cold_start_duration)
+ {
+ ereport(DEBUG2,
+ (errmsg("table_mutation_map: in cold start (%ld/%d ms)",
+ (long)elapsed_ms, pool_config->table_mutation_map_cold_start_duration)));
+ return true;
+ }
+
+ return false;
+}
+
+void
+pool_table_mutation_map_trigger_global_cold_start(void)
+{
+ struct timeval now;
+ int duration_ms;
+
+ if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL)
+ return;
+
+ duration_ms = pool_config->table_mutation_map_cold_start_duration;
+ if (duration_ms <= 0)
+ return;
+
+ get_current_time(&now);
+ table_mutation_map_shmem->state.global_cold_start_until = now;
+ table_mutation_map_shmem->state.global_cold_start_until.tv_sec += duration_ms / 1000;
+ table_mutation_map_shmem->state.global_cold_start_until.tv_usec += (duration_ms % 1000) * 1000;
+ if (table_mutation_map_shmem->state.global_cold_start_until.tv_usec >= 1000000)
+ {
+ table_mutation_map_shmem->state.global_cold_start_until.tv_sec +=
+ table_mutation_map_shmem->state.global_cold_start_until.tv_usec / 1000000;
+ table_mutation_map_shmem->state.global_cold_start_until.tv_usec %=
+ 1000000;
+ }
+
+ ereport(LOG,
+ (errmsg("table_mutation_map: entering global cold start for %d ms",
+ duration_ms)));
+}
+
+bool
+pool_table_mutation_map_table_is_stale(int table_oid, int dboid)
+{
+ TableMutationHashTable *map;
+ struct timeval now;
+ uint64 ttl_us;
+ uint32 hash;
+ int idx;
+ bool is_stale = false;
+
+ if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL)
+ return false;
+
+ if (table_oid <= 0 || dboid <= 0)
+ {
+ is_stale = true;
+ goto update_stats;
+ }
+
+ map = table_mutation_map_shmem->table_map;
+ hash = fnv1a_hash_table_key(table_oid, dboid);
+
+ table_map_lock();
+
+ idx = table_map_lookup(map, table_oid, dboid, hash);
+ if (idx != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ get_current_time(&now);
+ ttl_us = table_mutation_map_shmem->state.current_ttl_us;
+
+ int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now);
+ is_stale = (elapsed < (int64)ttl_us);
+
+ ereport(DEBUG2,
+ (errmsg("table_mutation_map: table oid %d (dboid %d) elapsed=%ld us, ttl=%lu us, stale=%d",
+ table_oid, dboid, (long)elapsed, (unsigned long)ttl_us, is_stale)));
+ }
+
+ table_map_unlock();
+
+update_stats:
+ /* Update statistics */
+ __sync_fetch_and_add(&table_mutation_map_shmem->state.stats_queries_checked, 1);
+ if (is_stale)
+ __sync_fetch_and_add(&table_mutation_map_shmem->state.stats_forced_primary, 1);
+ else
+ __sync_fetch_and_add(&table_mutation_map_shmem->state.stats_allowed_replica, 1);
+
+ return is_stale;
+}
+
+void
+pool_table_mutation_map_mark_tables_written(const int *table_oids, int num_tables, int dboid)
+{
+ TableMutationHashTable *map;
+ struct timeval now;
+ int i;
+
+ if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL)
+ return;
+
+ if (num_tables <= 0 || table_oids == NULL || dboid <= 0)
+ return;
+
+ map = table_mutation_map_shmem->table_map;
+ get_current_time(&now);
+
+ table_map_lock();
+
+ /* Periodically clean up expired entries */
+ if (map->num_entries > map->max_entries * 3 / 4)
+ {
+ /* Limit cleanup frequency to avoid O(N) scan on every write */
+ /* 100ms interval */
+ if (elapsed_us(&table_mutation_map_shmem->state.last_cleanup_time, &now) > 100000)
+ {
+ table_map_cleanup_expired(map, table_mutation_map_shmem->state.current_ttl_us);
+ table_mutation_map_shmem->state.last_cleanup_time = now;
+ }
+ }
+
+ for (i = 0; i < num_tables; i++)
+ {
+ uint32 hash;
+ int table_oid = table_oids[i];
+
+ if (table_oid > 0)
+ {
+ hash = fnv1a_hash_table_key(table_oid, dboid);
+ table_map_insert(map, table_oid, dboid, hash, &now);
+ }
+ }
+
+ table_map_unlock();
+}
+
+/*
+ * Convenience function to mark a single table as written
+ */
+void
+pool_table_mutation_map_mark_table_written(int table_oid, int dboid)
+{
+ if (table_oid > 0 && dboid > 0)
+ {
+ const int tables[1] = { table_oid };
+ pool_table_mutation_map_mark_tables_written(tables, 1, dboid);
+ }
+}
+
+void
+pool_table_mutation_map_update_ttl(uint64 delay_us)
+{
+ uint64 new_ttl;
+
+ if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL)
+ return;
+
+ /* Calculate new TTL: delay * factor, with minimum of default TTL */
+ new_ttl = (uint64)(delay_us * pool_config->table_mutation_map_ttl_factor);
+ if (new_ttl < TABLE_MUTATION_MAP_DEFAULT_TTL_US)
+ new_ttl = TABLE_MUTATION_MAP_DEFAULT_TTL_US;
+
+ /* Maximum TTL of 1 hour */
+ if (new_ttl > 3600ULL * 1000000ULL)
+ new_ttl = 3600ULL * 1000000ULL;
+
+ table_mutation_map_shmem->state.current_ttl_us = new_ttl;
+ get_current_time(&table_mutation_map_shmem->state.ttl_last_updated);
+
+ ereport(DEBUG1,
+ (errmsg("table_mutation_map: updated TTL to %lu us (delay=%lu us, factor=%.1f)",
+ (unsigned long)new_ttl, (unsigned long)delay_us,
+ pool_config->table_mutation_map_ttl_factor)));
+}
+
+bool
+pool_table_mutation_map_get_cached_parse(uint64 hash, bool *is_write,
+ char table_names[][TABLE_MUTATION_MAP_TABLE_NAME_LEN],
+ int *num_tables)
+{
+ QueryParseCache *cache;
+ int idx;
+ bool found = false;
+
+ if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL)
+ return false;
+
+ cache = table_mutation_map_shmem->query_cache;
+
+ query_cache_lock();
+
+ idx = query_cache_lookup(cache, hash);
+ if (idx != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int i;
+
+ *is_write = entries[idx].is_write;
+ *num_tables = entries[idx].num_tables;
+
+ for (i = 0; i < entries[idx].num_tables && i < TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY; i++)
+ {
+ strlcpy(table_names[i], entries[idx].table_names[i], TABLE_MUTATION_MAP_TABLE_NAME_LEN);
+ }
+
+ /* Move to front of LRU */
+ query_cache_lru_touch(cache, idx);
+ found = true;
+ }
+
+ query_cache_unlock();
+
+ return found;
+}
+
+void
+pool_table_mutation_map_cache_parse(uint64 hash, bool is_write,
+ const char table_names[][TABLE_MUTATION_MAP_TABLE_NAME_LEN],
+ int num_tables)
+{
+ QueryParseCache *cache;
+ int *buckets;
+ QueryParseEntry *entries;
+ int idx;
+ int bucket;
+
+ if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL)
+ return;
+
+ cache = table_mutation_map_shmem->query_cache;
+
+ query_cache_lock();
+
+ /* Check if already exists */
+ idx = query_cache_lookup(cache, hash);
+ if (idx != TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ query_cache_unlock();
+ return;
+ }
+
+ /* Allocate new entry (may evict LRU) */
+ idx = query_cache_alloc_entry(cache);
+ if (idx == TABLE_MUTATION_MAP_INVALID_INDEX)
+ {
+ query_cache_unlock();
+ ereport(WARNING,
+ (errmsg("table_mutation_map: failed to allocate query cache entry")));
+ return;
+ }
+
+ entries = QUERY_CACHE_ENTRIES(cache);
+ buckets = QUERY_CACHE_BUCKETS(cache);
+
+ /* Fill in entry */
+ entries[idx].query_hash = hash;
+ entries[idx].is_write = is_write;
+ entries[idx].num_tables = (num_tables > TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY) ?
+ TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY : num_tables;
+
+ {
+ int i;
+ for (i = 0; i < entries[idx].num_tables; i++)
+ {
+ strlcpy(entries[idx].table_names[i], table_names[i], TABLE_MUTATION_MAP_TABLE_NAME_LEN);
+ }
+ }
+
+ /* Insert into hash bucket */
+ bucket = hash % cache->num_buckets;
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ /* Add to LRU list */
+ query_cache_lru_add(cache, idx);
+
+ query_cache_unlock();
+}
+
+uint64
+pool_table_mutation_map_normalize_and_hash(const char *query)
+{
+ char normalized[8192];
+ size_t len;
+
+ if (query == NULL || query[0] == '\0')
+ return 0;
+
+ len = normalize_query(query, normalized, sizeof(normalized));
+ if (len == 0)
+ return 0;
+
+ return fnv1a_hash_64(normalized, len);
+}
--
2.52.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
In-Reply-To: <CACeKOO1W=NxxqxBFtVa++skatF67DawxJwF5R6iMnqYw=OHoHA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox