From 47551f4c1eae9b6275904d4ead9b24d9a83fda4b Mon Sep 17 00:00:00 2001 From: Nadav Shatz Date: Tue, 6 Jan 2026 12:41:50 +0200 Subject: [PATCH] Feature: add in-memory table tracking to prevent stale reads from replicas MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement "memory map" feature that tracks recently-written database tables in shared memory to prevent stale reads during replication lag. When a write (INSERT/UPDATE/DELETE) occurs on a table, that table is marked as "dirty" for a configurable TTL period. Any SELECT on a dirty table within the TTL window is routed to primary instead of replica. Key features: - Shared memory hash table for tracking table mutations with TTL - Query parse cache with LRU eviction for performance - Cold start protection (routes all queries to primary initially) - Automatic TTL calculation: replication_delay × configurable factor - Per-table staleness tracking with microsecond precision New configuration parameters: - memory_map_enabled: Enable/disable the feature (default: off) - memory_map_ttl_factor: TTL multiplier for replication delay (default: 5.0) - memory_map_cold_start_duration: Cold start period in ms (default: 2000) - memory_map_table_buckets: Hash buckets for table map (default: 1024) - memory_map_table_size: Max tracked tables (default: 2048) - memory_map_query_buckets: Hash buckets for query cache (default: 2048) - memory_map_query_cache_size: Max cached queries (default: 10000) diff --git a/doc/src/sgml/loadbalance.sgml b/doc/src/sgml/loadbalance.sgml index ee19fabebab2210cd4abe59a711a036ac0ac8943..bdc929ee55b94899ffdd90880a741cfbac051aa4 100644 --- a/doc/src/sgml/loadbalance.sgml +++ b/doc/src/sgml/loadbalance.sgml @@ -1193,4 +1193,210 @@ dml_adaptive_object_relationship_list = 'table_1:table_2' + + + Memory Map Configuration (Lagless Replica Reads) + + + These parameters configure the memory map feature, which tracks recently written tables + to prevent stale reads from replica nodes during replication lag. This implements the + "lagless" architecture pattern for distributed systems with read replicas. + + + + When a table is modified (INSERT/UPDATE/DELETE), it is marked as "stale" for a TTL period + (replication_delay * memory_map_ttl_factor). Any SELECT queries on stale tables are routed + to the primary node instead of replicas, ensuring read-after-write consistency. + + + + This feature requires to be configured + for monitoring replication delay from replicas. + + + + + Enabling the memory map feature increases shared memory consumption. With default settings, + the feature requires approximately 6.6 MB of shared memory (0.3 MB for table tracking + 6.3 MB for query cache). + Memory usage scales with configuration parameters: + + + + + Table tracking: memory_map_table_size * 160 bytes (default: 2048 * 160 = ~320 KB) + + + + + Query cache: memory_map_query_cache_size * 640 bytes (default: 10000 * 640 = ~6.3 MB) + + + + + For high-traffic systems with large cache sizes (e.g., memory_map_query_cache_size = 100000), + memory usage can reach 64 MB or more. Consider your system's available shared memory when enabling this feature. + + + + + + + memory_map_enabled (boolean) + + memory_map_enabled configuration parameter + + + + + Enables in-memory tracking of recently written tables. When enabled, tables are marked + as stale after write operations, and reads are routed to primary until the TTL expires. + + + This parameter can be changed by reloading the Pgpool-II configurations. + Default is off. + + + + + + memory_map_ttl_factor (floating point) + + memory_map_ttl_factor configuration parameter + + + + + Multiplier for calculating the TTL: TTL = replication_delay * memory_map_ttl_factor. + Higher values provide more safety margin but may reduce read replica utilization. + + + Valid range: 1.0-100.0. Default is 5.0. + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + memory_map_cold_start_duration (integer) + + memory_map_cold_start_duration configuration parameter + + + + + Duration in milliseconds to route all queries to primary after a child process starts. + This prevents stale reads when a new connection is established before the memory map + is populated with recent write history. + + + Valid range: 0-60000 ms. Default is 2000 (2 seconds). + Set to 0 to disable cold start behavior. + This parameter can be changed by reloading the Pgpool-II configurations. + + + + + + memory_map_table_buckets (integer) + + memory_map_table_buckets configuration parameter + + + + + Number of hash buckets for the table mutation tracking hash table. + Higher values reduce hash collisions and improve lookup performance. + + + Valid range: 64-65536. Default is 1024. + This parameter can only be set at server start. + + + + + + memory_map_table_size (integer) + + memory_map_table_size configuration parameter + + + + + Maximum number of tables that can be tracked simultaneously in the memory map. + When full, oldest entries are evicted using a simple eviction strategy. + + + Valid range: 128-131072. Default is 2048. + Memory usage: approximately 160 bytes per entry. + This parameter can only be set at server start. + + + + + + memory_map_query_buckets (integer) + + memory_map_query_buckets configuration parameter + + + + + Number of hash buckets for the query parse cache. The cache stores normalized + query strings mapped to their table dependencies to avoid repeated parsing. + + + Valid range: 64-65536. Default is 2048. + This parameter can only be set at server start. + + + + + + memory_map_query_cache_size (integer) + + memory_map_query_cache_size configuration parameter + + + + + Maximum number of query parse results to cache. Uses LRU eviction when full. + Larger caches reduce parsing overhead but consume more shared memory. + + + Valid range: 100-1000000. Default is 10000. + Memory usage: approximately 640 bytes per entry (~6.3 MB for default, ~64 MB for 100000 entries). + This parameter can only be set at server start. + + + + + + + + Memory Map Configuration Example + + To enable memory map with replication delay monitoring: + + +# Enable memory map feature +memory_map_enabled = on +memory_map_ttl_factor = 5.0 +memory_map_cold_start_duration = 2000 + +# Configure external replication delay monitoring +replication_delay_source_cmd = '/path/to/get-replication-delay.sh' +replication_delay_source_timeout = 10 + +# Adjust cache sizes based on workload (increases memory usage) +memory_map_table_size = 4096 # Track up to 4096 tables (~640 KB) +memory_map_query_cache_size = 50000 # Cache 50k queries (~31 MB) + + + Total shared memory required for above configuration: approximately 32 MB (31 MB query cache + 0.6 MB table map + overhead). + Default configuration (10000 query cache entries, 2048 tables) requires approximately 6.6 MB. + + + + + diff --git a/src/Makefile.am b/src/Makefile.am index 4678ab53055e828a37b6477801640aff17ff84a7..51896ae07771fc00382ab965eaf3807c8b5f3d94 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -35,6 +35,7 @@ pgpool_SOURCES = main/main.c \ rewrite/pool_timestamp.c \ rewrite/pool_lobj.c \ utils/pool_select_walker.c \ + utils/pool_memory_map.c \ utils/strlcpy.c \ utils/psprintf.c \ utils/pool_params.c \ diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c index 68abb7f41cb96d856c824a148842748bfb7a4d12..d9a28e7ec3369ff799cb37c37c0cd05075327606 100644 --- a/src/config/pool_config_variables.c +++ b/src/config/pool_config_variables.c @@ -783,6 +783,16 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"memory_map_enabled", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "Enable in-memory tracking of recently written tables to avoid stale reads from replicas", + CONFIG_VAR_TYPE_BOOL, false, 0 + }, + &g_pool_config.memory_map_enabled, + false, + NULL, NULL, NULL + }, + { {"auto_failback", CFGCXT_RELOAD, FAILOVER_CONFIG, "Enables nodes automatically reattach, when detached node continue streaming replication.", @@ -1757,6 +1767,17 @@ static struct config_int_array ConfigureNamesIntArray[] = static struct config_double ConfigureNamesDouble[] = { + { + {"memory_map_ttl_factor", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "TTL multiplier for memory map (TTL = replication_delay * factor)", + CONFIG_VAR_TYPE_DOUBLE, false, 0 + }, + &g_pool_config.memory_map_ttl_factor, + 5.0, /* boot value: 5x replication delay */ + 1.0, 100.0, /* min, max */ + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_DOUBLE }; @@ -2355,6 +2376,61 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"memory_map_cold_start_duration", CFGCXT_RELOAD, LOAD_BALANCE_CONFIG, + "Duration in milliseconds to force queries to primary after child process starts.", + CONFIG_VAR_TYPE_INT, false, GUC_UNIT_MS + }, + &g_pool_config.memory_map_cold_start_duration, + 2000, /* 2 seconds */ + 0, 60000, /* 0 to 60 seconds */ + NULL, NULL, NULL + }, + + { + {"memory_map_table_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Number of hash buckets for table mutation map.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.memory_map_table_buckets, + 1024, + 64, 65536, + NULL, NULL, NULL + }, + + { + {"memory_map_table_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Maximum number of entries in table mutation map.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.memory_map_table_size, + 2048, + 128, 131072, + NULL, NULL, NULL + }, + + { + {"memory_map_query_buckets", CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Number of hash buckets for query parse cache.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.memory_map_query_buckets, + 2048, + 64, 65536, + NULL, NULL, NULL + }, + + { + {"memory_map_query_cache_size", CFGCXT_INIT, LOAD_BALANCE_CONFIG, + "Maximum number of entries in query parse cache.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.memory_map_query_cache_size, + 10000, + 100, 1000000, + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_INT }; diff --git a/src/context/pool_query_context.c b/src/context/pool_query_context.c index 1a13168c6e8d3f0064dfce4ee6e4661eee69304e..47e5f2796f809dcf3208edd7d0a2bcf8dda83260 100644 --- a/src/context/pool_query_context.c +++ b/src/context/pool_query_context.c @@ -29,6 +29,7 @@ #include "utils/statistics.h" #include "utils/pool_select_walker.h" #include "utils/pool_stream.h" +#include "utils/pool_memory_map.h" #include "context/pool_session_context.h" #include "context/pool_query_context.h" #include "parser/nodes.h" @@ -2135,6 +2136,92 @@ where_to_send_main_replica(POOL_QUERY_CONTEXT *query_context, char *query, Node { pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); } + /* + * Check memory map for recently written tables. + * If in cold start or any table was recently written, + * route to primary to avoid stale reads. + */ + else if (pool_config->memory_map_enabled) + { + bool force_primary = false; + + /* During cold start, route everything to primary */ + if (pool_memory_map_in_cold_start()) + { + ereport(DEBUG1, + (errmsg("could not load balance because of memory map cold start"), + errdetail("destination = PRIMARY for query= \"%s\"", query))); + force_primary = true; + } + else + { + /* Extract table names and check if any are stale */ + SelectContext ctx; + int num_oids; + int i; + + memset(&ctx, 0, sizeof(ctx)); + num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx); + + for (i = 0; i < num_oids; i++) + { + if (pool_memory_map_table_is_stale(ctx.table_names[i])) + { + ereport(DEBUG1, + (errmsg("could not load balance because table \"%s\" was recently written", + ctx.table_names[i]), + errdetail("destination = PRIMARY for query= \"%s\"", query))); + force_primary = true; + break; + } + } + } + + if (force_primary) + { + pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); + } + else + { + /* Proceed with load balancing */ + if (pool_config->statement_level_load_balance) + { + session_context->load_balance_node_id = select_load_balancing_node(); + } + + /* + * As streaming replication delay is too much, if + * prefer_lower_delay_standby is true then elect new load + * balance node which is lowest delayed, false then send + * to the primary. + */ + if (STREAM && check_replication_delay(session_context->load_balance_node_id)) + { + ereport(DEBUG1, + (errmsg("could not load balance because of too much replication delay"), + errdetail("destination = %d for query= \"%s\"", dest, query))); + + if (pool_config->prefer_lower_delay_standby) + { + int new_load_balancing_node = select_load_balancing_node(); + + session_context->load_balance_node_id = new_load_balancing_node; + session_context->query_context->load_balance_node_id = session_context->load_balance_node_id; + pool_set_node_to_be_sent(query_context, session_context->query_context->load_balance_node_id); + } + else + { + pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID); + } + } + else + { + session_context->query_context->load_balance_node_id = session_context->load_balance_node_id; + pool_set_node_to_be_sent(query_context, + session_context->query_context->load_balance_node_id); + } + } + } else { if (pool_config->statement_level_load_balance) diff --git a/src/include/pool_config.h b/src/include/pool_config.h index 741de6cc5fc3368f813d6b6efa68eb7f8a79506b..9675c1b65d9bae83c6412c1f1f3399364932221f 100644 --- a/src/include/pool_config.h +++ b/src/include/pool_config.h @@ -365,6 +365,16 @@ typedef struct * replication check */ char *replication_delay_source_cmd; /* external command for replication delay */ int replication_delay_source_timeout; /* timeout for external command in seconds */ + + /* Memory map configuration for tracking recently written tables */ + bool memory_map_enabled; /* Enable in-memory table tracking */ + double memory_map_ttl_factor; /* TTL multiplier for replication delay */ + int memory_map_cold_start_duration; /* Cold start duration in ms */ + int memory_map_table_buckets; /* Number of hash buckets for table map */ + int memory_map_table_size; /* Max entries in table map */ + int memory_map_query_buckets; /* Number of hash buckets for query cache */ + int memory_map_query_cache_size; /* Max entries in query cache */ + char *failover_command; /* execute command when failover happens */ char *follow_primary_command; /* execute command when failover is * ended */ diff --git a/src/include/utils/pool_memory_map.h b/src/include/utils/pool_memory_map.h new file mode 100644 index 0000000000000000000000000000000000000000..511d7a45e7dbd417b1e49b9211fb994f29af1a08 --- /dev/null +++ b/src/include/utils/pool_memory_map.h @@ -0,0 +1,236 @@ +/* -*-pgsql-c-*- */ +/* + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2026 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + * pool_memory_map.h: In-memory tracking of recently written tables + * to avoid stale reads from replicas during replication lag + */ + +#ifndef POOL_MEMORY_MAP_H +#define POOL_MEMORY_MAP_H + +#include "pool.h" +#include + +/* + * Maximum table name length including schema: "schema"."table" + * Using NAMEDATALEN * 2 + 4 for quotes and dot + */ +#define MEMORY_MAP_TABLE_NAME_LEN (NAMEDATALEN * 2 + 4) + +/* + * Maximum number of tables we track per query + */ +#define MEMORY_MAP_MAX_TABLES_PER_QUERY 8 + +/* + * Invalid index marker for linked lists + */ +#define MEMORY_MAP_INVALID_INDEX (-1) + +/* + * Default TTL in microseconds (100ms) used when replication delay is unknown + */ +#define MEMORY_MAP_DEFAULT_TTL_US (100 * 1000) + +/* + * Entry in the table mutation hash table + */ +typedef struct TableMutationEntry +{ + char table_name[MEMORY_MAP_TABLE_NAME_LEN]; /* "schema"."table" */ + struct timeval last_write_time; /* When the table was last written */ + uint32 hash; /* Pre-computed hash value */ + int next; /* Next entry in collision chain (-1 if none) */ + bool in_use; /* Is this entry in use? */ +} TableMutationEntry; + +/* + * Header for the table mutation hash table in shared memory + */ +typedef struct TableMutationHashTable +{ + int num_buckets; /* Number of hash buckets */ + int max_entries; /* Maximum entries allowed */ + int num_entries; /* Current number of entries */ + int free_list_head; /* Head of free entry list */ + volatile uint32 lock; /* Spinlock for thread-safe access */ + /* Flexible array members follow in shared memory: + * int buckets[num_buckets]; + * TableMutationEntry entries[max_entries]; + */ +} TableMutationHashTable; + +/* + * Entry in the query parse cache + */ +typedef struct QueryParseEntry +{ + uint64 query_hash; /* Hash of normalized query */ + bool is_write; /* True if INSERT/UPDATE/DELETE */ + int num_tables; /* Number of tables in query */ + char table_names[MEMORY_MAP_MAX_TABLES_PER_QUERY][MEMORY_MAP_TABLE_NAME_LEN]; + int next; /* Next entry in collision chain */ + int lru_prev; /* Previous in LRU list */ + int lru_next; /* Next in LRU list */ + bool in_use; /* Is this entry in use? */ +} QueryParseEntry; + +/* + * Header for the query parse cache in shared memory + */ +typedef struct QueryParseCache +{ + int num_buckets; /* Number of hash buckets */ + int max_entries; /* Maximum entries allowed */ + int num_entries; /* Current number of entries */ + int free_list_head; /* Head of free entry list */ + int lru_head; /* Most recently used */ + int lru_tail; /* Least recently used */ + volatile uint32 lock; /* Spinlock for thread-safe access */ + /* Flexible array members follow in shared memory: + * int buckets[num_buckets]; + * QueryParseEntry entries[max_entries]; + */ +} QueryParseCache; + +/* + * Global state for memory map feature + */ +typedef struct MemoryMapState +{ + bool initialized; /* Has shared memory been initialized? */ + uint64 current_ttl_us; /* Current TTL in microseconds */ + struct timeval ttl_last_updated; /* When TTL was last updated */ + volatile uint32 stats_queries_checked; /* Number of queries checked */ + volatile uint32 stats_forced_primary; /* Queries forced to primary */ + volatile uint32 stats_allowed_replica; /* Queries allowed to replica */ +} MemoryMapState; + +/* + * Main shared memory structure containing all components + */ +typedef struct MemoryMapShmem +{ + MemoryMapState state; + TableMutationHashTable *table_map; + QueryParseCache *query_cache; +} MemoryMapShmem; + +/* ---------------- + * Public API functions + * ---------------- + */ + +/* + * Initialize shared memory structures for memory map. + * Called from pgpool_main.c after pool_init_pool_info(). + */ +extern void pool_memory_map_init(void); + +/* + * Initialize per-child process state for memory map. + * Called from child.c when a new child process starts. + * Sets up cold start tracking. + */ +extern void pool_memory_map_child_init(void); + +/* + * Check if the child process is in cold start period. + * During cold start, all queries are routed to primary. + * Returns true if in cold start, false otherwise. + */ +extern bool pool_memory_map_in_cold_start(void); + +/* + * Check if a table was recently written to (is "stale"). + * If stale, reads from this table should go to primary. + * Returns true if table is stale (recently written), false otherwise. + */ +extern bool pool_memory_map_table_is_stale(const char *table_name); + +/* + * Mark tables as recently written. + * Called after INSERT/UPDATE/DELETE queries complete. + * table_names: array of table names + * num_tables: number of tables in array + */ +extern void pool_memory_map_mark_tables_written(const char **table_names, int num_tables); + +/* + * Convenience function to mark a single table as written. + * table_name: fully qualified table name + */ +extern void pool_memory_map_mark_table_written(const char *table_name); + +/* + * Update the TTL based on current replication delay. + * Called from pool_worker_child.c when replication delay is updated. + * delay_us: replication delay in microseconds + */ +extern void pool_memory_map_update_ttl(uint64 delay_us); + +/* + * Look up cached parse result for a query. + * hash: hash of normalized query + * is_write: output - true if query is a write + * table_names: output - array to fill with table names + * num_tables: output - number of tables found + * Returns true if found in cache, false otherwise. + */ +extern bool pool_memory_map_get_cached_parse(uint64 hash, bool *is_write, + char table_names[][MEMORY_MAP_TABLE_NAME_LEN], + int *num_tables); + +/* + * Cache a parse result for a query. + * hash: hash of normalized query + * is_write: true if query is a write + * table_names: array of table names + * num_tables: number of tables + */ +extern void pool_memory_map_cache_parse(uint64 hash, bool is_write, + const char table_names[][MEMORY_MAP_TABLE_NAME_LEN], + int num_tables); + +/* + * Normalize a query and compute its hash. + * Strips comments, normalizes whitespace and literals. + * query: input SQL query string + * Returns: 64-bit hash of normalized query + */ +extern uint64 pool_memory_map_normalize_and_hash(const char *query); + +/* + * Get the current TTL in microseconds. + */ +extern uint64 pool_memory_map_get_ttl(void); + +/* + * Calculate required shared memory size for memory map. + */ +extern Size pool_memory_map_shmem_size(void); + +/* + * Get memory map statistics for monitoring. + */ +extern void pool_memory_map_get_stats(uint32 *queries_checked, + uint32 *forced_primary, + uint32 *allowed_replica, + uint64 *current_ttl_us); + +#endif /* POOL_MEMORY_MAP_H */ diff --git a/src/main/pgpool_main.c b/src/main/pgpool_main.c index 4d88c5815ea253471167dfe7e5bf39f0270323ec..f4a14c84db99100fb761168c14e77b2f2b9eff4b 100644 --- a/src/main/pgpool_main.c +++ b/src/main/pgpool_main.c @@ -57,6 +57,7 @@ #include "auth/pool_passwd.h" #include "auth/pool_hba.h" #include "query_cache/pool_memqcache.h" +#include "utils/pool_memory_map.h" #include "watchdog/wd_internal_commands.h" #include "watchdog/wd_lifecheck.h" #include "watchdog/watchdog.h" @@ -3065,6 +3066,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps) elog(DEBUG1, "watchdog: %zu bytes requested for shared memory", MAXALIGN(wd_ipc_get_shared_mem_size())); } + if (pool_config->memory_map_enabled) + { + size += MAXALIGN(pool_memory_map_shmem_size()); + elog(DEBUG1, "memory_map: %zu bytes requested for shared memory", MAXALIGN(pool_memory_map_shmem_size())); + } + initialize_shared_memory_main_segment(size); /* Move the backend descriptors to shared memory */ @@ -3181,6 +3188,12 @@ initialize_shared_mem_objects(bool clear_memcache_oidmaps) wd_ipc_initialize_data(); } + /* Initialize memory map for tracking recently written tables */ + if (pool_config->memory_map_enabled) + { + pool_memory_map_init(); + } + } /* diff --git a/src/protocol/CommandComplete.c b/src/protocol/CommandComplete.c index a3b8f0ea194ffecc79e58566be80562a46eb75ab..9b0681ca46ac2602d3f541ad3119770d422fb0c3 100644 --- a/src/protocol/CommandComplete.c +++ b/src/protocol/CommandComplete.c @@ -38,6 +38,8 @@ #include "utils/palloc.h" #include "utils/memutils.h" #include "utils/pool_stream.h" +#include "utils/pool_memory_map.h" +#include "utils/pool_select_walker.h" static int extract_ntuples(char *message); static POOL_STATUS handle_mismatch_tuples(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, char *packet, int packetlen, bool command_complete); @@ -304,6 +306,38 @@ handle_query_context(POOL_CONNECTION_POOL *backend) node = session_context->query_context->parse_tree; + /* + * Track table writes for memory map feature. + * Mark tables as written when INSERT/UPDATE/DELETE completes. + */ + if (pool_config->memory_map_enabled) + { + char *table_name = NULL; + + if (IsA(node, InsertStmt)) + { + InsertStmt *stmt = (InsertStmt *) node; + table_name = make_table_name_from_rangevar(stmt->relation); + } + else if (IsA(node, UpdateStmt)) + { + UpdateStmt *stmt = (UpdateStmt *) node; + table_name = make_table_name_from_rangevar(stmt->relation); + } + else if (IsA(node, DeleteStmt)) + { + DeleteStmt *stmt = (DeleteStmt *) node; + table_name = make_table_name_from_rangevar(stmt->relation); + } + + if (table_name != NULL) + { + pool_memory_map_mark_table_written(table_name); + ereport(DEBUG1, + (errmsg("memory map: marked table \"%s\" as written", table_name))); + } + } + if (IsA(node, PrepareStmt)) { if (session_context->uncompleted_message) diff --git a/src/protocol/child.c b/src/protocol/child.c index c34f057281be62feaf39db1bb605062f56dc398c..07ee58c6a48dcd3ef6d79970e08a6f77b8924e1d 100644 --- a/src/protocol/child.c +++ b/src/protocol/child.c @@ -57,6 +57,7 @@ #include "utils/elog.h" #include "utils/ps_status.h" #include "utils/timestamp.h" +#include "utils/pool_memory_map.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" @@ -213,6 +214,12 @@ do_child(int *fds) /* Initialize per process context */ pool_init_process_context(); + /* Initialize memory map child state for cold start tracking */ + if (pool_config->memory_map_enabled) + { + pool_memory_map_child_init(); + } + /* initialize connection pool */ if (pool_init_cp()) { diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream index 454fdb9e5d1fd65437b6a67f12ab62658ea08f49..a245d58bf3339913602143da1b83b964fe5dcaeb 100644 --- a/src/sample/pgpool.conf.sample-stream +++ b/src/sample/pgpool.conf.sample-stream @@ -499,6 +499,51 @@ backend_clustering_mode = streaming_replication #statement_level_load_balance = off # Enables statement level load balancing +# - Memory Map (Lagless Read Replica) - + # WARNING: Enabling this feature increases shared memory usage + # Default settings require ~6.6 MB shared memory + # (0.3 MB table tracking + 6.3 MB query cache) + +#memory_map_enabled = off + # Enable in-memory tracking of recently written tables + # to prevent stale reads from replicas during replication lag + # (change requires reload) + +#memory_map_ttl_factor = 5.0 + # TTL multiplier: TTL = replication_delay * factor + # Higher values provide more safety margin + # Range: 1.0-100.0 (default: 5.0) + # (change requires reload) + +#memory_map_cold_start_duration = 2000 + # Duration in milliseconds to route all queries to primary + # after child process starts (cold start period) + # Range: 0-60000 ms (default: 2000 ms = 2 seconds) + # Set to 0 to disable cold start behavior + # (change requires reload) + +#memory_map_table_buckets = 1024 + # Number of hash buckets for table mutation tracking + # Higher values reduce hash collisions + # Range: 64-65536 (default: 1024) + # (change requires restart) + +#memory_map_table_size = 2048 + # Maximum number of tables to track simultaneously + # Range: 128-131072 (default: 2048) + # (change requires restart) + +#memory_map_query_buckets = 2048 + # Number of hash buckets for query parse cache + # Range: 64-65536 (default: 2048) + # (change requires restart) + +#memory_map_query_cache_size = 10000 + # Maximum number of query parse results to cache + # Range: 100-1000000 (default: 10000) + # Memory usage: ~640 bytes per entry (~6.3 MB default, ~64 MB for 100000) + # (change requires restart) + #------------------------------------------------------------------------------ # STREAMING REPLICATION MODE #------------------------------------------------------------------------------ diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c index 7026f0b1f0de7b9018ac912fac850f91d1c2978b..7dfce4946e268e120471db760440155787f84515 100644 --- a/src/streaming_replication/pool_worker_child.c +++ b/src/streaming_replication/pool_worker_child.c @@ -58,6 +58,7 @@ #include "utils/pool_ip.h" #include "utils/ps_status.h" #include "utils/pool_stream.h" +#include "utils/pool_memory_map.h" #include "context/pool_process_context.h" #include "context/pool_session_context.h" @@ -696,6 +697,7 @@ check_replication_time_lag_with_cmd(void) double delay_ms; uint64 delay; uint64 delay_threshold_by_time; + uint64 max_delay_us = 0; /* Track maximum delay for memory map */ int token_count = 0; int primary_node_id; int save_errno; @@ -1032,6 +1034,10 @@ check_replication_time_lag_with_cmd(void) bkinfo->standby_delay = delay; bkinfo->standby_delay_by_time = true; + /* Track maximum delay for memory map TTL calculation */ + if (delay > max_delay_us) + max_delay_us = delay; + /* Log delay if necessary */ delay_threshold_by_time = pool_config->delay_threshold_by_time * 1000; /* threshold is in * milliseconds, convert @@ -1049,6 +1055,10 @@ check_replication_time_lag_with_cmd(void) token = strtok_r(NULL, " \t\n", &saveptr); } + /* Update memory map TTL based on maximum observed delay */ + if (pool_config->memory_map_enabled && max_delay_us > 0) + pool_memory_map_update_ttl(max_delay_us); + } PG_CATCH(); { diff --git a/src/test/regression/tests/045.memory_map/test.sh b/src/test/regression/tests/045.memory_map/test.sh new file mode 100755 index 0000000000000000000000000000000000000000..ce05418262664e5133e2ffd478c7ae622b062cc7 --- /dev/null +++ b/src/test/regression/tests/045.memory_map/test.sh @@ -0,0 +1,196 @@ +#!/usr/bin/env bash +#------------------------------------------------------------------- +# test script for memory map feature (in-memory table tracking). +# Tests routing of queries based on recently written tables. +# +source $TESTLIBS +TESTDIR=testdir +PSQL=$PGBIN/psql +PSQLOPTS="-a -q -X" +PGPOOLBIN=$PGPOOL_INSTALL_DIR/bin +export PGDATABASE=test + +# Only run in streaming replication mode since that's the target use case +for mode in s +do + rm -fr $TESTDIR + mkdir $TESTDIR + cd $TESTDIR + + # Create test environment with 2 nodes + echo -n "creating test environment..." + $PGPOOL_SETUP -m $mode -n 2 || exit 1 + echo "done." + + source ./bashrc.ports + + # Configure memory map feature + echo "memory_map_enabled = on" >> etc/pgpool.conf + echo "memory_map_ttl_factor = 5.0" >> etc/pgpool.conf + echo "memory_map_cold_start_duration = 2000" >> etc/pgpool.conf + + # Configure weights so we can distinguish routing + # Backend 0 (primary) weight=0, Backend 1 (standby) weight=1 + # This means load balanced queries go to node 1 by default + echo "backend_weight0 = 0" >> etc/pgpool.conf + echo "backend_weight1 = 1" >> etc/pgpool.conf + + # Enable debug logging to see routing decisions + echo "log_min_messages = debug1" >> etc/pgpool.conf + + ./startall + + export PGPORT=$PGPOOL_PORT + + wait_for_pgpool_startup + + # Create test tables + $PSQL test < /dev/null 2>&1 + + # Check log for cold start message + if grep -q "could not load balance because of memory map cold start" log/pgpool.log; then + echo "Test 1 PASSED: Cold start routing works" + else + echo "Test 1 FAILED: Cold start routing not detected" + ./shutdownall + exit 1 + fi + + echo "=== Test 2: Wait for cold start to end ===" + # Wait for cold start period to end (2 seconds) + sleep 3 + + # Clear the log + > log/pgpool.log + + # Now a clean table query should load balance (go to node 1) + $PSQL test -c "SELECT 'after_cold_start' as marker, * FROM t3;" > /dev/null 2>&1 + + # After cold start, queries to clean tables should load balance + # Check that it did NOT get forced to primary due to memory map + if grep -q "could not load balance because of memory map cold start" log/pgpool.log; then + echo "Test 2 FAILED: Still in cold start after waiting" + ./shutdownall + exit 1 + fi + echo "Test 2 PASSED: Cold start ended correctly" + + echo "=== Test 3: Write-then-Read Routing ===" + # Clear the log + > log/pgpool.log + + # Write to t1 + $PSQL test -c "INSERT INTO t1 VALUES (1);" > /dev/null 2>&1 + + # Immediately read from t1 - should go to primary due to recent write + $PSQL test -c "SELECT 'write_read_test' as marker, * FROM t1;" > /dev/null 2>&1 + + # Check log for table staleness message + if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then + echo "Test 3 PASSED: Write-then-read routing works" + else + echo "Test 3 FAILED: Table staleness not detected after write" + # Show relevant log entries for debugging + grep -i "memory" log/pgpool.log | tail -20 + ./shutdownall + exit 1 + fi + + echo "=== Test 4: Clean Table Still Load Balances ===" + # Clear the log + > log/pgpool.log + + # Read from t2 (never written to) - should load balance + $PSQL test -c "SELECT 'clean_table_test' as marker, * FROM t2;" > /dev/null 2>&1 + + # Should NOT see memory map blocking message for t2 + if grep -q "could not load balance because table.*t2.*was recently written" log/pgpool.log; then + echo "Test 4 FAILED: Clean table incorrectly marked as stale" + ./shutdownall + exit 1 + fi + echo "Test 4 PASSED: Clean tables still load balance" + + echo "=== Test 5: UPDATE Marks Table as Stale ===" + # Clear the log + > log/pgpool.log + + # Update t2 + $PSQL test -c "UPDATE t2 SET i = 999 WHERE i = 0;" > /dev/null 2>&1 + + # Immediately read from t2 - should go to primary + $PSQL test -c "SELECT 'update_test' as marker, * FROM t2;" > /dev/null 2>&1 + + if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then + echo "Test 5 PASSED: UPDATE marks table as stale" + else + echo "Test 5 FAILED: UPDATE did not mark table as stale" + ./shutdownall + exit 1 + fi + + echo "=== Test 6: DELETE Marks Table as Stale ===" + # Clear the log + > log/pgpool.log + + # Delete from t3 + $PSQL test -c "DELETE FROM t3 WHERE i = 0;" > /dev/null 2>&1 + + # Immediately read from t3 - should go to primary + $PSQL test -c "SELECT 'delete_test' as marker, * FROM t3;" > /dev/null 2>&1 + + if grep -q "could not load balance because table.*was recently written" log/pgpool.log; then + echo "Test 6 PASSED: DELETE marks table as stale" + else + echo "Test 6 FAILED: DELETE did not mark table as stale" + ./shutdownall + exit 1 + fi + + echo "=== Test 7: Multi-Table Query with One Stale Table ===" + # Clear the log + > log/pgpool.log + + # Create a new clean table + $PSQL test -c "CREATE TABLE t4(i INTEGER);" > /dev/null 2>&1 + + # Wait a bit for TTL to expire on other tables if factor is low + sleep 1 + + # Write to t1 only + $PSQL test -c "INSERT INTO t1 VALUES (100);" > /dev/null 2>&1 + + # Query joining t1 and t4 - should go to primary because t1 is stale + $PSQL test -c "SELECT 'multi_table_test' as marker FROM t1, t4;" > /dev/null 2>&1 + + if grep -q "could not load balance because table.*t1.*was recently written" log/pgpool.log; then + echo "Test 7 PASSED: Multi-table query routes to primary when one table is stale" + else + echo "Test 7 FAILED: Multi-table staleness not detected" + ./shutdownall + exit 1 + fi + + echo "" + echo "=== All Memory Map Tests PASSED ===" + + ./shutdownall + + cd .. +done + +exit 0 diff --git a/src/utils/pool_memory_map.c b/src/utils/pool_memory_map.c new file mode 100644 index 0000000000000000000000000000000000000000..3f00ec1e2afef6518532804391633175fd351811 --- /dev/null +++ b/src/utils/pool_memory_map.c @@ -0,0 +1,1076 @@ +/* -*-pgsql-c-*- */ +/* + * pgpool: a language independent connection pool server for PostgreSQL + * written by Tatsuo Ishii + * + * Copyright (c) 2003-2026 PgPool Global Development Group + * + * Permission to use, copy, modify, and distribute this software and + * its documentation for any purpose and without fee is hereby + * granted, provided that the above copyright notice appear in all + * copies and that both that copyright notice and this permission + * notice appear in supporting documentation, and that the name of the + * author not be used in advertising or publicity pertaining to + * distribution of the software without specific, written prior + * permission. The author makes no representations about the + * suitability of this software for any purpose. It is provided "as + * is" without express or implied warranty. + * + * pool_memory_map.c: In-memory tracking of recently written tables + * to avoid stale reads from replicas during replication lag + * + * Based on the "lagless" architecture from Tailor Brands: + * https://medium.com/tailor-tech/using-database-read-replicas-in-distributed-systems-d80eaf6bbf8a + */ + +#include +#include +#include +#include + +#include "pool.h" +#include "pool_config.h" +#include "utils/pool_memory_map.h" +#include "utils/elog.h" +#include "utils/palloc.h" + +/* ---------------- + * Local variables + * ---------------- + */ + +/* Pointer to shared memory structure */ +static MemoryMapShmem *memory_map_shmem = NULL; + +/* Per-process cold start tracking (not in shared memory) */ +static struct timeval process_start_time; +static bool cold_start_initialized = false; + +/* ---------------- + * Helper macros for accessing flexible arrays in shared memory + * ---------------- + */ + +/* Get pointer to bucket array in table map */ +#define TABLE_MAP_BUCKETS(map) \ + ((int *)((char *)(map) + sizeof(TableMutationHashTable))) + +/* Get pointer to entry array in table map */ +#define TABLE_MAP_ENTRIES(map) \ + ((TableMutationEntry *)((char *)(map) + sizeof(TableMutationHashTable) + \ + (map)->num_buckets * sizeof(int))) + +/* Get pointer to bucket array in query cache */ +#define QUERY_CACHE_BUCKETS(cache) \ + ((int *)((char *)(cache) + sizeof(QueryParseCache))) + +/* Get pointer to entry array in query cache */ +#define QUERY_CACHE_ENTRIES(cache) \ + ((QueryParseEntry *)((char *)(cache) + sizeof(QueryParseCache) + \ + (cache)->num_buckets * sizeof(int))) + +/* ---------------- + * Spinlock operations using atomic compare-and-swap + * ---------------- + */ + +static inline void +spin_lock(volatile uint32 *lock) +{ + while (__sync_lock_test_and_set(lock, 1)) + { + /* Spin until we acquire the lock */ + while (*lock) + ; + } +} + +static inline void +spin_unlock(volatile uint32 *lock) +{ + __sync_lock_release(lock); +} + +/* ---------------- + * Hash functions + * ---------------- + */ + +/* + * FNV-1a hash for strings + */ +static uint32 +fnv1a_hash_string(const char *str) +{ + uint32 hash = 2166136261u; /* FNV offset basis */ + + while (*str) + { + hash ^= (uint8)*str++; + hash *= 16777619u; /* FNV prime */ + } + + return hash; +} + +/* + * FNV-1a hash for 64-bit value + */ +static uint64 +fnv1a_hash_64(const char *str, size_t len) +{ + uint64 hash = 14695981039346656037ULL; /* FNV offset basis for 64-bit */ + size_t i; + + for (i = 0; i < len; i++) + { + hash ^= (uint8)str[i]; + hash *= 1099511628211ULL; /* FNV prime for 64-bit */ + } + + return hash; +} + +/* ---------------- + * Time utilities + * ---------------- + */ + +/* + * Get elapsed time in microseconds between two timevals + */ +static int64 +elapsed_us(struct timeval *start, struct timeval *end) +{ + return ((int64)(end->tv_sec - start->tv_sec) * 1000000) + + (end->tv_usec - start->tv_usec); +} + +/* + * Get current time + */ +static void +get_current_time(struct timeval *tv) +{ + gettimeofday(tv, NULL); +} + +/* ---------------- + * Table mutation hash table operations + * ---------------- + */ + +/* + * Initialize table mutation hash table + */ +static void +table_map_init(TableMutationHashTable *map, int num_buckets, int max_entries) +{ + int *buckets; + TableMutationEntry *entries; + int i; + + map->num_buckets = num_buckets; + map->max_entries = max_entries; + map->num_entries = 0; + map->free_list_head = 0; + map->lock = 0; + + buckets = TABLE_MAP_BUCKETS(map); + entries = TABLE_MAP_ENTRIES(map); + + /* Initialize all buckets to empty */ + for (i = 0; i < num_buckets; i++) + buckets[i] = MEMORY_MAP_INVALID_INDEX; + + /* Initialize free list - chain all entries */ + for (i = 0; i < max_entries; i++) + { + entries[i].in_use = false; + entries[i].next = (i < max_entries - 1) ? i + 1 : MEMORY_MAP_INVALID_INDEX; + } + + ereport(DEBUG1, + (errmsg("memory_map: initialized table map with %d buckets, %d max entries", + num_buckets, max_entries))); +} + +/* + * Allocate an entry from the free list + */ +static int +table_map_alloc_entry(TableMutationHashTable *map) +{ + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + int idx; + + if (map->free_list_head == MEMORY_MAP_INVALID_INDEX) + return MEMORY_MAP_INVALID_INDEX; + + idx = map->free_list_head; + map->free_list_head = entries[idx].next; + entries[idx].in_use = true; + entries[idx].next = MEMORY_MAP_INVALID_INDEX; + map->num_entries++; + + return idx; +} + +/* + * Free an entry back to the free list + */ +static void +table_map_free_entry(TableMutationHashTable *map, int idx) +{ + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + + entries[idx].in_use = false; + entries[idx].next = map->free_list_head; + map->free_list_head = idx; + map->num_entries--; +} + +/* + * Look up a table in the hash table + * Returns entry index or MEMORY_MAP_INVALID_INDEX if not found + * Must be called with lock held + */ +static int +table_map_lookup(TableMutationHashTable *map, const char *table_name, uint32 hash) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + int bucket = hash % map->num_buckets; + int idx = buckets[bucket]; + + while (idx != MEMORY_MAP_INVALID_INDEX) + { + if (entries[idx].hash == hash && + strcmp(entries[idx].table_name, table_name) == 0) + { + return idx; + } + idx = entries[idx].next; + } + + return MEMORY_MAP_INVALID_INDEX; +} + +/* + * Insert or update a table entry + * Must be called with lock held + */ +static void +table_map_insert(TableMutationHashTable *map, const char *table_name, + uint32 hash, struct timeval *write_time) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + int bucket = hash % map->num_buckets; + int idx; + + /* Check if entry already exists */ + idx = table_map_lookup(map, table_name, hash); + if (idx != MEMORY_MAP_INVALID_INDEX) + { + /* Update existing entry */ + entries[idx].last_write_time = *write_time; + return; + } + + /* Allocate new entry */ + idx = table_map_alloc_entry(map); + if (idx == MEMORY_MAP_INVALID_INDEX) + { + int b; + /* Table is full - evict oldest entry */ + /* For simplicity, just use the first entry in first non-empty bucket */ + for (b = 0; b < map->num_buckets; b++) + { + if (buckets[b] != MEMORY_MAP_INVALID_INDEX) + { + int victim = buckets[b]; + buckets[b] = entries[victim].next; + table_map_free_entry(map, victim); + idx = table_map_alloc_entry(map); + break; + } + } + + if (idx == MEMORY_MAP_INVALID_INDEX) + { + ereport(WARNING, + (errmsg("memory_map: failed to allocate entry for table %s", table_name))); + return; + } + } + + /* Initialize new entry */ + strlcpy(entries[idx].table_name, table_name, MEMORY_MAP_TABLE_NAME_LEN); + entries[idx].hash = hash; + entries[idx].last_write_time = *write_time; + + /* Insert at head of bucket chain */ + entries[idx].next = buckets[bucket]; + buckets[bucket] = idx; + + ereport(DEBUG2, + (errmsg("memory_map: marked table '%s' as written", table_name))); +} + +/* + * Remove expired entries from the table map + * Must be called with lock held + */ +static void +table_map_cleanup_expired(TableMutationHashTable *map, uint64 ttl_us) +{ + int *buckets = TABLE_MAP_BUCKETS(map); + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + struct timeval now; + int removed = 0; + int b; + + get_current_time(&now); + + for (b = 0; b < map->num_buckets; b++) + { + int *prev_ptr = &buckets[b]; + int idx = buckets[b]; + + while (idx != MEMORY_MAP_INVALID_INDEX) + { + int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now); + + if (elapsed > (int64)ttl_us) + { + /* Entry has expired - remove it */ + int next = entries[idx].next; + *prev_ptr = next; + table_map_free_entry(map, idx); + idx = next; + removed++; + } + else + { + prev_ptr = &entries[idx].next; + idx = entries[idx].next; + } + } + } + + if (removed > 0) + { + ereport(DEBUG1, + (errmsg("memory_map: cleaned up %d expired table entries", removed))); + } +} + +/* ---------------- + * Query parse cache operations + * ---------------- + */ + +/* + * Initialize query parse cache + */ +static void +query_cache_init(QueryParseCache *cache, int num_buckets, int max_entries) +{ + int *buckets; + QueryParseEntry *entries; + int i; + + cache->num_buckets = num_buckets; + cache->max_entries = max_entries; + cache->num_entries = 0; + cache->free_list_head = 0; + cache->lru_head = MEMORY_MAP_INVALID_INDEX; + cache->lru_tail = MEMORY_MAP_INVALID_INDEX; + cache->lock = 0; + + buckets = QUERY_CACHE_BUCKETS(cache); + entries = QUERY_CACHE_ENTRIES(cache); + + /* Initialize all buckets to empty */ + for (i = 0; i < num_buckets; i++) + buckets[i] = MEMORY_MAP_INVALID_INDEX; + + /* Initialize free list */ + for (i = 0; i < max_entries; i++) + { + entries[i].in_use = false; + entries[i].next = (i < max_entries - 1) ? i + 1 : MEMORY_MAP_INVALID_INDEX; + entries[i].lru_prev = MEMORY_MAP_INVALID_INDEX; + entries[i].lru_next = MEMORY_MAP_INVALID_INDEX; + } + + ereport(DEBUG1, + (errmsg("memory_map: initialized query cache with %d buckets, %d max entries", + num_buckets, max_entries))); +} + +/* + * Move entry to front of LRU list (most recently used) + */ +static void +query_cache_lru_touch(QueryParseCache *cache, int idx) +{ + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + + /* Already at head? */ + if (cache->lru_head == idx) + return; + + /* Remove from current position */ + if (entries[idx].lru_prev != MEMORY_MAP_INVALID_INDEX) + entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next; + if (entries[idx].lru_next != MEMORY_MAP_INVALID_INDEX) + entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev; + if (cache->lru_tail == idx) + cache->lru_tail = entries[idx].lru_prev; + + /* Insert at head */ + entries[idx].lru_prev = MEMORY_MAP_INVALID_INDEX; + entries[idx].lru_next = cache->lru_head; + if (cache->lru_head != MEMORY_MAP_INVALID_INDEX) + entries[cache->lru_head].lru_prev = idx; + cache->lru_head = idx; + if (cache->lru_tail == MEMORY_MAP_INVALID_INDEX) + cache->lru_tail = idx; +} + +/* + * Add entry to LRU list (at head) + */ +static void +query_cache_lru_add(QueryParseCache *cache, int idx) +{ + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + + entries[idx].lru_prev = MEMORY_MAP_INVALID_INDEX; + entries[idx].lru_next = cache->lru_head; + + if (cache->lru_head != MEMORY_MAP_INVALID_INDEX) + entries[cache->lru_head].lru_prev = idx; + + cache->lru_head = idx; + + if (cache->lru_tail == MEMORY_MAP_INVALID_INDEX) + cache->lru_tail = idx; +} + +/* + * Remove entry from LRU list + */ +static void +query_cache_lru_remove(QueryParseCache *cache, int idx) +{ + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + + if (entries[idx].lru_prev != MEMORY_MAP_INVALID_INDEX) + entries[entries[idx].lru_prev].lru_next = entries[idx].lru_next; + else + cache->lru_head = entries[idx].lru_next; + + if (entries[idx].lru_next != MEMORY_MAP_INVALID_INDEX) + entries[entries[idx].lru_next].lru_prev = entries[idx].lru_prev; + else + cache->lru_tail = entries[idx].lru_prev; + + entries[idx].lru_prev = MEMORY_MAP_INVALID_INDEX; + entries[idx].lru_next = MEMORY_MAP_INVALID_INDEX; +} + +/* + * Allocate entry from free list, evicting LRU if necessary + */ +static int +query_cache_alloc_entry(QueryParseCache *cache) +{ + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + int *buckets = QUERY_CACHE_BUCKETS(cache); + int idx; + + if (cache->free_list_head != MEMORY_MAP_INVALID_INDEX) + { + idx = cache->free_list_head; + cache->free_list_head = entries[idx].next; + entries[idx].in_use = true; + entries[idx].next = MEMORY_MAP_INVALID_INDEX; + cache->num_entries++; + return idx; + } + + /* No free entries - evict LRU */ + if (cache->lru_tail == MEMORY_MAP_INVALID_INDEX) + return MEMORY_MAP_INVALID_INDEX; + + idx = cache->lru_tail; + + /* Remove from hash bucket */ + int bucket = entries[idx].query_hash % cache->num_buckets; + int *prev_ptr = &buckets[bucket]; + int curr = buckets[bucket]; + + while (curr != MEMORY_MAP_INVALID_INDEX) + { + if (curr == idx) + { + *prev_ptr = entries[curr].next; + break; + } + prev_ptr = &entries[curr].next; + curr = entries[curr].next; + } + + /* Remove from LRU list */ + query_cache_lru_remove(cache, idx); + + /* Reinitialize entry */ + entries[idx].in_use = true; + entries[idx].next = MEMORY_MAP_INVALID_INDEX; + + return idx; +} + +/* + * Look up a query in the cache + */ +static int +query_cache_lookup(QueryParseCache *cache, uint64 hash) +{ + int *buckets = QUERY_CACHE_BUCKETS(cache); + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + int bucket = hash % cache->num_buckets; + int idx = buckets[bucket]; + + while (idx != MEMORY_MAP_INVALID_INDEX) + { + if (entries[idx].query_hash == hash) + return idx; + idx = entries[idx].next; + } + + return MEMORY_MAP_INVALID_INDEX; +} + +/* ---------------- + * Query normalization + * ---------------- + */ + +/* + * Simple query normalization: + * - Strip comments (-- style and C-style block comments) + * - Collapse whitespace + * - Convert to lowercase (except inside strings) + * - Replace literal values with placeholders + * + * This is a simplified version - pgpool2 already does this elsewhere, + * but we need a standalone version for the memory map feature. + */ +static size_t +normalize_query(const char *query, char *output, size_t output_size) +{ + const char *src = query; + char *dst = output; + char *dst_end = output + output_size - 1; + bool in_string = false; + char string_char = 0; + bool last_was_space = true; /* Start true to skip leading space */ + + while (*src && dst < dst_end) + { + /* Handle string literals */ + if (in_string) + { + if (*src == string_char) + { + if (*(src + 1) == string_char) + { + /* Escaped quote */ + src += 2; + continue; + } + in_string = false; + *dst++ = '$'; /* Replace string content with placeholder */ + } + src++; + continue; + } + + /* Check for string start */ + if (*src == '\'' || *src == '"') + { + in_string = true; + string_char = *src; + src++; + continue; + } + + /* Handle single-line comments */ + if (*src == '-' && *(src + 1) == '-') + { + while (*src && *src != '\n') + src++; + continue; + } + + /* Handle multi-line comments */ + if (*src == '/' && *(src + 1) == '*') + { + src += 2; + while (*src && !(*src == '*' && *(src + 1) == '/')) + src++; + if (*src) + src += 2; + continue; + } + + /* Handle whitespace */ + if (*src == ' ' || *src == '\t' || *src == '\n' || *src == '\r') + { + if (!last_was_space) + { + *dst++ = ' '; + last_was_space = true; + } + src++; + continue; + } + + /* Handle numbers - replace with placeholder */ + if ((*src >= '0' && *src <= '9') || + (*src == '.' && *(src + 1) >= '0' && *(src + 1) <= '9')) + { + while (*src && ((*src >= '0' && *src <= '9') || *src == '.')) + src++; + if (!last_was_space && dst > output && *(dst - 1) != '$') + *dst++ = '$'; + last_was_space = false; + continue; + } + + /* Regular character - convert to lowercase */ + if (*src >= 'A' && *src <= 'Z') + *dst++ = *src + 32; + else + *dst++ = *src; + + last_was_space = false; + src++; + } + + /* Remove trailing space */ + if (dst > output && *(dst - 1) == ' ') + dst--; + + *dst = '\0'; + return dst - output; +} + +/* ---------------- + * Public API implementation + * ---------------- + */ + +Size +pool_memory_map_shmem_size(void) +{ + Size size = 0; + int table_buckets = pool_config->memory_map_table_buckets; + int table_size = pool_config->memory_map_table_size; + int query_buckets = pool_config->memory_map_query_buckets; + int query_cache_size = pool_config->memory_map_query_cache_size; + + /* Main structure */ + size += sizeof(MemoryMapShmem); + + /* Table mutation hash table */ + size += sizeof(TableMutationHashTable); + size += table_buckets * sizeof(int); /* buckets array */ + size += table_size * sizeof(TableMutationEntry); /* entries array */ + + /* Query parse cache */ + size += sizeof(QueryParseCache); + size += query_buckets * sizeof(int); /* buckets array */ + size += query_cache_size * sizeof(QueryParseEntry); /* entries array */ + + return size; +} + +void +pool_memory_map_init(void) +{ +#ifndef POOL_PRIVATE + Size shmem_size; + char *shmem_ptr; + + if (!pool_config->memory_map_enabled) + { + ereport(DEBUG1, + (errmsg("memory_map: feature disabled"))); + return; + } + + shmem_size = pool_memory_map_shmem_size(); + + /* + * Allocate from the main shared memory segment. + * Memory is already zeroed by initialize_shared_memory_main_segment(). + */ + shmem_ptr = pool_shared_memory_segment_get_chunk(shmem_size); + if (shmem_ptr == NULL) + { + ereport(ERROR, + (errmsg("memory_map: failed to allocate %zu bytes of shared memory", + shmem_size))); + return; + } + + /* Set up pointers to structures within shared memory */ + memory_map_shmem = (MemoryMapShmem *)shmem_ptr; + shmem_ptr += sizeof(MemoryMapShmem); + + memory_map_shmem->table_map = (TableMutationHashTable *)shmem_ptr; + shmem_ptr += sizeof(TableMutationHashTable); + shmem_ptr += pool_config->memory_map_table_buckets * sizeof(int); + shmem_ptr += pool_config->memory_map_table_size * sizeof(TableMutationEntry); + + memory_map_shmem->query_cache = (QueryParseCache *)shmem_ptr; + + /* Initialize structures */ + table_map_init(memory_map_shmem->table_map, + pool_config->memory_map_table_buckets, + pool_config->memory_map_table_size); + + query_cache_init(memory_map_shmem->query_cache, + pool_config->memory_map_query_buckets, + pool_config->memory_map_query_cache_size); + + /* Initialize global state */ + memory_map_shmem->state.initialized = true; + memory_map_shmem->state.current_ttl_us = MEMORY_MAP_DEFAULT_TTL_US; + get_current_time(&memory_map_shmem->state.ttl_last_updated); + memory_map_shmem->state.stats_queries_checked = 0; + memory_map_shmem->state.stats_forced_primary = 0; + memory_map_shmem->state.stats_allowed_replica = 0; + + ereport(LOG, + (errmsg("memory_map: initialized with %zu bytes shared memory", + shmem_size))); +#endif +} + +void +pool_memory_map_child_init(void) +{ + if (!pool_config->memory_map_enabled || memory_map_shmem == NULL) + return; + + get_current_time(&process_start_time); + cold_start_initialized = true; + + ereport(DEBUG1, + (errmsg("memory_map: child initialized, cold start period %d ms", + pool_config->memory_map_cold_start_duration))); +} + +bool +pool_memory_map_in_cold_start(void) +{ + struct timeval now; + int64 elapsed_ms; + + if (!pool_config->memory_map_enabled || !cold_start_initialized) + return false; + + if (pool_config->memory_map_cold_start_duration <= 0) + return false; + + get_current_time(&now); + elapsed_ms = elapsed_us(&process_start_time, &now) / 1000; + + if (elapsed_ms < pool_config->memory_map_cold_start_duration) + { + ereport(DEBUG2, + (errmsg("memory_map: in cold start (%ld/%d ms)", + (long)elapsed_ms, pool_config->memory_map_cold_start_duration))); + return true; + } + + return false; +} + +bool +pool_memory_map_table_is_stale(const char *table_name) +{ + TableMutationHashTable *map; + struct timeval now; + uint64 ttl_us; + uint32 hash; + int idx; + bool is_stale = false; + + if (!pool_config->memory_map_enabled || memory_map_shmem == NULL) + return false; + + map = memory_map_shmem->table_map; + hash = fnv1a_hash_string(table_name); + + spin_lock(&map->lock); + + idx = table_map_lookup(map, table_name, hash); + if (idx != MEMORY_MAP_INVALID_INDEX) + { + TableMutationEntry *entries = TABLE_MAP_ENTRIES(map); + get_current_time(&now); + ttl_us = memory_map_shmem->state.current_ttl_us; + + int64 elapsed = elapsed_us(&entries[idx].last_write_time, &now); + is_stale = (elapsed < (int64)ttl_us); + + ereport(DEBUG2, + (errmsg("memory_map: table '%s' elapsed=%ld us, ttl=%lu us, stale=%d", + table_name, (long)elapsed, (unsigned long)ttl_us, is_stale))); + } + + spin_unlock(&map->lock); + + /* Update statistics */ + __sync_fetch_and_add(&memory_map_shmem->state.stats_queries_checked, 1); + if (is_stale) + __sync_fetch_and_add(&memory_map_shmem->state.stats_forced_primary, 1); + else + __sync_fetch_and_add(&memory_map_shmem->state.stats_allowed_replica, 1); + + return is_stale; +} + +void +pool_memory_map_mark_tables_written(const char **table_names, int num_tables) +{ + TableMutationHashTable *map; + struct timeval now; + int i; + + if (!pool_config->memory_map_enabled || memory_map_shmem == NULL) + return; + + if (num_tables <= 0 || table_names == NULL) + return; + + map = memory_map_shmem->table_map; + get_current_time(&now); + + spin_lock(&map->lock); + + /* Periodically clean up expired entries */ + if (map->num_entries > map->max_entries * 3 / 4) + { + table_map_cleanup_expired(map, memory_map_shmem->state.current_ttl_us); + } + + for (i = 0; i < num_tables; i++) + { + uint32 hash; + + if (table_names[i] != NULL && table_names[i][0] != '\0') + { + hash = fnv1a_hash_string(table_names[i]); + table_map_insert(map, table_names[i], hash, &now); + } + } + + spin_unlock(&map->lock); +} + +/* + * Convenience function to mark a single table as written + */ +void +pool_memory_map_mark_table_written(const char *table_name) +{ + if (table_name != NULL) + { + const char *tables[1] = { table_name }; + pool_memory_map_mark_tables_written(tables, 1); + } +} + +void +pool_memory_map_update_ttl(uint64 delay_us) +{ + uint64 new_ttl; + + if (!pool_config->memory_map_enabled || memory_map_shmem == NULL) + return; + + /* Calculate new TTL: delay * factor, with minimum of default TTL */ + new_ttl = (uint64)(delay_us * pool_config->memory_map_ttl_factor); + if (new_ttl < MEMORY_MAP_DEFAULT_TTL_US) + new_ttl = MEMORY_MAP_DEFAULT_TTL_US; + + /* Maximum TTL of 1 hour */ + if (new_ttl > 3600ULL * 1000000ULL) + new_ttl = 3600ULL * 1000000ULL; + + memory_map_shmem->state.current_ttl_us = new_ttl; + get_current_time(&memory_map_shmem->state.ttl_last_updated); + + ereport(DEBUG1, + (errmsg("memory_map: updated TTL to %lu us (delay=%lu us, factor=%.1f)", + (unsigned long)new_ttl, (unsigned long)delay_us, + pool_config->memory_map_ttl_factor))); +} + +bool +pool_memory_map_get_cached_parse(uint64 hash, bool *is_write, + char table_names[][MEMORY_MAP_TABLE_NAME_LEN], + int *num_tables) +{ + QueryParseCache *cache; + int idx; + bool found = false; + + if (!pool_config->memory_map_enabled || memory_map_shmem == NULL) + return false; + + cache = memory_map_shmem->query_cache; + + spin_lock(&cache->lock); + + idx = query_cache_lookup(cache, hash); + if (idx != MEMORY_MAP_INVALID_INDEX) + { + QueryParseEntry *entries = QUERY_CACHE_ENTRIES(cache); + int i; + + *is_write = entries[idx].is_write; + *num_tables = entries[idx].num_tables; + + for (i = 0; i < entries[idx].num_tables && i < MEMORY_MAP_MAX_TABLES_PER_QUERY; i++) + { + strlcpy(table_names[i], entries[idx].table_names[i], MEMORY_MAP_TABLE_NAME_LEN); + } + + /* Move to front of LRU */ + query_cache_lru_touch(cache, idx); + found = true; + } + + spin_unlock(&cache->lock); + + return found; +} + +void +pool_memory_map_cache_parse(uint64 hash, bool is_write, + const char table_names[][MEMORY_MAP_TABLE_NAME_LEN], + int num_tables) +{ + QueryParseCache *cache; + int *buckets; + QueryParseEntry *entries; + int idx; + int bucket; + + if (!pool_config->memory_map_enabled || memory_map_shmem == NULL) + return; + + cache = memory_map_shmem->query_cache; + + spin_lock(&cache->lock); + + /* Check if already exists */ + idx = query_cache_lookup(cache, hash); + if (idx != MEMORY_MAP_INVALID_INDEX) + { + spin_unlock(&cache->lock); + return; + } + + /* Allocate new entry (may evict LRU) */ + idx = query_cache_alloc_entry(cache); + if (idx == MEMORY_MAP_INVALID_INDEX) + { + spin_unlock(&cache->lock); + ereport(WARNING, + (errmsg("memory_map: failed to allocate query cache entry"))); + return; + } + + entries = QUERY_CACHE_ENTRIES(cache); + buckets = QUERY_CACHE_BUCKETS(cache); + + /* Fill in entry */ + entries[idx].query_hash = hash; + entries[idx].is_write = is_write; + entries[idx].num_tables = (num_tables > MEMORY_MAP_MAX_TABLES_PER_QUERY) ? + MEMORY_MAP_MAX_TABLES_PER_QUERY : num_tables; + + { + int i; + for (i = 0; i < entries[idx].num_tables; i++) + { + strlcpy(entries[idx].table_names[i], table_names[i], MEMORY_MAP_TABLE_NAME_LEN); + } + } + + /* Insert into hash bucket */ + bucket = hash % cache->num_buckets; + entries[idx].next = buckets[bucket]; + buckets[bucket] = idx; + + /* Add to LRU list */ + query_cache_lru_add(cache, idx); + + spin_unlock(&cache->lock); +} + +uint64 +pool_memory_map_normalize_and_hash(const char *query) +{ + char normalized[8192]; + size_t len; + + if (query == NULL || query[0] == '\0') + return 0; + + len = normalize_query(query, normalized, sizeof(normalized)); + if (len == 0) + return 0; + + return fnv1a_hash_64(normalized, len); +} + +uint64 +pool_memory_map_get_ttl(void) +{ + if (!pool_config->memory_map_enabled || memory_map_shmem == NULL) + return MEMORY_MAP_DEFAULT_TTL_US; + + return memory_map_shmem->state.current_ttl_us; +} + +void +pool_memory_map_get_stats(uint32 *queries_checked, + uint32 *forced_primary, + uint32 *allowed_replica, + uint64 *current_ttl_us) +{ + if (!pool_config->memory_map_enabled || memory_map_shmem == NULL) + { + *queries_checked = 0; + *forced_primary = 0; + *allowed_replica = 0; + *current_ttl_us = 0; + return; + } + + *queries_checked = memory_map_shmem->state.stats_queries_checked; + *forced_primary = memory_map_shmem->state.stats_forced_primary; + *allowed_replica = memory_map_shmem->state.stats_allowed_replica; + *current_ttl_us = memory_map_shmem->state.current_ttl_us; +} -- 2.52.0