public inbox for [email protected]
help / color / mirror / Atom feedFrom: Nadav Shatz <[email protected]>
To: [email protected]
Subject: Proposal: Recent mutated table tracking in memory
Date: Tue, 6 Jan 2026 13:25:54 +0200
Message-ID: <CACeKOO2hjPmstboJaa=rw8Erd7k5VhXyupU39bAosCPtUe1UBA@mail.gmail.com> (raw)
Hello,
As initially proposed under "Proposal: recent access based routing for
primary-replica setups" and then broken into separate tasks - i am adding
here a patch to implement tracking of latest mutated table, and then using
the replication lag as a base - deciding where to point queries when query
load balancing and parsing is enabled.
More details as in the patch:
Feature: add in-memory table tracking to prevent stale reads from replicas
Implement "memory map" feature that tracks recently-written database
tables in shared memory to prevent stale reads during replication lag.
When a write (INSERT/UPDATE/DELETE) occurs on a table, that table is
marked as "dirty" for a configurable TTL period. Any SELECT on a dirty
table within the TTL window is routed to primary instead of replica.
Key features:
- Shared memory hash table for tracking table mutations with TTL
- Query parse cache with LRU eviction for performance
- Cold start protection (routes all queries to primary initially)
- Automatic TTL calculation: replication_delay × configurable factor
- Per-table staleness tracking with microsecond precision
New configuration parameters:
- memory_map_enabled: Enable/disable the feature (default: off)
- memory_map_ttl_factor: TTL multiplier for replication delay (default: 5.0)
- memory_map_cold_start_duration: Cold start period in ms (default: 2000)
- memory_map_table_buckets: Hash buckets for table map (default: 1024)
- memory_map_table_size: Max tracked tables (default: 2048)
- memory_map_query_buckets: Hash buckets for query cache (default: 2048)
- memory_map_query_cache_size: Max cached queries (default: 10000)
Patch applies properly and tests pass.
Open to all feedback - thank you!
--
Nadav Shatz
Tailor Brands | CTO
Attachments:
[application/octet-stream] mutated_table.patch (67.1K, 3-mutated_table.patch)
download | inline diff:
From 47551f4c1eae9b6275904d4ead9b24d9a83fda4b Mon Sep 17 00:00:00 2001
From: Nadav Shatz <[email protected]>
Date: Tue, 6 Jan 2026 12:41:50 +0200
Subject: [PATCH] Feature: add in-memory table tracking to prevent stale reads
from replicas
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Implement "memory map" feature that tracks recently-written database
tables in shared memory to prevent stale reads during replication lag.
When a write (INSERT/UPDATE/DELETE) occurs on a table, that table is
marked as "dirty" for a configurable TTL period. Any SELECT on a dirty
table within the TTL window is routed to primary instead of replica.
Key features:
- Shared memory hash table for tracking table mutations with TTL
- Query parse cache with LRU eviction for performance
- Cold start protection (routes all queries to primary initially)
- Automatic TTL calculation: replication_delay × configurable factor
- Per-table staleness tracking with microsecond precision
New configuration parameters:
- memory_map_enabled: Enable/disable the feature (default: off)
- memory_map_ttl_factor: TTL multiplier for replication delay (default: 5.0)
- memory_map_cold_start_duration: Cold start period in ms (default: 2000)
- memory_map_table_buckets: Hash buckets for table map (default: 1024)
- memory_map_table_size: Max tracked tables (default: 2048)
- memory_map_query_buckets: Hash buckets for query cache (default: 2048)
- memory_map_query_cache_size: Max cached queries (default: 10000)
diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml
index ee19fabebab2210cd4abe59a711a036ac0ac8943..bdc929ee55b94899ffdd90880a741cfbac051aa4 100644
--- a/doc/src/sgml/loadbalance.sgml
+++ b/doc/src/sgml/loadbalance.sgml
@@ -1193,4 +1193,210 @@ dml_adaptive_object_relationship_list = 'table_1:table_2'
</variablelist>
</sect2>
+
+ <sect2 id="runtime-config-memory-map">
+ <title>Memory Map Configuration (Lagless Replica Reads)</title>
+
+ <para>
+ These parameters configure the memory map feature, which tracks recently written tables
+ to prevent stale reads from replica nodes during replication lag. This implements the
+ "lagless" architecture pattern for distributed systems with read replicas.
+ </para>
+
+ <para>
+ When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period
+ (<literal>replication_delay * memory_map_ttl_factor</literal>). Any SELECT queries on stale tables are routed
+ to the primary node instead of replicas, ensuring read-after-write consistency.
+ </para>
+
+ <para>
+ This feature requires <xref linkend="guc-replication-delay-source-cmd"> to be configured
+ for monitoring replication delay from replicas.
+ </para>
+
+ <warning>
+ <para>
+ Enabling the memory map feature increases shared memory consumption. With default settings,
+ the feature requires approximately 6.6 MB of shared memory (0.3 MB for table tracking + 6.3 MB for query cache).
+ Memory usage scales with configuration parameters:
+ </para>
+ <itemizedlist>
+ <listitem>
+ <para>
+ Table tracking: <literal>memory_map_table_size * 160 bytes</literal> (default: 2048 * 160 = ~320 KB)
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ Query cache: <literal>memory_map_query_cache_size * 640 bytes</literal> (default: 10000 * 640 = ~6.3 MB)
+ </para>
+ </listitem>
+ </itemizedlist>
+ <para>
+ For high-traffic systems with large cache sizes (e.g., <literal>memory_map_query_cache_size = 100000</literal>),
+ memory usage can reach 64 MB or more. Consider your system's available shared memory when enabling this feature.
+ </para>
+ </warning>
+
+ <variablelist>
+
+ <varlistentry id="guc-memory-map-enabled" xreflabel="memory_map_enabled">
+ <term><varname>memory_map_enabled</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>memory_map_enabled</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables in-memory tracking of recently written tables. When enabled, tables are marked
+ as stale after write operations, and reads are routed to primary until the TTL expires.
+ </para>
+ <para>
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ Default is <literal>off</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-memory-map-ttl-factor" xreflabel="memory_map_ttl_factor">
+ <term><varname>memory_map_ttl_factor</varname> (<type>floating point</type>)
+ <indexterm>
+ <primary><varname>memory_map_ttl_factor</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Multiplier for calculating the TTL: <literal>TTL = replication_delay * memory_map_ttl_factor</literal>.
+ Higher values provide more safety margin but may reduce read replica utilization.
+ </para>
+ <para>
+ Valid range: 1.0-100.0. Default is <literal>5.0</literal>.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-memory-map-cold-start-duration" xreflabel="memory_map_cold_start_duration">
+ <term><varname>memory_map_cold_start_duration</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>memory_map_cold_start_duration</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Duration in milliseconds to route all queries to primary after a child process starts.
+ This prevents stale reads when a new connection is established before the memory map
+ is populated with recent write history.
+ </para>
+ <para>
+ Valid range: 0-60000 ms. Default is <literal>2000</literal> (2 seconds).
+ Set to 0 to disable cold start behavior.
+ This parameter can be changed by reloading the <productname>Pgpool-II</> configurations.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-memory-map-table-buckets" xreflabel="memory_map_table_buckets">
+ <term><varname>memory_map_table_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>memory_map_table_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the table mutation tracking hash table.
+ Higher values reduce hash collisions and improve lookup performance.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>1024</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-memory-map-table-size" xreflabel="memory_map_table_size">
+ <term><varname>memory_map_table_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>memory_map_table_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of tables that can be tracked simultaneously in the memory map.
+ When full, oldest entries are evicted using a simple eviction strategy.
+ </para>
+ <para>
+ Valid range: 128-131072. Default is <literal>2048</literal>.
+ Memory usage: approximately 160 bytes per entry.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-memory-map-query-buckets" xreflabel="memory_map_query_buckets">
+ <term><varname>memory_map_query_buckets</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>memory_map_query_buckets</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Number of hash buckets for the query parse cache. The cache stores normalized
+ query strings mapped to their table dependencies to avoid repeated parsing.
+ </para>
+ <para>
+ Valid range: 64-65536. Default is <literal>2048</literal>.
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry id="guc-memory-map-query-cache-size" xreflabel="memory_map_query_cache_size">
+ <term><varname>memory_map_query_cache_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>memory_map_query_cache_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Maximum number of query parse results to cache. Uses LRU eviction when full.
+ Larger caches reduce parsing overhead but consume more shared memory.
+ </para>
+ <para>
+ Valid range: 100-1000000. Default is <literal>10000</literal>.
+ Memory usage: approximately 640 bytes per entry (~6.3 MB for default, ~64 MB for 100000 entries).
+ This parameter can only be set at server start.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+
+ <sect3 id="runtime-config-memory-map-example">
+ <title>Memory Map Configuration Example</title>
+ <para>
+ To enable memory map with replication delay monitoring:
+ </para>
+ <programlisting>
+# Enable memory map feature
+memory_map_enabled = on
+memory_map_ttl_factor = 5.0
+memory_map_cold_start_duration = 2000
+
+# Configure external replication delay monitoring
+replication_delay_source_cmd = '/path/to/get-replication-delay.sh'
+replication_delay_source_timeout = 10
+
+# Adjust cache sizes based on workload (increases memory usage)
+memory_map_table_size = 4096 # Track up to 4096 tables (~640 KB)
+memory_map_query_cache_size = 50000 # Cache 50k queries (~31 MB)
+ </programlisting>
+ <para>
+ Total shared memory required for above configuration: approximately 32 MB (31 MB query cache + 0.6 MB table map + overhead).
+ Default configuration (10000 query cache entries, 2048 tables) requires approximately 6.6 MB.
+ </para>
+ </sect3>
+
+ </sect2>
+
</sect1>
diff --git a/src/Makefile.am b/src/Makefile.am
index 4678ab53055e828a37b6477801640aff17ff84a7..51896ae07771fc00382ab965eaf3807c8b5f3d94 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \
rewrite/pool_timestamp.c \
rewrite/pool_lobj.c \
utils/pool_select_walker.c \
+ utils/pool_memory_map.c \
utils/strlcpy.c \
utils/psprintf.c \
utils/pool_params.c \
diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c
index 68abb7f41cb96d856c824a148842748bfb7a4d12..d9a28e7ec3369ff799cb37c37c0cd05075327606 100644
--- a/src/config/pool_config_variables.c
+++ b/src/config/pool_config_variables.c
@@ -783,6 +783,16 @@ static struct config_bool ConfigureNamesBool[] =
NULL, NULL, NULL
},
+ {
+ {"memory_map_enabled", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Enable in-memory tracking of recently written tables to avoid stale reads from replicas",
+ CONFIG_VAR_TYPE_BOOL, false, 0
+ },
+ &g_pool_config.memory_map_enabled,
+ false,
+ NULL, NULL, NULL
+ },
+
{
{"auto_failback", CFGCXT_RELOAD, FAILOVER_CONFIG,
"Enables nodes automatically reattach, when detached node continue streaming replication.",
@@ -1757,6 +1767,17 @@ static struct config_int_array ConfigureNamesIntArray[] =
static struct config_double ConfigureNamesDouble[] =
{
+ {
+ {"memory_map_ttl_factor", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "TTL multiplier for memory map (TTL = replication_delay * factor)",
+ CONFIG_VAR_TYPE_DOUBLE, false, 0
+ },
+ &g_pool_config.memory_map_ttl_factor,
+ 5.0, /* boot value: 5x replication delay */
+ 1.0, 100.0, /* min, max */
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_DOUBLE
};
@@ -2355,6 +2376,61 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"memory_map_cold_start_duration", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG,
+ "Duration in milliseconds to force queries to primary after child process starts.",
+ CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS
+ },
+ &g_pool_config.memory_map_cold_start_duration,
+ 2000, /* 2 seconds */
+ 0, 60000, /* 0 to 60 seconds */
+ NULL, NULL, NULL
+ },
+
+ {
+ {"memory_map_table_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for table mutation map.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.memory_map_table_buckets,
+ 1024,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"memory_map_table_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in table mutation map.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.memory_map_table_size,
+ 2048,
+ 128, 131072,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"memory_map_query_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Number of hash buckets for query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.memory_map_query_buckets,
+ 2048,
+ 64, 65536,
+ NULL, NULL, NULL
+ },
+
+ {
+ {"memory_map_query_cache_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG,
+ "Maximum number of entries in query parse cache.",
+ CONFIG_VAR_TYPE_INT, false, 0
+ },
+ &g_pool_config.memory_map_query_cache_size,
+ 10000,
+ 100, 1000000,
+ NULL, NULL, NULL
+ },
+
/* End-of-list marker */
EMPTY_CONFIG_INT
};
diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c
index 1a13168c6e8d3f0064dfce4ee6e4661eee69304e..47e5f2796f809dcf3208edd7d0a2bcf8dda83260 100644
--- a/src/context/pool_query_context.c
+++ b/src/context/pool_query_context.c
@@ -29,6 +29,7 @@
#include "utils/statistics.h"
#include "utils/pool_select_walker.h"
#include "utils/pool_stream.h"
+#include "utils/pool_memory_map.h"
#include "context/pool_session_context.h"
#include "context/pool_query_context.h"
#include "parser/nodes.h"
@@ -2135,6 +2136,92 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node
{
pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
}
+ /*
+ * Check memory map for recently written tables.
+ * If in cold start or any table was recently written,
+ * route to primary to avoid stale reads.
+ */
+ else if (pool_config->memory_map_enabled)
+ {
+ bool force_primary = false;
+
+ /* During cold start, route everything to primary */
+ if (pool_memory_map_in_cold_start())
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because of memory map cold start"),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ }
+ else
+ {
+ /* Extract table names and check if any are stale */
+ SelectContext ctx;
+ int num_oids;
+ int i;
+
+ memset(&ctx, 0, sizeof(ctx));
+ num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
+
+ for (i = 0; i < num_oids; i++)
+ {
+ if (pool_memory_map_table_is_stale(ctx.table_names[i]))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because table \"%s\" was recently written",
+ ctx.table_names[i]),
+ errdetail("destination = PRIMARY for query= \"%s\"", query)));
+ force_primary = true;
+ break;
+ }
+ }
+ }
+
+ if (force_primary)
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ else
+ {
+ /* Proceed with load balancing */
+ if (pool_config->statement_level_load_balance)
+ {
+ session_context->load_balance_node_id = select_load_balancing_node();
+ }
+
+ /*
+ * As streaming replication delay is too much, if
+ * prefer_lower_delay_standby is true then elect new load
+ * balance node which is lowest delayed, false then send
+ * to the primary.
+ */
+ if (STREAM && check_replication_delay(session_context->load_balance_node_id))
+ {
+ ereport(DEBUG1,
+ (errmsg("could not load balance because of too much replication delay"),
+ errdetail("destination = %d for query= \"%s\"", dest, query)));
+
+ if (pool_config->prefer_lower_delay_standby)
+ {
+ int new_load_balancing_node = select_load_balancing_node();
+
+ session_context->load_balance_node_id = new_load_balancing_node;
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id);
+ }
+ else
+ {
+ pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
+ }
+ }
+ else
+ {
+ session_context->query_context->load_balance_node_id = session_context->load_balance_node_id;
+ pool_set_node_to_be_sent(query_context,
+ session_context->query_context->load_balance_node_id);
+ }
+ }
+ }
else
{
if (pool_config->statement_level_load_balance)
diff --git a/src/include/pool_config.h b/src/include/pool_config.h
index 741de6cc5fc3368f813d6b6efa68eb7f8a79506b..9675c1b65d9bae83c6412c1f1f3399364932221f 100644
--- a/src/include/pool_config.h
+++ b/src/include/pool_config.h
@@ -365,6 +365,16 @@ typedef struct
* replication check */
char *replication_delay_source_cmd; /* external command for replication delay */
int replication_delay_source_timeout; /* timeout for external command in seconds */
+
+ /* Memory map configuration for tracking recently written tables */
+ bool memory_map_enabled; /* Enable in-memory table tracking */
+ double memory_map_ttl_factor; /* TTL multiplier for replication delay */
+ int memory_map_cold_start_duration; /* Cold start duration in ms */
+ int memory_map_table_buckets; /* Number of hash buckets for table map */
+ int memory_map_table_size; /* Max entries in table map */
+ int memory_map_query_buckets; /* Number of hash buckets for query cache */
+ int memory_map_query_cache_size; /* Max entries in query cache */
+
char *failover_command; /* execute command when failover happens */
char *follow_primary_command; /* execute command when failover is
* ended */
diff --git a/src/include/utils/pool_memory_map.h b/src/include/utils/pool_memory_map.h
new file mode 100644
index 0000000000000000000000000000000000000000..511d7a45e7dbd417b1e49b9211fb994f29af1a08
--- /dev/null
+++ b/src/include/utils/pool_memory_map.h
@@ -0,0 +1,236 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_memory_map.h: In-memory tracking of recently written tables
+ * to avoid stale reads from replicas during replication lag
+ */
+
+#ifndef POOL_MEMORY_MAP_H
+#define POOL_MEMORY_MAP_H
+
+#include "pool.h"
+#include <sys/time.h>
+
+/*
+ * Maximum table name length including schema: "schema"."table"
+ * Using NAMEDATALEN * 2 + 4 for quotes and dot
+ */
+#define MEMORY_MAP_TABLE_NAME_LEN (NAMEDATALEN * 2 + 4)
+
+/*
+ * Maximum number of tables we track per query
+ */
+#define MEMORY_MAP_MAX_TABLES_PER_QUERY 8
+
+/*
+ * Invalid index marker for linked lists
+ */
+#define MEMORY_MAP_INVALID_INDEX (-1)
+
+/*
+ * Default TTL in microseconds (100ms) used when replication delay is unknown
+ */
+#define MEMORY_MAP_DEFAULT_TTL_US (100 * 1000)
+
+/*
+ * Entry in the table mutation hash table
+ */
+typedef struct TableMutationEntry
+{
+ char table_name[MEMORY_MAP_TABLE_NAME_LEN]; /* "schema"."table" */
+ struct timeval last_write_time; /* When the table was last written */
+ uint32 hash; /* Pre-computed hash value */
+ int next; /* Next entry in collision chain (-1 if none) */
+ bool in_use; /* Is this entry in use? */
+} TableMutationEntry;
+
+/*
+ * Header for the table mutation hash table in shared memory
+ */
+typedef struct TableMutationHashTable
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ volatile uint32 lock; /* Spinlock for thread-safe access */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * TableMutationEntry entries[max_entries];
+ */
+} TableMutationHashTable;
+
+/*
+ * Entry in the query parse cache
+ */
+typedef struct QueryParseEntry
+{
+ uint64 query_hash; /* Hash of normalized query */
+ bool is_write; /* True if INSERT/UPDATE/DELETE */
+ int num_tables; /* Number of tables in query */
+ char table_names[MEMORY_MAP_MAX_TABLES_PER_QUERY][MEMORY_MAP_TABLE_NAME_LEN];
+ int next; /* Next entry in collision chain */
+ int lru_prev; /* Previous in LRU list */
+ int lru_next; /* Next in LRU list */
+ bool in_use; /* Is this entry in use? */
+} QueryParseEntry;
+
+/*
+ * Header for the query parse cache in shared memory
+ */
+typedef struct QueryParseCache
+{
+ int num_buckets; /* Number of hash buckets */
+ int max_entries; /* Maximum entries allowed */
+ int num_entries; /* Current number of entries */
+ int free_list_head; /* Head of free entry list */
+ int lru_head; /* Most recently used */
+ int lru_tail; /* Least recently used */
+ volatile uint32 lock; /* Spinlock for thread-safe access */
+ /* Flexible array members follow in shared memory:
+ * int buckets[num_buckets];
+ * QueryParseEntry entries[max_entries];
+ */
+} QueryParseCache;
+
+/*
+ * Global state for memory map feature
+ */
+typedef struct MemoryMapState
+{
+ bool initialized; /* Has shared memory been initialized? */
+ uint64 current_ttl_us; /* Current TTL in microseconds */
+ struct timeval ttl_last_updated; /* When TTL was last updated */
+ volatile uint32 stats_queries_checked; /* Number of queries checked */
+ volatile uint32 stats_forced_primary; /* Queries forced to primary */
+ volatile uint32 stats_allowed_replica; /* Queries allowed to replica */
+} MemoryMapState;
+
+/*
+ * Main shared memory structure containing all components
+ */
+typedef struct MemoryMapShmem
+{
+ MemoryMapState state;
+ TableMutationHashTable *table_map;
+ QueryParseCache *query_cache;
+} MemoryMapShmem;
+
+/* ----------------
+ * Public API functions
+ * ----------------
+ */
+
+/*
+ * Initialize shared memory structures for memory map.
+ * Called from pgpool_main.c after pool_init_pool_info().
+ */
+extern void pool_memory_map_init(void);
+
+/*
+ * Initialize per-child process state for memory map.
+ * Called from child.c when a new child process starts.
+ * Sets up cold start tracking.
+ */
+extern void pool_memory_map_child_init(void);
+
+/*
+ * Check if the child process is in cold start period.
+ * During cold start, all queries are routed to primary.
+ * Returns true if in cold start, false otherwise.
+ */
+extern bool pool_memory_map_in_cold_start(void);
+
+/*
+ * Check if a table was recently written to (is "stale").
+ * If stale, reads from this table should go to primary.
+ * Returns true if table is stale (recently written), false otherwise.
+ */
+extern bool pool_memory_map_table_is_stale(const char *table_name);
+
+/*
+ * Mark tables as recently written.
+ * Called after INSERT/UPDATE/DELETE queries complete.
+ * table_names: array of table names
+ * num_tables: number of tables in array
+ */
+extern void pool_memory_map_mark_tables_written(const char **table_names, int num_tables);
+
+/*
+ * Convenience function to mark a single table as written.
+ * table_name: fully qualified table name
+ */
+extern void pool_memory_map_mark_table_written(const char *table_name);
+
+/*
+ * Update the TTL based on current replication delay.
+ * Called from pool_worker_child.c when replication delay is updated.
+ * delay_us: replication delay in microseconds
+ */
+extern void pool_memory_map_update_ttl(uint64 delay_us);
+
+/*
+ * Look up cached parse result for a query.
+ * hash: hash of normalized query
+ * is_write: output - true if query is a write
+ * table_names: output - array to fill with table names
+ * num_tables: output - number of tables found
+ * Returns true if found in cache, false otherwise.
+ */
+extern bool pool_memory_map_get_cached_parse(uint64 hash, bool *is_write,
+ char table_names[][MEMORY_MAP_TABLE_NAME_LEN],
+ int *num_tables);
+
+/*
+ * Cache a parse result for a query.
+ * hash: hash of normalized query
+ * is_write: true if query is a write
+ * table_names: array of table names
+ * num_tables: number of tables
+ */
+extern void pool_memory_map_cache_parse(uint64 hash, bool is_write,
+ const char table_names[][MEMORY_MAP_TABLE_NAME_LEN],
+ int num_tables);
+
+/*
+ * Normalize a query and compute its hash.
+ * Strips comments, normalizes whitespace and literals.
+ * query: input SQL query string
+ * Returns: 64-bit hash of normalized query
+ */
+extern uint64 pool_memory_map_normalize_and_hash(const char *query);
+
+/*
+ * Get the current TTL in microseconds.
+ */
+extern uint64 pool_memory_map_get_ttl(void);
+
+/*
+ * Calculate required shared memory size for memory map.
+ */
+extern Size pool_memory_map_shmem_size(void);
+
+/*
+ * Get memory map statistics for monitoring.
+ */
+extern void pool_memory_map_get_stats(uint32 *queries_checked,
+ uint32 *forced_primary,
+ uint32 *allowed_replica,
+ uint64 *current_ttl_us);
+
+#endif /* POOL_MEMORY_MAP_H */
diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c
index 4d88c5815ea253471167dfe7e5bf39f0270323ec..f4a14c84db99100fb761168c14e77b2f2b9eff4b 100644
--- a/src/main/pgpool_main.c
+++ b/src/main/pgpool_main.c
@@ -57,6 +57,7 @@
#include "auth/pool_passwd.h"
#include "auth/pool_hba.h"
#include "query_cache/pool_memqcache.h"
+#include "utils/pool_memory_map.h"
#include "watchdog/wd_internal_commands.h"
#include "watchdog/wd_lifecheck.h"
#include "watchdog/watchdog.h"
@@ -3065,6 +3066,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size()));
}
+ if (pool_config->memory_map_enabled)
+ {
+ size += MAXALIGN(pool_memory_map_shmem_size());
+ elog(DEBUG1, "memory_map: %zu bytes requested for shared memory", MAXALIGN(pool_memory_map_shmem_size()));
+ }
+
initialize_shared_memory_main_segment(size);
/* Move the backend descriptors to shared memory */
@@ -3181,6 +3188,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps)
wd_ipc_initialize_data();
}
+ /* Initialize memory map for tracking recently written tables */
+ if (pool_config->memory_map_enabled)
+ {
+ pool_memory_map_init();
+ }
+
}
/*
diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c
index a3b8f0ea194ffecc79e58566be80562a46eb75ab..9b0681ca46ac2602d3f541ad3119770d422fb0c3 100644
--- a/src/protocol/CommandComplete.c
+++ b/src/protocol/CommandComplete.c
@@ -38,6 +38,8 @@
#include "utils/palloc.h"
#include "utils/memutils.h"
#include "utils/pool_stream.h"
+#include "utils/pool_memory_map.h"
+#include "utils/pool_select_walker.h"
static int extract_ntuples(char *message);
static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete);
@@ -304,6 +306,38 @@ handle_query_context(POOL_CONNECTION_POOL *backend)
node = session_context->query_context->parse_tree;
+ /*
+ * Track table writes for memory map feature.
+ * Mark tables as written when INSERT/UPDATE/DELETE completes.
+ */
+ if (pool_config->memory_map_enabled)
+ {
+ char *table_name = NULL;
+
+ if (IsA(node, InsertStmt))
+ {
+ InsertStmt *stmt = (InsertStmt *) node;
+ table_name = make_table_name_from_rangevar(stmt->relation);
+ }
+ else if (IsA(node, UpdateStmt))
+ {
+ UpdateStmt *stmt = (UpdateStmt *) node;
+ table_name = make_table_name_from_rangevar(stmt->relation);
+ }
+ else if (IsA(node, DeleteStmt))
+ {
+ DeleteStmt *stmt = (DeleteStmt *) node;
+ table_name = make_table_name_from_rangevar(stmt->relation);
+ }
+
+ if (table_name != NULL)
+ {
+ pool_memory_map_mark_table_written(table_name);
+ ereport(DEBUG1,
+ (errmsg("memory map: marked table \"%s\" as written", table_name)));
+ }
+ }
+
if (IsA(node, PrepareStmt))
{
if (session_context->uncompleted_message)
diff --git a/src/protocol/child.c b/src/protocol/child.c
index c34f057281be62feaf39db1bb605062f56dc398c..07ee58c6a48dcd3ef6d79970e08a6f77b8924e1d 100644
--- a/src/protocol/child.c
+++ b/src/protocol/child.c
@@ -57,6 +57,7 @@
#include "utils/elog.h"
#include "utils/ps_status.h"
#include "utils/timestamp.h"
+#include "utils/pool_memory_map.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -213,6 +214,12 @@ do_child(int *fds)
/* Initialize per process context */
pool_init_process_context();
+ /* Initialize memory map child state for cold start tracking */
+ if (pool_config->memory_map_enabled)
+ {
+ pool_memory_map_child_init();
+ }
+
/* initialize connection pool */
if (pool_init_cp())
{
diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream
index 454fdb9e5d1fd65437b6a67f12ab62658ea08f49..a245d58bf3339913602143da1b83b964fe5dcaeb 100644
--- a/src/sample/pgpool.conf.sample-stream
+++ b/src/sample/pgpool.conf.sample-stream
@@ -499,6 +499,51 @@ backend_clustering_mode = streaming_replication
#statement_level_load_balance = off
# Enables statement level load balancing
+# - Memory Map (Lagless Read Replica) -
+ # WARNING: Enabling this feature increases shared memory usage
+ # Default settings require ~6.6 MB shared memory
+ # (0.3 MB table tracking + 6.3 MB query cache)
+
+#memory_map_enabled = off
+ # Enable in-memory tracking of recently written tables
+ # to prevent stale reads from replicas during replication lag
+ # (change requires reload)
+
+#memory_map_ttl_factor = 5.0
+ # TTL multiplier: TTL = replication_delay * factor
+ # Higher values provide more safety margin
+ # Range: 1.0-100.0 (default: 5.0)
+ # (change requires reload)
+
+#memory_map_cold_start_duration = 2000
+ # Duration in milliseconds to route all queries to primary
+ # after child process starts (cold start period)
+ # Range: 0-60000 ms (default: 2000 ms = 2 seconds)
+ # Set to 0 to disable cold start behavior
+ # (change requires reload)
+
+#memory_map_table_buckets = 1024
+ # Number of hash buckets for table mutation tracking
+ # Higher values reduce hash collisions
+ # Range: 64-65536 (default: 1024)
+ # (change requires restart)
+
+#memory_map_table_size = 2048
+ # Maximum number of tables to track simultaneously
+ # Range: 128-131072 (default: 2048)
+ # (change requires restart)
+
+#memory_map_query_buckets = 2048
+ # Number of hash buckets for query parse cache
+ # Range: 64-65536 (default: 2048)
+ # (change requires restart)
+
+#memory_map_query_cache_size = 10000
+ # Maximum number of query parse results to cache
+ # Range: 100-1000000 (default: 10000)
+ # Memory usage: ~640 bytes per entry (~6.3 MB default, ~64 MB for 100000)
+ # (change requires restart)
+
#------------------------------------------------------------------------------
# STREAMING REPLICATION MODE
#------------------------------------------------------------------------------
diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c
index 7026f0b1f0de7b9018ac912fac850f91d1c2978b..7dfce4946e268e120471db760440155787f84515 100644
--- a/src/streaming_replication/pool_worker_child.c
+++ b/src/streaming_replication/pool_worker_child.c
@@ -58,6 +58,7 @@
#include "utils/pool_ip.h"
#include "utils/ps_status.h"
#include "utils/pool_stream.h"
+#include "utils/pool_memory_map.h"
#include "context/pool_process_context.h"
#include "context/pool_session_context.h"
@@ -696,6 +697,7 @@ check_replication_time_lag_with_cmd(void)
double delay_ms;
uint64 delay;
uint64 delay_threshold_by_time;
+ uint64 max_delay_us = 0; /* Track maximum delay for memory map */
int token_count = 0;
int primary_node_id;
int save_errno;
@@ -1032,6 +1034,10 @@ check_replication_time_lag_with_cmd(void)
bkinfo->standby_delay = delay;
bkinfo->standby_delay_by_time = true;
+ /* Track maximum delay for memory map TTL calculation */
+ if (delay > max_delay_us)
+ max_delay_us = delay;
+
/* Log delay if necessary */
delay_threshold_by_time = pool_config->delay_threshold_by_time * 1000; /* threshold is in
* milliseconds, convert
@@ -1049,6 +1055,10 @@ check_replication_time_lag_with_cmd(void)
token = strtok_r(NULL, " \t\n", &saveptr);
}
+ /* Update memory map TTL based on maximum observed delay */
+ if (pool_config->memory_map_enabled && max_delay_us > 0)
+ pool_memory_map_update_ttl(max_delay_us);
+
}
PG_CATCH();
{
diff --git a/src/test/regression/tests/045.memory_map/test.sh b/src/test/regression/tests/045.memory_map/test.sh
new file mode 100755
index 0000000000000000000000000000000000000000..ce05418262664e5133e2ffd478c7ae622b062cc7
--- /dev/null
+++ b/src/test/regression/tests/045.memory_map/test.sh
@@ -0,0 +1,196 @@
+#!/usr/bin/env bash
+#-------------------------------------------------------------------
+# test script for memory map feature (in-memory table tracking).
+# Tests routing of queries based on recently written tables.
+#
+source $TESTLIBS
+TESTDIR=testdir
+PSQL=$PGBIN/psql
+PSQLOPTS="-a -q -X"
+PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin
+export PGDATABASE=test
+
+# Only run in streaming replication mode since that's the target use case
+for mode in s
+do
+ rm -fr $TESTDIR
+ mkdir $TESTDIR
+ cd $TESTDIR
+
+ # Create test environment with 2 nodes
+ echo -n "creating test environment..."
+ $PGPOOL_SETUP -m $mode -n 2 || exit 1
+ echo "done."
+
+ source ./bashrc.ports
+
+ # Configure memory map feature
+ echo "memory_map_enabled = on" >> etc/pgpool.conf
+ echo "memory_map_ttl_factor = 5.0" >> etc/pgpool.conf
+ echo "memory_map_cold_start_duration = 2000" >> etc/pgpool.conf
+
+ # Configure weights so we can distinguish routing
+ # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1
+ # This means load balanced queries go to node 1 by default
+ echo "backend_weight0 = 0" >> etc/pgpool.conf
+ echo "backend_weight1 = 1" >> etc/pgpool.conf
+
+ # Enable debug logging to see routing decisions
+ echo "log_min_messages = debug1" >> etc/pgpool.conf
+
+ ./startall
+
+ export PGPORT=$PGPOOL_PORT
+
+ wait_for_pgpool_startup
+
+ # Create test tables
+ $PSQL test <<EOF
+CREATE TABLE t1(i INTEGER);
+CREATE TABLE t2(i INTEGER);
+CREATE TABLE t3(i INTEGER);
+EOF
+
+ echo "=== Test 1: Cold Start Routing ==="
+ # During cold start, all queries should go to primary
+ # Restart pgpool to trigger cold start
+ ./shutdownall
+ ./startall
+ wait_for_pgpool_startup
+
+ # Immediately query - should go to primary due to cold start
+ $PSQL test -c "SELECT 'cold_start_test' as marker, * FROM t1;" > /dev/null 2>&1
+
+ # Check log for cold start message
+ if grep -q "could not load balance because of memory map cold start" log/pgpool.log; then
+ echo "Test 1 PASSED: Cold start routing works"
+ else
+ echo "Test 1 FAILED: Cold start routing not detected"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 2: Wait for cold start to end ==="
+ # Wait for cold start period to end (2 seconds)
+ sleep 3
+
+ # Clear the log
+ > log/pgpool.log
+
+ # Now a clean table query should load balance (go to node 1)
+ $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1
+
+ # After cold start, queries to clean tables should load balance
+ # Check that it did NOT get forced to primary due to memory map
+ if grep -q "could not load balance because of memory map cold start" log/pgpool.log; then
+ echo "Test 2 FAILED: Still in cold start after waiting"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 2 PASSED: Cold start ended correctly"
+
+ echo "=== Test 3: Write-then-Read Routing ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Write to t1
+ $PSQL test -c "INSERT INTO t1 VALUES (1);" > /dev/null 2>&1
+
+ # Immediately read from t1 - should go to primary due to recent write
+ $PSQL test -c "SELECT 'write_read_test' as marker, * FROM t1;" > /dev/null 2>&1
+
+ # Check log for table staleness message
+ if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 3 PASSED: Write-then-read routing works"
+ else
+ echo "Test 3 FAILED: Table staleness not detected after write"
+ # Show relevant log entries for debugging
+ grep -i "memory" log/pgpool.log | tail -20
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 4: Clean Table Still Load Balances ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Read from t2 (never written to) - should load balance
+ $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1
+
+ # Should NOT see memory map blocking message for t2
+ if grep -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then
+ echo "Test 4 FAILED: Clean table incorrectly marked as stale"
+ ./shutdownall
+ exit 1
+ fi
+ echo "Test 4 PASSED: Clean tables still load balance"
+
+ echo "=== Test 5: UPDATE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Update t2
+ $PSQL test -c "UPDATE t2 SET i = 999 WHERE i = 0;" > /dev/null 2>&1
+
+ # Immediately read from t2 - should go to primary
+ $PSQL test -c "SELECT 'update_test' as marker, * FROM t2;" > /dev/null 2>&1
+
+ if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 5 PASSED: UPDATE marks table as stale"
+ else
+ echo "Test 5 FAILED: UPDATE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 6: DELETE Marks Table as Stale ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Delete from t3
+ $PSQL test -c "DELETE FROM t3 WHERE i = 0;" > /dev/null 2>&1
+
+ # Immediately read from t3 - should go to primary
+ $PSQL test -c "SELECT 'delete_test' as marker, * FROM t3;" > /dev/null 2>&1
+
+ if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then
+ echo "Test 6 PASSED: DELETE marks table as stale"
+ else
+ echo "Test 6 FAILED: DELETE did not mark table as stale"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo "=== Test 7: Multi-Table Query with One Stale Table ==="
+ # Clear the log
+ > log/pgpool.log
+
+ # Create a new clean table
+ $PSQL test -c "CREATE TABLE t4(i INTEGER);" > /dev/null 2>&1
+
+ # Wait a bit for TTL to expire on other tables if factor is low
+ sleep 1
+
+ # Write to t1 only
+ $PSQL test -c "INSERT INTO t1 VALUES (100);" > /dev/null 2>&1
+
+ # Query joining t1 and t4 - should go to primary because t1 is stale
+ $PSQL test -c "SELECT 'multi_table_test' as marker FROM t1, t4;" > /dev/null 2>&1
+
+ if grep -q "could not load balance because table.*t1.*was recently written" log/pgpool.log; then
+ echo "Test 7 PASSED: Multi-table query routes to primary when one table is stale"
+ else
+ echo "Test 7 FAILED: Multi-table staleness not detected"
+ ./shutdownall
+ exit 1
+ fi
+
+ echo ""
+ echo "=== All Memory Map Tests PASSED ==="
+
+ ./shutdownall
+
+ cd ..
+done
+
+exit 0
diff --git a/src/utils/pool_memory_map.c b/src/utils/pool_memory_map.c
new file mode 100644
index 0000000000000000000000000000000000000000..3f00ec1e2afef6518532804391633175fd351811
--- /dev/null
+++ b/src/utils/pool_memory_map.c
@@ -0,0 +1,1076 @@
+/* -*-pgsql-c-*- */
+/*
+ * pgpool: a language independent connection pool server for PostgreSQL
+ * written by Tatsuo Ishii
+ *
+ * Copyright (c) 2003-2026 PgPool Global Development Group
+ *
+ * Permission to use, copy, modify, and distribute this software and
+ * its documentation for any purpose and without fee is hereby
+ * granted, provided that the above copyright notice appear in all
+ * copies and that both that copyright notice and this permission
+ * notice appear in supporting documentation, and that the name of the
+ * author not be used in advertising or publicity pertaining to
+ * distribution of the software without specific, written prior
+ * permission. The author makes no representations about the
+ * suitability of this software for any purpose. It is provided "as
+ * is" without express or implied warranty.
+ *
+ * pool_memory_map.c: In-memory tracking of recently written tables
+ * to avoid stale reads from replicas during replication lag
+ *
+ * Based on the "lagless" architecture from Tailor Brands:
+ * https://medium.com/tailor-tech/using-database-read-replicas-in-distributed-systems-d80eaf6bbf8a
+ */
+
+#include <string.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <unistd.h>
+
+#include "pool.h"
+#include "pool_config.h"
+#include "utils/pool_memory_map.h"
+#include "utils/elog.h"
+#include "utils/palloc.h"
+
+/* ----------------
+ * Local variables
+ * ----------------
+ */
+
+/* Pointer to shared memory structure */
+static MemoryMapShmem *memory_map_shmem = NULL;
+
+/* Per-process cold start tracking (not in shared memory) */
+static struct timeval process_start_time;
+static bool cold_start_initialized = false;
+
+/* ----------------
+ * Helper macros for accessing flexible arrays in shared memory
+ * ----------------
+ */
+
+/* Get pointer to bucket array in table map */
+#define TABLE_MAP_BUCKETS(map) \
+ ((int *)((char *)(map) + sizeof(TableMutationHashTable)))
+
+/* Get pointer to entry array in table map */
+#define TABLE_MAP_ENTRIES(map) \
+ ((TableMutationEntry *)((char *)(map) + sizeof(TableMutationHashTable) + \
+ (map)->num_buckets * sizeof(int)))
+
+/* Get pointer to bucket array in query cache */
+#define QUERY_CACHE_BUCKETS(cache) \
+ ((int *)((char *)(cache) + sizeof(QueryParseCache)))
+
+/* Get pointer to entry array in query cache */
+#define QUERY_CACHE_ENTRIES(cache) \
+ ((QueryParseEntry *)((char *)(cache) + sizeof(QueryParseCache) + \
+ (cache)->num_buckets * sizeof(int)))
+
+/* ----------------
+ * Spinlock operations using atomic compare-and-swap
+ * ----------------
+ */
+
+static inline void
+spin_lock(volatile uint32 *lock)
+{
+ while (__sync_lock_test_and_set(lock, 1))
+ {
+ /* Spin until we acquire the lock */
+ while (*lock)
+ ;
+ }
+}
+
+static inline void
+spin_unlock(volatile uint32 *lock)
+{
+ __sync_lock_release(lock);
+}
+
+/* ----------------
+ * Hash functions
+ * ----------------
+ */
+
+/*
+ * FNV-1a hash for strings
+ */
+static uint32
+fnv1a_hash_string(const char *str)
+{
+ uint32 hash = 2166136261u; /* FNV offset basis */
+
+ while (*str)
+ {
+ hash ^= (uint8)*str++;
+ hash *= 16777619u; /* FNV prime */
+ }
+
+ return hash;
+}
+
+/*
+ * FNV-1a hash for 64-bit value
+ */
+static uint64
+fnv1a_hash_64(const char *str, size_t len)
+{
+ uint64 hash = 14695981039346656037ULL; /* FNV offset basis for 64-bit */
+ size_t i;
+
+ for (i = 0; i < len; i++)
+ {
+ hash ^= (uint8)str[i];
+ hash *= 1099511628211ULL; /* FNV prime for 64-bit */
+ }
+
+ return hash;
+}
+
+/* ----------------
+ * Time utilities
+ * ----------------
+ */
+
+/*
+ * Get elapsed time in microseconds between two timevals
+ */
+static int64
+elapsed_us(struct timeval *start, struct timeval *end)
+{
+ return ((int64)(end->tv_sec - start->tv_sec) * 1000000) +
+ (end->tv_usec - start->tv_usec);
+}
+
+/*
+ * Get current time
+ */
+static void
+get_current_time(struct timeval *tv)
+{
+ gettimeofday(tv, NULL);
+}
+
+/* ----------------
+ * Table mutation hash table operations
+ * ----------------
+ */
+
+/*
+ * Initialize table mutation hash table
+ */
+static void
+table_map_init(TableMutationHashTable *map, int num_buckets, int max_entries)
+{
+ int *buckets;
+ TableMutationEntry *entries;
+ int i;
+
+ map->num_buckets = num_buckets;
+ map->max_entries = max_entries;
+ map->num_entries = 0;
+ map->free_list_head = 0;
+ map->lock = 0;
+
+ buckets = TABLE_MAP_BUCKETS(map);
+ entries = TABLE_MAP_ENTRIES(map);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = MEMORY_MAP_INVALID_INDEX;
+
+ /* Initialize free list - chain all entries */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ? i + 1 : MEMORY_MAP_INVALID_INDEX;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("memory_map: initialized table map with %d buckets, %d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Allocate an entry from the free list
+ */
+static int
+table_map_alloc_entry(TableMutationHashTable *map)
+{
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int idx;
+
+ if (map->free_list_head == MEMORY_MAP_INVALID_INDEX)
+ return MEMORY_MAP_INVALID_INDEX;
+
+ idx = map->free_list_head;
+ map->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = MEMORY_MAP_INVALID_INDEX;
+ map->num_entries++;
+
+ return idx;
+}
+
+/*
+ * Free an entry back to the free list
+ */
+static void
+table_map_free_entry(TableMutationHashTable *map, int idx)
+{
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+
+ entries[idx].in_use = false;
+ entries[idx].next = map->free_list_head;
+ map->free_list_head = idx;
+ map->num_entries--;
+}
+
+/*
+ * Look up a table in the hash table
+ * Returns entry index or MEMORY_MAP_INVALID_INDEX if not found
+ * Must be called with lock held
+ */
+static int
+table_map_lookup(TableMutationHashTable *map, const char *table_name, uint32 hash)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int bucket = hash % map->num_buckets;
+ int idx = buckets[bucket];
+
+ while (idx != MEMORY_MAP_INVALID_INDEX)
+ {
+ if (entries[idx].hash == hash &&
+ strcmp(entries[idx].table_name, table_name) == 0)
+ {
+ return idx;
+ }
+ idx = entries[idx].next;
+ }
+
+ return MEMORY_MAP_INVALID_INDEX;
+}
+
+/*
+ * Insert or update a table entry
+ * Must be called with lock held
+ */
+static void
+table_map_insert(TableMutationHashTable *map, const char *table_name,
+ uint32 hash, struct timeval *write_time)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ int bucket = hash % map->num_buckets;
+ int idx;
+
+ /* Check if entry already exists */
+ idx = table_map_lookup(map, table_name, hash);
+ if (idx != MEMORY_MAP_INVALID_INDEX)
+ {
+ /* Update existing entry */
+ entries[idx].last_write_time = *write_time;
+ return;
+ }
+
+ /* Allocate new entry */
+ idx = table_map_alloc_entry(map);
+ if (idx == MEMORY_MAP_INVALID_INDEX)
+ {
+ int b;
+ /* Table is full - evict oldest entry */
+ /* For simplicity, just use the first entry in first non-empty bucket */
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ if (buckets[b] != MEMORY_MAP_INVALID_INDEX)
+ {
+ int victim = buckets[b];
+ buckets[b] = entries[victim].next;
+ table_map_free_entry(map, victim);
+ idx = table_map_alloc_entry(map);
+ break;
+ }
+ }
+
+ if (idx == MEMORY_MAP_INVALID_INDEX)
+ {
+ ereport(WARNING,
+ (errmsg("memory_map: failed to allocate entry for table %s", table_name)));
+ return;
+ }
+ }
+
+ /* Initialize new entry */
+ strlcpy(entries[idx].table_name, table_name, MEMORY_MAP_TABLE_NAME_LEN);
+ entries[idx].hash = hash;
+ entries[idx].last_write_time = *write_time;
+
+ /* Insert at head of bucket chain */
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ ereport(DEBUG2,
+ (errmsg("memory_map: marked table '%s' as written", table_name)));
+}
+
+/*
+ * Remove expired entries from the table map
+ * Must be called with lock held
+ */
+static void
+table_map_cleanup_expired(TableMutationHashTable *map, uint64 ttl_us)
+{
+ int *buckets = TABLE_MAP_BUCKETS(map);
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ struct timeval now;
+ int removed = 0;
+ int b;
+
+ get_current_time(&now);
+
+ for (b = 0; b < map->num_buckets; b++)
+ {
+ int *prev_ptr = &buckets[b];
+ int idx = buckets[b];
+
+ while (idx != MEMORY_MAP_INVALID_INDEX)
+ {
+ int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now);
+
+ if (elapsed > (int64)ttl_us)
+ {
+ /* Entry has expired - remove it */
+ int next = entries[idx].next;
+ *prev_ptr = next;
+ table_map_free_entry(map, idx);
+ idx = next;
+ removed++;
+ }
+ else
+ {
+ prev_ptr = &entries[idx].next;
+ idx = entries[idx].next;
+ }
+ }
+ }
+
+ if (removed > 0)
+ {
+ ereport(DEBUG1,
+ (errmsg("memory_map: cleaned up %d expired table entries", removed)));
+ }
+}
+
+/* ----------------
+ * Query parse cache operations
+ * ----------------
+ */
+
+/*
+ * Initialize query parse cache
+ */
+static void
+query_cache_init(QueryParseCache *cache, int num_buckets, int max_entries)
+{
+ int *buckets;
+ QueryParseEntry *entries;
+ int i;
+
+ cache->num_buckets = num_buckets;
+ cache->max_entries = max_entries;
+ cache->num_entries = 0;
+ cache->free_list_head = 0;
+ cache->lru_head = MEMORY_MAP_INVALID_INDEX;
+ cache->lru_tail = MEMORY_MAP_INVALID_INDEX;
+ cache->lock = 0;
+
+ buckets = QUERY_CACHE_BUCKETS(cache);
+ entries = QUERY_CACHE_ENTRIES(cache);
+
+ /* Initialize all buckets to empty */
+ for (i = 0; i < num_buckets; i++)
+ buckets[i] = MEMORY_MAP_INVALID_INDEX;
+
+ /* Initialize free list */
+ for (i = 0; i < max_entries; i++)
+ {
+ entries[i].in_use = false;
+ entries[i].next = (i < max_entries - 1) ? i + 1 : MEMORY_MAP_INVALID_INDEX;
+ entries[i].lru_prev = MEMORY_MAP_INVALID_INDEX;
+ entries[i].lru_next = MEMORY_MAP_INVALID_INDEX;
+ }
+
+ ereport(DEBUG1,
+ (errmsg("memory_map: initialized query cache with %d buckets, %d max entries",
+ num_buckets, max_entries)));
+}
+
+/*
+ * Move entry to front of LRU list (most recently used)
+ */
+static void
+query_cache_lru_touch(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ /* Already at head? */
+ if (cache->lru_head == idx)
+ return;
+
+ /* Remove from current position */
+ if (entries[idx].lru_prev != MEMORY_MAP_INVALID_INDEX)
+ entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next;
+ if (entries[idx].lru_next != MEMORY_MAP_INVALID_INDEX)
+ entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev;
+ if (cache->lru_tail == idx)
+ cache->lru_tail = entries[idx].lru_prev;
+
+ /* Insert at head */
+ entries[idx].lru_prev = MEMORY_MAP_INVALID_INDEX;
+ entries[idx].lru_next = cache->lru_head;
+ if (cache->lru_head != MEMORY_MAP_INVALID_INDEX)
+ entries[cache->lru_head].lru_prev = idx;
+ cache->lru_head = idx;
+ if (cache->lru_tail == MEMORY_MAP_INVALID_INDEX)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Add entry to LRU list (at head)
+ */
+static void
+query_cache_lru_add(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ entries[idx].lru_prev = MEMORY_MAP_INVALID_INDEX;
+ entries[idx].lru_next = cache->lru_head;
+
+ if (cache->lru_head != MEMORY_MAP_INVALID_INDEX)
+ entries[cache->lru_head].lru_prev = idx;
+
+ cache->lru_head = idx;
+
+ if (cache->lru_tail == MEMORY_MAP_INVALID_INDEX)
+ cache->lru_tail = idx;
+}
+
+/*
+ * Remove entry from LRU list
+ */
+static void
+query_cache_lru_remove(QueryParseCache *cache, int idx)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+
+ if (entries[idx].lru_prev != MEMORY_MAP_INVALID_INDEX)
+ entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next;
+ else
+ cache->lru_head = entries[idx].lru_next;
+
+ if (entries[idx].lru_next != MEMORY_MAP_INVALID_INDEX)
+ entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev;
+ else
+ cache->lru_tail = entries[idx].lru_prev;
+
+ entries[idx].lru_prev = MEMORY_MAP_INVALID_INDEX;
+ entries[idx].lru_next = MEMORY_MAP_INVALID_INDEX;
+}
+
+/*
+ * Allocate entry from free list, evicting LRU if necessary
+ */
+static int
+query_cache_alloc_entry(QueryParseCache *cache)
+{
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int *buckets = QUERY_CACHE_BUCKETS(cache);
+ int idx;
+
+ if (cache->free_list_head != MEMORY_MAP_INVALID_INDEX)
+ {
+ idx = cache->free_list_head;
+ cache->free_list_head = entries[idx].next;
+ entries[idx].in_use = true;
+ entries[idx].next = MEMORY_MAP_INVALID_INDEX;
+ cache->num_entries++;
+ return idx;
+ }
+
+ /* No free entries - evict LRU */
+ if (cache->lru_tail == MEMORY_MAP_INVALID_INDEX)
+ return MEMORY_MAP_INVALID_INDEX;
+
+ idx = cache->lru_tail;
+
+ /* Remove from hash bucket */
+ int bucket = entries[idx].query_hash % cache->num_buckets;
+ int *prev_ptr = &buckets[bucket];
+ int curr = buckets[bucket];
+
+ while (curr != MEMORY_MAP_INVALID_INDEX)
+ {
+ if (curr == idx)
+ {
+ *prev_ptr = entries[curr].next;
+ break;
+ }
+ prev_ptr = &entries[curr].next;
+ curr = entries[curr].next;
+ }
+
+ /* Remove from LRU list */
+ query_cache_lru_remove(cache, idx);
+
+ /* Reinitialize entry */
+ entries[idx].in_use = true;
+ entries[idx].next = MEMORY_MAP_INVALID_INDEX;
+
+ return idx;
+}
+
+/*
+ * Look up a query in the cache
+ */
+static int
+query_cache_lookup(QueryParseCache *cache, uint64 hash)
+{
+ int *buckets = QUERY_CACHE_BUCKETS(cache);
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int bucket = hash % cache->num_buckets;
+ int idx = buckets[bucket];
+
+ while (idx != MEMORY_MAP_INVALID_INDEX)
+ {
+ if (entries[idx].query_hash == hash)
+ return idx;
+ idx = entries[idx].next;
+ }
+
+ return MEMORY_MAP_INVALID_INDEX;
+}
+
+/* ----------------
+ * Query normalization
+ * ----------------
+ */
+
+/*
+ * Simple query normalization:
+ * - Strip comments (-- style and C-style block comments)
+ * - Collapse whitespace
+ * - Convert to lowercase (except inside strings)
+ * - Replace literal values with placeholders
+ *
+ * This is a simplified version - pgpool2 already does this elsewhere,
+ * but we need a standalone version for the memory map feature.
+ */
+static size_t
+normalize_query(const char *query, char *output, size_t output_size)
+{
+ const char *src = query;
+ char *dst = output;
+ char *dst_end = output + output_size - 1;
+ bool in_string = false;
+ char string_char = 0;
+ bool last_was_space = true; /* Start true to skip leading space */
+
+ while (*src && dst < dst_end)
+ {
+ /* Handle string literals */
+ if (in_string)
+ {
+ if (*src == string_char)
+ {
+ if (*(src + 1) == string_char)
+ {
+ /* Escaped quote */
+ src += 2;
+ continue;
+ }
+ in_string = false;
+ *dst++ = '$'; /* Replace string content with placeholder */
+ }
+ src++;
+ continue;
+ }
+
+ /* Check for string start */
+ if (*src == '\'' || *src == '"')
+ {
+ in_string = true;
+ string_char = *src;
+ src++;
+ continue;
+ }
+
+ /* Handle single-line comments */
+ if (*src == '-' && *(src + 1) == '-')
+ {
+ while (*src && *src != '\n')
+ src++;
+ continue;
+ }
+
+ /* Handle multi-line comments */
+ if (*src == '/' && *(src + 1) == '*')
+ {
+ src += 2;
+ while (*src && !(*src == '*' && *(src + 1) == '/'))
+ src++;
+ if (*src)
+ src += 2;
+ continue;
+ }
+
+ /* Handle whitespace */
+ if (*src == ' ' || *src == '\t' || *src == '\n' || *src == '\r')
+ {
+ if (!last_was_space)
+ {
+ *dst++ = ' ';
+ last_was_space = true;
+ }
+ src++;
+ continue;
+ }
+
+ /* Handle numbers - replace with placeholder */
+ if ((*src >= '0' && *src <= '9') ||
+ (*src == '.' && *(src + 1) >= '0' && *(src + 1) <= '9'))
+ {
+ while (*src && ((*src >= '0' && *src <= '9') || *src == '.'))
+ src++;
+ if (!last_was_space && dst > output && *(dst - 1) != '$')
+ *dst++ = '$';
+ last_was_space = false;
+ continue;
+ }
+
+ /* Regular character - convert to lowercase */
+ if (*src >= 'A' && *src <= 'Z')
+ *dst++ = *src + 32;
+ else
+ *dst++ = *src;
+
+ last_was_space = false;
+ src++;
+ }
+
+ /* Remove trailing space */
+ if (dst > output && *(dst - 1) == ' ')
+ dst--;
+
+ *dst = '\0';
+ return dst - output;
+}
+
+/* ----------------
+ * Public API implementation
+ * ----------------
+ */
+
+Size
+pool_memory_map_shmem_size(void)
+{
+ Size size = 0;
+ int table_buckets = pool_config->memory_map_table_buckets;
+ int table_size = pool_config->memory_map_table_size;
+ int query_buckets = pool_config->memory_map_query_buckets;
+ int query_cache_size = pool_config->memory_map_query_cache_size;
+
+ /* Main structure */
+ size += sizeof(MemoryMapShmem);
+
+ /* Table mutation hash table */
+ size += sizeof(TableMutationHashTable);
+ size += table_buckets * sizeof(int); /* buckets array */
+ size += table_size * sizeof(TableMutationEntry); /* entries array */
+
+ /* Query parse cache */
+ size += sizeof(QueryParseCache);
+ size += query_buckets * sizeof(int); /* buckets array */
+ size += query_cache_size * sizeof(QueryParseEntry); /* entries array */
+
+ return size;
+}
+
+void
+pool_memory_map_init(void)
+{
+#ifndef POOL_PRIVATE
+ Size shmem_size;
+ char *shmem_ptr;
+
+ if (!pool_config->memory_map_enabled)
+ {
+ ereport(DEBUG1,
+ (errmsg("memory_map: feature disabled")));
+ return;
+ }
+
+ shmem_size = pool_memory_map_shmem_size();
+
+ /*
+ * Allocate from the main shared memory segment.
+ * Memory is already zeroed by initialize_shared_memory_main_segment().
+ */
+ shmem_ptr = pool_shared_memory_segment_get_chunk(shmem_size);
+ if (shmem_ptr == NULL)
+ {
+ ereport(ERROR,
+ (errmsg("memory_map: failed to allocate %zu bytes of shared memory",
+ shmem_size)));
+ return;
+ }
+
+ /* Set up pointers to structures within shared memory */
+ memory_map_shmem = (MemoryMapShmem *)shmem_ptr;
+ shmem_ptr += sizeof(MemoryMapShmem);
+
+ memory_map_shmem->table_map = (TableMutationHashTable *)shmem_ptr;
+ shmem_ptr += sizeof(TableMutationHashTable);
+ shmem_ptr += pool_config->memory_map_table_buckets * sizeof(int);
+ shmem_ptr += pool_config->memory_map_table_size * sizeof(TableMutationEntry);
+
+ memory_map_shmem->query_cache = (QueryParseCache *)shmem_ptr;
+
+ /* Initialize structures */
+ table_map_init(memory_map_shmem->table_map,
+ pool_config->memory_map_table_buckets,
+ pool_config->memory_map_table_size);
+
+ query_cache_init(memory_map_shmem->query_cache,
+ pool_config->memory_map_query_buckets,
+ pool_config->memory_map_query_cache_size);
+
+ /* Initialize global state */
+ memory_map_shmem->state.initialized = true;
+ memory_map_shmem->state.current_ttl_us = MEMORY_MAP_DEFAULT_TTL_US;
+ get_current_time(&memory_map_shmem->state.ttl_last_updated);
+ memory_map_shmem->state.stats_queries_checked = 0;
+ memory_map_shmem->state.stats_forced_primary = 0;
+ memory_map_shmem->state.stats_allowed_replica = 0;
+
+ ereport(LOG,
+ (errmsg("memory_map: initialized with %zu bytes shared memory",
+ shmem_size)));
+#endif
+}
+
+void
+pool_memory_map_child_init(void)
+{
+ if (!pool_config->memory_map_enabled || memory_map_shmem == NULL)
+ return;
+
+ get_current_time(&process_start_time);
+ cold_start_initialized = true;
+
+ ereport(DEBUG1,
+ (errmsg("memory_map: child initialized, cold start period %d ms",
+ pool_config->memory_map_cold_start_duration)));
+}
+
+bool
+pool_memory_map_in_cold_start(void)
+{
+ struct timeval now;
+ int64 elapsed_ms;
+
+ if (!pool_config->memory_map_enabled || !cold_start_initialized)
+ return false;
+
+ if (pool_config->memory_map_cold_start_duration <= 0)
+ return false;
+
+ get_current_time(&now);
+ elapsed_ms = elapsed_us(&process_start_time, &now) / 1000;
+
+ if (elapsed_ms < pool_config->memory_map_cold_start_duration)
+ {
+ ereport(DEBUG2,
+ (errmsg("memory_map: in cold start (%ld/%d ms)",
+ (long)elapsed_ms, pool_config->memory_map_cold_start_duration)));
+ return true;
+ }
+
+ return false;
+}
+
+bool
+pool_memory_map_table_is_stale(const char *table_name)
+{
+ TableMutationHashTable *map;
+ struct timeval now;
+ uint64 ttl_us;
+ uint32 hash;
+ int idx;
+ bool is_stale = false;
+
+ if (!pool_config->memory_map_enabled || memory_map_shmem == NULL)
+ return false;
+
+ map = memory_map_shmem->table_map;
+ hash = fnv1a_hash_string(table_name);
+
+ spin_lock(&map->lock);
+
+ idx = table_map_lookup(map, table_name, hash);
+ if (idx != MEMORY_MAP_INVALID_INDEX)
+ {
+ TableMutationEntry *entries = TABLE_MAP_ENTRIES(map);
+ get_current_time(&now);
+ ttl_us = memory_map_shmem->state.current_ttl_us;
+
+ int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now);
+ is_stale = (elapsed < (int64)ttl_us);
+
+ ereport(DEBUG2,
+ (errmsg("memory_map: table '%s' elapsed=%ld us, ttl=%lu us, stale=%d",
+ table_name, (long)elapsed, (unsigned long)ttl_us, is_stale)));
+ }
+
+ spin_unlock(&map->lock);
+
+ /* Update statistics */
+ __sync_fetch_and_add(&memory_map_shmem->state.stats_queries_checked, 1);
+ if (is_stale)
+ __sync_fetch_and_add(&memory_map_shmem->state.stats_forced_primary, 1);
+ else
+ __sync_fetch_and_add(&memory_map_shmem->state.stats_allowed_replica, 1);
+
+ return is_stale;
+}
+
+void
+pool_memory_map_mark_tables_written(const char **table_names, int num_tables)
+{
+ TableMutationHashTable *map;
+ struct timeval now;
+ int i;
+
+ if (!pool_config->memory_map_enabled || memory_map_shmem == NULL)
+ return;
+
+ if (num_tables <= 0 || table_names == NULL)
+ return;
+
+ map = memory_map_shmem->table_map;
+ get_current_time(&now);
+
+ spin_lock(&map->lock);
+
+ /* Periodically clean up expired entries */
+ if (map->num_entries > map->max_entries * 3 / 4)
+ {
+ table_map_cleanup_expired(map, memory_map_shmem->state.current_ttl_us);
+ }
+
+ for (i = 0; i < num_tables; i++)
+ {
+ uint32 hash;
+
+ if (table_names[i] != NULL && table_names[i][0] != '\0')
+ {
+ hash = fnv1a_hash_string(table_names[i]);
+ table_map_insert(map, table_names[i], hash, &now);
+ }
+ }
+
+ spin_unlock(&map->lock);
+}
+
+/*
+ * Convenience function to mark a single table as written
+ */
+void
+pool_memory_map_mark_table_written(const char *table_name)
+{
+ if (table_name != NULL)
+ {
+ const char *tables[1] = { table_name };
+ pool_memory_map_mark_tables_written(tables, 1);
+ }
+}
+
+void
+pool_memory_map_update_ttl(uint64 delay_us)
+{
+ uint64 new_ttl;
+
+ if (!pool_config->memory_map_enabled || memory_map_shmem == NULL)
+ return;
+
+ /* Calculate new TTL: delay * factor, with minimum of default TTL */
+ new_ttl = (uint64)(delay_us * pool_config->memory_map_ttl_factor);
+ if (new_ttl < MEMORY_MAP_DEFAULT_TTL_US)
+ new_ttl = MEMORY_MAP_DEFAULT_TTL_US;
+
+ /* Maximum TTL of 1 hour */
+ if (new_ttl > 3600ULL * 1000000ULL)
+ new_ttl = 3600ULL * 1000000ULL;
+
+ memory_map_shmem->state.current_ttl_us = new_ttl;
+ get_current_time(&memory_map_shmem->state.ttl_last_updated);
+
+ ereport(DEBUG1,
+ (errmsg("memory_map: updated TTL to %lu us (delay=%lu us, factor=%.1f)",
+ (unsigned long)new_ttl, (unsigned long)delay_us,
+ pool_config->memory_map_ttl_factor)));
+}
+
+bool
+pool_memory_map_get_cached_parse(uint64 hash, bool *is_write,
+ char table_names[][MEMORY_MAP_TABLE_NAME_LEN],
+ int *num_tables)
+{
+ QueryParseCache *cache;
+ int idx;
+ bool found = false;
+
+ if (!pool_config->memory_map_enabled || memory_map_shmem == NULL)
+ return false;
+
+ cache = memory_map_shmem->query_cache;
+
+ spin_lock(&cache->lock);
+
+ idx = query_cache_lookup(cache, hash);
+ if (idx != MEMORY_MAP_INVALID_INDEX)
+ {
+ QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache);
+ int i;
+
+ *is_write = entries[idx].is_write;
+ *num_tables = entries[idx].num_tables;
+
+ for (i = 0; i < entries[idx].num_tables && i < MEMORY_MAP_MAX_TABLES_PER_QUERY; i++)
+ {
+ strlcpy(table_names[i], entries[idx].table_names[i], MEMORY_MAP_TABLE_NAME_LEN);
+ }
+
+ /* Move to front of LRU */
+ query_cache_lru_touch(cache, idx);
+ found = true;
+ }
+
+ spin_unlock(&cache->lock);
+
+ return found;
+}
+
+void
+pool_memory_map_cache_parse(uint64 hash, bool is_write,
+ const char table_names[][MEMORY_MAP_TABLE_NAME_LEN],
+ int num_tables)
+{
+ QueryParseCache *cache;
+ int *buckets;
+ QueryParseEntry *entries;
+ int idx;
+ int bucket;
+
+ if (!pool_config->memory_map_enabled || memory_map_shmem == NULL)
+ return;
+
+ cache = memory_map_shmem->query_cache;
+
+ spin_lock(&cache->lock);
+
+ /* Check if already exists */
+ idx = query_cache_lookup(cache, hash);
+ if (idx != MEMORY_MAP_INVALID_INDEX)
+ {
+ spin_unlock(&cache->lock);
+ return;
+ }
+
+ /* Allocate new entry (may evict LRU) */
+ idx = query_cache_alloc_entry(cache);
+ if (idx == MEMORY_MAP_INVALID_INDEX)
+ {
+ spin_unlock(&cache->lock);
+ ereport(WARNING,
+ (errmsg("memory_map: failed to allocate query cache entry")));
+ return;
+ }
+
+ entries = QUERY_CACHE_ENTRIES(cache);
+ buckets = QUERY_CACHE_BUCKETS(cache);
+
+ /* Fill in entry */
+ entries[idx].query_hash = hash;
+ entries[idx].is_write = is_write;
+ entries[idx].num_tables = (num_tables > MEMORY_MAP_MAX_TABLES_PER_QUERY) ?
+ MEMORY_MAP_MAX_TABLES_PER_QUERY : num_tables;
+
+ {
+ int i;
+ for (i = 0; i < entries[idx].num_tables; i++)
+ {
+ strlcpy(entries[idx].table_names[i], table_names[i], MEMORY_MAP_TABLE_NAME_LEN);
+ }
+ }
+
+ /* Insert into hash bucket */
+ bucket = hash % cache->num_buckets;
+ entries[idx].next = buckets[bucket];
+ buckets[bucket] = idx;
+
+ /* Add to LRU list */
+ query_cache_lru_add(cache, idx);
+
+ spin_unlock(&cache->lock);
+}
+
+uint64
+pool_memory_map_normalize_and_hash(const char *query)
+{
+ char normalized[8192];
+ size_t len;
+
+ if (query == NULL || query[0] == '\0')
+ return 0;
+
+ len = normalize_query(query, normalized, sizeof(normalized));
+ if (len == 0)
+ return 0;
+
+ return fnv1a_hash_64(normalized, len);
+}
+
+uint64
+pool_memory_map_get_ttl(void)
+{
+ if (!pool_config->memory_map_enabled || memory_map_shmem == NULL)
+ return MEMORY_MAP_DEFAULT_TTL_US;
+
+ return memory_map_shmem->state.current_ttl_us;
+}
+
+void
+pool_memory_map_get_stats(uint32 *queries_checked,
+ uint32 *forced_primary,
+ uint32 *allowed_replica,
+ uint64 *current_ttl_us)
+{
+ if (!pool_config->memory_map_enabled || memory_map_shmem == NULL)
+ {
+ *queries_checked = 0;
+ *forced_primary = 0;
+ *allowed_replica = 0;
+ *current_ttl_us = 0;
+ return;
+ }
+
+ *queries_checked = memory_map_shmem->state.stats_queries_checked;
+ *forced_primary = memory_map_shmem->state.stats_forced_primary;
+ *allowed_replica = memory_map_shmem->state.stats_allowed_replica;
+ *current_ttl_us = memory_map_shmem->state.current_ttl_us;
+}
--
2.52.0
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: Proposal: Recent mutated table tracking in memory
In-Reply-To: <CACeKOO2hjPmstboJaa=rw8Erd7k5VhXyupU39bAosCPtUe1UBA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox