public inbox for [email protected]
help / color / mirror / Atom feedFrom: Ashutosh Sharma <[email protected]>
To: shveta malik <[email protected]>
Cc: Zhijie Hou (Fujitsu) <[email protected]>
Cc: Amit Kapila <[email protected]>
Cc: Ajin Cherian <[email protected]>
Cc: SATYANARAYANA NARLAPURAM <[email protected]>
Cc: PostgreSQL-development <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Subject: Re: synchronized_standby_slots behavior inconsistent with quorum-based synchronous replication
Date: Mon, 8 Jun 2026 14:23:28 +0530
Message-ID: <CAE9k0PnQ5XNkDwt3uLmHuZOuiZmHn-bRPWn1p7nHQnurE8QwxQ@mail.gmail.com> (raw)
In-Reply-To: <CAE9k0PmOURa4Yi3sYFb6PG7PuJGi9zBd9nAcJ3A5-_N_qn8xew@mail.gmail.com>
References: <CAJpy0uBT8JbEGE0xvm-Wxh1g_VVgC=whKqChZo-uB+VOa_YCTw@mail.gmail.com>
<CAE9k0Pkk6q72X3Rc3MUo7PxU46UcCzLfMhM02PGDUmAue9cDGg@mail.gmail.com>
<TY4PR01MB17718104B91F2945BE727467694102@TY4PR01MB17718.jpnprd01.prod.outlook.com>
<CAE9k0P=dgCEaKE6+vSCQp8TgrYOi_RqQkDTScdWzFSECsPQn9w@mail.gmail.com>
<CAJpy0uAdBxGpc4wtj-LcTGMNkVCYu4eMbDr27snEO_SrN2cV4A@mail.gmail.com>
<CAE9k0PmOURa4Yi3sYFb6PG7PuJGi9zBd9nAcJ3A5-_N_qn8xew@mail.gmail.com>
Hi,
On Mon, Jun 8, 2026 at 11:38 AM Ashutosh Sharma <[email protected]> wrote:
>
> Hi,
>
> On Fri, Jun 5, 2026 at 3:32 PM shveta malik <[email protected]> wrote:
> >
> > Thanks for tha pathces. I have attached a patch (txt file) with a few
> > trivial changes, take it if you find the changes acceptable.
> >
>
> Thanks, I will review it and share my feedback.
>
PFA the patches that incorporate the changes from the top-up patch
shared by Shveta in [1]. These changes primarily consist of
documentation updates, comment improvements, and indentation fixes at
a few places.
[1] - https://www.postgresql.org/message-id/CAJpy0uAdBxGpc4wtj-LcTGMNkVCYu4eMbDr27snEO_SrN2cV4A%40mail.gma...
--
With Regards,
Ashutosh Sharma.
Attachments:
[application/octet-stream] 0003-Add-FIRST-N-and-N-.-priority-syntax-to-synchronized_.patch (23.3K, 2-0003-Add-FIRST-N-and-N-.-priority-syntax-to-synchronized_.patch)
download | inline diff:
From 93ea69cfd769d553f156d5ab6596c1e6c46454f6 Mon Sep 17 00:00:00 2001
From: Ashutosh Sharma <[email protected]>
Date: Thu, 4 Jun 2026 07:16:02 +0000
Subject: [PATCH 3/3] Add FIRST N and N (...) priority syntax to
synchronized_standby_slots
Extend synchronized_standby_slots to support explicit priority
forms aligned with synchronous_standby_names.
- FIRST N (slot1, slot2, ...)
- N (slot1, slot2, ...) as shorthand for FIRST N
Implementation details:
- Use the SYNC_REP_DEFAULT parser distinction from the earlier
refactor so plain-list syntax remains separate from priority
syntax.
- Extend StandbySlotsHaveCaughtup() priority handling.
- Select slots in list order.
- Skip missing, logical, invalidated, and inactive lagging slots.
- Wait for active lagging higher-priority slots.
- Clarify duplicate handling for priority syntax in the
synchronized_standby_slots documentation.
- Simplify caught-up comments and clarify standby confirmation wait
comments to match the final control flow.
Tests and docs:
- Add coverage for FIRST behavior and shorthand N (...) behavior.
- Add plain-list disambiguation with first-prefixed slot names.
- Add FIRST duplicate-entry recovery coverage to show duplicates do
not create extra priority positions.
- Update docs for FIRST and shorthand priority syntax semantics.
- Clarify that duplicate slot names are ignored in priority-based
forms and preserve first-occurrence order.
Author: Satya Narlapuram <[email protected]>
Author: Ashutosh Sharma <[email protected]>
Reviewed-by: Shveta Malik <[email protected]>
Reviewed-by: Ajin Cherian <[email protected]>
Reviewed-by: Hou, Zhijie <[email protected]>
Reviewed-by: Dilip Kumar <[email protected]>
Reviewed-by: Surya Poondla <[email protected]>
Reviewed-by: Japin Li <[email protected]>
---
doc/src/sgml/config.sgml | 45 ++-
src/backend/replication/slot.c | 47 ++--
.../053_synchronized_standby_slots_quorum.pl | 262 ++++++++++++++++--
3 files changed, 306 insertions(+), 48 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 1f176bd48f4..473f2641b90 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5194,6 +5194,7 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
sender processes must wait on before delivering decoded changes. This
parameter uses the following syntax:
<synopsis>
+ [FIRST] <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">slot_name</replaceable> [, ...] )
ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">slot_name</replaceable> [, ...] )
<replaceable class="parameter">slot_name</replaceable> [, ...]
</synopsis>
@@ -5205,9 +5206,24 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
<replaceable class="parameter">num_sync</replaceable>
must be an integer value greater than zero and must not exceed the
number of listed slots.
- Other forms supported by
- <xref linkend="guc-synchronous-standby-names"/>, such as priority
- syntax, are not supported.
+ </para>
+ <para>
+ The keyword <literal>FIRST</literal>, coupled with
+ <replaceable class="parameter">num_sync</replaceable>, specifies
+ priority-based semantics. Logical decoding will wait for the first
+ <replaceable class="parameter">num_sync</replaceable> available
+ physical slots in priority order (the order they appear in the list).
+ Missing, logical, or invalidated slots are skipped. Inactive slots are
+ skipped only while they are lagging. However, if a slot exists and is
+ valid and active but has not yet caught up, the system will wait for it
+ rather than skipping to lower-priority slots. If, after skipping
+ unusable slots, fewer than
+ <replaceable class="parameter">num_sync</replaceable> usable slots
+ remain, logical decoding waits until enough slots become usable and
+ caught up, or until the configuration is changed. The keyword
+ <literal>FIRST</literal> is optional in this form, so
+ <literal>2 (slot1, slot2, slot3)</literal> and
+ <literal>FIRST 2 (slot1, slot2, slot3)</literal> are equivalent.
</para>
<para>
A plain comma-separated list without a keyword specifies that
@@ -5244,19 +5260,26 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
duplicate entries are ignored and only the first occurrence is used.
The semantics of <varname>synchronized_standby_slots</varname> are
therefore based on the unique set of listed slot names, preserving the
- original order of first occurrence. This means that
- <literal>ANY 2 (slot1, slot1, slot2, slot3)</literal> is treated the
- same as <literal>ANY 2 (slot1, slot2, slot3)</literal>, and a plain
- list such as <literal>(slot1, slot1, slot2)</literal> is treated the
- same as <literal>(slot1, slot2)</literal>. In particular,
+ original order of first occurrence. This means that, in
+ priority-based forms, duplicates do not create additional priority
+ positions: for example,
+ <literal>FIRST 2 (slot1, slot1, slot2, slot3)</literal> is treated the
+ same as <literal>FIRST 2 (slot1, slot2, slot3)</literal>.
+ Likewise, <literal>ANY 2 (slot1, slot1, slot2, slot3)</literal> is
+ treated the same as <literal>ANY 2 (slot1, slot2, slot3)</literal>,
+ and a plain list such as <literal>(slot1, slot1, slot2)</literal>
+ is treated the same as <literal>(slot1, slot2)</literal>. In particular,
<replaceable class="parameter">num_sync</replaceable> must not exceed
the number of unique listed slots. Such a configuration results in an
error to prevent indefinite waits in WAL sender processes due to a
misconfigured <varname>synchronized_standby_slots</varname> setting.
</para>
- <para>
- <literal>ANY</literal> is case-insensitive.
- </para>
+ <para>
+ <literal>FIRST</literal> and <literal>ANY</literal> are case-insensitive.
+ If these keywords are used as the name of a replication slot,
+ the <replaceable class="parameter">slot_name</replaceable> must
+ be double-quoted.
+ </para>
<para>
The use of <varname>synchronized_standby_slots</varname> guarantees
that logical replication
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 93797fcdde4..2c9d30de280 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -3068,6 +3068,8 @@ CompactSyncRepConfigMemberNames(SyncRepConfigData *config)
*
* slot1, slot2 -- wait for ALL listed slots
* ANY N (slot1, slot2, ...) -- wait for any N-of-M (quorum)
+ * FIRST N (slot1, slot2, ...) -- wait for first N in priority order
+ * N (slot1, slot2, ...) -- shorthand for FIRST N
*
* Note: Simple list syntax is interpreted as "wait for ALL" for this GUC,
* unlike synchronous_standby_names where it means "FIRST 1".
@@ -3108,14 +3110,6 @@ check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
return false;
}
- if (syncrep_parse_result->syncrep_method == SYNC_REP_PRIORITY)
- {
- GUC_check_errcode(ERRCODE_INVALID_PARAMETER_VALUE);
- GUC_check_errmsg("priority syntax is not supported for parameter \"%s\"",
- "synchronized_standby_slots");
- return false;
- }
-
if (syncrep_parse_result->num_sync <= 0)
{
GUC_check_errmsg("number of synchronized standby slots (%d) must be greater than zero",
@@ -3333,6 +3327,12 @@ ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo * slot_states,
* Simple list (e.g., "slot1, slot2"):
* ALL slots must have caught up. Returns false otherwise.
*
+ * FIRST N (e.g., "FIRST 2 (slot1, slot2, slot3)"):
+ * Wait for the first N eligible slots in priority order. Skips missing,
+ * invalid, logical, and inactive-lagging slots to find N eligible slots.
+ * If an active slot is lagging, waits for it (does not skip to lower
+ * priority slots).
+ *
* ANY N (e.g., "ANY 2 (slot1, slot2, slot3)"):
* Wait for any N eligible slots. Skips missing, invalid, logical, and
* lagging slots (inactive or active) to find N slots that have caught up.
@@ -3383,14 +3383,18 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
* first slot that is missing/invalid/logical, or the first slot that is
* lagging (inactive or active).
*
- * wait_for_all = false means we select N from M candidates (ANY N syntax).
- * In this mode, slots already caught up are counted even if inactive, and
- * lagging slots are skipped until enough slots have caught up.
- * Duplicate configured slot names do not appear here because the check hook
- * compacts them out of the parsed configuration.
+ * wait_for_all = false means we select N from M candidates (FIRST N or
+ * ANY N syntax). In this mode, slots already caught up are counted even if
+ * inactive. In FIRST N mode, we skip missing/invalid/logical slots and
+ * lagging inactive slots, but wait for an active lagging slot with higher
+ * priority. In ANY N mode, we skip lagging slots (inactive or active) to
+ * find any N that have caught up. Duplicate configured slot names do not
+ * appear here because the check hook compacts them out of the parsed
+ * configuration.
*/
required = synchronized_standby_slots_config->num_sync;
- wait_for_all = (required == synchronized_standby_slots_config->nslotnames);
+ wait_for_all =
+ (synchronized_standby_slots_config->syncrep_method == SYNC_REP_DEFAULT);
/*
* Allocate array to track slot states. Size it to the total number of
@@ -3470,8 +3474,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
* If a slot is inactive and lagging, report it as inactive. If it
* is active and lagging, report it as lagging.
*
- * In ALL mode: must wait for it. In ANY N (quorum) mode: skip and
- * use another slot.
+ * In ALL mode: must wait for it. In FIRST N (priority) mode:
+ * lagging active slots block, while inactive slots can be
+ * skipped. In ANY N (quorum) mode: skip and use another slot.
*/
slot_states[num_slot_states].slot_name = name;
slot_states[num_slot_states].state =
@@ -3479,7 +3484,9 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
slot_states[num_slot_states].restart_lsn = restart_lsn;
num_slot_states++;
- if (wait_for_all)
+ if (wait_for_all ||
+ (!inactive &&
+ synchronized_standby_slots_config->syncrep_method == SYNC_REP_PRIORITY))
break;
goto next_slot;
}
@@ -3492,7 +3499,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
caught_up_slot_num++;
- /* Stop processing if the required number of slots have caught up. */
+ /* Stop once the required number of slots have caught up. */
if (caught_up_slot_num >= required)
break;
@@ -3507,8 +3514,8 @@ next_slot:
* problem states and return false.
*
* We only emit messages when the requirement is not met to avoid
- * misleading messages in quorum mode where other slots may have satisfied
- * the condition despite some slots having issues.
+ * misleading messages in quorum/priority mode where other slots may have
+ * satisfied the condition despite some slots having issues.
*/
if (caught_up_slot_num < required)
{
diff --git a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
index d387e0c7e7e..a4153a44b37 100644
--- a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
+++ b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
@@ -4,6 +4,7 @@
# Test synchronized_standby_slots with different syntax modes:
# - Plain list (ALL mode): slot1, slot2
# - ANY N (quorum mode): ANY N (slot1, slot2, ...)
+# - FIRST N (priority mode): FIRST N (slot1, slot2, ...)
#
# Setup: a 3-node cluster with one primary, two physical standbys, and a
# logical decoding client using a failover-enabled slot.
@@ -200,16 +201,168 @@ is($decoded_bc, '1',
'plain list: works when all standbys are up');
##################################################
-# PART D: ANY 2 waits on an active lagging slot
+# PART D: Verify FIRST N priority semantics
##################################################
-# Stop standby1 so sb1_slot can be controlled by a raw replication connection
-# that keeps the slot active while lagging.
+# FIRST N should:
+# 1. Select first N slots in priority order (list order)
+# 2. Skip missing/invalid/logical slots and inactive lagging slots to find
+# N caught-up slots
+# 3. Wait for active lagging slots (not skip to lower priority)
+
+# Test FIRST 2 (sb1_slot, sb2_slot) with both up; should wait for both.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'FIRST 2 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'first_2_both_up');"
+);
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+my $decoded_e2 = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%first_2_both_up%';});
+is($decoded_e2, '1',
+ 'FIRST 2: decoding works when all required slots are up');
+
+# Test FIRST 1 (sb1_slot, sb2_slot) with sb1_slot unavailable.
$standby1->stop;
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'FIRST 1 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'first_1_skip_unavailable');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+# FIRST 1 should skip sb1_slot (unavailable) and use sb2_slot.
+my $decoded_e1 = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%first_1_skip_unavailable%';});
+is($decoded_e1, '1',
+ 'FIRST 1: skips unavailable first slot, uses second slot');
+
+# Test shorthand priority syntax: N (...) means FIRST N (...).
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'1 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'num_1_shorthand_priority');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+my $decoded_num1 = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%num_1_shorthand_priority%';});
+is($decoded_num1, '1',
+ '1 (...): shorthand priority syntax behaves like FIRST 1');
+
+##################################################
+# PART E: FIRST 1 and ANY 2 wait on an active lagging slot
+##################################################
+
+# Bring standby1 back so sb1_slot is active and caught up.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+# To test the active-but-lagging slot path deterministically, we open a raw
+# replication connection to sb1_slot starting from a deliberately old LSN.
+# psql in replication mode never sends Standby Status Update messages, so
+# the walsender keeps sb1_slot's active_pid set but restart_lsn never
+# advances.
+
+# Stop standby1 so its walsender releases sb1_slot, allowing our replication
+# connection below to acquire it.
+$standby1->stop;
+
+# Capture a safely old LSN to stream from, before the test WAL record.
my $old_lsn = $primary->safe_psql('postgres',
"SELECT pg_current_wal_lsn();");
+# FIRST 1 must wait for the highest-priority slot when it is active but lagging.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'FIRST 1 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+my $first_lag_lsn = $primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'first_1_lagging_blocks');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+# Open a raw replication connection to sb1_slot starting from $old_lsn.
+# This activates the slot (active_pid IS NOT NULL) while keeping restart_lsn
+# frozen below $first_lag_lsn for the lifetime of the connection.
+my $repl_first = $primary->background_psql(
+ 'postgres',
+ replication => 'database',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$repl_first->query_until(
+ qr/^$/,
+ "START_REPLICATION SLOT sb1_slot PHYSICAL $old_lsn;\n");
+
+# Wait until sb1_slot shows active_pid, confirming the walsender is live.
+$primary->poll_query_until('postgres', q{
+ SELECT active_pid IS NOT NULL
+ FROM pg_replication_slots
+ WHERE slot_name = 'sb1_slot'
+}) or die "replication connection did not activate sb1_slot";
+
+# sb1_slot is now active and its restart_lsn is behind $first_lag_lsn.
+# Start logical decoding in the background; it must block.
+my $bg_first = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg_first->query_until(
+ qr/decode_start/, q(
+ \echo decode_start
+ SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+ok( $primary->poll_query_until(
+ 'postgres', q{
+SELECT EXISTS (
+ SELECT 1
+ FROM pg_stat_activity
+ WHERE wait_event = 'WaitForStandbyConfirmation'
+ AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%'
+);
+}),
+ 'FIRST 1: decoding waits for active lagging higher-priority slot');
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg_first->quit;
+$repl_first->quit;
+
+# Ensure the previous replication connection has fully released sb1_slot
+# before reusing it in the next subtest.
+$primary->poll_query_until('postgres', q{
+ SELECT active_pid IS NULL
+ FROM pg_replication_slots
+ WHERE slot_name = 'sb1_slot'
+}) or die "replication connection did not release sb1_slot";
+
+# Consume the change so the slot is clean for the next test.
+$primary->safe_psql('postgres',
+ q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+# ANY 2 must also wait when only one of two required slots has caught up.
+# Reuse the same technique: open a raw replication connection to sb1_slot
+# from $old_lsn so it is active but its restart_lsn stays behind the target.
+
+# Capture another old LSN baseline before the next test WAL record.
+$old_lsn = $primary->safe_psql('postgres',
+ "SELECT pg_current_wal_lsn();");
+
$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
"'ANY 2 (sb1_slot, sb2_slot)'");
$primary->reload;
@@ -272,7 +425,54 @@ $primary->wait_for_replay_catchup($standby1);
##################################################
-# PART E: Duplicate entries are ignored for quorum counting
+# PART F: Plain list with first-prefixed slot name still means ALL mode
+##################################################
+
+# Create a slot name starting with "first_" for parser disambiguation checks.
+$primary->safe_psql('postgres',
+ "SELECT pg_create_physical_replication_slot('first_slot');");
+
+# If simple-list syntax starts with a slot name like "first_slot", it must
+# still be treated as ALL mode (not as explicit FIRST N syntax).
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'first_slot, sb2_slot'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'first_prefix_all_mode_blocks');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+$log_offset = -s $primary->logfile;
+
+$bg = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg->query_until(
+ qr/decode_start/, q(
+ \echo decode_start
+ SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+# Plain list must require all listed slots; first_slot is intentionally inactive.
+$primary->wait_for_log(
+ qr/replication slot \"first_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/,
+ $log_offset);
+
+pass('plain list with first-prefixed slot name blocks in ALL mode');
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg->quit;
+
+# Consume the change for the next test.
+$primary->safe_psql('postgres',
+ q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+##################################################
+# PART G: Duplicate entries are ignored for quorum counting
##################################################
# Stop standby2 so only sb1_slot can catch up.
@@ -313,6 +513,46 @@ $primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
$primary->reload;
$bg_dup->quit;
+# Consume the change for the next test.
+$primary->safe_psql('postgres',
+ q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+# FIRST duplicates must also not create extra priority positions.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'FIRST 2 (sb1_slot, sb1_slot, sb2_slot)'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'first_duplicate_entries_ignored');"
+);
+$primary->wait_for_replay_catchup($standby1);
+
+my $bg_first_dup = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg_first_dup->query_until(
+ qr/decode_start/, q(
+ \echo decode_start
+ SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+ok( $primary->poll_query_until(
+ 'postgres', q{
+SELECT EXISTS (
+ SELECT 1
+ FROM pg_stat_activity
+ WHERE wait_event = 'WaitForStandbyConfirmation'
+ AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%'
+);
+}),
+ 'FIRST duplicates are ignored when counting priority slots');
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg_first_dup->quit;
+
# Consume the change for the next test.
$primary->safe_psql('postgres',
q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
@@ -323,7 +563,7 @@ $primary->wait_for_replay_catchup($standby2);
##################################################
-# PART F: Verify GUC validation rejects bad values
+# PART H: Verify GUC validation rejects bad values
##################################################
my ($result, $stdout, $stderr);
@@ -340,18 +580,6 @@ like($stderr, qr/ERROR/,
like($stderr, qr/ERROR/,
'GUC rejects malformed ANY syntax');
-# Priority syntax is not supported by synchronized_standby_slots yet
-($result, $stdout, $stderr) = $primary->psql('postgres',
- "ALTER SYSTEM SET synchronized_standby_slots = 'FIRST 1 (sb1_slot, sb2_slot)';");
-like($stderr, qr/priority syntax is not supported/,
- 'GUC rejects FIRST syntax');
-
-# Legacy priority syntax is not supported by synchronized_standby_slots yet
-($result, $stdout, $stderr) = $primary->psql('postgres',
- "ALTER SYSTEM SET synchronized_standby_slots = '1 (sb1_slot, sb2_slot)';");
-like($stderr, qr/priority syntax is not supported/,
- 'GUC rejects legacy priority syntax');
-
# Invalid slot name
($result, $stdout, $stderr) = $primary->psql('postgres',
"ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (INVALID_UPPER)';");
--
2.43.0
[application/octet-stream] 0001-Refactor-syncrep-parsing-to-represent-bare-standby-l.patch (3.1K, 3-0001-Refactor-syncrep-parsing-to-represent-bare-standby-l.patch)
download | inline diff:
From 789018468cf3a503436450922a75e6d4085f727f Mon Sep 17 00:00:00 2001
From: Ashutosh Sharma <[email protected]>
Date: Wed, 13 May 2026 06:59:58 +0000
Subject: [PATCH 1/3] Refactor syncrep parsing to represent bare standby lists
explicitly
The syncrep parser currently reduces a simple list form to FIRST 1
(SYNC_REP_PRIORITY). That is acceptable for synchronous_standby_names,
but it loses information about whether FIRST was explicitly written.
Introduce SYNC_REP_DEFAULT to represent the bare list form parsed
from standby_list. This allows callers to distinguish:
- explicit priority syntax (FIRST N (...) or N (...))
- quorum syntax (ANY N (...))
- simple list syntax without FIRST/ANY
With this change:
- syncrep grammar emits SYNC_REP_DEFAULT for bare standby lists
- check_synchronous_standby_names() maps SYNC_REP_DEFAULT to
SYNC_REP_PRIORITY, preserving existing synchronous_standby_names
behavior
This is a preparatory patch for future synchronized_standby_slots
changes, where callers can directly interpret SYNC_REP_DEFAULT as
plain-list semantics, while keeping existing synchronous_standby_names
semantics intact.
Per suggestion from Zhijie Hou <[email protected]>
---
src/backend/replication/syncrep.c | 4 ++++
src/backend/replication/syncrep_gram.y | 2 +-
src/include/replication/syncrep.h | 1 +
3 files changed, 6 insertions(+), 1 deletion(-)
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index e0e30579c59..ae8ecfa0711 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -1100,6 +1100,10 @@ check_synchronous_standby_names(char **newval, void **extra, GucSource source)
return false;
}
+ /* Default to FIRST 1 (name ...) priority method if not specified */
+ if (syncrep_parse_result->syncrep_method == SYNC_REP_DEFAULT)
+ syncrep_parse_result->syncrep_method = SYNC_REP_PRIORITY;
+
/* GUC extra value must be guc_malloc'd, not palloc'd */
pconf = (SyncRepConfigData *)
guc_malloc(LOG, syncrep_parse_result->config_size);
diff --git a/src/backend/replication/syncrep_gram.y b/src/backend/replication/syncrep_gram.y
index 1b9d7b2edc4..f1550e109ef 100644
--- a/src/backend/replication/syncrep_gram.y
+++ b/src/backend/replication/syncrep_gram.y
@@ -65,7 +65,7 @@ result:
;
standby_config:
- standby_list { $$ = create_syncrep_config("1", $1, SYNC_REP_PRIORITY); }
+ standby_list { $$ = create_syncrep_config("1", $1, SYNC_REP_DEFAULT); }
| NUM '(' standby_list ')' { $$ = create_syncrep_config($1, $3, SYNC_REP_PRIORITY); }
| ANY NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_QUORUM); }
| FIRST NUM '(' standby_list ')' { $$ = create_syncrep_config($2, $4, SYNC_REP_PRIORITY); }
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index b42b5862a70..130c7f6f242 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -34,6 +34,7 @@
/* syncrep_method of SyncRepConfigData */
#define SYNC_REP_PRIORITY 0
#define SYNC_REP_QUORUM 1
+#define SYNC_REP_DEFAULT 2
/*
* SyncRepGetCandidateStandbys returns an array of these structs,
--
2.43.0
[application/octet-stream] 0002-Add-ANY-N-semantics-to-synchronized_standby_slots.patch (43.9K, 4-0002-Add-ANY-N-semantics-to-synchronized_standby_slots.patch)
download | inline diff:
From e1a4cab301522a2347a3677e12cd7bc970c70335 Mon Sep 17 00:00:00 2001
From: Ashutosh Sharma <[email protected]>
Date: Mon, 8 Jun 2026 08:26:40 +0000
Subject: [PATCH 2/3] Add ANY N semantics to synchronized_standby_slots
Extend synchronized_standby_slots with quorum syntax for logical
failover slot synchronization:
- ANY N (slot1, slot2, ...)
Plain-list semantics are preserved as-is:
- slot1, slot2 continues to mean all listed slots are required
Implementation details:
- Reuse syncrep parser infrastructure in the GUC check hook and
map parsed output into synchronized_standby_slots semantics.
- Consume SYNC_REP_DEFAULT from the preparatory parser refactor to
distinguish plain-list syntax from explicit parser modes.
- In StandbySlotsHaveCaughtup(), enforce mode-specific behavior for:
- existing all-listed-slots semantics (plain list)
- quorum N-of-M behavior (ANY N)
- Validation rejects configurations where N exceeds the number of
listed slots.
- Ignore duplicate synchronized_standby_slots entries, preserving the
first occurrence and applying semantics to the resulting unique list.
- Clarify synchronized_standby_slots comments and lagging restart_lsn
reporting to match the implemented behavior.
Tests and docs:
- Add recovery coverage for plain-list behavior and ANY quorum
behavior, including lagging-slot and validation-error scenarios.
- Add duplicate-entry recovery coverage for synchronized_standby_slots.
- Document ANY syntax and clarify plain-list behavior for this GUC.
- Document that duplicate slot names are ignored and counted only once.
Author: Satya Narlapuram <[email protected]>
Author: Ashutosh Sharma <[email protected]>
Reviewed-by: Shveta Malik <[email protected]>
Reviewed-by: Ajin Cherian <[email protected]>
Reviewed-by: Hou, Zhijie <[email protected]>
Reviewed-by: Dilip Kumar <[email protected]>
Reviewed-by: Surya Poondla <[email protected]>
Reviewed-by: Japin Li <[email protected]>
---
doc/src/sgml/config.sgml | 87 ++-
src/backend/replication/slot.c | 515 ++++++++++++++----
.../053_synchronized_standby_slots_quorum.pl | 367 +++++++++++++
3 files changed, 843 insertions(+), 126 deletions(-)
create mode 100644 src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index fa566c9e553..1f176bd48f4 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5190,17 +5190,84 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</term>
<listitem>
<para>
- A comma-separated list of streaming replication standby server slot names
- that logical WAL sender processes will wait for. Logical WAL sender processes
- will send decoded changes to plugins only after the specified replication
- slots confirm receiving WAL. This guarantees that logical replication
+ Specifies the streaming replication standby slots that logical WAL
+ sender processes must wait on before delivering decoded changes. This
+ parameter uses the following syntax:
+<synopsis>
+ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">slot_name</replaceable> [, ...] )
+<replaceable class="parameter">slot_name</replaceable> [, ...]
+</synopsis>
+ where <replaceable class="parameter">num_sync</replaceable> is
+ the number of physical replication slots that must confirm WAL
+ receipt before logical decoding proceeds,
+ and <replaceable class="parameter">slot_name</replaceable>
+ is the name of a physical replication slot.
+ <replaceable class="parameter">num_sync</replaceable>
+ must be an integer value greater than zero and must not exceed the
+ number of listed slots.
+ Other forms supported by
+ <xref linkend="guc-synchronous-standby-names"/>, such as priority
+ syntax, are not supported.
+ </para>
+ <para>
+ A plain comma-separated list without a keyword specifies that
+ <emphasis>all</emphasis> listed physical slots must confirm WAL
+ receipt. This differs from <xref linkend="guc-synchronous-standby-names"/>
+ where a simple list means <literal>FIRST 1</literal>. For
+ <varname>synchronized_standby_slots</varname>, requiring all slots
+ provides safer failover semantics by default.
+ </para>
+ <para>
+ The keyword <literal>ANY</literal>, coupled with
+ <replaceable class="parameter">num_sync</replaceable>, specifies
+ quorum-based semantics. Logical decoding proceeds once at least
+ <replaceable class="parameter">num_sync</replaceable> of the listed
+ slots have caught up. Missing, logical, and invalidated slots are
+ skipped when determining candidates. Lagging slots (inactive or
+ active) simply do not count toward the required number until they
+ catch up.
+ If fewer than <replaceable class="parameter">num_sync</replaceable>
+ slots have caught up at a given moment, logical decoding waits until
+ that threshold is reached.
+ i.e., there is no priority ordering.
+ For example, a setting of <literal>ANY 1 (sb1_slot, sb2_slot)</literal>
+ allows logical decoding to proceed as soon as either physical slot has
+ confirmed WAL receipt. If none of the slots are available or have
+ caught up, logical decoding waits until at least one slot meets the
+ required condition. This is useful in conjunction with
+ quorum-based synchronous replication
+ (<literal>synchronous_standby_names = 'ANY ...'</literal>), so that
+ logical decoding availability matches the commit durability guarantee.
+ </para>
+ <para>
+ If the same physical replication slot name appears more than once,
+ duplicate entries are ignored and only the first occurrence is used.
+ The semantics of <varname>synchronized_standby_slots</varname> are
+ therefore based on the unique set of listed slot names, preserving the
+ original order of first occurrence. This means that
+ <literal>ANY 2 (slot1, slot1, slot2, slot3)</literal> is treated the
+ same as <literal>ANY 2 (slot1, slot2, slot3)</literal>, and a plain
+ list such as <literal>(slot1, slot1, slot2)</literal> is treated the
+ same as <literal>(slot1, slot2)</literal>. In particular,
+ <replaceable class="parameter">num_sync</replaceable> must not exceed
+ the number of unique listed slots. Such a configuration results in an
+ error to prevent indefinite waits in WAL sender processes due to a
+ misconfigured <varname>synchronized_standby_slots</varname> setting.
+ </para>
+ <para>
+ <literal>ANY</literal> is case-insensitive.
+ </para>
+ <para>
+ The use of <varname>synchronized_standby_slots</varname> guarantees
+ that logical replication
failover slots do not consume changes until those changes are received
- and flushed to corresponding physical standbys. If a
+ and flushed to the required physical standbys. If a
logical replication connection is meant to switch to a physical standby
after the standby is promoted, the physical replication slot for the
standby should be listed here. Note that logical replication will not
- proceed if the slots specified in the
- <varname>synchronized_standby_slots</varname> do not exist or are invalidated.
+ proceed if the required number of physical slots specified in
+ <varname>synchronized_standby_slots</varname> do not exist or are
+ invalidated.
Additionally, the replication management functions
<link linkend="pg-replication-slot-advance">
<function>pg_replication_slot_advance</function></link>,
@@ -5208,9 +5275,9 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
<function>pg_logical_slot_get_changes</function></link>, and
<link linkend="pg-logical-slot-peek-changes">
<function>pg_logical_slot_peek_changes</function></link>,
- when used with logical failover slots, will block until all
- physical slots specified in <varname>synchronized_standby_slots</varname> have
- confirmed WAL receipt.
+ when used with logical failover slots, will block until the required
+ physical slots specified in <varname>synchronized_standby_slots</varname>
+ have confirmed WAL receipt.
</para>
<para>
The standbys corresponding to the physical replication slots in
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index d7fb9f5a67f..93797fcdde4 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -50,6 +50,7 @@
#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
+#include "replication/syncrep.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -91,11 +92,19 @@ typedef struct ReplicationSlotOnDisk
* Note: this must be a flat representation that can be held in a single chunk
* of guc_malloc'd memory, so that it can be stored as the "extra" data for the
* synchronized_standby_slots GUC.
+ *
+ * The layout mirrors SyncRepConfigData so that the same quorum and priority
+ * semantics can be expressed. The syncrep_method field uses the
+ * SYNC_REP_DEFAULT, SYNC_REP_PRIORITY, and SYNC_REP_QUORUM constants from
+ * syncrep.h.
*/
typedef struct
{
- /* Number of slot names in the slot_names[] */
- int nslotnames;
+ int config_size; /* total size of this struct, in bytes */
+ int num_sync; /* number of slots that must confirm WAL
+ * receipt before logical decoding proceeds */
+ uint8 syncrep_method; /* SYNC_REP_* method */
+ int nslotnames; /* number of slot names that follow */
/*
* slot_names contains 'nslotnames' consecutive null-terminated C strings.
@@ -103,6 +112,29 @@ typedef struct
char slot_names[FLEXIBLE_ARRAY_MEMBER];
} SyncStandbySlotsConfigData;
+/*
+ * State of a replication slot specified in synchronized_standby_slots GUC.
+ */
+typedef enum
+{
+ SS_SLOT_NOT_FOUND, /* slot does not exist */
+ SS_SLOT_LOGICAL, /* slot is logical, not physical */
+ SS_SLOT_INVALIDATED, /* slot has been invalidated */
+ SS_SLOT_INACTIVE_LAGGING, /* slot is inactive and behind wait_for_lsn */
+ SS_SLOT_ACTIVE_LAGGING, /* slot is active and behind wait_for_lsn */
+} SyncStandbySlotsState;
+
+/*
+ * Information about a synchronized standby slot's state.
+ */
+typedef struct
+{
+ const char *slot_name; /* name of the slot */
+ SyncStandbySlotsState state; /* state of the slot */
+ XLogRecPtr restart_lsn; /* current restart_lsn (valid for lagging
+ * states) */
+} SyncStandbySlotsStateInfo;
+
/*
* Lookup table for slot invalidation causes.
*/
@@ -2963,94 +2995,203 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
}
/*
- * A helper function to validate slots specified in GUC synchronized_standby_slots.
+ * Remove duplicate member names from a SyncRepConfigData object.
*
- * The rawname will be parsed, and the result will be saved into *elemlist.
+ * The member_names array of SyncRepConfigData is compacted in place so
+ * that only the first occurrence of each member name is retained. The
+ * original ordering of retained names is preserved, and nmembers and
+ * config_size are updated to describe only the compacted portion of
+ * the array.
*/
-static bool
-validate_sync_standby_slots(char *rawname, List **elemlist)
+static void
+CompactSyncRepConfigMemberNames(SyncRepConfigData *config)
{
- /* Verify syntax and parse string into a list of identifiers */
- if (!SplitIdentifierString(rawname, ',', elemlist))
- {
- GUC_check_errdetail("List syntax is invalid.");
- return false;
- }
+ char *src_name;
+ char *dst_name;
+ int nunique_members = 0;
+ Size unique_size = offsetof(SyncRepConfigData, member_names);
- /* Iterate the list to validate each slot name */
- foreach_ptr(char, name, *elemlist)
+ src_name = config->member_names;
+ dst_name = config->member_names;
+
+ for (int i = 0; i < config->nmembers; i++)
{
- int err_code;
- char *err_msg = NULL;
- char *err_hint = NULL;
+ char *unique_name;
+ size_t name_size;
+ bool duplicate = false;
+
+ name_size = strlen(src_name) + 1;
- if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
- &err_msg, &err_hint))
+ /*
+ * Check whether src_name matches any previously retained unique name.
+ * Only the first nunique_members entries in member_names need to be
+ * examined for this.
+ */
+ unique_name = config->member_names;
+ for (int j = 0; j < nunique_members; j++)
{
- GUC_check_errcode(err_code);
- GUC_check_errdetail("%s", err_msg);
- if (err_hint != NULL)
- GUC_check_errhint("%s", err_hint);
- return false;
+ if (strcmp(unique_name, src_name) == 0)
+ {
+ duplicate = true;
+ break;
+ }
+
+ unique_name += strlen(unique_name) + 1;
+ }
+
+ if (!duplicate)
+ {
+ /*
+ * This src_name is a new unique name. Copy it immediately after the
+ * unique names retained so far.
+ */
+ if (dst_name != src_name)
+ memmove(dst_name, src_name, name_size);
+
+ dst_name += name_size;
+ nunique_members++;
+ unique_size += name_size;
}
+
+ src_name += name_size;
}
- return true;
+ config->nmembers = nunique_members;
+ config->config_size = (int) unique_size;
}
/*
* GUC check_hook for synchronized_standby_slots
+ *
+ * This reuses the syncrep_yyparse/syncrep_scanner infrastructure that is
+ * also used for synchronous_standby_names, and accepts these forms:
+ *
+ * slot1, slot2 -- wait for ALL listed slots
+ * ANY N (slot1, slot2, ...) -- wait for any N-of-M (quorum)
+ *
+ * Note: Simple list syntax is interpreted as "wait for ALL" for this GUC,
+ * unlike synchronous_standby_names where it means "FIRST 1".
+ *
+ * After parsing, we validate every name as a legal replication slot name,
+ * omit duplicate entries while preserving first-occurrence order, and then
+ * apply the resulting unique list to the configured semantics.
*/
bool
check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
{
- char *rawname;
- char *ptr;
- List *elemlist;
- int size;
- bool ok;
- SyncStandbySlotsConfigData *config;
-
- if ((*newval)[0] == '\0')
- return true;
+ if (*newval != NULL && (*newval)[0] != '\0')
+ {
+ yyscan_t scanner;
+ int parse_rc;
+ SyncStandbySlotsConfigData *config;
+ const char *mname;
+
+ /* Result of parsing is returned in one of these two variables */
+ SyncRepConfigData *syncrep_parse_result = NULL;
+ char *syncrep_parse_error_msg = NULL;
+
+ /* Parse the synchronized standby slots configuration */
+ syncrep_scanner_init(*newval, &scanner);
+ parse_rc = syncrep_yyparse(&syncrep_parse_result,
+ &syncrep_parse_error_msg,
+ scanner);
+ syncrep_scanner_finish(scanner);
+
+ if (parse_rc != 0 || syncrep_parse_result == NULL)
+ {
+ GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
+ if (syncrep_parse_error_msg)
+ GUC_check_errdetail("%s", syncrep_parse_error_msg);
+ else
+ GUC_check_errdetail("\"%s\" parser failed.",
+ "synchronized_standby_slots");
+ return false;
+ }
- /* Need a modifiable copy of the GUC string */
- rawname = pstrdup(*newval);
+ if (syncrep_parse_result->syncrep_method == SYNC_REP_PRIORITY)
+ {
+ GUC_check_errcode(ERRCODE_INVALID_PARAMETER_VALUE);
+ GUC_check_errmsg("priority syntax is not supported for parameter \"%s\"",
+ "synchronized_standby_slots");
+ return false;
+ }
- /* Now verify if the specified slots exist and have correct type */
- ok = validate_sync_standby_slots(rawname, &elemlist);
+ if (syncrep_parse_result->num_sync <= 0)
+ {
+ GUC_check_errmsg("number of synchronized standby slots (%d) must be greater than zero",
+ syncrep_parse_result->num_sync);
+ return false;
+ }
- if (!ok || elemlist == NIL)
- {
- pfree(rawname);
- list_free(elemlist);
- return ok;
- }
+ /* validate every member name as a slot name */
+ mname = syncrep_parse_result->member_names;
- /* Compute the size required for the SyncStandbySlotsConfigData struct */
- size = offsetof(SyncStandbySlotsConfigData, slot_names);
- foreach_ptr(char, slot_name, elemlist)
- size += strlen(slot_name) + 1;
+ for (int i = 0; i < syncrep_parse_result->nmembers; i++)
+ {
+ int err_code;
+ char *err_msg = NULL;
+ char *err_hint = NULL;
- /* GUC extra value must be guc_malloc'd, not palloc'd */
- config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
- if (!config)
- return false;
+ if (!ReplicationSlotValidateNameInternal(mname, false, &err_code,
+ &err_msg, &err_hint))
+ {
+ GUC_check_errcode(err_code);
+ GUC_check_errdetail("%s", err_msg);
+ if (err_hint != NULL)
+ GUC_check_errhint("%s", err_hint);
+ return false;
+ }
- /* Transform the data into SyncStandbySlotsConfigData */
- config->nslotnames = list_length(elemlist);
+ mname += strlen(mname) + 1;
+ }
- ptr = config->slot_names;
- foreach_ptr(char, slot_name, elemlist)
- {
- strcpy(ptr, slot_name);
- ptr += strlen(slot_name) + 1;
- }
+ /* Omit duplicate slot names to ensure each slot is considered only once. */
+ CompactSyncRepConfigMemberNames(syncrep_parse_result);
- *extra = config;
+ /*
+ * For synchronized_standby_slots, a comma-separated list means all
+ * listed slots are required. The syncrep parser preserves this shape
+ * as SYNC_REP_DEFAULT, so map num_sync to nmembers to enforce
+ * all-mode semantics after removing duplicate names.
+ */
+ if (syncrep_parse_result->syncrep_method == SYNC_REP_DEFAULT)
+ syncrep_parse_result->num_sync = syncrep_parse_result->nmembers;
+
+ /* Reject num_sync > nmembers after duplicates have been omitted. */
+ if (syncrep_parse_result->num_sync > syncrep_parse_result->nmembers)
+ {
+ GUC_check_errmsg("number of synchronized standby slots (%d) must not exceed the number of unique listed slots (%d)",
+ syncrep_parse_result->num_sync,
+ syncrep_parse_result->nmembers);
+ return false;
+ }
+
+ /*
+ * Build SyncStandbySlotsConfigData from the parsed SyncRepConfigData.
+ * Since the structures have identical layout, we can use the same
+ * config_size.
+ */
+ config = (SyncStandbySlotsConfigData *)
+ guc_malloc(LOG, syncrep_parse_result->config_size);
+ if (!config)
+ return false;
+
+ config->config_size = syncrep_parse_result->config_size;
+ config->num_sync = syncrep_parse_result->num_sync;
+ config->syncrep_method = syncrep_parse_result->syncrep_method;
+ config->nslotnames = syncrep_parse_result->nmembers;
+
+ /* Copy all slot names in one operation */
+ memcpy(config->slot_names,
+ syncrep_parse_result->member_names,
+ syncrep_parse_result->config_size -
+ offsetof(SyncRepConfigData, member_names));
+
+ *extra = config;
+ }
+ else
+ *extra = NULL;
- pfree(rawname);
- list_free(elemlist);
return true;
}
@@ -3099,18 +3240,117 @@ SlotExistsInSyncStandbySlots(const char *slot_name)
}
/*
- * Return true if the slots specified in synchronized_standby_slots have caught up to
- * the given WAL location, false otherwise.
+ * Report problem states for synchronized standby slots that prevented the
+ * catch-up requirement from being met.
+ */
+static void
+ReportUnavailableSyncStandbySlots(SyncStandbySlotsStateInfo * slot_states,
+ int num_slot_states, int elevel,
+ XLogRecPtr wait_for_lsn)
+{
+ for (int i = 0; i < num_slot_states; i++)
+ {
+ const char *slot_name = slot_states[i].slot_name;
+
+ switch (slot_states[i].state)
+ {
+ case SS_SLOT_NOT_FOUND:
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
+ slot_name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+ slot_name),
+ errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
+ slot_name, "synchronized_standby_slots"));
+ break;
+
+ case SS_SLOT_LOGICAL:
+ ereport(elevel,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
+ slot_name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
+ slot_name),
+ errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
+ slot_name, "synchronized_standby_slots"));
+ break;
+
+ case SS_SLOT_INVALIDATED:
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
+ slot_name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+ slot_name),
+ errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
+ slot_name, "synchronized_standby_slots"));
+ break;
+
+ case SS_SLOT_INACTIVE_LAGGING:
+ ereport(elevel,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
+ slot_name, "synchronized_standby_slots"),
+ errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
+ slot_name),
+ errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
+ slot_name, "synchronized_standby_slots"));
+ break;
+
+ case SS_SLOT_ACTIVE_LAGGING:
+ if (!XLogRecPtrIsValid(slot_states[i].restart_lsn))
+ ereport(DEBUG1,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter \"%s\" has not caught up",
+ slot_name, "synchronized_standby_slots"),
+ errdetail("The slot's restart_lsn is not yet set; required LSN is %X/%X.",
+ LSN_FORMAT_ARGS(wait_for_lsn)));
+ else
+ ereport(DEBUG1,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication slot \"%s\" specified in parameter \"%s\" has not caught up",
+ slot_name, "synchronized_standby_slots"),
+ errdetail("The slot's restart_lsn %X/%X is behind the required %X/%X.",
+ LSN_FORMAT_ARGS(slot_states[i].restart_lsn),
+ LSN_FORMAT_ARGS(wait_for_lsn)));
+ break;
+
+ default:
+ /* Should not happen */
+ Assert(false);
+ break;
+ }
+ }
+}
+
+/*
+ * Return true if the required standby slots have caught up to the given WAL
+ * location, false otherwise.
+ *
+ * The behavior depends on the synchronized_standby_slots configuration:
+ *
+ * Simple list (e.g., "slot1, slot2"):
+ * ALL slots must have caught up. Returns false otherwise.
*
- * The elevel parameter specifies the error level used for logging messages
- * related to slots that do not exist, are invalidated, or are inactive.
+ * ANY N (e.g., "ANY 2 (slot1, slot2, slot3)"):
+ * Wait for any N eligible slots. Skips missing, invalid, logical, and
+ * lagging slots (inactive or active) to find N slots that have caught up.
+ *
+ * The elevel parameter specifies the error level used for reporting issues
+ * related to the slots specified in synchronized_standby_slots when the
+ * catch-up requirement is not met.
*/
bool
StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
{
const char *name;
int caught_up_slot_num = 0;
+ int required;
XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
+ bool wait_for_all;
+ SyncStandbySlotsStateInfo *slot_states;
+ int num_slot_states = 0;
/*
* Don't need to wait for the standbys to catch up if there is no value in
@@ -3134,12 +3374,44 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
ss_oldest_flush_lsn >= wait_for_lsn)
return true;
+ /*
+ * Determine how many slots are required and whether we're in "wait for
+ * ALL" mode versus "wait for N-of-M" mode.
+ *
+ * wait_for_all = true means we need ALL slots to be ready (simple list
+ * syntax like "slot1, slot2"). In this mode, we stop checking on the
+ * first slot that is missing/invalid/logical, or the first slot that is
+ * lagging (inactive or active).
+ *
+ * wait_for_all = false means we select N from M candidates (ANY N syntax).
+ * In this mode, slots already caught up are counted even if inactive, and
+ * lagging slots are skipped until enough slots have caught up.
+ * Duplicate configured slot names do not appear here because the check hook
+ * compacts them out of the parsed configuration.
+ */
+ required = synchronized_standby_slots_config->num_sync;
+ wait_for_all = (required == synchronized_standby_slots_config->nslotnames);
+
+ /*
+ * Allocate array to track slot states. Size it to the total number of
+ * configured slots since in the worst case all could have problem states.
+ */
+ slot_states = palloc_array(SyncStandbySlotsStateInfo,
+ synchronized_standby_slots_config->nslotnames);
+
/*
* To prevent concurrent slot dropping and creation while filtering the
* slots, take the ReplicationSlotControlLock outside of the loop.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ /*
+ * Iterate through configured slots, checking their state and tracking how
+ * many have caught up. Problem states are recorded for deferred
+ * reporting: missing/logical/invalidated slots, and lagging slots
+ * (inactive or active). Messages are only emitted if the catch-up
+ * requirement isn't met.
+ */
name = synchronized_standby_slots_config->slot_names;
for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
{
@@ -3150,35 +3422,28 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
slot = SearchNamedReplicationSlot(name, false);
- /*
- * If a slot name provided in synchronized_standby_slots does not
- * exist, report a message and exit the loop.
- */
if (!slot)
{
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
- name),
- errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
- name, "synchronized_standby_slots"));
- break;
+ /* Record Slot State */
+ slot_states[num_slot_states].slot_name = name;
+ slot_states[num_slot_states].state = SS_SLOT_NOT_FOUND;
+ num_slot_states++;
+
+ if (wait_for_all)
+ break;
+ goto next_slot;
}
- /* Same as above: if a slot is not physical, exit the loop. */
if (SlotIsLogical(slot))
{
- ereport(elevel,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
- name),
- errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
- name, "synchronized_standby_slots"));
- break;
+ /* Record Slot State */
+ slot_states[num_slot_states].slot_name = name;
+ slot_states[num_slot_states].state = SS_SLOT_LOGICAL;
+ num_slot_states++;
+
+ if (wait_for_all)
+ break;
+ goto next_slot;
}
SpinLockAcquire(&slot->mutex);
@@ -3189,33 +3454,34 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
if (invalidated)
{
- /* Specified physical slot has been invalidated */
- ereport(elevel,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
- name),
- errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
- name, "synchronized_standby_slots"));
- break;
+ /* Record Slot State */
+ slot_states[num_slot_states].slot_name = name;
+ slot_states[num_slot_states].state = SS_SLOT_INVALIDATED;
+ num_slot_states++;
+
+ if (wait_for_all)
+ break;
+ goto next_slot;
}
if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
{
- /* Log a message if no active_pid for this physical slot */
- if (inactive)
- ereport(elevel,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
- name, "synchronized_standby_slots"),
- errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
- name),
- errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
- name, "synchronized_standby_slots"));
+ /*
+ * If a slot is inactive and lagging, report it as inactive. If it
+ * is active and lagging, report it as lagging.
+ *
+ * In ALL mode: must wait for it. In ANY N (quorum) mode: skip and
+ * use another slot.
+ */
+ slot_states[num_slot_states].slot_name = name;
+ slot_states[num_slot_states].state =
+ inactive ? SS_SLOT_INACTIVE_LAGGING : SS_SLOT_ACTIVE_LAGGING;
+ slot_states[num_slot_states].restart_lsn = restart_lsn;
+ num_slot_states++;
- /* Continue if the current slot hasn't caught up. */
- break;
+ if (wait_for_all)
+ break;
+ goto next_slot;
}
Assert(restart_lsn >= wait_for_lsn);
@@ -3226,17 +3492,30 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
caught_up_slot_num++;
+ /* Stop processing if the required number of slots have caught up. */
+ if (caught_up_slot_num >= required)
+ break;
+
+next_slot:
name += strlen(name) + 1;
}
LWLockRelease(ReplicationSlotControlLock);
/*
- * Return false if not all the standbys have caught up to the specified
- * WAL location.
+ * If the required number of slots have not caught up, report any recorded
+ * problem states and return false.
+ *
+ * We only emit messages when the requirement is not met to avoid
+ * misleading messages in quorum mode where other slots may have satisfied
+ * the condition despite some slots having issues.
*/
- if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
+ if (caught_up_slot_num < required)
+ {
+ ReportUnavailableSyncStandbySlots(slot_states, num_slot_states, elevel, wait_for_lsn);
+ pfree(slot_states);
return false;
+ }
/* The ss_oldest_flush_lsn must not retreat. */
Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
@@ -3244,6 +3523,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
ss_oldest_flush_lsn = min_restart_lsn;
+ pfree(slot_states);
return true;
}
@@ -3276,7 +3556,10 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
ProcessConfigFile(PGC_SIGHUP);
}
- /* Exit if done waiting for every slot. */
+ /*
+ * Exit once the configured synchronized_standby_slots requirement is
+ * met.
+ */
if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
break;
diff --git a/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
new file mode 100644
index 00000000000..d387e0c7e7e
--- /dev/null
+++ b/src/test/recovery/t/053_synchronized_standby_slots_quorum.pl
@@ -0,0 +1,367 @@
+
+# Copyright (c) 2024-2026, PostgreSQL Global Development Group
+
+# Test synchronized_standby_slots with different syntax modes:
+# - Plain list (ALL mode): slot1, slot2
+# - ANY N (quorum mode): ANY N (slot1, slot2, ...)
+#
+# Setup: a 3-node cluster with one primary, two physical standbys, and a
+# logical decoding client using a failover-enabled slot.
+#
+# | ----> standby1 (primary_slot_name = sb1_slot)
+# primary ------|
+# | ----> standby2 (primary_slot_name = sb2_slot)
+#
+# synchronous_standby_names = 'ANY 1 (standby1, standby2)'
+#
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# ---------------------------------------------------------------------------
+# 1. Create a primary with logical replication level, autovacuum off
+# ---------------------------------------------------------------------------
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+$primary->append_conf(
+ 'postgresql.conf', qq{
+autovacuum = off
+});
+$primary->start;
+
+# Physical replication slots for the two standbys
+$primary->safe_psql('postgres',
+ "SELECT pg_create_physical_replication_slot('sb1_slot');");
+$primary->safe_psql('postgres',
+ "SELECT pg_create_physical_replication_slot('sb2_slot');");
+
+# ---------------------------------------------------------------------------
+# 2. Create standby1 and standby2 from a fresh backup
+# ---------------------------------------------------------------------------
+my $backup_name = 'base_backup';
+$primary->backup($backup_name);
+
+my $connstr = $primary->connstr;
+
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup(
+ $primary, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$standby1->append_conf(
+ 'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup(
+ $primary, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$standby2->append_conf(
+ 'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb2_slot'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+$standby1->start;
+$standby2->start;
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+# ---------------------------------------------------------------------------
+# 3. Create a logical failover slot on the primary
+# ---------------------------------------------------------------------------
+$primary->safe_psql('postgres',
+ "SELECT pg_create_logical_replication_slot('logical_failover', 'test_decoding', false, false, true);"
+);
+
+# ---------------------------------------------------------------------------
+# 4. Configure quorum sync rep with ALL-mode synchronized_standby_slots
+# ---------------------------------------------------------------------------
+$primary->append_conf(
+ 'postgresql.conf', qq{
+synchronous_standby_names = 'ANY 1 (standby1, standby2)'
+synchronized_standby_slots = 'sb1_slot, sb2_slot'
+});
+$primary->reload;
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+# ---------------------------------------------------------------------------
+# 5. Confirm that quorum sync rep is active for both standbys
+# ---------------------------------------------------------------------------
+is( $primary->safe_psql(
+ 'postgres',
+ q{SELECT count(*) FROM pg_stat_replication WHERE sync_state = 'quorum';}
+ ),
+ '2',
+ 'both standbys are in quorum sync state');
+
+##################################################
+# PART A: Plain list (ALL mode) blocks when any slot is unavailable
+##################################################
+
+$standby1->stop;
+
+# Commit succeeds since standby2 satisfies the quorum.
+my $emit_lsn = $primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'all_mode_blocks');"
+);
+like($emit_lsn, qr/^[0-9A-F]+\/[0-9A-F]+$/,
+ 'synchronous commit succeeds with quorum (standby2 alive)');
+
+$primary->wait_for_replay_catchup($standby2);
+
+my $log_offset = -s $primary->logfile;
+
+my $bg = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg->query_until(
+ qr/decode_start/, q(
+ \echo decode_start
+ SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+# Wait for the primary to log a warning about sb1_slot not being active.
+$primary->wait_for_log(
+ qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/,
+ $log_offset);
+
+pass('plain list (ALL mode): logical decoding blocked by unavailable sb1_slot');
+
+# Unblock by clearing synchronized_standby_slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg->quit;
+
+# Consume the change so the slot is clean for the next test.
+$primary->safe_psql('postgres',
+ q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+##################################################
+# PART B: ANY mode (quorum) — logical decoding proceeds with N-of-M slots
+##################################################
+
+# Switch synchronized_standby_slots to quorum mode: need only 1 of 2 slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'ANY 1 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+# standby1 is still down; standby2 is up.
+
+# Emit another transactional message — commits via quorum.
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'quorum_mode_works');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+# In quorum mode, logical decoding should NOT block because sb2_slot has
+# caught up and 1-of-2 is sufficient.
+my $decoded = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%quorum_mode_works%';});
+is($decoded, '1',
+ 'ANY mode: logical decoding proceeds with only sb2_slot caught up');
+
+##################################################
+# PART C: Re-check plain list (ALL mode) works when both standbys are up
+##################################################
+
+# Bring standby1 back.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+# Switch to plain list (ALL mode) with both slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'sb1_slot, sb2_slot'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'both_caught_up');"
+);
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+my $decoded_bc = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%both_caught_up%';});
+is($decoded_bc, '1',
+ 'plain list: works when all standbys are up');
+
+##################################################
+# PART D: ANY 2 waits on an active lagging slot
+##################################################
+
+# Stop standby1 so sb1_slot can be controlled by a raw replication connection
+# that keeps the slot active while lagging.
+$standby1->stop;
+
+my $old_lsn = $primary->safe_psql('postgres',
+ "SELECT pg_current_wal_lsn();");
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'ANY 2 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+my $any2_lag_lsn = $primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'any_2_lagging_blocks');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+my $repl_any2 = $primary->background_psql(
+ 'postgres',
+ replication => 'database',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$repl_any2->query_until(
+ qr/^$/,
+ "START_REPLICATION SLOT sb1_slot PHYSICAL $old_lsn;\n");
+
+$primary->poll_query_until('postgres', q{
+ SELECT active_pid IS NOT NULL
+ FROM pg_replication_slots
+ WHERE slot_name = 'sb1_slot'
+}) or die "replication connection did not activate sb1_slot";
+
+my $bg_any2 = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg_any2->query_until(
+ qr/decode_start/, q(
+ \echo decode_start
+ SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+ok( $primary->poll_query_until(
+ 'postgres', q{
+SELECT EXISTS (
+ SELECT 1
+ FROM pg_stat_activity
+ WHERE wait_event = 'WaitForStandbyConfirmation'
+ AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%'
+);
+}),
+ 'ANY 2: decoding waits when only one slot has caught up');
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg_any2->quit;
+$repl_any2->quit;
+
+# Consume the change for the next test.
+$primary->safe_psql('postgres',
+ q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+# Bring standby1 back up for the remaining tests.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+
+##################################################
+# PART E: Duplicate entries are ignored for quorum counting
+##################################################
+
+# Stop standby2 so only sb1_slot can catch up.
+$standby2->stop;
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'ANY 2 (sb1_slot, sb1_slot, sb2_slot)'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'duplicate_entries_ignored');"
+);
+$primary->wait_for_replay_catchup($standby1);
+
+my $bg_dup = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg_dup->query_until(
+ qr/decode_start/, q(
+ \echo decode_start
+ SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+ok( $primary->poll_query_until(
+ 'postgres', q{
+SELECT EXISTS (
+ SELECT 1
+ FROM pg_stat_activity
+ WHERE wait_event = 'WaitForStandbyConfirmation'
+ AND query LIKE '%pg_logical_slot_peek_changes(''logical_failover''%'
+);
+}),
+ 'duplicate entries are ignored when counting quorum slots');
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg_dup->quit;
+
+# Consume the change for the next test.
+$primary->safe_psql('postgres',
+ q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+# Bring standby2 back up for validation tests.
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+
+##################################################
+# PART F: Verify GUC validation rejects bad values
+##################################################
+
+my ($result, $stdout, $stderr);
+
+# N exceeds number of listed slots
+($result, $stdout, $stderr) = $primary->psql('postgres',
+ "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 3 (sb1_slot, sb2_slot)';");
+like($stderr, qr/ERROR/,
+ 'GUC rejects ANY N when N > number of listed slots');
+
+# Missing closing parenthesis
+($result, $stdout, $stderr) = $primary->psql('postgres',
+ "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (sb1_slot, sb2_slot';");
+like($stderr, qr/ERROR/,
+ 'GUC rejects malformed ANY syntax');
+
+# Priority syntax is not supported by synchronized_standby_slots yet
+($result, $stdout, $stderr) = $primary->psql('postgres',
+ "ALTER SYSTEM SET synchronized_standby_slots = 'FIRST 1 (sb1_slot, sb2_slot)';");
+like($stderr, qr/priority syntax is not supported/,
+ 'GUC rejects FIRST syntax');
+
+# Legacy priority syntax is not supported by synchronized_standby_slots yet
+($result, $stdout, $stderr) = $primary->psql('postgres',
+ "ALTER SYSTEM SET synchronized_standby_slots = '1 (sb1_slot, sb2_slot)';");
+like($stderr, qr/priority syntax is not supported/,
+ 'GUC rejects legacy priority syntax');
+
+# Invalid slot name
+($result, $stdout, $stderr) = $primary->psql('postgres',
+ "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (INVALID_UPPER)';");
+like($stderr, qr/ERROR/,
+ 'GUC rejects invalid slot name in ANY syntax');
+
+# ---------------------------------------------------------------------------
+# Cleanup
+# ---------------------------------------------------------------------------
+$primary->safe_psql('postgres',
+ "SELECT pg_drop_replication_slot('logical_failover');");
+
+done_testing();
--
2.43.0
view thread (25+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: synchronized_standby_slots behavior inconsistent with quorum-based synchronous replication
In-Reply-To: <CAE9k0PnQ5XNkDwt3uLmHuZOuiZmHn-bRPWn1p7nHQnurE8QwxQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox