public inbox for [email protected]
help / color / mirror / Atom feedFrom: SATYANARAYANA NARLAPURAM <[email protected]>
To: PostgreSQL-development <[email protected]>
To: PostgreSQL Hackers <[email protected]>
Subject: synchronized_standby_slots behavior inconsistent with quorum-based synchronous replication
Date: Tue, 24 Feb 2026 14:08:37 -0800
Message-ID: <CAHg+QDfU7rOebrLDESPpHSgdiadKbpCOmBokcbmM6Gr+A5VobQ@mail.gmail.com> (raw)
Hi hackers,
synchronized_standby_slots requires that every physical slot listed in the
GUC has caught up before a logical failover slot is allowed to proceed with
decoding. This is an ALL-of-N slots semantic. The logical slot
availability model does not align with quorum replication semantics set
using synchronous_standby_names which can be configured for quorum commit
(ANY M of N).
In a typical 3 Node HA deployment with quorum sync rep:
Primary, standby1 (corresponds to sb1_slot), standby2 (corresponds to
sb2_slot)
synchronized_standby_slots = ' sb1_slot, sb2_slot'
synchronous_standby_names = 'Any 1 ('standby1','standby2')'
If standby1 goes down, synchronous commits still succeed because standby2
satisfies the quorum. However, logical decoding blocks indefinitely in
WaitForStandbyConfirmation(), waiting for sb1_slot (corresponds to
standby1) to catch up — even though the transaction is already safely
committed on a quorum of synchronous standbys. This blocks logical decoding
consumers from progressing and is inconsistent with the availability
guarantee the DBA intended by choosing quorum commit. This scenario is
constructed in the TAP test (052_synchronized_standby_slots_quorum.pl) in
the attached patch.
*Proposal:*
Make synchronized_standby_slots quorum aware i.e. extend the GUC to accept
an ANY M (slot1, slot2, ...) syntax similar to synchronous_standby_names,
so StandbySlotsHaveCaughtup() can return true when M of N slots (where M <=
N and M >= 1) have caught up. I still prefer two different GUCs for this as
the list of slots to be synchronized can still be different (for example,
DBA may want to ensure Geo standby to be sync before allowing the logical
decoding client to read the changes). I kept synchronized_standby_slots
parse logic similar to synchronous_standby_names to keep things simple.
The default behavior is also not changed for synchronized_standby_slots.
Added a draft patch (AI assisted). Please let me know your Thoughts.
Thanks,
Satya
Attachments:
[application/octet-stream] 0001-Add-quorum-support-to-synchronized_standby_slots.patch (29.8K, 3-0001-Add-quorum-support-to-synchronized_standby_slots.patch)
download | inline diff:
From 7f164ce216857e0a0bfb135990f98c86ccea9c61 Mon Sep 17 00:00:00 2001
From: Satya <[email protected]>
Date: Tue, 24 Feb 2026 13:28:20 -0800
Subject: [PATCH] Add quorum support to synchronized_standby_slots
Previously, synchronized_standby_slots required ALL listed physical slots
to confirm WAL receipt before logical decoding could proceed. This created
an availability problem when used with quorum-based synchronous replication
(synchronous_standby_names = 'ANY N (...)'): a transaction could be safely
committed via quorum, yet logical decoding consumers would stall waiting
for a standby that wasn't needed for the durability guarantee.
This patch extends synchronized_standby_slots to accept the same syntax as
synchronous_standby_names, reusing the syncrep_yyparse/scanner grammar:
slot1, slot2 -- ALL mode (backward-compatible)
ANY N (slot1, slot2, ...) -- quorum: N-of-M slots suffice
FIRST N (slot1, slot2, ...) -- priority: ALL slots required
Key changes:
- SyncStandbySlotsConfigData now mirrors SyncRepConfigData with num_sync,
syncrep_method (SYNC_REP_PRIORITY / SYNC_REP_QUORUM), and nslotnames.
- check_synchronized_standby_slots() reuses syncrep_yyparse() and
syncrep_scanner_init/finish() for parsing. After parsing, it validates
each member name as a legal replication slot name and rejects
num_sync > nmembers.
- StandbySlotsHaveCaughtup() in QUORUM mode continues past lagging slots
to count caught-up ones, returning true once num_sync are satisfied.
In PRIORITY mode (the default), it preserves the original
break-on-first-lagging-slot behavior requiring all slots.
- Documentation updated with formal synopsis, FIRST/ANY/plain-list
explanations, examples, and cross-reference to synchronous_standby_names.
- New TAP test (052_synchronized_standby_slots_quorum.pl) with 11 subtests
covering ALL-mode blocking, ANY-mode quorum, FIRST N, backward compat,
recovery scenarios, and GUC validation of invalid values.
---
doc/src/sgml/config.sgml | 67 +++-
src/backend/replication/slot.c | 240 +++++++++-----
.../052_synchronized_standby_slots_quorum.pl | 303 ++++++++++++++++++
3 files changed, 519 insertions(+), 91 deletions(-)
create mode 100644 src/test/recovery/t/052_synchronized_standby_slots_quorum.pl
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index f670e2d4c31..9917d5ce70d 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4949,17 +4949,64 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
</term>
<listitem>
<para>
- A comma-separated list of streaming replication standby server slot names
- that logical WAL sender processes will wait for. Logical WAL sender processes
- will send decoded changes to plugins only after the specified replication
- slots confirm receiving WAL. This guarantees that logical replication
+ Specifies which streaming replication standby server slots logical WAL
+ sender processes must wait for before delivering decoded changes. This
+ parameter uses the same syntax as
+ <xref linkend="guc-synchronous-standby-names"/>:
+<synopsis>
+[FIRST] <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">slot_name</replaceable> [, ...] )
+ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="parameter">slot_name</replaceable> [, ...] )
+<replaceable class="parameter">slot_name</replaceable> [, ...]
+</synopsis>
+ where <replaceable class="parameter">num_sync</replaceable> is
+ the number of physical replication slots that must confirm WAL
+ receipt before logical decoding proceeds,
+ and <replaceable class="parameter">slot_name</replaceable>
+ is the name of a physical replication slot.
+ <replaceable class="parameter">num_sync</replaceable>
+ must be an integer value greater than zero and must not exceed the
+ number of listed slots.
+ </para>
+ <para>
+ The keyword <literal>FIRST</literal>, coupled with
+ <replaceable class="parameter">num_sync</replaceable>, specifies
+ priority-based semantics. Logical decoding will wait until
+ <emphasis>all</emphasis> listed physical slots have confirmed WAL
+ receipt regardless of
+ <replaceable class="parameter">num_sync</replaceable>.
+ A plain comma-separated list without a keyword is equivalent to
+ <literal>FIRST 1 (...)</literal> and therefore also requires all
+ listed slots.
+ </para>
+ <para>
+ The keyword <literal>ANY</literal>, coupled with
+ <replaceable class="parameter">num_sync</replaceable>, specifies
+ quorum-based semantics. Logical decoding proceeds once at least
+ <replaceable class="parameter">num_sync</replaceable> of the listed
+ slots have caught up.
+ For example, a setting of <literal>ANY 1 (sb1_slot, sb2_slot)</literal>
+ will allow logical decoding to proceed as soon as either physical slot
+ has confirmed WAL receipt. This is useful in conjunction with
+ quorum-based synchronous replication
+ (<literal>synchronous_standby_names = 'ANY ...'</literal>), so that
+ logical decoding availability matches the commit durability guarantee.
+ </para>
+ <para>
+ <literal>FIRST</literal> and <literal>ANY</literal> are case-insensitive.
+ If these keywords are used as the name of a replication slot,
+ the <replaceable class="parameter">slot_name</replaceable> must
+ be double-quoted.
+ </para>
+ <para>
+ This guarantees that logical replication
failover slots do not consume changes until those changes are received
- and flushed to corresponding physical standbys. If a
+ and flushed to the required physical standbys. If a
logical replication connection is meant to switch to a physical standby
after the standby is promoted, the physical replication slot for the
standby should be listed here. Note that logical replication will not
- proceed if the slots specified in the
- <varname>synchronized_standby_slots</varname> do not exist or are invalidated.
+ proceed if the slots specified in
+ <varname>synchronized_standby_slots</varname> do not exist or are
+ invalidated.
Additionally, the replication management functions
<link linkend="pg-replication-slot-advance">
<function>pg_replication_slot_advance</function></link>,
@@ -4967,9 +5014,9 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
<function>pg_logical_slot_get_changes</function></link>, and
<link linkend="pg-logical-slot-peek-changes">
<function>pg_logical_slot_peek_changes</function></link>,
- when used with logical failover slots, will block until all
- physical slots specified in <varname>synchronized_standby_slots</varname> have
- confirmed WAL receipt.
+ when used with logical failover slots, will block until the required
+ physical slots specified in <varname>synchronized_standby_slots</varname>
+ have confirmed WAL receipt.
</para>
<para>
The standbys corresponding to the physical replication slots in
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 28c7019402b..5509e15586a 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -50,6 +50,7 @@
#include "replication/logicallauncher.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
+#include "replication/syncrep.h"
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -89,11 +90,18 @@ typedef struct ReplicationSlotOnDisk
* Note: this must be a flat representation that can be held in a single chunk
* of guc_malloc'd memory, so that it can be stored as the "extra" data for the
* synchronized_standby_slots GUC.
+ *
+ * The layout mirrors SyncRepConfigData so that the same quorum / priority
+ * semantics can be expressed. The syncrep_method field uses the
+ * SYNC_REP_PRIORITY and SYNC_REP_QUORUM constants from syncrep.h.
*/
typedef struct
{
- /* Number of slot names in the slot_names[] */
- int nslotnames;
+ int config_size; /* total size of this struct, in bytes */
+ int num_sync; /* number of slots that must confirm WAL
+ * receipt before logical decoding proceeds */
+ uint8 syncrep_method; /* SYNC_REP_PRIORITY or SYNC_REP_QUORUM */
+ int nslotnames; /* number of slot names that follow */
/*
* slot_names contains 'nslotnames' consecutive null-terminated C strings.
@@ -2946,94 +2954,131 @@ GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
}
/*
- * A helper function to validate slots specified in GUC synchronized_standby_slots.
+ * GUC check_hook for synchronized_standby_slots
+ *
+ * This reuses the syncrep_yyparse / syncrep_scanner infrastructure that is
+ * also used for synchronous_standby_names, so the same syntax is accepted:
*
- * The rawname will be parsed, and the result will be saved into *elemlist.
+ * slot1, slot2 -- wait for ALL listed slots (FIRST 1)
+ * ANY N (slot1, slot2, ...) -- wait for N-of-M (quorum)
+ * FIRST N (slot1, slot2, ...) -- wait for first N in priority order
+ *
+ * After parsing, we additionally validate every name as a legal replication
+ * slot name.
*/
-static bool
-validate_sync_standby_slots(char *rawname, List **elemlist)
+bool
+check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
{
- /* Verify syntax and parse string into a list of identifiers */
- if (!SplitIdentifierString(rawname, ',', elemlist))
- {
- GUC_check_errdetail("List syntax is invalid.");
- return false;
- }
+ char *ptr;
+ int size;
+ SyncStandbySlotsConfigData *config;
+
+ if ((*newval)[0] == '\0')
+ return true;
- /* Iterate the list to validate each slot name */
- foreach_ptr(char, name, *elemlist)
+ /* parse with the syncrep grammar */
{
- int err_code;
- char *err_msg = NULL;
- char *err_hint = NULL;
+ yyscan_t scanner;
+ int parse_rc;
+ SyncRepConfigData *syncrep_parse_result = NULL;
+ char *syncrep_parse_error_msg = NULL;
+
+ syncrep_scanner_init(*newval, &scanner);
+ parse_rc = syncrep_yyparse(&syncrep_parse_result,
+ &syncrep_parse_error_msg,
+ scanner);
+ syncrep_scanner_finish(scanner);
+
+ if (parse_rc != 0 || syncrep_parse_result == NULL)
+ {
+ GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
+ if (syncrep_parse_error_msg)
+ GUC_check_errdetail("%s", syncrep_parse_error_msg);
+ else
+ GUC_check_errdetail("\"%s\" parser failed.",
+ "synchronized_standby_slots");
+ return false;
+ }
- if (!ReplicationSlotValidateNameInternal(name, false, &err_code,
- &err_msg, &err_hint))
+ if (syncrep_parse_result->num_sync <= 0)
{
- GUC_check_errcode(err_code);
- GUC_check_errdetail("%s", err_msg);
- if (err_hint != NULL)
- GUC_check_errhint("%s", err_hint);
+ GUC_check_errmsg("number of synchronized standby slots (%d) must be greater than zero",
+ syncrep_parse_result->num_sync);
return false;
}
- }
- return true;
-}
+ /* Reject num_sync > nmembers — it can never be satisfied */
+ if (syncrep_parse_result->num_sync > syncrep_parse_result->nmembers)
+ {
+ GUC_check_errmsg("number of synchronized standby slots (%d) must not exceed the number of listed slots (%d)",
+ syncrep_parse_result->num_sync,
+ syncrep_parse_result->nmembers);
+ return false;
+ }
-/*
- * GUC check_hook for synchronized_standby_slots
- */
-bool
-check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
-{
- char *rawname;
- char *ptr;
- List *elemlist;
- int size;
- bool ok;
- SyncStandbySlotsConfigData *config;
+ /* validate every member name as a slot name */
+ {
+ const char *mname = syncrep_parse_result->member_names;
- if ((*newval)[0] == '\0')
- return true;
+ for (int i = 0; i < syncrep_parse_result->nmembers; i++)
+ {
+ int err_code;
+ char *err_msg = NULL;
+ char *err_hint = NULL;
+
+ if (!ReplicationSlotValidateNameInternal(mname, false, &err_code,
+ &err_msg, &err_hint))
+ {
+ GUC_check_errcode(err_code);
+ GUC_check_errdetail("%s", err_msg);
+ if (err_hint != NULL)
+ GUC_check_errhint("%s", err_hint);
+ return false;
+ }
+
+ mname += strlen(mname) + 1;
+ }
+ }
- /* Need a modifiable copy of the GUC string */
- rawname = pstrdup(*newval);
+ /* build SyncStandbySlotsConfigData from the parsed SyncRepConfigData */
+ size = offsetof(SyncStandbySlotsConfigData, slot_names);
+ {
+ const char *mname = syncrep_parse_result->member_names;
- /* Now verify if the specified slots exist and have correct type */
- ok = validate_sync_standby_slots(rawname, &elemlist);
+ for (int i = 0; i < syncrep_parse_result->nmembers; i++)
+ {
+ size += strlen(mname) + 1;
+ mname += strlen(mname) + 1;
+ }
+ }
- if (!ok || elemlist == NIL)
- {
- pfree(rawname);
- list_free(elemlist);
- return ok;
- }
+ /* GUC extra value must be guc_malloc'd, not palloc'd */
+ config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
+ if (!config)
+ return false;
- /* Compute the size required for the SyncStandbySlotsConfigData struct */
- size = offsetof(SyncStandbySlotsConfigData, slot_names);
- foreach_ptr(char, slot_name, elemlist)
- size += strlen(slot_name) + 1;
+ config->config_size = size;
+ config->num_sync = syncrep_parse_result->num_sync;
+ config->syncrep_method = syncrep_parse_result->syncrep_method;
+ config->nslotnames = syncrep_parse_result->nmembers;
- /* GUC extra value must be guc_malloc'd, not palloc'd */
- config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
- if (!config)
- return false;
+ /* Copy member names into the flat slot_names array */
+ {
+ const char *mname = syncrep_parse_result->member_names;
- /* Transform the data into SyncStandbySlotsConfigData */
- config->nslotnames = list_length(elemlist);
+ ptr = config->slot_names;
+ for (int i = 0; i < syncrep_parse_result->nmembers; i++)
+ {
+ int len = strlen(mname) + 1;
- ptr = config->slot_names;
- foreach_ptr(char, slot_name, elemlist)
- {
- strcpy(ptr, slot_name);
- ptr += strlen(slot_name) + 1;
+ memcpy(ptr, mname, len);
+ ptr += len;
+ mname += len;
+ }
+ }
}
*extra = config;
-
- pfree(rawname);
- list_free(elemlist);
return true;
}
@@ -3094,6 +3139,7 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
const char *name;
int caught_up_slot_num = 0;
XLogRecPtr min_restart_lsn = InvalidXLogRecPtr;
+ bool quorum_mode;
/*
* Don't need to wait for the standbys to catch up if there is no value in
@@ -3121,6 +3167,15 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
* To prevent concurrent slot dropping and creation while filtering the
* slots, take the ReplicationSlotControlLock outside of the loop.
*/
+ /*
+ * Determine whether we are in quorum mode. In quorum mode we must visit
+ * every slot (skipping lagging ones) until quorum requirements met
+ * so we can count the total number that have caught up.
+ * In the default ALL mode the original
+ * break-on-first-lagging-slot behaviour is preserved.
+ */
+ quorum_mode = (synchronized_standby_slots_config->syncrep_method == SYNC_REP_QUORUM);
+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
name = synchronized_standby_slots_config->slot_names;
@@ -3129,13 +3184,15 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
XLogRecPtr restart_lsn;
bool invalidated;
bool inactive;
+ bool slot_ok = true;
ReplicationSlot *slot;
slot = SearchNamedReplicationSlot(name, false);
/*
* If a slot name provided in synchronized_standby_slots does not
- * exist, report a message and exit the loop.
+ * exist, report a message. In ALL mode, break immediately;
+ * in quorum mode, skip and continue.
*/
if (!slot)
{
@@ -3147,10 +3204,13 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
name),
errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
name, "synchronized_standby_slots"));
- break;
+ if (!quorum_mode)
+ break;
+ name += strlen(name) + 1;
+ continue;
}
- /* Same as above: if a slot is not physical, exit the loop. */
+ /* Same as above: if a slot is not physical, skip/break. */
if (SlotIsLogical(slot))
{
ereport(elevel,
@@ -3161,7 +3221,10 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
name),
errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
name, "synchronized_standby_slots"));
- break;
+ if (!quorum_mode)
+ break;
+ name += strlen(name) + 1;
+ continue;
}
SpinLockAcquire(&slot->mutex);
@@ -3181,10 +3244,11 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
name),
errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
name, "synchronized_standby_slots"));
- break;
+ slot_ok = false;
}
- if (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn)
+ if (slot_ok &&
+ (!XLogRecPtrIsValid(restart_lsn) || restart_lsn < wait_for_lsn))
{
/* Log a message if no active_pid for this physical slot */
if (inactive)
@@ -3196,9 +3260,16 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
name),
errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
name, "synchronized_standby_slots"));
+ slot_ok = false;
+ }
- /* Continue if the current slot hasn't caught up. */
- break;
+ if (!slot_ok)
+ {
+ /* In ALL mode, stop on the first lagging slot */
+ if (!quorum_mode)
+ break;
+ name += strlen(name) + 1;
+ continue;
}
Assert(restart_lsn >= wait_for_lsn);
@@ -3215,11 +3286,18 @@ StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
LWLockRelease(ReplicationSlotControlLock);
/*
- * Return false if not all the standbys have caught up to the specified
- * WAL location.
- */
- if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
- return false;
+ /* In quorum mode, we only need num_sync of the listed slots to have caught up. */
+ {
+ int required;
+
+ if (synchronized_standby_slots_config->syncrep_method == SYNC_REP_QUORUM)
+ required = synchronized_standby_slots_config->num_sync;
+ else
+ required = synchronized_standby_slots_config->nslotnames;
+
+ if (caught_up_slot_num < required)
+ return false;
+ }
/* The ss_oldest_flush_lsn must not retreat. */
Assert(!XLogRecPtrIsValid(ss_oldest_flush_lsn) ||
diff --git a/src/test/recovery/t/052_synchronized_standby_slots_quorum.pl b/src/test/recovery/t/052_synchronized_standby_slots_quorum.pl
new file mode 100644
index 00000000000..42b12f3b439
--- /dev/null
+++ b/src/test/recovery/t/052_synchronized_standby_slots_quorum.pl
@@ -0,0 +1,303 @@
+
+# Copyright (c) 2024-2026, PostgreSQL Global Development Group
+
+# Test that synchronized_standby_slots supports quorum-based syntax
+# (ANY N (slot1, slot2, ...)) so that logical decoding availability matches
+# the commit durability guarantee of synchronous_standby_names = 'ANY ...'.
+#
+# Setup: a 3-node cluster with one primary, two physical standbys, and a
+# logical decoding client using a failover-enabled slot.
+#
+# | ----> standby1 (primary_slot_name = sb1_slot)
+# primary ------|
+# | ----> standby2 (primary_slot_name = sb2_slot)
+#
+# synchronous_standby_names = 'ANY 1 (standby1, standby2)'
+#
+# We test two scenarios:
+#
+# A) synchronized_standby_slots = 'sb1_slot, sb2_slot' (ALL mode)
+# With standby1 down, logical decoding BLOCKS despite the quorum commit
+# having succeeded — this demonstrates the original limitation.
+#
+# B) synchronized_standby_slots = 'ANY 1 (sb1_slot, sb2_slot)' (quorum mode)
+# With standby1 down, logical decoding proceeds because sb2_slot alone
+# satisfies the quorum — this is the fix.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# ---------------------------------------------------------------------------
+# 1. Create a primary with logical replication level, autovacuum off
+# ---------------------------------------------------------------------------
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 'logical');
+$primary->append_conf(
+ 'postgresql.conf', qq{
+autovacuum = off
+});
+$primary->start;
+
+# Physical replication slots for the two standbys
+$primary->safe_psql('postgres',
+ "SELECT pg_create_physical_replication_slot('sb1_slot');");
+$primary->safe_psql('postgres',
+ "SELECT pg_create_physical_replication_slot('sb2_slot');");
+
+# ---------------------------------------------------------------------------
+# 2. Create standby1 and standby2 from a fresh backup
+# ---------------------------------------------------------------------------
+my $backup_name = 'base_backup';
+$primary->backup($backup_name);
+
+my $connstr = $primary->connstr;
+
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup(
+ $primary, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$standby1->append_conf(
+ 'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb1_slot'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup(
+ $primary, $backup_name,
+ has_streaming => 1,
+ has_restoring => 1);
+$standby2->append_conf(
+ 'postgresql.conf', qq(
+hot_standby_feedback = on
+primary_slot_name = 'sb2_slot'
+primary_conninfo = '$connstr dbname=postgres'
+));
+
+$standby1->start;
+$standby2->start;
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+# ---------------------------------------------------------------------------
+# 3. Create a logical failover slot on the primary
+# ---------------------------------------------------------------------------
+$primary->safe_psql('postgres',
+ "SELECT pg_create_logical_replication_slot('logical_failover', 'test_decoding', false, false, true);"
+);
+
+# ---------------------------------------------------------------------------
+# 4. Configure quorum sync rep with ALL-mode synchronized_standby_slots
+# ---------------------------------------------------------------------------
+$primary->append_conf(
+ 'postgresql.conf', qq{
+synchronous_standby_names = 'ANY 1 (standby1, standby2)'
+synchronized_standby_slots = 'sb1_slot, sb2_slot'
+});
+$primary->reload;
+
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+# ---------------------------------------------------------------------------
+# 5. Confirm that quorum sync rep is active for both standbys
+# ---------------------------------------------------------------------------
+is( $primary->safe_psql(
+ 'postgres',
+ q{SELECT count(*) FROM pg_stat_replication WHERE sync_state = 'quorum';}
+ ),
+ '2',
+ 'both standbys are in quorum sync state');
+
+##################################################
+# PART A: ALL-mode blocks even when quorum is met
+##################################################
+
+$standby1->stop;
+
+# Commit succeeds since standby2 satisfies the quorum.
+my $emit_lsn = $primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'all_mode_blocks');"
+);
+like($emit_lsn, qr/^[0-9A-F]+\/[0-9A-F]+$/,
+ 'synchronous commit succeeds with quorum (standby2 alive)');
+
+$primary->wait_for_replay_catchup($standby2);
+
+my $log_offset = -s $primary->logfile;
+
+my $bg = $primary->background_psql(
+ 'postgres',
+ on_error_stop => 0,
+ timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+$bg->query_until(
+ qr/decode_start/, q(
+ \echo decode_start
+ SELECT pg_logical_slot_peek_changes('logical_failover', NULL, NULL);
+));
+
+# Wait for the primary to log a warning about sb1_slot not being active.
+$primary->wait_for_log(
+ qr/replication slot \"sb1_slot\" specified in parameter "synchronized_standby_slots" does not have active_pid/,
+ $log_offset);
+
+pass('ALL mode: logical decoding blocked by sb1_slot even though quorum commit succeeded');
+
+# Unblock by clearing synchronized_standby_slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
+$bg->quit;
+
+# Consume the change so the slot is clean for the next test.
+$primary->safe_psql('postgres',
+ q{SELECT pg_logical_slot_get_changes('logical_failover', NULL, NULL);});
+
+##################################################
+# PART B: ANY (quorum) mode — logical decoding proceeds
+##################################################
+
+# Switch synchronized_standby_slots to quorum mode: need only 1 of 2 slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'ANY 1 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+# standby1 is still down; standby2 is up.
+
+# Emit another transactional message — commits via quorum.
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'quorum_mode_works');"
+);
+$primary->wait_for_replay_catchup($standby2);
+
+# In quorum mode, logical decoding should NOT block because sb2_slot has
+# caught up and 1-of-2 is sufficient.
+my $decoded = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%quorum_mode_works%';});
+is($decoded, '1',
+ 'ANY mode: logical decoding proceeds with only sb2_slot caught up');
+
+##################################################
+# PART C: Verify backward-compat — plain list still requires ALL
+##################################################
+
+# Bring standby1 back.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+# Switch to plain list (ALL mode) with both slots.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'sb1_slot, sb2_slot'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'both_caught_up');"
+);
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+my $decoded_bc = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%both_caught_up%';});
+is($decoded_bc, '1',
+ 'backward-compat: plain list works when all standbys are up');
+
+##################################################
+# PART D: ANY mode — bringing standby1 back also works
+##################################################
+
+# Stop standby1 again, switch to ANY 1.
+$standby1->stop;
+
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'ANY 1 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'standby1_recovery');"
+);
+
+$primary->wait_for_replay_catchup($standby2);
+
+# Decoding proceeds via quorum.
+my $decoded_d1 = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%standby1_recovery%';});
+is($decoded_d1, '1',
+ 'ANY mode: decoding works while standby1 is down');
+
+# Bring standby1 back and verify decoding still works.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'after_recovery');"
+);
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+my $decoded_d2 = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%after_recovery%';});
+is($decoded_d2, '1',
+ 'ANY mode: decoding works after standby1 recovers');
+
+##################################################
+# PART E: Verify FIRST N syntax (treated as ALL mode)
+##################################################
+
+# FIRST 2 (sb1_slot, sb2_slot) — both standbys up, should work like ALL.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots',
+ "'FIRST 2 (sb1_slot, sb2_slot)'");
+$primary->reload;
+
+$primary->safe_psql('postgres',
+ "SELECT pg_logical_emit_message(true, 'qtest', 'first_n_test');"
+);
+$primary->wait_for_replay_catchup($standby1);
+$primary->wait_for_replay_catchup($standby2);
+
+my $decoded_e = $primary->safe_psql('postgres',
+ q{SELECT count(*) FROM pg_logical_slot_get_changes('logical_failover', NULL, NULL)
+ WHERE data LIKE '%first_n_test%';});
+is($decoded_e, '1',
+ 'FIRST N mode: decoding works when all standbys are up');
+
+##################################################
+# PART F: Verify GUC validation rejects bad values
+##################################################
+
+my ($result, $stdout, $stderr);
+
+# N exceeds number of listed slots
+($result, $stdout, $stderr) = $primary->psql('postgres',
+ "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 3 (sb1_slot, sb2_slot)';");
+like($stderr, qr/ERROR/,
+ 'GUC rejects ANY N when N > number of listed slots');
+
+# Missing closing parenthesis
+($result, $stdout, $stderr) = $primary->psql('postgres',
+ "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (sb1_slot, sb2_slot';");
+like($stderr, qr/ERROR/,
+ 'GUC rejects malformed ANY syntax');
+
+# Invalid slot name
+($result, $stdout, $stderr) = $primary->psql('postgres',
+ "ALTER SYSTEM SET synchronized_standby_slots = 'ANY 1 (INVALID_UPPER)';");
+like($stderr, qr/ERROR/,
+ 'GUC rejects invalid slot name in ANY syntax');
+
+# ---------------------------------------------------------------------------
+# Cleanup
+# ---------------------------------------------------------------------------
+$primary->safe_psql('postgres',
+ "SELECT pg_drop_replication_slot('logical_failover');");
+
+done_testing();
--
2.34.1
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: synchronized_standby_slots behavior inconsistent with quorum-based synchronous replication
In-Reply-To: <CAHg+QDfU7rOebrLDESPpHSgdiadKbpCOmBokcbmM6Gr+A5VobQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox