From 2565d66b441c82e1e4a70b146aa8bfbda71e1471 Mon Sep 17 00:00:00 2001 From: Nadav Shatz Date: Mon, 27 Oct 2025 16:22:31 +0200 Subject: [PATCH 1/2] feat: external replication delay injection via external command Implementation: - Add replication_delay_source_cmd configuration string option - Remove replication_delay_source enum (no magic values) - Command receives replica node identifiers in host:port format - Primary node omitted from command arguments and output - Handle -1 for down nodes (log without triggering failover) - Command outputs one delay value (ms) per replica - Falls back to builtin queries if command not configured - Timeout handling with replication_delay_source_timeout --- src/config/pool_config_variables.c | 21 ++ src/include/pool_config.h | 3 +- src/sample/pgpool.conf.sample-stream | 14 + src/streaming_replication/pool_worker_child.c | 340 +++++++++++++++++- 4 files changed, 376 insertions(+), 2 deletions(-) diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c index 5bbe46d3a..efac0d866 100644 --- a/src/config/pool_config_variables.c +++ b/src/config/pool_config_variables.c @@ -980,6 +980,16 @@ static struct config_string ConfigureNamesString[] = NULL, NULL, NULL, NULL }, + { + {"replication_delay_source_cmd", CFGCXT_RELOAD, STREAMING_REPLICATION_CONFIG, + "External command to retrieve replication delay information.", + CONFIG_VAR_TYPE_STRING, false, 0 + }, + &g_pool_config.replication_delay_source_cmd, + "", + NULL, NULL, NULL, NULL + }, + { {"failback_command", CFGCXT_RELOAD, FAILOVER_CONFIG, "Command to execute when backend node is attached.", @@ -2323,6 +2333,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"replication_delay_source_timeout", CFGCXT_RELOAD, STREAMING_REPLICATION_CONFIG, + "Timeout for external replication delay command execution in seconds.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.replication_delay_source_timeout, + 10, + 1, 3600, + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_INT }; diff --git a/src/include/pool_config.h b/src/include/pool_config.h index be82750e5..67275559f 100644 --- a/src/include/pool_config.h +++ b/src/include/pool_config.h @@ -94,7 +94,6 @@ typedef enum LogStandbyDelayModes LSD_NONE } LogStandbyDelayModes; - typedef enum MemCacheMethod { SHMEM_CACHE = 1, @@ -371,6 +370,8 @@ typedef struct char *sr_check_password; /* password for sr_check_user */ char *sr_check_database; /* PostgreSQL database name for streaming * replication check */ + char *replication_delay_source_cmd; /* external command for replication delay */ + int replication_delay_source_timeout; /* timeout for external command in seconds */ char *failover_command; /* execute command when failover happens */ char *follow_primary_command; /* execute command when failover is * ended */ diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream index a7eb594c9..2ccc907ea 100644 --- a/src/sample/pgpool.conf.sample-stream +++ b/src/sample/pgpool.conf.sample-stream @@ -519,6 +519,20 @@ backend_clustering_mode = streaming_replication #sr_check_database = 'postgres' # Database name for streaming replication check + +#replication_delay_source_cmd = '' + # External command to retrieve replication delay information + # If set, pgpool uses this command instead of built-in queries + # Command receives replica node identifiers (host:port) as arguments + # Primary node is omitted from arguments + # Command should output one delay value (in ms) per replica + # Use -1 to indicate a replica that is down but not yet detected + # Format: "25 100" for 2 replicas (e.g., 3-node cluster with 1 primary) + # Command runs as the pgpool process user +#replication_delay_source_timeout = 10 + # Timeout for external command execution in seconds + # Range: 1-3600 seconds (default: 10) + #delay_threshold = 0 # Threshold before not dispatching query to standby node # Unit is in bytes diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c index 4f8f823a3..07559ba45 100644 --- a/src/streaming_replication/pool_worker_child.c +++ b/src/streaming_replication/pool_worker_child.c @@ -76,6 +76,8 @@ static volatile sig_atomic_t restart_request = 0; static void establish_persistent_connection(void); static void discard_persistent_connection(void); static void check_replication_time_lag(void); +static void check_replication_time_lag_with_cmd(void); +static char *build_instance_identifier_for_node(int node_id); static void CheckReplicationTimeLagErrorCb(void *arg); static unsigned long long int text_to_lsn(char *text); static RETSIGTYPE my_signal_handler(int sig); @@ -259,7 +261,12 @@ do_worker_child(void) POOL_NODE_STATUS *node_status; int i; - /* Do replication time lag checking */ + /* Do replication time lag checking */ + /* Use external command if replication_delay_source_cmd is configured */ + if (pool_config->replication_delay_source_cmd && + strlen(pool_config->replication_delay_source_cmd) > 0) + check_replication_time_lag_with_cmd(); + else check_replication_time_lag(); /* Check node status */ @@ -659,6 +666,337 @@ check_replication_time_lag(void) error_context_stack = callback.previous; } +#define MAX_CMD_OUTPUT 4096 +#define MAX_REASONABLE_DELAY_MS 3600000.0 /* 1 hour in milliseconds */ + +/* Global variable to track command timeout */ +static volatile sig_atomic_t command_timeout_occurred = 0; + +/* + * Signal handler for command timeou + */ +static void +command_timeout_handler(int sig) +{ + command_timeout_occurred = 1; +} + + + +/* + * Check replication time lag using external command + * + * The external command receives only replica (standby) node identifiers as arguments, + * omitting the primary node. It returns delay values in milliseconds for each replica. + * A value of -1 indicates a node that is down but not yet detected by pgpool's health checks. + */ +static void +check_replication_time_lag_with_cmd(void) +{ + FILE *fp; + char *command; + char *line; + char *token; + char *saveptr; + int replica_idx; + int num_replicas; + double delay_ms; + uint64 delay; + int token_count = 0; + BackendInfo *bkinfo; + ErrorContextCallback callback; + + if (NUM_BACKENDS <= 1) + { + /* If there's only one node, there's no point to do checking */ + return; + } + + if (REAL_PRIMARY_NODE_ID < 0) + { + /* No need to check if there's no primary */ + return; + } + + if (!VALID_BACKEND(REAL_PRIMARY_NODE_ID)) + { + /* No need to check replication delay if primary is down */ + return; + } + + if (!pool_config->replication_delay_source_cmd || + strlen(pool_config->replication_delay_source_cmd) == 0) + { + ereport(WARNING, + (errmsg("replication_delay_source_cmd is not configured"), + errhint("Set replication_delay_source_cmd to use external command mode"))); + /* Fall back to builtin method */ + check_replication_time_lag(); + return; + } + + /* Allocate buffer for command output */ + line = palloc(MAX_CMD_OUTPUT); + fp = NULL; + + /* + * Register a error context callback to throw proper context message + */ + callback.callback = CheckReplicationTimeLagErrorCb; + callback.arg = NULL; + callback.previous = error_context_stack; + error_context_stack = &callback; + + /* Execute command as current process user */ + PG_TRY(); + { + const char *base_command = pool_config->replication_delay_source_cmd; + size_t total_len = strlen(base_command) + 1; /* +1 for NUL */ + + /* Build command with replica-only arguments (omit primary) */ + /* Calculate total command length including space-separated replica identifiers */ + for (int i = 0; i < NUM_BACKENDS; i++) + { + if (i == REAL_PRIMARY_NODE_ID) + continue; /* Skip primary node */ + + char *ident = build_instance_identifier_for_node(i); + total_len += 1 /* space */ + strlen(ident); + pfree(ident); + } + + command = palloc(total_len); + strlcpy(command, base_command, total_len); + + /* Append replica identifiers */ + for (int i = 0; i < NUM_BACKENDS; i++) + { + if (i == REAL_PRIMARY_NODE_ID) + continue; /* Skip primary node */ + + char *ident = build_instance_identifier_for_node(i); + strlcat(command, " ", total_len); + strlcat(command, ident, total_len); + pfree(ident); + } + + ereport(DEBUG1, + (errmsg("executing replication delay command: %s", command))); + + /* Set up timeout for command execution */ + command_timeout_occurred = 0; + signal(SIGALRM, command_timeout_handler); + alarm(pool_config->replication_delay_source_timeout); + + fp = popen(command, "r"); + if (fp == NULL) + { + alarm(0); /* Cancel alarm */ + signal(SIGALRM, SIG_DFL); + ereport(ERROR, + (errmsg("failed to execute replication delay command: %s", command), + errdetail("popen failed: %m"))); + } + + if (fgets(line, MAX_CMD_OUTPUT, fp) == NULL) + { + int pclose_result = pclose(fp); + fp = NULL; + alarm(0); /* Cancel alarm */ + signal(SIGALRM, SIG_DFL); + + if (command_timeout_occurred) + { + ereport(ERROR, + (errmsg("replication delay command timed out after %d seconds: %s", + pool_config->replication_delay_source_timeout, command), + errhint("Consider increasing replication_delay_source_timeout or optimizing the command"))); + } + else + { + ereport(ERROR, + (errmsg("failed to read output from replication delay command: %s", command), + errdetail("command exit status: %d", pclose_result))); + } + } + + alarm(0); /* Cancel alarm */ + signal(SIGALRM, SIG_DFL); + + /* Check if output was truncated */ + if (strlen(line) == MAX_CMD_OUTPUT - 1 && line[MAX_CMD_OUTPUT - 2] != '\n') + { + ereport(WARNING, + (errmsg("replication delay command output may have been truncated"))); + } + + pclose(fp); + fp = NULL; + pfree(command); + command = NULL; + + /* Set primary node delay to 0 */ + bkinfo = pool_get_node_info(REAL_PRIMARY_NODE_ID); + bkinfo->standby_delay = 0; + bkinfo->standby_delay_by_time = true; + + /* Count expected replicas */ + num_replicas = NUM_BACKENDS - 1; /* Total nodes minus primary */ + + /* Count tokens in output for validation */ + char *line_copy = pstrdup(line); + char *temp_token = strtok(line_copy, " \t\n"); + while (temp_token != NULL) + { + token_count++; + temp_token = strtok(NULL, " \t\n"); + } + pfree(line_copy); + + if (token_count != num_replicas) + { + ereport(WARNING, + (errmsg("replication delay command returned %d values, expected %d (one per replica, excluding primary)", + token_count, num_replicas), + errhint("Command should output one delay value per replica node"))); + } + + /* Parse the output - one delay value per replica in order */ + token = strtok_r(line, " \t\n", &saveptr); + replica_idx = 0; + + for (int i = 0; i < NUM_BACKENDS && token != NULL; i++) + { + if (i == REAL_PRIMARY_NODE_ID) + continue; /* Skip primary - it's not in the output */ + + if (!VALID_BACKEND(i)) + { + /* Skip invalid backend but consume token */ + token = strtok_r(NULL, " \t\n", &saveptr); + replica_idx++; + continue; + } + + char *endptr; + delay_ms = strtod(token, &endptr); + + /* Validate the conversion */ + if (*endptr != '\0') + { + ereport(WARNING, + (errmsg("invalid delay value '%s' for node %d, treating as 0", + token, i))); + delay_ms = 0; + } + + bkinfo = pool_get_node_info(i); + + /* Handle -1 for down nodes */ + if (delay_ms == -1.0) + { + ereport(LOG, + (errmsg("node %d reported as down by external command (delay -1), relying on health check for failover decision", + i))); + /* Keep previous delay value, don't trigger failover */ + token = strtok_r(NULL, " \t\n", &saveptr); + replica_idx++; + continue; + } + + /* Validate delay value range */ + if (delay_ms < 0) + { + ereport(WARNING, + (errmsg("negative delay value %.3f for node %d (other than -1), treating as 0", + delay_ms, i))); + delay_ms = 0; + } + else if (delay_ms > MAX_REASONABLE_DELAY_MS) + { + ereport(WARNING, + (errmsg("extremely large delay value %.3f for node %d", + delay_ms, i))); + } + + /* Convert delay from milliseconds to microseconds for internal storage */ + delay = (uint64)(delay_ms * 1000); + bkinfo->standby_delay = delay; + bkinfo->standby_delay_by_time = true; + + /* Log delay if necessary */ + uint64 delay_threshold_by_time = pool_config->delay_threshold_by_time * 1000; /* threshold is in milliseconds, convert to microseconds */ + + if ((pool_config->log_standby_delay == LSD_ALWAYS && delay_ms > 0) || + (pool_config->log_standby_delay == LSD_OVER_THRESHOLD && + bkinfo->standby_delay > delay_threshold_by_time)) + { + ereport(LOG, + (errmsg("Replication of node: %d is behind %.3f second(s) from the primary server (node: %d) [external command]", + i, delay_ms / 1000, REAL_PRIMARY_NODE_ID))); + } + + token = strtok_r(NULL, " \t\n", &saveptr); + replica_idx++; + } + + } + PG_CATCH(); + { + /* Cleanup in case of error */ + alarm(0); /* Cancel any pending alarm */ + signal(SIGALRM, SIG_DFL); + if (fp) + { + pclose(fp); + fp = NULL; + } + if (line) + pfree(line); + if (command) + pfree(command); + error_context_stack = callback.previous; + PG_RE_THROW(); + } + PG_END_TRY(); + + /* Normal cleanup */ + if (line) + pfree(line); + + error_context_stack = callback.previous; +} + +/* + * build_instance_identifier_for_node + * Build an identifier string for a backend node for passing to external commands. + * Format: ":" + */ +static char * +build_instance_identifier_for_node(int node_id) +{ + BackendInfo *bi = pool_get_node_info(node_id); + size_t hlen; + size_t out_len; + char *out; + + if (!bi || bi->backend_hostname[0] == '\0' || bi->backend_port <= 0) + { + /* Fallback if hostname or port is not set */ + out = palloc(32); + snprintf(out, 32, "unknown_node_%d", node_id); + return out; + } + + /* Use hostname:port format */ + hlen = strlen(bi->backend_hostname); + /* max port chars ~5, plus colon and NUL */ + out_len = hlen + 1 + 5 + 1; + out = palloc(out_len); + snprintf(out, out_len, "%s:%d", bi->backend_hostname, bi->backend_port); + return out; +} + static void CheckReplicationTimeLagErrorCb(void *arg) { -- 2.51.1