From 7f164ce216857e0a0bfb135990f98c86ccea9c61 Mon Sep 17 00:00:00 2001 From: Satya Date: Tue, 24 Feb 2026 13:28:20 -0800 Subject: [PATCH] Add quorum support to synchronized_standby_slots Previously, synchronized_standby_slots required ALL listed physical slots to confirm WAL receipt before logical decoding could proceed. This created an availability problem when used with quorum-based synchronous replication (synchronous_standby_names = 'ANY N (...)'): a transaction could be safely committed via quorum, yet logical decoding consumers would stall waiting for a standby that wasn't needed for the durability guarantee. This patch extends synchronized_standby_slots to accept the same syntax as synchronous_standby_names, reusing the syncrep_yyparse/scanner grammar: slot1, slot2 -- ALL mode (backward-compatible) ANY N (slot1, slot2, ...) -- quorum: N-of-M slots suffice FIRST N (slot1, slot2, ...) -- priority: ALL slots required Key changes: - SyncStandbySlotsConfigData now mirrors SyncRepConfigData with num_sync, syncrep_method (SYNC_REP_PRIORITY / SYNC_REP_QUORUM), and nslotnames. - check_synchronized_standby_slots() reuses syncrep_yyparse() and syncrep_scanner_init/finish() for parsing. After parsing, it validates each member name as a legal replication slot name and rejects num_sync > nmembers. - StandbySlotsHaveCaughtup() in QUORUM mode continues past lagging slots to count caught-up ones, returning true once num_sync are satisfied. In PRIORITY mode (the default), it preserves the original break-on-first-lagging-slot behavior requiring all slots. - Documentation updated with formal synopsis, FIRST/ANY/plain-list explanations, examples, and cross-reference to synchronous_standby_names. - New TAP test (052_synchronized_standby_slots_quorum.pl) with 11 subtests covering ALL-mode blocking, ANY-mode quorum, FIRST N, backward compat, recovery scenarios, and GUC validation of invalid values. --- doc/src/sgml/config.sgml | 67 +++- src/backend/replication/slot.c | 240 +++++++++----- .../052_synchronized_standby_slots_quorum.pl | 303 ++++++++++++++++++ 3 files changed, 519 insertions(+), 91 deletions(-) create mode 100644 src/test/recovery/t/052_synchronized_standby_slots_quorum.pl diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index f670e2d4c31..9917d5ce70d 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4949,17 +4949,64 @@ ANY num_sync ( : + +[FIRST] num_sync ( slot_name [, ...] ) +ANY num_sync ( slot_name [, ...] ) +slot_name [, ...] + + where num_sync is + the number of physical replication slots that must confirm WAL + receipt before logical decoding proceeds, + and slot_name + is the name of a physical replication slot. + num_sync + must be an integer value greater than zero and must not exceed the + number of listed slots. + + + The keyword FIRST, coupled with + num_sync, specifies + priority-based semantics. Logical decoding will wait until + all listed physical slots have confirmed WAL + receipt regardless of + num_sync. + A plain comma-separated list without a keyword is equivalent to + FIRST 1 (...) and therefore also requires all + listed slots. + + + The keyword ANY, coupled with + num_sync, specifies + quorum-based semantics. Logical decoding proceeds once at least + num_sync of the listed + slots have caught up. + For example, a setting of ANY 1 (sb1_slot, sb2_slot) + will allow logical decoding to proceed as soon as either physical slot + has confirmed WAL receipt. This is useful in conjunction with + quorum-based synchronous replication + (synchronous_standby_names = 'ANY ...'), so that + logical decoding availability matches the commit durability guarantee. + + + FIRST and ANY are case-insensitive. + If these keywords are used as the name of a replication slot, + the slot_name must + be double-quoted. + + + This guarantees that logical replication failover slots do not consume changes until those changes are received - and flushed to corresponding physical standbys. If a + and flushed to the required physical standbys. If a logical replication connection is meant to switch to a physical standby after the standby is promoted, the physical replication slot for the standby should be listed here. Note that logical replication will not - proceed if the slots specified in the - synchronized_standby_slots do not exist or are invalidated. + proceed if the slots specified in + synchronized_standby_slots do not exist or are + invalidated. Additionally, the replication management functions pg_replication_slot_advance, @@ -4967,9 +5014,9 @@ ANY num_sync ( pg_logical_slot_peek_changes, - when used with logical failover slots, will block until all - physical slots specified in synchronized_standby_slots have - confirmed WAL receipt. + when used with logical failover slots, will block until the required + physical slots specified in synchronized_standby_slots + have confirmed WAL receipt. The standbys corresponding to the physical replication slots in diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 28c7019402b..5509e15586a 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -50,6 +50,7 @@ #include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/slot.h" +#include "replication/syncrep.h" #include "replication/walsender_private.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -89,11 +90,18 @@ typedef struct ReplicationSlotOnDisk * Note: this must be a flat representation that can be held in a single chunk * of guc_malloc'd memory, so that it can be stored as the "extra" data for the * synchronized_standby_slots GUC. + * + * The layout mirrors SyncRepConfigData so that the same quorum / priority + * semantics can be expressed. The syncrep_method field uses the + * SYNC_REP_PRIORITY and SYNC_REP_QUORUM constants from syncrep.h. */ typedef struct { - /* Number of slot names in the slot_names[] */ - int nslotnames; + int config_size; /* total size of this struct, in bytes */ + int num_sync; /* number of slots that must confirm WAL + * receipt before logical decoding proceeds */ + uint8 syncrep_method; /* SYNC_REP_PRIORITY or SYNC_REP_QUORUM */ + int nslotnames; /* number of slot names that follow */ /* * slot_names contains 'nslotnames' consecutive null-terminated C strings. @@ -2946,94 +2954,131 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause) } /* - * A helper function to validate slots specified in GUC synchronized_standby_slots. + * GUC check_hook for synchronized_standby_slots + * + * This reuses the syncrep_yyparse / syncrep_scanner infrastructure that is + * also used for synchronous_standby_names, so the same syntax is accepted: * - * The rawname will be parsed, and the result will be saved into *elemlist. + * slot1, slot2 -- wait for ALL listed slots (FIRST 1) + * ANY N (slot1, slot2, ...) -- wait for N-of-M (quorum) + * FIRST N (slot1, slot2, ...) -- wait for first N in priority order + * + * After parsing, we additionally validate every name as a legal replication + * slot name. */ -static bool -validate_sync_standby_slots(char *rawname, List **elemlist) +bool +check_synchronized_standby_slots(char **newval, void **extra, GucSource source) { - /* Verify syntax and parse string into a list of identifiers */ - if (!SplitIdentifierString(rawname, ',', elemlist)) - { - GUC_check_errdetail("List syntax is invalid."); - return false; - } + char *ptr; + int size; + SyncStandbySlotsConfigData *config; + + if ((*newval)[0] == '\0') + return true; - /* Iterate the list to validate each slot name */ - foreach_ptr(char, name, *elemlist) + /* parse with the syncrep grammar */ { - int err_code; - char *err_msg = NULL; - char *err_hint = NULL; + yyscan_t scanner; + int parse_rc; + SyncRepConfigData *syncrep_parse_result = NULL; + char *syncrep_parse_error_msg = NULL; + + syncrep_scanner_init(*newval, &scanner); + parse_rc = syncrep_yyparse(&syncrep_parse_result, + &syncrep_parse_error_msg, + scanner); + syncrep_scanner_finish(scanner); + + if (parse_rc != 0 || syncrep_parse_result == NULL) + { + GUC_check_errcode(ERRCODE_SYNTAX_ERROR); + if (syncrep_parse_error_msg) + GUC_check_errdetail("%s", syncrep_parse_error_msg); + else + GUC_check_errdetail("\"%s\" parser failed.", + "synchronized_standby_slots"); + return false; + } - if (!ReplicationSlotValidateNameInternal(name, false, &err_code, - &err_msg, &err_hint)) + if (syncrep_parse_result->num_sync <= 0) { - GUC_check_errcode(err_code); - GUC_check_errdetail("%s", err_msg); - if (err_hint != NULL) - GUC_check_errhint("%s", err_hint); + GUC_check_errmsg("number of synchronized standby slots (%d) must be greater than zero", + syncrep_parse_result->num_sync); return false; } - } - return true; -} + /* Reject num_sync > nmembers — it can never be satisfied */ + if (syncrep_parse_result->num_sync > syncrep_parse_result->nmembers) + { + GUC_check_errmsg("number of synchronized standby slots (%d) must not exceed the number of listed slots (%d)", + syncrep_parse_result->num_sync, + syncrep_parse_result->nmembers); + return false; + } -/* - * GUC check_hook for synchronized_standby_slots - */ -bool -check_synchronized_standby_slots(char **newval, void **extra, GucSource source) -{ - char *rawname; - char *ptr; - List *elemlist; - int size; - bool ok; - SyncStandbySlotsConfigData *config; + /* validate every member name as a slot name */ + { + const char *mname = syncrep_parse_result->member_names; - if ((*newval)[0] == '\0') - return true; + for (int i = 0; i < syncrep_parse_result->nmembers; i++) + { + int err_code; + char *err_msg = NULL; + char *err_hint = NULL; + + if (!ReplicationSlotValidateNameInternal(mname, false, &err_code, + &err_msg, &err_hint)) + { + GUC_check_errcode(err_code); + GUC_check_errdetail("%s", err_msg); + if (err_hint != NULL) + GUC_check_errhint("%s", err_hint); + return false; + } + + mname += strlen(mname) + 1; + } + } - /* Need a modifiable copy of the GUC string */ - rawname = pstrdup(*newval); + /* build SyncStandbySlotsConfigData from the parsed SyncRepConfigData */ + size = offsetof(SyncStandbySlotsConfigData, slot_names); + { + const char *mname = syncrep_parse_result->member_names; - /* Now verify if the specified slots exist and have correct type */ - ok = validate_sync_standby_slots(rawname, &elemlist); + for (int i = 0; i < syncrep_parse_result->nmembers; i++) + { + size += strlen(mname) + 1; + mname += strlen(mname) + 1; + } + } - if (!ok || elemlist == NIL) - { - pfree(rawname); - list_free(elemlist); - return ok; - } + /* GUC extra value must be guc_malloc'd, not palloc'd */ + config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size); + if (!config) + return false; - /* Compute the size required for the SyncStandbySlotsConfigData struct */ - size = offsetof(SyncStandbySlotsConfigData, slot_names); - foreach_ptr(char, slot_name, elemlist) - size += strlen(slot_name) + 1; + config->config_size = size; + config->num_sync = syncrep_parse_result->num_sync; + config->syncrep_method = syncrep_parse_result->syncrep_method; + config->nslotnames = syncrep_parse_result->nmembers; - /* GUC extra value must be guc_malloc'd, not palloc'd */ - config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size); - if (!config) - return false; + /* Copy member names into the flat slot_names array */ + { + const char *mname = syncrep_parse_result->member_names; - /* Transform the data into SyncStandbySlotsConfigData */ - config->nslotnames = list_length(elemlist); + ptr = config->slot_names; + for (int i = 0; i < syncrep_parse_result->nmembers; i++) + { + int len = strlen(mname) + 1; - ptr = config->slot_names; - foreach_ptr(char, slot_name, elemlist) - { - strcpy(ptr, slot_name); - ptr += strlen(slot_name) + 1; + memcpy(ptr, mname, len); + ptr += len; + mname += len; + } + } } *extra = config; - - pfree(rawname); - list_free(elemlist); return true; } @@ -3094,6 +3139,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) const char *name; int caught_up_slot_num = 0; XLogRecPtr min_restart_lsn = InvalidXLogRecPtr; + bool quorum_mode; /* * Don't need to wait for the standbys to catch up if there is no value in @@ -3121,6 +3167,15 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) * To prevent concurrent slot dropping and creation while filtering the * slots, take the ReplicationSlotControlLock outside of the loop. */ + /* + * Determine whether we are in quorum mode. In quorum mode we must visit + * every slot (skipping lagging ones) until quorum requirements met + * so we can count the total number that have caught up. + * In the default ALL mode the original + * break-on-first-lagging-slot behaviour is preserved. + */ + quorum_mode = (synchronized_standby_slots_config->syncrep_method == SYNC_REP_QUORUM); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); name = synchronized_standby_slots_config->slot_names; @@ -3129,13 +3184,15 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) XLogRecPtr restart_lsn; bool invalidated; bool inactive; + bool slot_ok = true; ReplicationSlot *slot; slot = SearchNamedReplicationSlot(name, false); /* * If a slot name provided in synchronized_standby_slots does not - * exist, report a message and exit the loop. + * exist, report a message. In ALL mode, break immediately; + * in quorum mode, skip and continue. */ if (!slot) { @@ -3147,10 +3204,13 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) name), errhint("Create the replication slot \"%s\" or amend parameter \"%s\".", name, "synchronized_standby_slots")); - break; + if (!quorum_mode) + break; + name += strlen(name) + 1; + continue; } - /* Same as above: if a slot is not physical, exit the loop. */ + /* Same as above: if a slot is not physical, skip/break. */ if (SlotIsLogical(slot)) { ereport(elevel, @@ -3161,7 +3221,10 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) name), errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".", name, "synchronized_standby_slots")); - break; + if (!quorum_mode) + break; + name += strlen(name) + 1; + continue; } SpinLockAcquire(&slot->mutex); @@ -3181,10 +3244,11 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) name), errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".", name, "synchronized_standby_slots")); - break; + slot_ok = false; } - if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn) + if (slot_ok && + (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)) { /* Log a message if no active_pid for this physical slot */ if (inactive) @@ -3196,9 +3260,16 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) name), errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".", name, "synchronized_standby_slots")); + slot_ok = false; + } - /* Continue if the current slot hasn't caught up. */ - break; + if (!slot_ok) + { + /* In ALL mode, stop on the first lagging slot */ + if (!quorum_mode) + break; + name += strlen(name) + 1; + continue; } Assert(restart_lsn >= wait_for_lsn); @@ -3215,11 +3286,18 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel) LWLockRelease(ReplicationSlotControlLock); /* - * Return false if not all the standbys have caught up to the specified - * WAL location. - */ - if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames) - return false; + /* In quorum mode, we only need num_sync of the listed slots to have caught up. */ + { + int required; + + if (synchronized_standby_slots_config->syncrep_method == SYNC_REP_QUORUM) + required = synchronized_standby_slots_config->num_sync; + else + required = synchronized_standby_slots_config->nslotnames; + + if (caught_up_slot_num < required) + return false; + } /* The ss_oldest_flush_lsn must not retreat. */ Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) || diff --git a/src/test/recovery/t/052_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/052_synchronized_standby_slots_quorum.pl new file mode 100644 index 00000000000..42b12f3b439 --- /dev/null +++ b/src/test/recovery/t/052_synchronized_standby_slots_quorum.pl @@ -0,0 +1,303 @@ + +# Copyright (c) 2024-2026, PostgreSQL Global Development Group + +# Test that synchronized_standby_slots supports quorum-based syntax +# (ANY N (slot1, slot2, ...)) so that logical decoding availability matches +# the commit durability guarantee of synchronous_standby_names = 'ANY ...'. +# +# Setup: a 3-node cluster with one primary, two physical standbys, and a +# logical decoding client using a failover-enabled slot. +# +# | ----> standby1 (primary_slot_name = sb1_slot) +# primary ------| +# | ----> standby2 (primary_slot_name = sb2_slot) +# +# synchronous_standby_names = 'ANY 1 (standby1, standby2)' +# +# We test two scenarios: +# +# A) synchronized_standby_slots = 'sb1_slot, sb2_slot' (ALL mode) +# With standby1 down, logical decoding BLOCKS despite the quorum commit +# having succeeded — this demonstrates the original limitation. +# +# B) synchronized_standby_slots = 'ANY 1 (sb1_slot, sb2_slot)' (quorum mode) +# With standby1 down, logical decoding proceeds because sb2_slot alone +# satisfies the quorum — this is the fix. + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# --------------------------------------------------------------------------- +# 1. Create a primary with logical replication level, autovacuum off +# --------------------------------------------------------------------------- +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); +$primary->append_conf( + 'postgresql.conf', qq{ +autovacuum = off +}); +$primary->start; + +# Physical replication slots for the two standbys +$primary->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('sb1_slot');"); +$primary->safe_psql('postgres', + "SELECT pg_create_physical_replication_slot('sb2_slot');"); + +# --------------------------------------------------------------------------- +# 2. Create standby1 and standby2 from a fresh backup +# --------------------------------------------------------------------------- +my $backup_name = 'base_backup'; +$primary->backup($backup_name); + +my $connstr = $primary->connstr; + +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr dbname=postgres' +)); + +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb2_slot' +primary_conninfo = '$connstr dbname=postgres' +)); + +$standby1->start; +$standby2->start; + +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +# --------------------------------------------------------------------------- +# 3. Create a logical failover slot on the primary +# --------------------------------------------------------------------------- +$primary->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('logical_failover', 'test_decoding', false, false, true);" +); + +# --------------------------------------------------------------------------- +# 4. Configure quorum sync rep with ALL-mode synchronized_standby_slots +# --------------------------------------------------------------------------- +$primary->append_conf( + 'postgresql.conf', qq{ +synchronous_standby_names = 'ANY 1 (standby1, standby2)' +synchronized_standby_slots = 'sb1_slot, sb2_slot' +}); +$primary->reload; + +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +# --------------------------------------------------------------------------- +# 5. Confirm that quorum sync rep is active for both standbys +# --------------------------------------------------------------------------- +is( $primary->safe_psql( + 'postgres', + q{SELECT count(*) FROM pg_stat_replication WHERE sync_state = 'quorum';} + ), + '2', + 'both standbys are in quorum sync state'); + +################################################## +# PART A: ALL-mode blocks even when quorum is met +################################################## + +$standby1->stop; + +# Commit succeeds since standby2 satisfies the quorum. +my $emit_lsn = $primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'all_mode_blocks');" +); +like($emit_lsn, qr/^[0-9A-F]+\/[0-9A-F]+$/, + 'synchronous commit succeeds with quorum (standby2 alive)'); + +$primary->wait_for_replay_catchup($standby2); + +my $log_offset = -s $primary->logfile; + +my $bg = $primary->background_psql( + 'postgres', + on_error_stop => 0, + timeout => $PostgreSQL::Test::Utils::timeout_default); + +$bg->query_until( + qr/decode_start/, q( + \echo decode_start + SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL); +)); + +# Wait for the primary to log a warning about sb1_slot not being active. +$primary->wait_for_log( + qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/, + $log_offset); + +pass('ALL mode: logical decoding blocked by sb1_slot even though quorum commit succeeded'); + +# Unblock by clearing synchronized_standby_slots. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; +$bg->quit; + +# Consume the change so the slot is clean for the next test. +$primary->safe_psql('postgres', + q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);}); + +################################################## +# PART B: ANY (quorum) mode — logical decoding proceeds +################################################## + +# Switch synchronized_standby_slots to quorum mode: need only 1 of 2 slots. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'ANY 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +# standby1 is still down; standby2 is up. + +# Emit another transactional message — commits via quorum. +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'quorum_mode_works');" +); +$primary->wait_for_replay_catchup($standby2); + +# In quorum mode, logical decoding should NOT block because sb2_slot has +# caught up and 1-of-2 is sufficient. +my $decoded = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%quorum_mode_works%';}); +is($decoded, '1', + 'ANY mode: logical decoding proceeds with only sb2_slot caught up'); + +################################################## +# PART C: Verify backward-compat — plain list still requires ALL +################################################## + +# Bring standby1 back. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Switch to plain list (ALL mode) with both slots. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'sb1_slot, sb2_slot'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'both_caught_up');" +); +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_bc = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%both_caught_up%';}); +is($decoded_bc, '1', + 'backward-compat: plain list works when all standbys are up'); + +################################################## +# PART D: ANY mode — bringing standby1 back also works +################################################## + +# Stop standby1 again, switch to ANY 1. +$standby1->stop; + +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'ANY 1 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'standby1_recovery');" +); + +$primary->wait_for_replay_catchup($standby2); + +# Decoding proceeds via quorum. +my $decoded_d1 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%standby1_recovery%';}); +is($decoded_d1, '1', + 'ANY mode: decoding works while standby1 is down'); + +# Bring standby1 back and verify decoding still works. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'after_recovery');" +); +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_d2 = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%after_recovery%';}); +is($decoded_d2, '1', + 'ANY mode: decoding works after standby1 recovers'); + +################################################## +# PART E: Verify FIRST N syntax (treated as ALL mode) +################################################## + +# FIRST 2 (sb1_slot, sb2_slot) — both standbys up, should work like ALL. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', + "'FIRST 2 (sb1_slot, sb2_slot)'"); +$primary->reload; + +$primary->safe_psql('postgres', + "SELECT pg_logical_emit_message(true, 'qtest', 'first_n_test');" +); +$primary->wait_for_replay_catchup($standby1); +$primary->wait_for_replay_catchup($standby2); + +my $decoded_e = $primary->safe_psql('postgres', + q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL) + WHERE data LIKE '%first_n_test%';}); +is($decoded_e, '1', + 'FIRST N mode: decoding works when all standbys are up'); + +################################################## +# PART F: Verify GUC validation rejects bad values +################################################## + +my ($result, $stdout, $stderr); + +# N exceeds number of listed slots +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 3 (sb1_slot, sb2_slot)';"); +like($stderr, qr/ERROR/, + 'GUC rejects ANY N when N > number of listed slots'); + +# Missing closing parenthesis +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (sb1_slot, sb2_slot';"); +like($stderr, qr/ERROR/, + 'GUC rejects malformed ANY syntax'); + +# Invalid slot name +($result, $stdout, $stderr) = $primary->psql('postgres', + "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (INVALID_UPPER)';"); +like($stderr, qr/ERROR/, + 'GUC rejects invalid slot name in ANY syntax'); + +# --------------------------------------------------------------------------- +# Cleanup +# --------------------------------------------------------------------------- +$primary->safe_psql('postgres', + "SELECT pg_drop_replication_slot('logical_failover');"); + +done_testing(); -- 2.34.1