From 403ed46f0d2b33858c05a25d74be2b027db7d21b Mon Sep 17 00:00:00 2001 From: Nadav Shatz Date: Tue, 6 Jan 2026 12:41:50 +0200 Subject: [PATCH] Feature: add in-memory table tracking to prevent stale reads from replicas MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement "memory map" feature that tracks recently-written database tables in shared memory to prevent stale reads during replication lag. When a write (INSERT/UPDATE/DELETE) occurs on a table, that table is marked as "dirty" for a configurable TTL period. Any SELECT on a dirty table within the TTL window is routed to primary instead of replica. Key features: - Shared memory hash table for tracking table mutations with TTL - Query parse cache with LRU eviction for performance - Cold start protection (routes all queries to primary initially) - Automatic TTL calculation: replication_delay × configurable factor - Per-table staleness tracking with microsecond precision New configuration parameters: - memory_map_enabled: Enable/disable the feature (default: off) - memory_map_ttl_factor: TTL multiplier for replication delay (default: 5.0) - memory_map_cold_start_duration: Cold start period in ms (default: 2000) - memory_map_table_buckets: Hash buckets for table map (default: 1024) - memory_map_table_size: Max tracked tables (default: 2048) - memory_map_query_buckets: Hash buckets for query cache (default: 2048) - memory_map_query_cache_size: Max cached queries (default: 10000) diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml index ee19fabebab2210cd4abe59a711a036ac0ac8943..dce8dec199371e3a24d92baaad6647757b7edf5f 100644 --- a/doc/src/sgml/loadbalance.sgml +++ b/doc/src/sgml/loadbalance.sgml @@ -1193,4 +1193,214 @@ dml_adaptive_object_relationship_list = 'table_1:table_2' + + + Table Mutation Map Configuration (Lagless Replica Reads) + + + These parameters configure the table mutation map feature, which tracks recently written tables + to prevent stale reads from replica nodes during replication lag. This implements the + "lagless" architecture pattern for distributed systems with read replicas. + + + + When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period + (replication_delay * table_mutation_map_ttl_factor). Any SELECT queries on stale tables are routed + to the primary node instead of replicas, ensuring read-after-write consistency. + + + + This feature requires to be configured + for monitoring replication delay from replicas. + + + + + Enabling the table mutation map feature increases shared memory consumption. With default settings, + the feature requires approximately 6.4 MB of shared memory (0.1 MB for table tracking + 6.3 MB for query cache). + Memory usage scales with configuration parameters: + + + + + Table tracking: table_mutation_map_table_size * 40 bytes (default: 2048 * 40 = ~80 KB) + + + + + Query cache: table_mutation_map_query_cache_size * 640 bytes (default: 10000 * 640 = ~6.3 MB) + + + + + For high-traffic systems with large cache sizes (e.g., table_mutation_map_query_cache_size = 100000), + memory usage can reach 64 MB or more. Consider your system's available shared memory when enabling this feature. + + + + + + + table_mutation_map_enabled (boolean) + + table_mutation_map_enabled configuration parameter + + + + + Enables in-memory tracking of recently written tables. When enabled, tables are marked + as stale after write operations, and reads are routed to primary until the TTL expires. + + + This parameter can be changed by reloading the Pgpool-II configurations. + Default is off. + + + + + + table_mutation_map_ttl_factor (floating point) + + table_mutation_map_ttl_factor configuration parameter + + + + + Multiplier for calculating the TTL: TTL = replication_delay * table_mutation_map_ttl_factor. + Higher values provide more safety margin but may reduce read replica utilization. + + + Valid range: 1.0-100.0. Default is 5.0. + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + table_mutation_map_cold_start_duration (integer) + + table_mutation_map_cold_start_duration configuration parameter + + + + + Duration in milliseconds to route all queries to primary after a child process starts. + This prevents stale reads when a new connection is established before the table mutation map + is populated with recent write history. + + + When watchdog is enabled and the local node becomes the leader, Pgpool-II also triggers a + global cold start for this duration to avoid stale reads after leadership changes. + + + Valid range: 0-60000 ms. Default is 2000 (2 seconds). + Set to 0 to disable cold start behavior. + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + table_mutation_map_table_buckets (integer) + + table_mutation_map_table_buckets configuration parameter + + + + + Number of hash buckets for the table mutation tracking hash table. + Higher values reduce hash collisions and improve lookup performance. + + + Valid range: 64-65536. Default is 1024. + This parameter can only be set at server start. + + + + + + table_mutation_map_table_size (integer) + + table_mutation_map_table_size configuration parameter + + + + + Maximum number of tables that can be tracked simultaneously in the table mutation map. + When full, oldest entries are evicted using a simple eviction strategy. + + + Valid range: 128-131072. Default is 2048. + Memory usage: approximately 40 bytes per entry. + This parameter can only be set at server start. + + + + + + table_mutation_map_query_buckets (integer) + + table_mutation_map_query_buckets configuration parameter + + + + + Number of hash buckets for the query parse cache. The cache stores normalized + query strings mapped to their table dependencies to avoid repeated parsing. + + + Valid range: 64-65536. Default is 2048. + This parameter can only be set at server start. + + + + + + table_mutation_map_query_cache_size (integer) + + table_mutation_map_query_cache_size configuration parameter + + + + + Maximum number of query parse results to cache. Uses LRU eviction when full. + Larger caches reduce parsing overhead but consume more shared memory. + + + Valid range: 100-1000000. Default is 10000. + Memory usage: approximately 640 bytes per entry (~6.3 MB for default, ~64 MB for 100000 entries). + This parameter can only be set at server start. + + + + + + + + Table Mutation Map Configuration Example + + To enable table mutation map with replication delay monitoring: + + +# Enable table mutation map feature +table_mutation_map_enabled = on +table_mutation_map_ttl_factor = 5.0 +table_mutation_map_cold_start_duration = 2000 + +# Configure external replication delay monitoring +replication_delay_source_cmd = '/path/to/get-replication-delay.sh' +replication_delay_source_timeout = 10 + +# Adjust cache sizes based on workload (increases memory usage) +table_mutation_map_table_size = 4096 # Track up to 4096 tables (~160 KB) +table_mutation_map_query_cache_size = 50000 # Cache 50k queries (~31 MB) + + + Total shared memory required for above configuration: approximately 31.2 MB (31 MB query cache + 0.2 MB table map + overhead). + Default configuration (10000 query cache entries, 2048 tables) requires approximately 6.4 MB. + + + + + diff --git a/src/Makefile.am b/src/Makefile.am index 4678ab53055e828a37b6477801640aff17ff84a7..fc69bb98c8907d23855837cefaad0a972b4e2171 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \ rewrite/pool_timestamp.c \ rewrite/pool_lobj.c \ utils/pool_select_walker.c \ + utils/pool_table_mutation_map.c \ utils/strlcpy.c \ utils/psprintf.c \ utils/pool_params.c \ diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c index 68abb7f41cb96d856c824a148842748bfb7a4d12..099191af7629c0ca145628e9a9e9ac92c4bb2f6e 100644 --- a/src/config/pool_config_variables.c +++ b/src/config/pool_config_variables.c @@ -783,6 +783,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"table_mutation_map_enabled", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "Enable in-memory tracking of recently written tables to avoid stale reads from replicas", + CONFIG_VAR_TYPE_BOOL, false, 0 + }, + &g_pool_config.table_mutation_map_enabled, + false, + NULL, NULL, NULL + }, + { {"auto_failback", CFGCXT_RELOAD, FAILOVER_CONFIG, "Enables nodes automatically reattach, when detached node continue streaming replication.", @@ -1757,6 +1767,17 @@ static struct config_int_array ConfigureNamesIntArray[] = static struct config_double ConfigureNamesDouble[] = { + { + {"table_mutation_map_ttl_factor", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "TTL multiplier for table mutation map (TTL = replication_delay * factor)", + CONFIG_VAR_TYPE_DOUBLE, false, 0 + }, + &g_pool_config.table_mutation_map_ttl_factor, + 5.0, /* boot value: 5x replication delay */ + 1.0, 100.0, /* min, max */ + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_DOUBLE }; @@ -2355,6 +2376,61 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"table_mutation_map_cold_start_duration", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "Duration in milliseconds to force queries to primary after child process starts.", + CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS + }, + &g_pool_config.table_mutation_map_cold_start_duration, + 2000, /* 2 seconds */ + 0, 60000, /* 0 to 60 seconds */ + NULL, NULL, NULL + }, + + { + {"table_mutation_map_table_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Number of hash buckets for table mutation map.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.table_mutation_map_table_buckets, + 1024, + 64, 65536, + NULL, NULL, NULL + }, + + { + {"table_mutation_map_table_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Maximum number of entries in table mutation map.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.table_mutation_map_table_size, + 2048, + 128, 131072, + NULL, NULL, NULL + }, + + { + {"table_mutation_map_query_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Number of hash buckets for query parse cache.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.table_mutation_map_query_buckets, + 2048, + 64, 65536, + NULL, NULL, NULL + }, + + { + {"table_mutation_map_query_cache_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Maximum number of entries in query parse cache.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.table_mutation_map_query_cache_size, + 10000, + 100, 1000000, + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_INT }; diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c index 7cf9813eb7d58678bc86a0aaa38bd3c6445b6687..2dbbee8abce8daff6a98bf8f202bdc10bf324006 100644 --- a/src/context/pool_query_context.c +++ b/src/context/pool_query_context.c @@ -29,6 +29,7 @@ #include "utils/statistics.h" #include "utils/pool_select_walker.h" #include "utils/pool_stream.h" +#include "utils/pool_table_mutation_map.h" #include "context/pool_session_context.h" #include "context/pool_query_context.h" #include "parser/nodes.h" @@ -2010,6 +2011,19 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node if (dest == POOL_PRIMARY) { pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); + + /* + * Resolve table and database OIDs now to populate relcache. + * This avoids potential hangs in CommandComplete where we shouldn't + * be running new queries against the backend. + */ + if (pool_config->table_mutation_map_enabled && + (IsA(node, InsertStmt) || IsA(node, UpdateStmt) || IsA(node, DeleteStmt))) + { + int *oids; + pool_extract_table_oids(node, &oids); + pool_table_mutation_map_get_database_oid(); + } } /* Should be sent to both primary and standby? */ else if (dest == POOL_BOTH) @@ -2139,6 +2153,107 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node { pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); } + /* + * Check table mutation map for recently written tables. + * If in cold start or any table was recently written, + * route to primary to avoid stale reads. + */ + else if (pool_config->table_mutation_map_enabled) + { + bool force_primary = false; + + /* During cold start, route everything to primary */ + if (pool_table_mutation_map_in_cold_start()) + { + ereport(DEBUG1, + (errmsg("could not load balance because of table mutation map cold start"), + errdetail("destination = PRIMARY for query= \"%s\"", query))); + force_primary = true; + } + else + { + /* Extract table oids and check if any are stale */ + SelectContext ctx; + int dboid; + int num_oids; + int i; + + memset(&ctx, 0, sizeof(ctx)); + num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx); + if (num_oids > 0) + { + dboid = pool_table_mutation_map_get_database_oid(); + + if (dboid <= 0) + { + ereport(DEBUG1, + (errmsg("could not load balance because database oid was unavailable"), + errdetail("destination = PRIMARY for query= \"%s\"", query))); + force_primary = true; + } + else + { + for (i = 0; i < num_oids; i++) + { + if (pool_table_mutation_map_table_is_stale(ctx.table_oids[i], dboid)) + { + ereport(DEBUG1, + (errmsg("could not load balance because table \"%s\" was recently written", + ctx.table_names[i]), + errdetail("destination = PRIMARY for query= \"%s\"", query))); + force_primary = true; + break; + } + } + } + } + } + + if (force_primary) + { + pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); + } + else + { + /* Proceed with load balancing */ + if (pool_config->statement_level_load_balance) + { + session_context->load_balance_node_id = select_load_balancing_node(); + } + + /* + * As streaming replication delay is too much, if + * prefer_lower_delay_standby is true then elect new load + * balance node which is lowest delayed, false then send + * to the primary. + */ + if (STREAM && check_replication_delay(session_context->load_balance_node_id)) + { + ereport(DEBUG1, + (errmsg("could not load balance because of too much replication delay"), + errdetail("destination = %d for query= \"%s\"", dest, query))); + + if (pool_config->prefer_lower_delay_standby) + { + int new_load_balancing_node = select_load_balancing_node(); + + session_context->load_balance_node_id = new_load_balancing_node; + session_context->query_context->load_balance_node_id = session_context->load_balance_node_id; + pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id); + } + else + { + pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); + } + } + else + { + session_context->query_context->load_balance_node_id = session_context->load_balance_node_id; + pool_set_node_to_be_sent(query_context, + session_context->query_context->load_balance_node_id); + } + } + } else { if (pool_config->statement_level_load_balance) diff --git a/src/include/pool.h b/src/include/pool.h index ea6f87e120af866b8ed3a15790d9d8a8e009fe91..c0d8170a8d6c23afdaa1e30dcf7c4bd4d88e7edd 100644 --- a/src/include/pool.h +++ b/src/include/pool.h @@ -424,7 +424,7 @@ typedef enum #define Min(x, y) ((x) < (y) ? (x) : (y)) -#define MAX_NUM_SEMAPHORES 8 +#define MAX_NUM_SEMAPHORES 10 #define CONN_COUNTER_SEM 0 #define REQUEST_INFO_SEM 1 #define QUERY_CACHE_STATS_SEM 2 @@ -434,6 +434,8 @@ typedef enum #define FOLLOW_PRIMARY_SEM 6 #define MAIN_EXIT_HANDLER_SEM 7 /* used in exit_hander in pgpool main * process */ +#define TABLE_MUTATION_MAP_TABLE_SEM 8 +#define TABLE_MUTATION_MAP_QUERY_SEM 9 #define MAX_REQUEST_QUEUE_SIZE 10 #define MAX_SEC_WAIT_FOR_CLUSTER_TRANSACTION 10 /* time in seconds to keep diff --git a/src/include/pool_config.h b/src/include/pool_config.h index 741de6cc5fc3368f813d6b6efa68eb7f8a79506b..e6c727823dedeedd0225420b66be8382f5bb83fe 100644 --- a/src/include/pool_config.h +++ b/src/include/pool_config.h @@ -365,6 +365,16 @@ typedef struct * replication check */ char *replication_delay_source_cmd; /* external command for replication delay */ int replication_delay_source_timeout; /* timeout for external command in seconds */ + + /* Table mutation map configuration for tracking recently written tables */ + bool table_mutation_map_enabled; /* Enable in-memory table tracking */ + double table_mutation_map_ttl_factor; /* TTL multiplier for replication delay */ + int table_mutation_map_cold_start_duration; /* Cold start duration in ms */ + int table_mutation_map_table_buckets; /* Number of hash buckets for table map */ + int table_mutation_map_table_size; /* Max entries in table map */ + int table_mutation_map_query_buckets; /* Number of hash buckets for query cache */ + int table_mutation_map_query_cache_size; /* Max entries in query cache */ + char *failover_command; /* execute command when failover happens */ char *follow_primary_command; /* execute command when failover is * ended */ diff --git a/src/include/utils/pool_table_mutation_map.h b/src/include/utils/pool_table_mutation_map.h new file mode 100644 index 0000000000000000000000000000000000000000..4c96cb9085107a2682be2948b83f83835fe555c8 --- /dev/null +++ b/src/include/utils/pool_table_mutation_map.h @@ -0,0 +1,237 @@ +/* -*-pgsql-c-*- */ +/* + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2026 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + * pool_table_mutation_map.h: In-memory tracking of recently written tables + * to avoid stale reads from replicas during replication lag + */ + +#ifndef POOL_TABLE_MUTATION_MAP_H +#define POOL_TABLE_MUTATION_MAP_H + +#include "pool.h" +#include + +/* + * Maximum table name length including schema: "schema"."table" + * Using NAMEDATALEN * 2 + 4 for quotes and dot + */ +#define TABLE_MUTATION_MAP_TABLE_NAME_LEN (NAMEDATALEN * 2 + 4) + +/* + * Maximum number of tables we track per query + */ +#define TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY 8 + +/* + * Invalid index marker for linked lists + */ +#define TABLE_MUTATION_MAP_INVALID_INDEX (-1) + +/* + * Default TTL in microseconds (100ms) used when replication delay is unknown + */ +#define TABLE_MUTATION_MAP_DEFAULT_TTL_US (100 * 1000) + +/* + * Entry in the table mutation hash table (keyed by table/database oids) + */ +typedef struct TableMutationEntry +{ + int table_oid; /* Table oid */ + int dboid; /* Database oid */ + struct timeval last_write_time; /* When the table was last written */ + uint32 hash; /* Pre-computed hash value */ + int next; /* Next entry in collision chain (-1 if none) */ + bool in_use; /* Is this entry in use? */ +} TableMutationEntry; + +/* + * Header for the table mutation hash table in shared memory + */ +typedef struct TableMutationHashTable +{ + int num_buckets; /* Number of hash buckets */ + int max_entries; /* Maximum entries allowed */ + int num_entries; /* Current number of entries */ + int free_list_head; /* Head of free entry list */ + /* Flexible array members follow in shared memory: + * int buckets[num_buckets]; + * TableMutationEntry entries[max_entries]; + */ +} TableMutationHashTable; + +/* + * Entry in the query parse cache + */ +typedef struct QueryParseEntry +{ + uint64 query_hash; /* Hash of normalized query */ + bool is_write; /* True if INSERT/UPDATE/DELETE */ + int num_tables; /* Number of tables in query */ + char table_names[TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY][TABLE_MUTATION_MAP_TABLE_NAME_LEN]; + int next; /* Next entry in collision chain */ + int lru_prev; /* Previous in LRU list */ + int lru_next; /* Next in LRU list */ + bool in_use; /* Is this entry in use? */ +} QueryParseEntry; + +/* + * Header for the query parse cache in shared memory + */ +typedef struct QueryParseCache +{ + int num_buckets; /* Number of hash buckets */ + int max_entries; /* Maximum entries allowed */ + int num_entries; /* Current number of entries */ + int free_list_head; /* Head of free entry list */ + int lru_head; /* Most recently used */ + int lru_tail; /* Least recently used */ + /* Flexible array members follow in shared memory: + * int buckets[num_buckets]; + * QueryParseEntry entries[max_entries]; + */ +} QueryParseCache; + +/* + * Global state for table mutation map feature + */ +typedef struct TableMutationMapState +{ + bool initialized; /* Has shared memory been initialized? */ + uint64 current_ttl_us; /* Current TTL in microseconds */ + struct timeval ttl_last_updated; /* When TTL was last updated */ + struct timeval last_cleanup_time; /* When last expired cleanup ran */ + struct timeval global_cold_start_until; /* Global cold start end time */ + volatile uint32 stats_queries_checked; /* Number of queries checked */ + volatile uint32 stats_forced_primary; /* Queries forced to primary */ + volatile uint32 stats_allowed_replica; /* Queries allowed to replica */ +} TableMutationMapState; + +/* + * Main shared memory structure containing all components + */ +typedef struct TableMutationMapShmem +{ + TableMutationMapState state; + TableMutationHashTable *table_map; + QueryParseCache *query_cache; +} TableMutationMapShmem; + +/* ---------------- + * Public API functions + * ---------------- + */ + +/* + * Initialize shared memory structures for table mutation map. + * Called from pgpool_main.c after pool_init_pool_info(). + */ +extern void pool_table_mutation_map_init(void); + +/* + * Initialize per-child process state for table mutation map. + * Called from child.c when a new child process starts. + * Sets up cold start tracking. + */ +extern void pool_table_mutation_map_child_init(void); + +/* + * Check if the child process is in cold start period. + * During cold start, all queries are routed to primary. + * Returns true if in cold start, false otherwise. + */ +extern bool pool_table_mutation_map_in_cold_start(void); + +/* + * Trigger a global cold start period for all processes. + * Used after watchdog leader change to avoid stale reads. + */ +extern void pool_table_mutation_map_trigger_global_cold_start(void); + +/* + * Get oid of current database. + */ +extern int pool_table_mutation_map_get_database_oid(void); + +/* + * Check if a table was recently written to (is "stale"). + * If stale, reads from this table should go to primary. + * Returns true if table is stale (recently written), false otherwise. + */ +extern bool pool_table_mutation_map_table_is_stale(int table_oid, int dboid); + +/* + * Mark tables as recently written. + * Called after INSERT/UPDATE/DELETE queries complete. + * table_oids: array of table oids + * num_tables: number of tables in array + * dboid: database oid + */ +extern void pool_table_mutation_map_mark_tables_written(const int *table_oids, int num_tables, int dboid); + +/* + * Convenience function to mark a single table as written. + * table_oid: table oid + * dboid: database oid + */ +extern void pool_table_mutation_map_mark_table_written(int table_oid, int dboid); + +/* + * Update the TTL based on current replication delay. + * Called from pool_worker_child.c when replication delay is updated. + * delay_us: replication delay in microseconds + */ +extern void pool_table_mutation_map_update_ttl(uint64 delay_us); + +/* + * Look up cached parse result for a query. + * hash: hash of normalized query + * is_write: output - true if query is a write + * table_names: output - array to fill with table names + * num_tables: output - number of tables found + * Returns true if found in cache, false otherwise. + */ +extern bool pool_table_mutation_map_get_cached_parse(uint64 hash, bool *is_write, + char table_names[][TABLE_MUTATION_MAP_TABLE_NAME_LEN], + int *num_tables); + +/* + * Cache a parse result for a query. + * hash: hash of normalized query + * is_write: true if query is a write + * table_names: array of table names + * num_tables: number of tables + */ +extern void pool_table_mutation_map_cache_parse(uint64 hash, bool is_write, + const char table_names[][TABLE_MUTATION_MAP_TABLE_NAME_LEN], + int num_tables); + +/* + * Normalize a query and compute its hash. + * Strips comments, normalizes whitespace and literals. + * query: input SQL query string + * Returns: 64-bit hash of normalized query + */ +extern uint64 pool_table_mutation_map_normalize_and_hash(const char *query); + +/* + * Calculate required shared memory size for table mutation map. + */ +extern Size pool_table_mutation_map_shmem_size(void); + +#endif /* POOL_TABLE_MUTATION_MAP_H */ diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c index fa05e15e7ac435e072298063f918c70aa4e5680c..87dc2c4f09a62e1cd680b8020975e3ecf0813ec0 100644 --- a/src/main/pgpool_main.c +++ b/src/main/pgpool_main.c @@ -57,6 +57,7 @@ #include "auth/pool_passwd.h" #include "auth/pool_hba.h" #include "query_cache/pool_memqcache.h" +#include "utils/pool_table_mutation_map.h" #include "watchdog/wd_internal_commands.h" #include "watchdog/wd_lifecheck.h" #include "watchdog/watchdog.h" @@ -1485,11 +1486,14 @@ sigusr1_interrupt_processor(void) if (user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED]) { + WD_STATES wd_state; + ereport(LOG, (errmsg("Pgpool-II parent process received watchdog state change signal from watchdog"))); user1SignalSlot->signalFlags[SIG_WATCHDOG_STATE_CHANGED] = false; - if (wd_internal_get_watchdog_local_node_state() == WD_STANDBY) + wd_state = wd_internal_get_watchdog_local_node_state(); + if (wd_state == WD_STANDBY) { ereport(LOG, (errmsg("we have joined the watchdog cluster as STANDBY node"), @@ -1503,6 +1507,10 @@ sigusr1_interrupt_processor(void) */ pool_release_follow_primary_lock(true); } + else if (wd_state == WD_COORDINATOR && pool_config->table_mutation_map_enabled) + { + pool_table_mutation_map_trigger_global_cold_start(); + } } if (user1SignalSlot->signalFlags[SIG_FAILOVER_INTERRUPT]) { @@ -3068,6 +3076,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps) elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size())); } + if (pool_config->table_mutation_map_enabled) + { + size += MAXALIGN(pool_table_mutation_map_shmem_size()); + elog(DEBUG1, "table_mutation_map: %zu bytes requested for shared memory", MAXALIGN(pool_table_mutation_map_shmem_size())); + } + initialize_shared_memory_main_segment(size); /* Move the backend descriptors to shared memory */ @@ -3184,6 +3198,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps) wd_ipc_initialize_data(); } + /* Initialize table mutation map for tracking recently written tables */ + if (pool_config->table_mutation_map_enabled) + { + pool_table_mutation_map_init(); + } + } /* diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c index a3b8f0ea194ffecc79e58566be80562a46eb75ab..2de467496194dd219437eb3721ba9d8c8f999bb6 100644 --- a/src/protocol/CommandComplete.c +++ b/src/protocol/CommandComplete.c @@ -38,6 +38,8 @@ #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/pool_stream.h" +#include "utils/pool_table_mutation_map.h" +#include "utils/pool_select_walker.h" static int extract_ntuples(char *message); static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete); @@ -304,6 +306,45 @@ handle_query_context(POOL_CONNECTION_POOL *backend) node = session_context->query_context->parse_tree; + /* + * Track table writes for table mutation map feature. + * Mark tables as written when INSERT/UPDATE/DELETE completes. + */ + if (pool_config->table_mutation_map_enabled) + { + char *table_name = NULL; + int table_oid = 0; + int dboid = 0; + + if (IsA(node, InsertStmt)) + { + InsertStmt *stmt = (InsertStmt *) node; + table_name = make_table_name_from_rangevar(stmt->relation); + } + else if (IsA(node, UpdateStmt)) + { + UpdateStmt *stmt = (UpdateStmt *) node; + table_name = make_table_name_from_rangevar(stmt->relation); + } + else if (IsA(node, DeleteStmt)) + { + DeleteStmt *stmt = (DeleteStmt *) node; + table_name = make_table_name_from_rangevar(stmt->relation); + } + + if (table_name != NULL) + { + table_oid = pool_table_name_to_oid(table_name); + dboid = pool_table_mutation_map_get_database_oid(); + if (table_oid > 0 && dboid > 0) + { + pool_table_mutation_map_mark_table_written(table_oid, dboid); + ereport(DEBUG1, + (errmsg("table_mutation_map: marked table \"%s\" as written", table_name))); + } + } + } + if (IsA(node, PrepareStmt)) { if (session_context->uncompleted_message) diff --git a/src/protocol/child.c b/src/protocol/child.c index c34f057281be62feaf39db1bb605062f56dc398c..26d7cf1d1a6768c109850a43b57373141f9f7eaf 100644 --- a/src/protocol/child.c +++ b/src/protocol/child.c @@ -57,6 +57,7 @@ #include "utils/elog.h" #include "utils/ps_status.h" #include "utils/timestamp.h" +#include "utils/pool_table_mutation_map.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" @@ -213,6 +214,12 @@ do_child(int *fds) /* Initialize per process context */ pool_init_process_context(); + /* Initialize table mutation map child state for cold start tracking */ + if (pool_config->table_mutation_map_enabled) + { + pool_table_mutation_map_child_init(); + } + /* initialize connection pool */ if (pool_init_cp()) { diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream index 454fdb9e5d1fd65437b6a67f12ab62658ea08f49..46052bad37bbd1f4affec8e08e5cecd3d4903976 100644 --- a/src/sample/pgpool.conf.sample-stream +++ b/src/sample/pgpool.conf.sample-stream @@ -499,6 +499,51 @@ backend_clustering_mode = streaming_replication #statement_level_load_balance = off # Enables statement level load balancing +# - Table Mutation Map (Lagless Read Replica) - + # WARNING: Enabling this feature increases shared memory usage + # Default settings require ~6.4 MB shared memory + # (0.1 MB table tracking + 6.3 MB query cache) + +#table_mutation_map_enabled = off + # Enable in-memory tracking of recently written tables + # to prevent stale reads from replicas during replication lag + # (change requires reload) + +#table_mutation_map_ttl_factor = 5.0 + # TTL multiplier: TTL = replication_delay * factor + # Higher values provide more safety margin + # Range: 1.0-100.0 (default: 5.0) + # (change requires reload) + +#table_mutation_map_cold_start_duration = 2000 + # Duration in milliseconds to route all queries to primary + # after child process starts (cold start period) + # Range: 0-60000 ms (default: 2000 ms = 2 seconds) + # Set to 0 to disable cold start behavior + # (change requires reload) + +#table_mutation_map_table_buckets = 1024 + # Number of hash buckets for table mutation tracking + # Higher values reduce hash collisions + # Range: 64-65536 (default: 1024) + # (change requires restart) + +#table_mutation_map_table_size = 2048 + # Maximum number of tables to track simultaneously + # Range: 128-131072 (default: 2048) + # (change requires restart) + +#table_mutation_map_query_buckets = 2048 + # Number of hash buckets for query parse cache + # Range: 64-65536 (default: 2048) + # (change requires restart) + +#table_mutation_map_query_cache_size = 10000 + # Maximum number of query parse results to cache + # Range: 100-1000000 (default: 10000) + # Memory usage: ~640 bytes per entry (~6.3 MB default, ~64 MB for 100000) + # (change requires restart) + #------------------------------------------------------------------------------ # STREAMING REPLICATION MODE #------------------------------------------------------------------------------ diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c index 311b638658e66ebb56162ad9fa4392315b2df64e..38bd217be1972af57f80c26c8d726aad704d56bd 100644 --- a/src/streaming_replication/pool_worker_child.c +++ b/src/streaming_replication/pool_worker_child.c @@ -58,6 +58,7 @@ #include "utils/pool_ip.h" #include "utils/ps_status.h" #include "utils/pool_stream.h" +#include "utils/pool_table_mutation_map.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" @@ -695,6 +696,7 @@ check_replication_time_lag_with_cmd(void) double delay_ms; uint64 delay; uint64 delay_threshold_by_time; + uint64 max_delay_us = 0; /* Track maximum delay for table mutation map */ int token_count = 0; int primary_node_id; int save_errno; @@ -1003,6 +1005,10 @@ check_replication_time_lag_with_cmd(void) bkinfo->standby_delay = delay; bkinfo->standby_delay_by_time = true; + /* Track maximum delay for table mutation map TTL calculation */ + if (delay > max_delay_us) + max_delay_us = delay; + /* * Log delay if necessary. threshold is in milliseconds, convert * to microseconds. @@ -1021,6 +1027,10 @@ check_replication_time_lag_with_cmd(void) token = strtok_r(NULL, " \t\n", &saveptr); } + /* Update table mutation map TTL based on maximum observed delay */ + if (pool_config->table_mutation_map_enabled && max_delay_us > 0) + pool_table_mutation_map_update_ttl(max_delay_us); + } PG_CATCH(); { diff --git a/src/test/regression/tests/045.table_mutation_map/test.sh b/src/test/regression/tests/045.table_mutation_map/test.sh new file mode 100755 index 0000000000000000000000000000000000000000..e0f229ee88a70e4643df1745a6e6992b867354ae --- /dev/null +++ b/src/test/regression/tests/045.table_mutation_map/test.sh @@ -0,0 +1,228 @@ +#!/usr/bin/env bash +#------------------------------------------------------------------- +# test script for table mutation map feature (in-memory table tracking). +# Tests routing of queries based on recently written tables. +# +source $TESTLIBS +TESTDIR=testdir +PSQL=$PGBIN/psql +PSQLOPTS="-a -q -X" +PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin +export PGDATABASE=test + +# Only run in streaming replication mode since that's the target use case +for mode in s +do + rm -fr $TESTDIR + mkdir $TESTDIR + cd $TESTDIR + + # Create test environment with 2 nodes + echo -n "creating test environment..." + $PGPOOL_SETUP -m $mode -n 2 || exit 1 + echo "done." + + source ./bashrc.ports + + # Configure table mutation map feature + echo "table_mutation_map_enabled = on" >> etc/pgpool.conf + echo "table_mutation_map_ttl_factor = 5.0" >> etc/pgpool.conf + echo "table_mutation_map_cold_start_duration = 2000" >> etc/pgpool.conf + + # Configure weights so we can distinguish routing + # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1 + # This means load balanced queries go to node 1 by default + echo "backend_weight0 = 0" >> etc/pgpool.conf + echo "backend_weight1 = 1" >> etc/pgpool.conf + + # Enable debug logging to see routing decisions + echo "log_min_messages = debug1" >> etc/pgpool.conf + + ./startall + + export PGPORT=$PGPOOL_PORT + + wait_for_pgpool_startup + + # Create test tables + $PSQL test < /dev/null 2>&1 + + # Check log for cold start message + if grep -q "could not load balance because of table mutation map cold start" log/pgpool.log; then + echo "Test 1 PASSED: Cold start routing works" + else + echo "Test 1 FAILED: Cold start routing not detected" + ./shutdownall + exit 1 + fi + + echo "=== Test 2: Wait for cold start to end ===" + # Wait for cold start period to end (2 seconds) + sleep 3 + + # Clear the log + > log/pgpool.log + + # Now a clean table query should load balance (go to node 1) + $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1 + + # After cold start, queries to clean tables should load balance + # Check that it did NOT get forced to primary due to table mutation map + if grep -q "could not load balance because of table mutation map cold start" log/pgpool.log; then + echo "Test 2 FAILED: Still in cold start after waiting" + ./shutdownall + exit 1 + fi + echo "Test 2 PASSED: Cold start ended correctly" + + echo "=== Test 3: Write-then-Read Routing ===" + # Clear the log + > log/pgpool.log + + # Write to t1 + $PSQL test -c "INSERT INTO t1 VALUES (1);" > /dev/null 2>&1 + + # Immediately read from t1 - should go to primary due to recent write + $PSQL test -c "SELECT 'write_read_test' as marker, * FROM t1;" > /dev/null 2>&1 + + # Check log for table staleness message + if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then + echo "Test 3 PASSED: Write-then-read routing works" + else + echo "Test 3 FAILED: Table staleness not detected after write" + # Show relevant log entries for debugging + grep -i "table_mutation_map" log/pgpool.log | tail -20 + ./shutdownall + exit 1 + fi + + echo "=== Test 4: Clean Table Still Load Balances ===" + # Clear the log + > log/pgpool.log + + # Read from t2 (never written to) - should load balance + $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1 + + # Should NOT see table mutation map blocking message for t2 + if grep -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then + echo "Test 4 FAILED: Clean table incorrectly marked as stale" + ./shutdownall + exit 1 + fi + echo "Test 4 PASSED: Clean tables still load balance" + + echo "=== Test 5: UPDATE Marks Table as Stale ===" + # Clear the log + > log/pgpool.log + + # Update t2 + $PSQL test -c "UPDATE t2 SET i = 999 WHERE i = 0;" > /dev/null 2>&1 + + # Immediately read from t2 - should go to primary + $PSQL test -c "SELECT 'update_test' as marker, * FROM t2;" > /dev/null 2>&1 + + if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then + echo "Test 5 PASSED: UPDATE marks table as stale" + else + echo "Test 5 FAILED: UPDATE did not mark table as stale" + ./shutdownall + exit 1 + fi + + echo "=== Test 6: DELETE Marks Table as Stale ===" + # Clear the log + > log/pgpool.log + + # Delete from t3 + $PSQL test -c "DELETE FROM t3 WHERE i = 0;" > /dev/null 2>&1 + + # Immediately read from t3 - should go to primary + $PSQL test -c "SELECT 'delete_test' as marker, * FROM t3;" > /dev/null 2>&1 + + if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then + echo "Test 6 PASSED: DELETE marks table as stale" + else + echo "Test 6 FAILED: DELETE did not mark table as stale" + ./shutdownall + exit 1 + fi + + echo "=== Test 7: Multi-Table Query with One Stale Table ===" + # Clear the log + > log/pgpool.log + + # Create a new clean table + $PSQL test -c "CREATE TABLE t4(i INTEGER);" > /dev/null 2>&1 + + # Wait a bit for TTL to expire on other tables if factor is low + sleep 1 + + # Write to t1 only + $PSQL test -c "INSERT INTO t1 VALUES (100);" > /dev/null 2>&1 + + # Query joining t1 and t4 - should go to primary because t1 is stale + $PSQL test -c "SELECT 'multi_table_test' as marker FROM t1, t4;" > /dev/null 2>&1 + + if grep -q "could not load balance because table.*t1.*was recently written" log/pgpool.log; then + echo "Test 7 PASSED: Multi-table query routes to primary when one table is stale" + else + echo "Test 7 FAILED: Multi-table staleness not detected" + ./shutdownall + exit 1 + fi + + echo "=== Test 8: Different Databases with Same Table Name ===" + # Create another database and a table with the same name + $PSQL test -c "CREATE DATABASE test2;" > /dev/null 2>&1 + $PSQL test2 -c "CREATE TABLE t1(i INTEGER);" > /dev/null 2>&1 + + # Wait for TTL to expire + sleep 1 + + # Write to t1 in 'test' database + $PSQL test -c "INSERT INTO t1 VALUES (500);" > /dev/null 2>&1 + + # Read from t1 in 'test2' database - should load balance (Node 1) + # because it's a different database, even if table name is same + > log/pgpool.log + $PSQL test2 -c "SELECT 'diff_db_test' as marker, * FROM t1;" > /dev/null 2>&1 + + if grep -q "could not load balance because table.*t1.*was recently written" log/pgpool.log; then + echo "Test 8 FAILED: Table marked as stale in wrong database" + ./shutdownall + exit 1 + fi + + # Read from t1 in 'test' database - should go to primary + $PSQL test -c "SELECT 'same_db_test' as marker, * FROM t1;" > /dev/null 2>&1 + if grep -q "could not load balance because table.*t1.*was recently written" log/pgpool.log; then + echo "Test 8 PASSED: Correctly distinguishes between databases" + else + echo "Test 8 FAILED: Table staleness not detected in correct database" + ./shutdownall + exit 1 + fi + + echo "" + echo "=== All Table Mutation Map Tests PASSED ===" + + ./shutdownall + + cd .. +done + +exit 0 diff --git a/src/utils/pool_table_mutation_map.c b/src/utils/pool_table_mutation_map.c new file mode 100644 index 0000000000000000000000000000000000000000..300c230ad18aa2204a09974d13ecf8e8958ff36f --- /dev/null +++ b/src/utils/pool_table_mutation_map.c @@ -0,0 +1,1166 @@ +/* -*-pgsql-c-*- */ +/* + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2026 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + * pool_table_mutation_map.c: In-memory tracking of recently written tables + * to avoid stale reads from replicas during replication lag + * + * Based on the "lagless" architecture from Tailor Brands: + * https://medium.com/tailor-tech/using-database-read-replicas-in-distributed-systems-d80eaf6bbf8a + */ + +#include +#include +#include +#include + +#include "pool.h" +#include "pool_config.h" +#include "context/pool_session_context.h" +#include "utils/pool_table_mutation_map.h" +#include "utils/elog.h" +#include "utils/pool_ipc.h" +#include "utils/palloc.h" +#include "utils/pool_relcache.h" + +#define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_catalog.pg_database WHERE datname = '%s'" + +/* ---------------- + * Local variables + * ---------------- + */ + +/* Pointer to shared memory structure */ +static TableMutationMapShmem *table_mutation_map_shmem = NULL; + +/* Per-process cold start tracking (not in shared memory) */ +static struct timeval process_start_time; +static bool cold_start_initialized = false; + +/* ---------------- + * Helper macros for accessing flexible arrays in shared memory + * ---------------- + */ + +/* Get pointer to bucket array in table map */ +#define TABLE_MAP_BUCKETS(map) \ + ((int *)((char *)(map) + sizeof(TableMutationHashTable))) + +/* Get pointer to entry array in table map */ +#define TABLE_MAP_ENTRIES(map) \ + ((TableMutationEntry *)((char *)(map) + sizeof(TableMutationHashTable) + \ + (map)->num_buckets * sizeof(int))) + +/* Get pointer to bucket array in query cache */ +#define QUERY_CACHE_BUCKETS(cache) \ + ((int *)((char *)(cache) + sizeof(QueryParseCache))) + +/* Get pointer to entry array in query cache */ +#define QUERY_CACHE_ENTRIES(cache) \ + ((QueryParseEntry *)((char *)(cache) + sizeof(QueryParseCache) + \ + (cache)->num_buckets * sizeof(int))) + +/* ---------------- + * Semaphore lock helpers + * ---------------- + */ + +static inline void +table_map_lock(void) +{ + pool_semaphore_lock(TABLE_MUTATION_MAP_TABLE_SEM); +} + +static inline void +table_map_unlock(void) +{ + pool_semaphore_unlock(TABLE_MUTATION_MAP_TABLE_SEM); +} + +static inline void +query_cache_lock(void) +{ + pool_semaphore_lock(TABLE_MUTATION_MAP_QUERY_SEM); +} + +static inline void +query_cache_unlock(void) +{ + pool_semaphore_unlock(TABLE_MUTATION_MAP_QUERY_SEM); +} + +/* ---------------- + * Hash functions + * ---------------- + */ + +/* + * FNV-1a hash for table/database oid pair + */ +static uint32 +fnv1a_hash_table_key(int table_oid, int dboid) +{ + uint32 hash = 2166136261u; /* FNV offset basis */ + uint32 data[2]; + const unsigned char *bytes; + size_t i; + + data[0] = (uint32) table_oid; + data[1] = (uint32) dboid; + bytes = (const unsigned char *) data; + + for (i = 0; i < sizeof(data); i++) + { + hash ^= bytes[i]; + hash *= 16777619u; /* FNV prime */ + } + + return hash; +} + +/* + * FNV-1a hash for 64-bit value + */ +static uint64 +fnv1a_hash_64(const char *str, size_t len) +{ + uint64 hash = 14695981039346656037ULL; /* FNV offset basis for 64-bit */ + size_t i; + + for (i = 0; i < len; i++) + { + hash ^= (uint8)str[i]; + hash *= 1099511628211ULL; /* FNV prime for 64-bit */ + } + + return hash; +} + +/* ---------------- + * Time utilities + * ---------------- + */ + +/* + * Get elapsed time in microseconds between two timevals + */ +static int64 +elapsed_us(struct timeval *start, struct timeval *end) +{ + return ((int64)(end->tv_sec - start->tv_sec) * 1000000) + + (end->tv_usec - start->tv_usec); +} + +/* + * Get current time + */ +static void +get_current_time(struct timeval *tv) +{ + gettimeofday(tv, NULL); +} + +/* ---------------- + * Database oid lookup + * ---------------- + */ + +static int +table_mutation_map_get_database_oid_internal(void) +{ + int oid = 0; + static POOL_RELCACHE *relcache; + POOL_CONNECTION_POOL *backend; + + backend = pool_get_session_context(false)->backend; + if (backend == NULL || MAIN_CONNECTION(backend) == NULL || MAIN_CONNECTION(backend)->sp == NULL) + return oid; + + if (!relcache) + { + relcache = pool_create_relcache(pool_config->relcache_size, + DATABASE_TO_OID_QUERY, + int_register_func, + int_unregister_func, + false); + if (relcache == NULL) + { + ereport(LOG, + (errmsg("table_mutation_map: error creating relcache while getting database OID"))); + return oid; + } + } + + oid = (int) (intptr_t) pool_search_relcache(relcache, backend, + MAIN_CONNECTION(backend)->sp->database); + return oid; +} + +int +pool_table_mutation_map_get_database_oid(void) +{ + return table_mutation_map_get_database_oid_internal(); +} + +/* ---------------- + * Table mutation hash table operations + * ---------------- + */ + +/* + * Initialize table mutation hash table + */ +static void +table_map_init(TableMutationHashTable *map, int num_buckets, int max_entries) +{ + int *buckets; + TableMutationEntry *entries; + int i; + + map->num_buckets = num_buckets; + map->max_entries = max_entries; + map->num_entries = 0; + map->free_list_head = 0; + + buckets = TABLE_MAP_BUCKETS(map); + entries = TABLE_MAP_ENTRIES(map); + + /* Initialize all buckets to empty */ + for (i = 0; i < num_buckets; i++) + buckets[i] = TABLE_MUTATION_MAP_INVALID_INDEX; + + /* Initialize free list - chain all entries */ + for (i = 0; i < max_entries; i++) + { + entries[i].in_use = false; + entries[i].next = (i < max_entries - 1) ? i + 1 : TABLE_MUTATION_MAP_INVALID_INDEX; + } + + ereport(DEBUG1, + (errmsg("table_mutation_map: initialized table map with %d buckets, %d max entries", + num_buckets, max_entries))); +} + +/* + * Allocate an entry from the free list + */ +static int +table_map_alloc_entry(TableMutationHashTable *map) +{ + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + int idx; + + if (map->free_list_head == TABLE_MUTATION_MAP_INVALID_INDEX) + return TABLE_MUTATION_MAP_INVALID_INDEX; + + idx = map->free_list_head; + map->free_list_head = entries[idx].next; + entries[idx].in_use = true; + entries[idx].next = TABLE_MUTATION_MAP_INVALID_INDEX; + map->num_entries++; + + return idx; +} + +/* + * Free an entry back to the free list + */ +static void +table_map_free_entry(TableMutationHashTable *map, int idx) +{ + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + + entries[idx].in_use = false; + entries[idx].next = map->free_list_head; + map->free_list_head = idx; + map->num_entries--; +} + +/* + * Look up a table in the hash table + * Returns entry index or TABLE_MUTATION_MAP_INVALID_INDEX if not found + * Must be called with lock held + */ +static int +table_map_lookup(TableMutationHashTable *map, int table_oid, int dboid, uint32 hash) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + int bucket = hash % map->num_buckets; + int idx = buckets[bucket]; + + while (idx != TABLE_MUTATION_MAP_INVALID_INDEX) + { + if (entries[idx].hash == hash && + entries[idx].table_oid == table_oid && + entries[idx].dboid == dboid) + { + return idx; + } + idx = entries[idx].next; + } + + return TABLE_MUTATION_MAP_INVALID_INDEX; +} + +/* + * Insert or update a table entry + * Must be called with lock held + */ +static void +table_map_insert(TableMutationHashTable *map, int table_oid, int dboid, + uint32 hash, struct timeval *write_time) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + int bucket = hash % map->num_buckets; + int idx; + + /* Check if entry already exists */ + idx = table_map_lookup(map, table_oid, dboid, hash); + if (idx != TABLE_MUTATION_MAP_INVALID_INDEX) + { + /* Update existing entry */ + entries[idx].last_write_time = *write_time; + return; + } + + /* Allocate new entry */ + idx = table_map_alloc_entry(map); + if (idx == TABLE_MUTATION_MAP_INVALID_INDEX) + { + int b; + /* Table is full - evict an entry */ + /* For simplicity, just use the first entry in first non-empty bucket */ + for (b = 0; b < map->num_buckets; b++) + { + if (buckets[b] != TABLE_MUTATION_MAP_INVALID_INDEX) + { + int victim = buckets[b]; + buckets[b] = entries[victim].next; + table_map_free_entry(map, victim); + idx = table_map_alloc_entry(map); + break; + } + } + + if (idx == TABLE_MUTATION_MAP_INVALID_INDEX) + { + ereport(WARNING, + (errmsg("table_mutation_map: failed to allocate entry for table oid %d (dboid %d)", + table_oid, dboid))); + return; + } + } + + /* Initialize new entry */ + entries[idx].table_oid = table_oid; + entries[idx].dboid = dboid; + entries[idx].hash = hash; + entries[idx].last_write_time = *write_time; + + /* Insert at head of bucket chain */ + entries[idx].next = buckets[bucket]; + buckets[bucket] = idx; + + ereport(DEBUG2, + (errmsg("table_mutation_map: marked table oid %d (dboid %d) as written", + table_oid, dboid))); +} + +/* + * Remove expired entries from the table map + * Must be called with lock held + */ +static void +table_map_cleanup_expired(TableMutationHashTable *map, uint64 ttl_us) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + struct timeval now; + int removed = 0; + int b; + + get_current_time(&now); + + for (b = 0; b < map->num_buckets; b++) + { + int *prev_ptr = &buckets[b]; + int idx = buckets[b]; + + while (idx != TABLE_MUTATION_MAP_INVALID_INDEX) + { + int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now); + + if (elapsed > (int64)ttl_us) + { + /* Entry has expired - remove it */ + int next = entries[idx].next; + *prev_ptr = next; + table_map_free_entry(map, idx); + idx = next; + removed++; + } + else + { + prev_ptr = &entries[idx].next; + idx = entries[idx].next; + } + } + } + + if (removed > 0) + { + ereport(DEBUG1, + (errmsg("table_mutation_map: cleaned up %d expired table entries", removed))); + } +} + +/* ---------------- + * Query parse cache operations + * ---------------- + */ + +/* + * Initialize query parse cache + */ +static void +query_cache_init(QueryParseCache *cache, int num_buckets, int max_entries) +{ + int *buckets; + QueryParseEntry *entries; + int i; + + cache->num_buckets = num_buckets; + cache->max_entries = max_entries; + cache->num_entries = 0; + cache->free_list_head = 0; + cache->lru_head = TABLE_MUTATION_MAP_INVALID_INDEX; + cache->lru_tail = TABLE_MUTATION_MAP_INVALID_INDEX; + + buckets = QUERY_CACHE_BUCKETS(cache); + entries = QUERY_CACHE_ENTRIES(cache); + + /* Initialize all buckets to empty */ + for (i = 0; i < num_buckets; i++) + buckets[i] = TABLE_MUTATION_MAP_INVALID_INDEX; + + /* Initialize free list */ + for (i = 0; i < max_entries; i++) + { + entries[i].in_use = false; + entries[i].next = (i < max_entries - 1) ? i + 1 : TABLE_MUTATION_MAP_INVALID_INDEX; + entries[i].lru_prev = TABLE_MUTATION_MAP_INVALID_INDEX; + entries[i].lru_next = TABLE_MUTATION_MAP_INVALID_INDEX; + } + + ereport(DEBUG1, + (errmsg("table_mutation_map: initialized query cache with %d buckets, %d max entries", + num_buckets, max_entries))); +} + +/* + * Move entry to front of LRU list (most recently used) + */ +static void +query_cache_lru_touch(QueryParseCache *cache, int idx) +{ + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + + /* Already at head? */ + if (cache->lru_head == idx) + return; + + /* Remove from current position */ + if (entries[idx].lru_prev != TABLE_MUTATION_MAP_INVALID_INDEX) + entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next; + if (entries[idx].lru_next != TABLE_MUTATION_MAP_INVALID_INDEX) + entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev; + if (cache->lru_tail == idx) + cache->lru_tail = entries[idx].lru_prev; + + /* Insert at head */ + entries[idx].lru_prev = TABLE_MUTATION_MAP_INVALID_INDEX; + entries[idx].lru_next = cache->lru_head; + if (cache->lru_head != TABLE_MUTATION_MAP_INVALID_INDEX) + entries[cache->lru_head].lru_prev = idx; + cache->lru_head = idx; + if (cache->lru_tail == TABLE_MUTATION_MAP_INVALID_INDEX) + cache->lru_tail = idx; +} + +/* + * Add entry to LRU list (at head) + */ +static void +query_cache_lru_add(QueryParseCache *cache, int idx) +{ + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + + entries[idx].lru_prev = TABLE_MUTATION_MAP_INVALID_INDEX; + entries[idx].lru_next = cache->lru_head; + + if (cache->lru_head != TABLE_MUTATION_MAP_INVALID_INDEX) + entries[cache->lru_head].lru_prev = idx; + + cache->lru_head = idx; + + if (cache->lru_tail == TABLE_MUTATION_MAP_INVALID_INDEX) + cache->lru_tail = idx; +} + +/* + * Remove entry from LRU list + */ +static void +query_cache_lru_remove(QueryParseCache *cache, int idx) +{ + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + + if (entries[idx].lru_prev != TABLE_MUTATION_MAP_INVALID_INDEX) + entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next; + else + cache->lru_head = entries[idx].lru_next; + + if (entries[idx].lru_next != TABLE_MUTATION_MAP_INVALID_INDEX) + entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev; + else + cache->lru_tail = entries[idx].lru_prev; + + entries[idx].lru_prev = TABLE_MUTATION_MAP_INVALID_INDEX; + entries[idx].lru_next = TABLE_MUTATION_MAP_INVALID_INDEX; +} + +/* + * Allocate entry from free list, evicting LRU if necessary + */ +static int +query_cache_alloc_entry(QueryParseCache *cache) +{ + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + int *buckets = QUERY_CACHE_BUCKETS(cache); + int idx; + + if (cache->free_list_head != TABLE_MUTATION_MAP_INVALID_INDEX) + { + idx = cache->free_list_head; + cache->free_list_head = entries[idx].next; + entries[idx].in_use = true; + entries[idx].next = TABLE_MUTATION_MAP_INVALID_INDEX; + cache->num_entries++; + return idx; + } + + /* No free entries - evict LRU */ + if (cache->lru_tail == TABLE_MUTATION_MAP_INVALID_INDEX) + return TABLE_MUTATION_MAP_INVALID_INDEX; + + idx = cache->lru_tail; + + /* Remove from hash bucket */ + int bucket = entries[idx].query_hash % cache->num_buckets; + int *prev_ptr = &buckets[bucket]; + int curr = buckets[bucket]; + + while (curr != TABLE_MUTATION_MAP_INVALID_INDEX) + { + if (curr == idx) + { + *prev_ptr = entries[curr].next; + break; + } + prev_ptr = &entries[curr].next; + curr = entries[curr].next; + } + + /* Remove from LRU list */ + query_cache_lru_remove(cache, idx); + + /* Reinitialize entry */ + entries[idx].in_use = true; + entries[idx].next = TABLE_MUTATION_MAP_INVALID_INDEX; + + return idx; +} + +/* + * Look up a query in the cache + */ +static int +query_cache_lookup(QueryParseCache *cache, uint64 hash) +{ + int *buckets = QUERY_CACHE_BUCKETS(cache); + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + int bucket = hash % cache->num_buckets; + int idx = buckets[bucket]; + + while (idx != TABLE_MUTATION_MAP_INVALID_INDEX) + { + if (entries[idx].query_hash == hash) + return idx; + idx = entries[idx].next; + } + + return TABLE_MUTATION_MAP_INVALID_INDEX; +} + +/* ---------------- + * Query normalization + * ---------------- + */ + +/* + * Simple query normalization: + * - Strip comments (-- style and C-style block comments) + * - Collapse whitespace + * - Convert to lowercase (except inside strings) + * - Replace literal values with placeholders + * + * This is a simplified version - pgpool2 already does this elsewhere, + * but we need a standalone version for the table mutation map feature. + */ +static size_t +normalize_query(const char *query, char *output, size_t output_size) +{ + const char *src = query; + char *dst = output; + char *dst_end = output + output_size - 1; + bool in_string = false; + char string_char = 0; + bool last_was_space = true; /* Start true to skip leading space */ + + while (*src && dst < dst_end) + { + /* Handle string literals */ + if (in_string) + { + if (*src == string_char) + { + if (*(src + 1) == string_char) + { + /* Escaped quote */ + src += 2; + continue; + } + in_string = false; + *dst++ = '$'; /* Replace string content with placeholder */ + } + src++; + continue; + } + + /* Check for string start */ + if (*src == '\'' || *src == '"') + { + in_string = true; + string_char = *src; + src++; + continue; + } + + /* Handle single-line comments */ + if (*src == '-' && *(src + 1) == '-') + { + while (*src && *src != '\n') + src++; + continue; + } + + /* Handle multi-line comments */ + if (*src == '/' && *(src + 1) == '*') + { + src += 2; + while (*src && !(*src == '*' && *(src + 1) == '/')) + src++; + if (*src) + src += 2; + continue; + } + + /* Handle whitespace */ + if (*src == ' ' || *src == '\t' || *src == '\n' || *src == '\r') + { + if (!last_was_space) + { + *dst++ = ' '; + last_was_space = true; + } + src++; + continue; + } + + /* Handle numbers - replace with placeholder */ + if ((*src >= '0' && *src <= '9') || + (*src == '.' && *(src + 1) >= '0' && *(src + 1) <= '9')) + { + while (*src && ((*src >= '0' && *src <= '9') || *src == '.')) + src++; + if (!last_was_space && dst > output && *(dst - 1) != '$') + *dst++ = '$'; + last_was_space = false; + continue; + } + + /* Regular character - convert to lowercase */ + if (*src >= 'A' && *src <= 'Z') + *dst++ = *src + 32; + else + *dst++ = *src; + + last_was_space = false; + src++; + } + + /* Remove trailing space */ + if (dst > output && *(dst - 1) == ' ') + dst--; + + *dst = '\0'; + return dst - output; +} + +/* ---------------- + * Public API implementation + * ---------------- + */ + +Size +pool_table_mutation_map_shmem_size(void) +{ + Size size = 0; + int table_buckets = pool_config->table_mutation_map_table_buckets; + int table_size = pool_config->table_mutation_map_table_size; + int query_buckets = pool_config->table_mutation_map_query_buckets; + int query_cache_size = pool_config->table_mutation_map_query_cache_size; + + /* Main structure */ + size += sizeof(TableMutationMapShmem); + + /* Table mutation hash table */ + size += sizeof(TableMutationHashTable); + size += table_buckets * sizeof(int); /* buckets array */ + size += table_size * sizeof(TableMutationEntry); /* entries array */ + + /* Query parse cache */ + size += sizeof(QueryParseCache); + size += query_buckets * sizeof(int); /* buckets array */ + size += query_cache_size * sizeof(QueryParseEntry); /* entries array */ + + return size; +} + +void +pool_table_mutation_map_init(void) +{ +#ifndef POOL_PRIVATE + Size shmem_size; + char *shmem_ptr; + + if (!pool_config->table_mutation_map_enabled) + { + ereport(DEBUG1, + (errmsg("table_mutation_map: feature disabled"))); + return; + } + + shmem_size = pool_table_mutation_map_shmem_size(); + + /* + * Allocate from the main shared memory segment. + * Memory is already zeroed by initialize_shared_memory_main_segment(). + */ + shmem_ptr = pool_shared_memory_segment_get_chunk(shmem_size); + if (shmem_ptr == NULL) + { + ereport(ERROR, + (errmsg("table_mutation_map: failed to allocate %zu bytes of shared memory", + shmem_size))); + return; + } + + /* Set up pointers to structures within shared memory */ + table_mutation_map_shmem = (TableMutationMapShmem *)shmem_ptr; + shmem_ptr += sizeof(TableMutationMapShmem); + + table_mutation_map_shmem->table_map = (TableMutationHashTable *)shmem_ptr; + shmem_ptr += sizeof(TableMutationHashTable); + shmem_ptr += pool_config->table_mutation_map_table_buckets * sizeof(int); + shmem_ptr += pool_config->table_mutation_map_table_size * sizeof(TableMutationEntry); + + table_mutation_map_shmem->query_cache = (QueryParseCache *)shmem_ptr; + + /* Initialize structures */ + table_map_init(table_mutation_map_shmem->table_map, + pool_config->table_mutation_map_table_buckets, + pool_config->table_mutation_map_table_size); + + query_cache_init(table_mutation_map_shmem->query_cache, + pool_config->table_mutation_map_query_buckets, + pool_config->table_mutation_map_query_cache_size); + + /* Initialize global state */ + table_mutation_map_shmem->state.initialized = true; + table_mutation_map_shmem->state.current_ttl_us = TABLE_MUTATION_MAP_DEFAULT_TTL_US; + get_current_time(&table_mutation_map_shmem->state.ttl_last_updated); + get_current_time(&table_mutation_map_shmem->state.last_cleanup_time); + table_mutation_map_shmem->state.global_cold_start_until.tv_sec = 0; + table_mutation_map_shmem->state.global_cold_start_until.tv_usec = 0; + table_mutation_map_shmem->state.stats_queries_checked = 0; + table_mutation_map_shmem->state.stats_forced_primary = 0; + table_mutation_map_shmem->state.stats_allowed_replica = 0; + + ereport(LOG, + (errmsg("table_mutation_map: initialized with %zu bytes shared memory", + shmem_size))); +#endif +} + +void +pool_table_mutation_map_child_init(void) +{ + if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL) + return; + + get_current_time(&process_start_time); + cold_start_initialized = true; + + ereport(DEBUG1, + (errmsg("table_mutation_map: child initialized, cold start period %d ms", + pool_config->table_mutation_map_cold_start_duration))); +} + +bool +pool_table_mutation_map_in_cold_start(void) +{ + struct timeval now; + int64 elapsed_ms; + + if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL) + return false; + + if (pool_config->table_mutation_map_cold_start_duration <= 0) + return false; + + get_current_time(&now); + + if (table_mutation_map_shmem->state.global_cold_start_until.tv_sec != 0 && + elapsed_us(&now, &table_mutation_map_shmem->state.global_cold_start_until) > 0) + { + return true; + } + + if (!cold_start_initialized) + return false; + + elapsed_ms = elapsed_us(&process_start_time, &now) / 1000; + + if (elapsed_ms < pool_config->table_mutation_map_cold_start_duration) + { + ereport(DEBUG2, + (errmsg("table_mutation_map: in cold start (%ld/%d ms)", + (long)elapsed_ms, pool_config->table_mutation_map_cold_start_duration))); + return true; + } + + return false; +} + +void +pool_table_mutation_map_trigger_global_cold_start(void) +{ + struct timeval now; + int duration_ms; + + if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL) + return; + + duration_ms = pool_config->table_mutation_map_cold_start_duration; + if (duration_ms <= 0) + return; + + get_current_time(&now); + table_mutation_map_shmem->state.global_cold_start_until = now; + table_mutation_map_shmem->state.global_cold_start_until.tv_sec += duration_ms / 1000; + table_mutation_map_shmem->state.global_cold_start_until.tv_usec += (duration_ms % 1000) * 1000; + if (table_mutation_map_shmem->state.global_cold_start_until.tv_usec >= 1000000) + { + table_mutation_map_shmem->state.global_cold_start_until.tv_sec += + table_mutation_map_shmem->state.global_cold_start_until.tv_usec / 1000000; + table_mutation_map_shmem->state.global_cold_start_until.tv_usec %= + 1000000; + } + + ereport(LOG, + (errmsg("table_mutation_map: entering global cold start for %d ms", + duration_ms))); +} + +bool +pool_table_mutation_map_table_is_stale(int table_oid, int dboid) +{ + TableMutationHashTable *map; + struct timeval now; + uint64 ttl_us; + uint32 hash; + int idx; + bool is_stale = false; + + if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL) + return false; + + if (table_oid <= 0 || dboid <= 0) + { + is_stale = true; + goto update_stats; + } + + map = table_mutation_map_shmem->table_map; + hash = fnv1a_hash_table_key(table_oid, dboid); + + table_map_lock(); + + idx = table_map_lookup(map, table_oid, dboid, hash); + if (idx != TABLE_MUTATION_MAP_INVALID_INDEX) + { + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + get_current_time(&now); + ttl_us = table_mutation_map_shmem->state.current_ttl_us; + + int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now); + is_stale = (elapsed < (int64)ttl_us); + + ereport(DEBUG2, + (errmsg("table_mutation_map: table oid %d (dboid %d) elapsed=%ld us, ttl=%lu us, stale=%d", + table_oid, dboid, (long)elapsed, (unsigned long)ttl_us, is_stale))); + } + + table_map_unlock(); + +update_stats: + /* Update statistics */ + __sync_fetch_and_add(&table_mutation_map_shmem->state.stats_queries_checked, 1); + if (is_stale) + __sync_fetch_and_add(&table_mutation_map_shmem->state.stats_forced_primary, 1); + else + __sync_fetch_and_add(&table_mutation_map_shmem->state.stats_allowed_replica, 1); + + return is_stale; +} + +void +pool_table_mutation_map_mark_tables_written(const int *table_oids, int num_tables, int dboid) +{ + TableMutationHashTable *map; + struct timeval now; + int i; + + if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL) + return; + + if (num_tables <= 0 || table_oids == NULL || dboid <= 0) + return; + + map = table_mutation_map_shmem->table_map; + get_current_time(&now); + + table_map_lock(); + + /* Periodically clean up expired entries */ + if (map->num_entries > map->max_entries * 3 / 4) + { + /* Limit cleanup frequency to avoid O(N) scan on every write */ + /* 100ms interval */ + if (elapsed_us(&table_mutation_map_shmem->state.last_cleanup_time, &now) > 100000) + { + table_map_cleanup_expired(map, table_mutation_map_shmem->state.current_ttl_us); + table_mutation_map_shmem->state.last_cleanup_time = now; + } + } + + for (i = 0; i < num_tables; i++) + { + uint32 hash; + int table_oid = table_oids[i]; + + if (table_oid > 0) + { + hash = fnv1a_hash_table_key(table_oid, dboid); + table_map_insert(map, table_oid, dboid, hash, &now); + } + } + + table_map_unlock(); +} + +/* + * Convenience function to mark a single table as written + */ +void +pool_table_mutation_map_mark_table_written(int table_oid, int dboid) +{ + if (table_oid > 0 && dboid > 0) + { + const int tables[1] = { table_oid }; + pool_table_mutation_map_mark_tables_written(tables, 1, dboid); + } +} + +void +pool_table_mutation_map_update_ttl(uint64 delay_us) +{ + uint64 new_ttl; + + if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL) + return; + + /* Calculate new TTL: delay * factor, with minimum of default TTL */ + new_ttl = (uint64)(delay_us * pool_config->table_mutation_map_ttl_factor); + if (new_ttl < TABLE_MUTATION_MAP_DEFAULT_TTL_US) + new_ttl = TABLE_MUTATION_MAP_DEFAULT_TTL_US; + + /* Maximum TTL of 1 hour */ + if (new_ttl > 3600ULL * 1000000ULL) + new_ttl = 3600ULL * 1000000ULL; + + table_mutation_map_shmem->state.current_ttl_us = new_ttl; + get_current_time(&table_mutation_map_shmem->state.ttl_last_updated); + + ereport(DEBUG1, + (errmsg("table_mutation_map: updated TTL to %lu us (delay=%lu us, factor=%.1f)", + (unsigned long)new_ttl, (unsigned long)delay_us, + pool_config->table_mutation_map_ttl_factor))); +} + +bool +pool_table_mutation_map_get_cached_parse(uint64 hash, bool *is_write, + char table_names[][TABLE_MUTATION_MAP_TABLE_NAME_LEN], + int *num_tables) +{ + QueryParseCache *cache; + int idx; + bool found = false; + + if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL) + return false; + + cache = table_mutation_map_shmem->query_cache; + + query_cache_lock(); + + idx = query_cache_lookup(cache, hash); + if (idx != TABLE_MUTATION_MAP_INVALID_INDEX) + { + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + int i; + + *is_write = entries[idx].is_write; + *num_tables = entries[idx].num_tables; + + for (i = 0; i < entries[idx].num_tables && i < TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY; i++) + { + strlcpy(table_names[i], entries[idx].table_names[i], TABLE_MUTATION_MAP_TABLE_NAME_LEN); + } + + /* Move to front of LRU */ + query_cache_lru_touch(cache, idx); + found = true; + } + + query_cache_unlock(); + + return found; +} + +void +pool_table_mutation_map_cache_parse(uint64 hash, bool is_write, + const char table_names[][TABLE_MUTATION_MAP_TABLE_NAME_LEN], + int num_tables) +{ + QueryParseCache *cache; + int *buckets; + QueryParseEntry *entries; + int idx; + int bucket; + + if (!pool_config->table_mutation_map_enabled || table_mutation_map_shmem == NULL) + return; + + cache = table_mutation_map_shmem->query_cache; + + query_cache_lock(); + + /* Check if already exists */ + idx = query_cache_lookup(cache, hash); + if (idx != TABLE_MUTATION_MAP_INVALID_INDEX) + { + query_cache_unlock(); + return; + } + + /* Allocate new entry (may evict LRU) */ + idx = query_cache_alloc_entry(cache); + if (idx == TABLE_MUTATION_MAP_INVALID_INDEX) + { + query_cache_unlock(); + ereport(WARNING, + (errmsg("table_mutation_map: failed to allocate query cache entry"))); + return; + } + + entries = QUERY_CACHE_ENTRIES(cache); + buckets = QUERY_CACHE_BUCKETS(cache); + + /* Fill in entry */ + entries[idx].query_hash = hash; + entries[idx].is_write = is_write; + entries[idx].num_tables = (num_tables > TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY) ? + TABLE_MUTATION_MAP_MAX_TABLES_PER_QUERY : num_tables; + + { + int i; + for (i = 0; i < entries[idx].num_tables; i++) + { + strlcpy(entries[idx].table_names[i], table_names[i], TABLE_MUTATION_MAP_TABLE_NAME_LEN); + } + } + + /* Insert into hash bucket */ + bucket = hash % cache->num_buckets; + entries[idx].next = buckets[bucket]; + buckets[bucket] = idx; + + /* Add to LRU list */ + query_cache_lru_add(cache, idx); + + query_cache_unlock(); +} + +uint64 +pool_table_mutation_map_normalize_and_hash(const char *query) +{ + char normalized[8192]; + size_t len; + + if (query == NULL || query[0] == '\0') + return 0; + + len = normalize_query(query, normalized, sizeof(normalized)); + if (len == 0) + return 0; + + return fnv1a_hash_64(normalized, len); +} -- 2.52.0