From 03c27743219240ea9050f604f5e7fcae74992c2c Mon Sep 17 00:00:00 2001 From: Nadav Shatz Date: Mon, 8 Sep 2025 10:33:49 +0300 Subject: [PATCH 1/3] feat: add external command replication delay source feature - Add replication_delay_source_cmd configuration option - Support external commands for replication delay detection - Pass ordered instance identifiers to external commands - Update configuration files and samples --- src/config/pool_config_variables.c | 38 ++ src/include/pool_config.h | 9 + src/sample/pgpool.conf.sample-stream | 16 + src/streaming_replication/pool_worker_child.c | 363 +++++++++++++++++- 4 files changed, 424 insertions(+), 2 deletions(-) diff --git a/src/config/pool_config_variables.c b/src/config/pool_config_variables.c index 5bbe46d3a..d2be434e0 100644 --- a/src/config/pool_config_variables.c +++ b/src/config/pool_config_variables.c @@ -310,6 +310,12 @@ static const struct config_enum_entry check_temp_table_options[] = { {NULL, 0, false} }; +static const struct config_enum_entry replication_delay_source_options[] = { + {"builtin", REPLICATION_DELAY_BUILTIN, false}, + {"cmd", REPLICATION_DELAY_CMD, false}, + {NULL, 0, false} +}; + static const struct config_enum_entry log_backend_messages_options[] = { {"none", BGMSG_NONE, false}, /* turn off logging */ {"terse", BGMSG_TERSE, false}, /* terse logging (repeated messages are @@ -980,6 +986,16 @@ static struct config_string ConfigureNamesString[] = NULL, NULL, NULL, NULL }, + { + {"replication_delay_source_cmd", CFGCXT_RELOAD, STREAMING_REPLICATION_CONFIG, + "External command to retrieve replication delay information.", + CONFIG_VAR_TYPE_STRING, false, 0 + }, + &g_pool_config.replication_delay_source_cmd, + "", + NULL, NULL, NULL, NULL + }, + { {"failback_command", CFGCXT_RELOAD, FAILOVER_CONFIG, "Command to execute when backend node is attached.", @@ -2323,6 +2339,17 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"replication_delay_source_timeout", CFGCXT_RELOAD, STREAMING_REPLICATION_CONFIG, + "Timeout for external replication delay command execution in seconds.", + CONFIG_VAR_TYPE_INT, false, 0 + }, + &g_pool_config.replication_delay_source_timeout, + 10, + 1, 3600, + NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_INT }; @@ -2485,6 +2512,17 @@ static struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL, NULL }, + { + {"replication_delay_source", CFGCXT_RELOAD, STREAMING_REPLICATION_CONFIG, + "Source of replication delay information.", + CONFIG_VAR_TYPE_ENUM, false, 0 + }, + (int *) &g_pool_config.replication_delay_source, + REPLICATION_DELAY_BUILTIN, + replication_delay_source_options, + NULL, NULL, NULL, NULL + }, + /* End-of-list marker */ EMPTY_CONFIG_ENUM }; diff --git a/src/include/pool_config.h b/src/include/pool_config.h index be82750e5..1a8262dd7 100644 --- a/src/include/pool_config.h +++ b/src/include/pool_config.h @@ -94,6 +94,12 @@ typedef enum LogStandbyDelayModes LSD_NONE } LogStandbyDelayModes; +typedef enum ReplicationDelaySourceModes +{ + REPLICATION_DELAY_BUILTIN = 1, + REPLICATION_DELAY_CMD +} ReplicationDelaySourceModes; + typedef enum MemCacheMethod { @@ -371,6 +377,9 @@ typedef struct char *sr_check_password; /* password for sr_check_user */ char *sr_check_database; /* PostgreSQL database name for streaming * replication check */ + int replication_delay_source; /* replication delay source: builtin or cmd */ + char *replication_delay_source_cmd; /* external command for replication delay */ + int replication_delay_source_timeout; /* timeout for external command in seconds */ char *failover_command; /* execute command when failover happens */ char *follow_primary_command; /* execute command when failover is * ended */ diff --git a/src/sample/pgpool.conf.sample-stream b/src/sample/pgpool.conf.sample-stream index a7eb594c9..662e767a6 100644 --- a/src/sample/pgpool.conf.sample-stream +++ b/src/sample/pgpool.conf.sample-stream @@ -519,6 +519,22 @@ backend_clustering_mode = streaming_replication #sr_check_database = 'postgres' # Database name for streaming replication check + +#replication_delay_source = 'builtin' + # Source of replication delay information + # 'builtin': use built-in database queries (default) + # 'cmd': use external command +#replication_delay_source_cmd = '' + # External command to retrieve replication delay information + # Only used when replication_delay_source = 'cmd' + # Command should output delay values in milliseconds + # Format: "0 20 10" (node0 node1 node2 delays) + # Command runs as the pgpool process user +#replication_delay_source_timeout = 10 + # Timeout for external command execution in seconds + # Only used when replication_delay_source = 'cmd' + # Range: 1-3600 seconds (default: 10) + #delay_threshold = 0 # Threshold before not dispatching query to standby node # Unit is in bytes diff --git a/src/streaming_replication/pool_worker_child.c b/src/streaming_replication/pool_worker_child.c index 4f8f823a3..d19bb1773 100644 --- a/src/streaming_replication/pool_worker_child.c +++ b/src/streaming_replication/pool_worker_child.c @@ -76,6 +76,9 @@ static volatile sig_atomic_t restart_request = 0; static void establish_persistent_connection(void); static void discard_persistent_connection(void); static void check_replication_time_lag(void); +static void check_replication_time_lag_with_cmd(void); +static char *shell_single_quote(const char *src); +static char *get_instance_identifier_for_node(int node_id); static void CheckReplicationTimeLagErrorCb(void *arg); static unsigned long long int text_to_lsn(char *text); static RETSIGTYPE my_signal_handler(int sig); @@ -259,7 +262,10 @@ do_worker_child(void) POOL_NODE_STATUS *node_status; int i; - /* Do replication time lag checking */ + /* Do replication time lag checking */ + if (pool_config->replication_delay_source == REPLICATION_DELAY_CMD) + check_replication_time_lag_with_cmd(); + else check_replication_time_lag(); /* Check node status */ @@ -659,10 +665,363 @@ check_replication_time_lag(void) error_context_stack = callback.previous; } +#define MAX_CMD_OUTPUT 4096 +#define MAX_REASONABLE_DELAY_MS 3600000.0 /* 1 hour in milliseconds */ + +/* Global variable to track command timeout */ +static volatile sig_atomic_t command_timeout_occurred = 0; + +/* + * Signal handler for command timeou + */ +static void +command_timeout_handler(int sig) +{ + command_timeout_occurred = 1; +} + + + +/* + * Check replication time lag using external command + */ +static void +check_replication_time_lag_with_cmd(void) +{ + FILE *fp; + char *command; + char *line; + char *token; + char *saveptr; + int node_id; + double delay_ms; + uint64 delay; + int token_count = 0; + BackendInfo *bkinfo; + ErrorContextCallback callback; + + if (NUM_BACKENDS <= 1) + { + /* If there's only one node, there's no point to do checking */ + return; + } + + if (REAL_PRIMARY_NODE_ID < 0) + { + /* No need to check if there's no primary */ + return; + } + + if (!VALID_BACKEND(REAL_PRIMARY_NODE_ID)) + { + /* No need to check replication delay if primary is down */ + return; + } + + if (!pool_config->replication_delay_source_cmd || + strlen(pool_config->replication_delay_source_cmd) == 0) + { + ereport(WARNING, + (errmsg("replication_delay_source is set to 'cmd' but replication_delay_source_cmd is not configured"), + errhint("Set replication_delay_source_cmd or change replication_delay_source to 'builtin'"))); + /* Fall back to builtin method */ + check_replication_time_lag(); + return; + } + + /* Allocate buffer for command output */ + line = palloc(MAX_CMD_OUTPUT); + + /* + * Register a error context callback to throw proper context message + */ + callback.callback = CheckReplicationTimeLagErrorCb; + callback.arg = NULL; + callback.previous = error_context_stack; + error_context_stack = &callback; + + /* Execute command as current process user */ + PG_TRY(); + { + const char *base_command = pool_config->replication_delay_source_cmd; + + /* Build command with ordered instance identifiers as arguments */ + size_t total_len = strlen(base_command) + 1; /* +1 for NUL */ + for (int i = 0; i < NUM_BACKENDS; i++) + { + char *ident = get_instance_identifier_for_node(i); + char *q = shell_single_quote(ident); + total_len += 1 /* space */ + strlen(q); + pfree(ident); + pfree(q); + } + + command = palloc(total_len); + command[0] = '\0'; + strlcpy(command, base_command, total_len); + for (int i = 0; i < NUM_BACKENDS; i++) + { + char *ident = get_instance_identifier_for_node(i); + char *q = shell_single_quote(ident); + strlcat(command, " ", total_len); + strlcat(command, q, total_len); + pfree(ident); + pfree(q); + } + + ereport(DEBUG1, + (errmsg("executing replication delay command: %s", command))); + + /* Set up timeout for command execution */ + command_timeout_occurred = 0; + signal(SIGALRM, command_timeout_handler); + alarm(pool_config->replication_delay_source_timeout); + + fp = popen(command, "r"); + if (fp == NULL) + { + alarm(0); /* Cancel alarm */ + signal(SIGALRM, SIG_DFL); + ereport(ERROR, + (errmsg("failed to execute replication delay command: %s", command), + errdetail("popen failed: %m"))); + } + + if (fgets(line, MAX_CMD_OUTPUT, fp) == NULL) + { + int pclose_result = pclose(fp); + fp = NULL; + alarm(0); /* Cancel alarm */ + signal(SIGALRM, SIG_DFL); + + if (command_timeout_occurred) + { + ereport(ERROR, + (errmsg("replication delay command timed out after %d seconds: %s", + pool_config->replication_delay_source_timeout, command), + errhint("Consider increasing replication_delay_source_timeout or optimizing the command"))); + } + else + { + ereport(ERROR, + (errmsg("failed to read output from replication delay command: %s", command), + errdetail("command exit status: %d", pclose_result))); + } + } + + alarm(0); /* Cancel alarm */ + signal(SIGALRM, SIG_DFL); + + /* Check if output was truncated */ + if (strlen(line) == MAX_CMD_OUTPUT - 1 && line[MAX_CMD_OUTPUT - 2] != '\n') + { + ereport(WARNING, + (errmsg("replication delay command output may have been truncated"))); + } + + pclose(fp); + fp = NULL; + + /* Parse the output format "0 20 10" where each number is delay in milliseconds for nodes 0, 1, 2 etc */ + /* Count tokens first for validation */ + char *line_copy = pstrdup(line); + char *temp_token = strtok(line_copy, " \t\n"); + while (temp_token != NULL) + { + token_count++; + temp_token = strtok(NULL, " \t\n"); + } + pfree(line_copy); + + /* Now parse the actual tokens */ + token = strtok_r(line, " \t\n", &saveptr); + node_id = 0; + + if (token_count != NUM_BACKENDS) + { + ereport(WARNING, + (errmsg("replication delay command returned %d values, expected %d", + token_count, NUM_BACKENDS), + errhint("Command should output one delay value per backend node"))); + } + + while (token != NULL && node_id < NUM_BACKENDS) + { + if (!VALID_BACKEND(node_id)) + { + node_id++; + token = strtok_r(NULL, " \t\n", &saveptr); + continue; + } + + char *endptr; + delay_ms = strtod(token, &endptr); + + /* Validate the conversion */ + if (*endptr != '\0') + { + ereport(WARNING, + (errmsg("invalid delay value '%s' for node %d, treating as 0", + token, node_id))); + delay_ms = 0; + } + + /* Validate delay value range */ + if (delay_ms < 0) + { + ereport(WARNING, + (errmsg("negative delay value %.3f for node %d, treating as 0", + delay_ms, node_id))); + delay_ms = 0; + } + else if (delay_ms > MAX_REASONABLE_DELAY_MS) + { + ereport(WARNING, + (errmsg("extremely large delay value %.3f for node %d", + delay_ms, node_id))); + } + + bkinfo = pool_get_node_info(node_id); + + if (PRIMARY_NODE_ID == node_id) + { + /* Primary node should always have 0 delay */ + bkinfo->standby_delay = 0; + if (delay_ms > 0) + { + ereport(DEBUG1, + (errmsg("primary node %d reported non-zero delay %.3f, setting to 0", + node_id, delay_ms))); + } + } + else + { + /* Convert delay from milliseconds to microseconds for internal storage */ + delay = (uint64)(delay_ms * 1000); + bkinfo->standby_delay = delay; + bkinfo->standby_delay_by_time = true; + + /* Log delay if necessary */ + uint64 delay_threshold_by_time = pool_config->delay_threshold_by_time * 1000; /* threshold is in milliseconds, convert to microseconds */ + + if ((pool_config->log_standby_delay == LSD_ALWAYS && delay_ms > 0) || + (pool_config->log_standby_delay == LSD_OVER_THRESHOLD && + bkinfo->standby_delay > delay_threshold_by_time)) + { + ereport(LOG, + (errmsg("Replication of node: %d is behind %.3f second(s) from the primary server (node: %d) [external command]", + node_id, delay_ms / 1000, PRIMARY_NODE_ID))); + } + } + + node_id++; + token = strtok_r(NULL, " \t\n", &saveptr); + } + + } + PG_CATCH(); + { + /* Cleanup in case of error */ + alarm(0); /* Cancel any pending alarm */ + signal(SIGALRM, SIG_DFL); + if (fp) + { + pclose(fp); + fp = NULL; + } + if (line) + pfree(line); + if (command) + pfree(command); + error_context_stack = callback.previous; + PG_RE_THROW(); + } + PG_END_TRY(); + + /* Normal cleanup */ + if (line) + pfree(line); + if (command) + pfree(command); + + error_context_stack = callback.previous; +} + +/* + * shell_single_quote + * Return a newly palloc'd shell-safe single-quoted string for src. + * Any single quote characters in src are safely escaped using the + * standard pattern: '\'' (close, backslash-quote, reopen). + */ +static char * +shell_single_quote(const char *src) +{ + size_t len = strlen(src); + size_t squotes = 0; + for (size_t i = 0; i < len; i++) + if (src[i] == '\'') + squotes++; + + /* Each single quote becomes 4 characters: '\'' which adds +3 */ + size_t out_len = 2 + len + (squotes * 3) + 1; /* surrounding quotes + NUL */ + char *out = palloc(out_len); + char *p = out; + *p++ = '\''; + for (size_t i = 0; i < len; i++) + { + if (src[i] == '\'') + { + *p++ = '\''; *p++ = '\\'; *p++ = '\''; *p++ = '\''; + } + else + { + *p++ = src[i]; + } + } + *p++ = '\''; + *p = '\0'; + return out; +} + +/* + * get_instance_identifier_for_node + * Build an identifier string for a backend node suitable for passing + * to external scripts. Preference order: + * - backend_application_name if present + * - ":" + * - "node" + */ +static char * +get_instance_identifier_for_node(int node_id) +{ + BackendInfo *bi = pool_get_node_info(node_id); + /* Use application_name if available */ + if (bi && bi->backend_application_name[0] != '\0') + { + return pstrdup(bi->backend_application_name); + } + + /* Otherwise use hostname:port if hostname is set */ + if (bi && bi->backend_hostname[0] != '\0' && bi->backend_port > 0) + { + size_t hlen = strlen(bi->backend_hostname); + /* max port chars ~5, plus colon and NUL */ + size_t out_len = hlen + 1 + 5 + 1; + char *out = palloc(out_len); + snprintf(out, out_len, "%s:%d", bi->backend_hostname, bi->backend_port); + return out; + } + + /* Fallback to node */ + char *out = palloc(16); + snprintf(out, 16, "node%d", node_id); + return out; +} + static void CheckReplicationTimeLagErrorCb(void *arg) { - errcontext("while checking replication time lag"); + errcontext("while checking replication time lag"); } /* -- 2.51.0