public inbox for [email protected]  
help / color / mirror / Atom feed
From: Alexander Korotkov <[email protected]>
To: Xuneng Zhou <[email protected]>
Cc: Álvaro Herrera <[email protected]>
Cc: Chao Li <[email protected]>
Cc: pgsql-hackers <[email protected]>
Cc: Andres Freund <[email protected]>
Cc: Michael Paquier <[email protected]>
Cc: jian he <[email protected]>
Cc: Tomas Vondra <[email protected]>
Cc: Yura Sokolov <[email protected]>
Subject: Re: Implement waiting for wal lsn replay: reloaded
Date: Sat, 3 Jan 2026 00:53:56 +0200
Message-ID: <CAPpHfds-KiZRuCruc0jHxLSxLqzKcHJGwOFFA0b_RgaJvtUOEQ@mail.gmail.com> (raw)
In-Reply-To: <CABPTF7UkwQZGx5ub731Q+=+rU8yx4ruqMdDt__L_dm9_32LsMw@mail.gmail.com>
References: <CABPTF7Xs-64GQNjmbimZNhj2YSKbBny+evz6=cp3X2fkJS+vMQ@mail.gmail.com>
	<[email protected]>
	<CAPpHfduDVNo4VXgnQFZUg9=2yHQJfUUqjokbi3qVxuJiKNfcwg@mail.gmail.com>
	<CABPTF7UkwQZGx5ub731Q+=+rU8yx4ruqMdDt__L_dm9_32LsMw@mail.gmail.com>

Hi, Xuneng!

On Fri, Jan 2, 2026 at 11:17 AM Xuneng Zhou <[email protected]> wrote:
> On Fri, Jan 2, 2026 at 7:42 AM Alexander Korotkov <[email protected]> wrote:
> >
> > On Thu, Jan 1, 2026 at 7:16 PM Álvaro Herrera <[email protected]> wrote:
> > > In 0002 you have this kind of thing:
> > >
> > > >                               ereport(ERROR,
> > > >                                               errcode(ERRCODE_QUERY_CANCELED),
> > > > -                                             errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
> > > > +                                             errmsg("timed out while waiting for target LSN %X/%08X to be %s; current %s LSN %X/%08X",
> > > >                                                          LSN_FORMAT_ARGS(lsn),
> > > > -                                                        LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
> > > > +                                                        desc->verb,
> > > > +                                                        desc->noun,
> > > > +                                                        LSN_FORMAT_ARGS(currentLSN)));
> > > > +                     }
> > >
> > >
> > > I'm afraid this technique doesn't work, for translatability reasons.
> > > Your whole design of having a struct with ->verb and ->noun is not
> > > workable (which is a pity, but you can't really fight this.) You need to
> > > spell out the whole messages for each case, something like
> > >
> > > if (lsntype == replay)
> > >    ereport(ERROR,
> > >            errcode(ERRCODE_QUERY_CANCELED),
> > >            errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
> > > else if (lsntype == flush)
> > >     ereport( ... )
> > >
> > > and so on.  This means four separate messages for translation for each
> > > message your patch is adding, which is IMO the correct approach.
> >
> > +1
> > Thank you for catching this, Alvaro.  Yes, I think we need to get rid
> > of WaitLSNTypeDesc.  It's nice idea, but we support too many languages
> > to have something like this.
> >
>
> Thanks for pointing this out. This approach doesn’t scale to multiple
> languages. While switch statements are more verbose, the extra clarity
> is justified to preserve proper internationalization. Please check the
> updated v12.

I've corrected the patchset.  Mostly changed just comments, formatting
etc.  I'm going to push it if no objections.

------
Regards,
Alexander Korotkov
Supabase


Attachments:

  [application/octet-stream] v13-0001-Extend-xlogwait-infrastructure-with-write-and-fl.patch (11.8K, 2-v13-0001-Extend-xlogwait-infrastructure-with-write-and-fl.patch)
  download | inline diff:
From 3f7b1deaae59f45d8e049cf3b95ac7716ab38471 Mon Sep 17 00:00:00 2001
From: alterego655 <[email protected]>
Date: Tue, 16 Dec 2025 10:21:36 +0800
Subject: [PATCH v13 1/4] Extend xlogwait infrastructure with write and flush
 wait types

Add support for waiting on WAL write and flush LSNs in addition to the
existing replay LSN wait type. This provides the foundation for
extending the WAIT FOR command with MODE parameter.

Key changes are following.
- Add WAIT_LSN_TYPE_STANDBY_WRITE and WAIT_LSN_TYPE_STANDBY_FLUSH to
  WaitLSNType.
- Add GetCurrentLSNForWaitType() to retrieve the current LSN for each wait
  type.
- Add new wait events WAIT_EVENT_WAIT_FOR_WAL_WRITE and
  WAIT_EVENT_WAIT_FOR_WAL_FLUSH for pg_stat_activity visibility.
- Update WaitForLSN() to use GetCurrentLSNForWaitType() internally.

Discussion: https://postgr.es/m/CABPTF7UiArgW-sXj9CNwRzUhYOQrevLzkYcgBydmX5oDes1sjg%40mail.gmail.com
Author: Xuneng Zhou <[email protected]>
Reviewed-by: Alexander Korotkov <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Reviewed-by: Alvaro Herrera <[email protected]>
---
 src/backend/access/transam/xlog.c             |  2 +-
 src/backend/access/transam/xlogrecovery.c     |  4 +-
 src/backend/access/transam/xlogwait.c         | 96 +++++++++++++++----
 src/backend/commands/wait.c                   |  2 +-
 .../utils/activity/wait_event_names.txt       |  3 +-
 src/include/access/xlogwait.h                 | 14 ++-
 6 files changed, 93 insertions(+), 28 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index e71b6e21123..05ac7c5f7f8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6280,7 +6280,7 @@ StartupXLOG(void)
 	 * Wake up all waiters for replay LSN.  They need to report an error that
 	 * recovery was ended before reaching the target LSN.
 	 */
-	WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, InvalidXLogRecPtr);
 
 	/*
 	 * Shutdown the recovery environment.  This must occur after
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index a21ac48c9fe..0b5f871abe7 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1856,8 +1856,8 @@ PerformWalRecovery(void)
 			 */
 			if (waitLSNState &&
 				(XLogRecoveryCtl->lastReplayedEndRecPtr >=
-				 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY])))
-				WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
+				 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_REPLAY])))
+				WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
 
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
index 6c2bda763e2..5020ae1e52d 100644
--- a/src/backend/access/transam/xlogwait.c
+++ b/src/backend/access/transam/xlogwait.c
@@ -12,25 +12,30 @@
  *		This file implements waiting for WAL operations to reach specific LSNs
  *		on both physical standby and primary servers. The core idea is simple:
  *		every process that wants to wait publishes the LSN it needs to the
- *		shared memory, and the appropriate process (startup on standby, or
- *		WAL writer/backend on primary) wakes it once that LSN has been reached.
+ *		shared memory, and the appropriate process (startup on standby,
+ *		walreceiver on standby, or WAL writer/backend on primary) wakes it
+ *		once that LSN has been reached.
  *
  *		The shared memory used by this module comprises a procInfos
  *		per-backend array with the information of the awaited LSN for each
  *		of the backend processes.  The elements of that array are organized
- *		into a pairing heap waitersHeap, which allows for very fast finding
- *		of the least awaited LSN.
+ *		into pairing heaps (waitersHeap), one for each WaitLSNType, which
+ *		allows for very fast finding of the least awaited LSN for each type.
  *
- *		In addition, the least-awaited LSN is cached as minWaitedLSN.  The
- *		waiter process publishes information about itself to the shared
- *		memory and waits on the latch until it is woken up by the appropriate
- *		process, standby is promoted, or the postmaster	dies.  Then, it cleans
- *		information about itself in the shared memory.
+ *		In addition, the least-awaited LSN for each type is cached in the
+ *		minWaitedLSN array.  The waiter process publishes information about
+ *		itself to the shared memory and waits on the latch until it is woken
+ *		up by the appropriate process, standby is promoted, or the postmaster
+ *		dies.  Then, it cleans information about itself in the shared memory.
  *
- *		On standby servers: After replaying a WAL record, the startup process
- *		first performs a fast path check minWaitedLSN > replayLSN.  If this
- *		check is negative, it checks waitersHeap and wakes up the backend
- *		whose awaited LSNs are reached.
+ *		On standby servers:
+ *		- After replaying a WAL record, the startup process performs a fast
+ *		  path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
+ *		  negative, it checks waitersHeap[REPLAY] and wakes up the backends
+ *		  whose awaited LSNs are reached.
+ *		- After receiving WAL, the walreceiver process performs similar checks
+ *		  against the flush and write LSNs, waking up waiters in the FLUSH
+ *		  and WRITE heaps, respectively.
  *
  *		On primary servers: After flushing WAL, the WAL writer or backend
  *		process performs a similar check against the flush LSN and wakes up
@@ -49,6 +54,7 @@
 #include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walreceiver.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
@@ -62,6 +68,47 @@ static int	waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
 
 struct WaitLSNState *waitLSNState = NULL;
 
+/*
+ * Wait event for each WaitLSNType, used with WaitLatch() to report
+ * the wait in pg_stat_activity.
+ */
+static const uint32 WaitLSNWaitEvents[] = {
+	[WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
+	[WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
+	[WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+	[WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+};
+
+StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
+				 "WaitLSNWaitEvents must match WaitLSNType enum");
+
+/*
+ * Get the current LSN for the specified wait type.
+ */
+XLogRecPtr
+GetCurrentLSNForWaitType(WaitLSNType lsnType)
+{
+	Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
+
+	switch (lsnType)
+	{
+		case WAIT_LSN_TYPE_STANDBY_REPLAY:
+			return GetXLogReplayRecPtr(NULL);
+
+		case WAIT_LSN_TYPE_STANDBY_WRITE:
+			return GetWalRcvWriteRecPtr();
+
+		case WAIT_LSN_TYPE_STANDBY_FLUSH:
+			return GetWalRcvFlushRecPtr(NULL, NULL);
+
+		case WAIT_LSN_TYPE_PRIMARY_FLUSH:
+			return GetFlushRecPtr(NULL);
+	}
+
+	elog(ERROR, "invalid LSN wait type: %d", lsnType);
+	pg_unreachable();
+}
+
 /* Report the amount of shared memory space needed for WaitLSNState. */
 Size
 WaitLSNShmemSize(void)
@@ -302,6 +349,19 @@ WaitLSNCleanup(void)
 	}
 }
 
+/*
+ * Check if the given LSN type requires recovery to be in progress.
+ * Standby wait types (replay, write, flush) require recovery;
+ * primary wait types (flush) do not.
+ */
+static inline bool
+WaitLSNTypeRequiresRecovery(WaitLSNType t)
+{
+	return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
+		t == WAIT_LSN_TYPE_STANDBY_WRITE ||
+		t == WAIT_LSN_TYPE_STANDBY_FLUSH;
+}
+
 /*
  * Wait using MyLatch till the given LSN is reached, the replica gets
  * promoted, or the postmaster dies.
@@ -341,13 +401,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 		int			rc;
 		long		delay_ms = -1;
 
-		if (lsnType == WAIT_LSN_TYPE_REPLAY)
-			currentLSN = GetXLogReplayRecPtr(NULL);
-		else
-			currentLSN = GetFlushRecPtr(NULL);
+		/* Get current LSN for the wait type */
+		currentLSN = GetCurrentLSNForWaitType(lsnType);
 
 		/* Check that recovery is still in-progress */
-		if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
+		if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
 		{
 			/*
 			 * Recovery was ended, but check if target LSN was already
@@ -376,7 +434,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 		CHECK_FOR_INTERRUPTS();
 
 		rc = WaitLatch(MyLatch, wake_events, delay_ms,
-					   (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+					   WaitLSNWaitEvents[lsnType]);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
index d43dfd642d6..4867f59691e 100644
--- a/src/backend/commands/wait.c
+++ b/src/backend/commands/wait.c
@@ -140,7 +140,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	 */
 	Assert(MyProc->xmin == InvalidTransactionId);
 
-	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_STANDBY_REPLAY, lsn, timeout);
 
 	/*
 	 * Process the result of WaitForLSN().  Throw appropriate error if needed.
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 43d870dbcf1..3299de23bb3 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for WAL to be received and flushed by the physical standby."
-WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary."
+WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary or standby."
 WAIT_FOR_WAL_REPLAY	"Waiting for WAL replay to reach a target LSN on a standby."
+WAIT_FOR_WAL_WRITE	"Waiting for WAL write to reach a target LSN on a standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
index b5fd3e74f1c..d12531d32b8 100644
--- a/src/include/access/xlogwait.h
+++ b/src/include/access/xlogwait.h
@@ -1,7 +1,7 @@
 /*-------------------------------------------------------------------------
  *
  * xlogwait.h
- *	  Declarations for LSN replay waiting routines.
+ *	  Declarations for WAL flush, write, and replay waiting routines.
  *
  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
  *
@@ -35,11 +35,16 @@ typedef enum
  */
 typedef enum WaitLSNType
 {
-	WAIT_LSN_TYPE_REPLAY,		/* Waiting for replay on standby */
-	WAIT_LSN_TYPE_FLUSH,		/* Waiting for flush on primary */
+	/* Standby wait types (walreceiver/startup wakes) */
+	WAIT_LSN_TYPE_STANDBY_REPLAY,
+	WAIT_LSN_TYPE_STANDBY_WRITE,
+	WAIT_LSN_TYPE_STANDBY_FLUSH,
+
+	/* Primary wait types (WAL writer/backends wake) */
+	WAIT_LSN_TYPE_PRIMARY_FLUSH,
 } WaitLSNType;
 
-#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_FLUSH + 1)
+#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_PRIMARY_FLUSH + 1)
 
 /*
  * WaitLSNProcInfo - the shared memory structure representing information
@@ -97,6 +102,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState;
 
 extern Size WaitLSNShmemSize(void);
 extern void WaitLSNShmemInit(void);
+extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType);
 extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN);
 extern void WaitLSNCleanup(void);
 extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,
-- 
2.39.5 (Apple Git-154)



  [application/octet-stream] v13-0004-Use-WAIT-FOR-LSN-in-PostgreSQL-Test-Cluster-wait.patch (4.9K, 3-v13-0004-Use-WAIT-FOR-LSN-in-PostgreSQL-Test-Cluster-wait.patch)
  download | inline diff:
From 5827bd2d978757e910f2bf00f8e2006abc563d24 Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <[email protected]>
Date: Sat, 3 Jan 2026 00:49:10 +0200
Subject: [PATCH v13 4/4] Use WAIT FOR LSN in
 PostgreSQL::Test::Cluster::wait_for_catchup()

When the standby is passed as a PostgreSQL::Test::Cluster instance,
use the WAIT FOR LSN command on the standby server to implement
wait_for_catchup() for replay, write, and flush modes.  This is more
efficient than polling pg_stat_replication on the upstream, as the
WAIT FOR LSN command uses a latch-based wakeup mechanism.

The optimization applies when:
- The standby is passed as a Cluster object (not just a name string)
- The mode is 'replay', 'write', or 'flush' (not 'sent')
- The standby is in recovery

For 'sent' mode, when the standby is passed as a string (e.g., a
subscription name for logical replication), or when the standby has
been promoted, the function falls back to the original polling-based
approach using pg_stat_replication on the upstream.

Discussion: https://postgr.es/m/CABPTF7UiArgW-sXj9CNwRzUhYOQrevLzkYcgBydmX5oDes1sjg%40mail.gmail.com
Author: Xuneng Zhou <[email protected]>
Reviewed-by: Alexander Korotkov <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Reviewed-by: Alvaro Herrera <[email protected]>
---
 src/test/perl/PostgreSQL/Test/Cluster.pm | 59 +++++++++++++++++++++++-
 1 file changed, 58 insertions(+), 1 deletion(-)

diff --git a/src/test/perl/PostgreSQL/Test/Cluster.pm b/src/test/perl/PostgreSQL/Test/Cluster.pm
index 955dfc0e7f8..a28ea89aa10 100644
--- a/src/test/perl/PostgreSQL/Test/Cluster.pm
+++ b/src/test/perl/PostgreSQL/Test/Cluster.pm
@@ -3320,6 +3320,13 @@ If you pass an explicit value of target_lsn, it should almost always be
 the primary's write LSN; so this parameter is seldom needed except when
 querying some intermediate replication node rather than the primary.
 
+When the standby is passed as a PostgreSQL::Test::Cluster instance and is
+in recovery, this function uses the WAIT FOR LSN command on the standby
+for modes replay, write, and flush.  This is more efficient than polling
+pg_stat_replication on the upstream, as WAIT FOR LSN uses a latch-based
+wakeup mechanism.  For 'sent' mode, or when the standby is passed as a
+string (e.g., a subscription name), it falls back to polling.
+
 If there is no active replication connection from this peer, waits until
 poll_query_until timeout.
 
@@ -3339,10 +3346,13 @@ sub wait_for_catchup
 	  . join(', ', keys(%valid_modes))
 	  unless exists($valid_modes{$mode});
 
-	# Allow passing of a PostgreSQL::Test::Cluster instance as shorthand
+	# Keep a reference to the standby node if passed as an object, so we can
+	# use WAIT FOR LSN on it later.
+	my $standby_node;
 	if (blessed($standby_name)
 		&& $standby_name->isa("PostgreSQL::Test::Cluster"))
 	{
+		$standby_node = $standby_name;
 		$standby_name = $standby_name->name;
 	}
 	if (!defined($target_lsn))
@@ -3367,6 +3377,53 @@ sub wait_for_catchup
 	  . $self->name . "\n";
 	# Before release 12 walreceiver just set the application name to
 	# "walreceiver"
+
+	# Use WAIT FOR LSN on the standby when:
+	# - The standby was passed as a Cluster object (so we can connect to it)
+	# - The mode is replay, write, or flush (not 'sent')
+	# - The standby is in recovery
+	# This is more efficient than polling pg_stat_replication on the upstream,
+	# as WAIT FOR LSN uses a latch-based wakeup mechanism.
+	if (defined($standby_node) && ($mode ne 'sent'))
+	{
+		my $standby_in_recovery =
+		  $standby_node->safe_psql('postgres', "SELECT pg_is_in_recovery()");
+		chomp($standby_in_recovery);
+
+		if ($standby_in_recovery eq 't')
+		{
+			# Map mode names to WAIT FOR LSN mode names
+			my %mode_map = (
+				'replay' => 'standby_replay',
+				'write' => 'standby_write',
+				'flush' => 'standby_flush',);
+			my $wait_mode = $mode_map{$mode};
+			my $timeout = $PostgreSQL::Test::Utils::timeout_default;
+			my $wait_query =
+			  qq[WAIT FOR LSN '${target_lsn}' WITH (MODE '${wait_mode}', timeout '${timeout}s', no_throw);];
+			my $output = $standby_node->safe_psql('postgres', $wait_query);
+			chomp($output);
+
+			if ($output ne 'success')
+			{
+				# Fetch additional detail for debugging purposes
+				my $details = $self->safe_psql('postgres',
+					"SELECT * FROM pg_catalog.pg_stat_replication");
+				diag qq(WAIT FOR LSN failed with status:
+	${output});
+				diag qq(Last pg_stat_replication contents:
+	${details});
+				croak "failed waiting for catchup";
+			}
+			print "done\n";
+			return;
+		}
+	}
+
+	# Fall back to polling pg_stat_replication on the upstream for:
+	# - 'sent' mode (no corresponding WAIT FOR LSN mode)
+	# - When standby_name is a string (e.g., subscription name)
+	# - When the standby is no longer in recovery (was promoted)
 	my $query = qq[SELECT '$target_lsn' <= ${mode}_lsn AND state = 'streaming'
          FROM pg_catalog.pg_stat_replication
          WHERE application_name IN ('$standby_name', 'walreceiver')];
-- 
2.39.5 (Apple Git-154)



  [application/octet-stream] v13-0003-Add-tab-completion-for-the-WAIT-FOR-LSN-MODE-opt.patch (3.0K, 4-v13-0003-Add-tab-completion-for-the-WAIT-FOR-LSN-MODE-opt.patch)
  download | inline diff:
From 1d1ee0be7975a94e0f062a96e77be11322f335fa Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <[email protected]>
Date: Sat, 3 Jan 2026 00:42:32 +0200
Subject: [PATCH v13 3/4] Add tab completion for the WAIT FOR LSN MODE option

Update psql tab completion to support the optional MODE option in the
WAIT FOR LSN command.  After specifying an LSN value, completion now offers
both MODE and WITH keywords.  The MODE option specifies which LSN type to wait
for.  In particular, it controls whether the wait is evaluated from the
standby or primary perspective.

When MODE is specified, the completion suggests the valid mode values:
standby_replay, standby_write, standby_flush, and primary_flush.

Discussion: https://postgr.es/m/CABPTF7UiArgW-sXj9CNwRzUhYOQrevLzkYcgBydmX5oDes1sjg%40mail.gmail.com
Author: Xuneng Zhou <[email protected]>
Reviewed-by: Alexander Korotkov <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Reviewed-by: Alvaro Herrera <[email protected]>
---
 src/bin/psql/tab-complete.in.c | 28 +++++++++++++++++-----------
 1 file changed, 17 insertions(+), 11 deletions(-)

diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index d81f2fcdbe6..06edea98f06 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -5355,8 +5355,11 @@ match_previous_words(int pattern_id,
 /*
  * WAIT FOR LSN '<lsn>' [ WITH ( option [, ...] ) ]
  * where option can be:
+ *   MODE '<mode>'
  *   TIMEOUT '<timeout>'
  *   NO_THROW
+ * and mode can be:
+ *   standby_replay | standby_write | standby_flush | primary_flush
  */
 	else if (Matches("WAIT"))
 		COMPLETE_WITH("FOR");
@@ -5369,21 +5372,24 @@ match_previous_words(int pattern_id,
 		COMPLETE_WITH("WITH");
 	else if (Matches("WAIT", "FOR", "LSN", MatchAny, "WITH"))
 		COMPLETE_WITH("(");
+
+	/*
+	 * Handle parenthesized option list.  This fires when we're in an
+	 * unfinished parenthesized option list.  get_previous_words treats a
+	 * completed parenthesized option list as one word, so the above test is
+	 * correct.
+	 *
+	 * 'mode' takes a string value (one of the listed above), 'timeout' takes
+	 * a string value, and 'no_throw' takes no value.  We do not offer
+	 * completions for the *values* of 'timeout' or 'no_throw'.
+	 */
 	else if (HeadMatches("WAIT", "FOR", "LSN", MatchAny, "WITH", "(*") &&
 			 !HeadMatches("WAIT", "FOR", "LSN", MatchAny, "WITH", "(*)"))
 	{
-		/*
-		 * This fires if we're in an unfinished parenthesized option list.
-		 * get_previous_words treats a completed parenthesized option list as
-		 * one word, so the above test is correct.
-		 */
 		if (ends_with(prev_wd, '(') || ends_with(prev_wd, ','))
-			COMPLETE_WITH("timeout", "no_throw");
-
-		/*
-		 * timeout takes a string value, no_throw takes no value. We don't
-		 * offer completions for these values.
-		 */
+			COMPLETE_WITH("mode", "timeout", "no_throw");
+		else if (TailMatches("mode"))
+			COMPLETE_WITH("'standby_replay'", "'standby_write'", "'standby_flush'", "'primary_flush'");
 	}
 
 /* WITH [RECURSIVE] */
-- 
2.39.5 (Apple Git-154)



  [application/octet-stream] v13-0002-Add-the-MODE-option-to-the-WAIT-FOR-LSN-command.patch (44.8K, 5-v13-0002-Add-the-MODE-option-to-the-WAIT-FOR-LSN-command.patch)
  download | inline diff:
From 4ff3736d2becbff9931ec571098f8ca44081b18c Mon Sep 17 00:00:00 2001
From: Alexander Korotkov <[email protected]>
Date: Sat, 3 Jan 2026 00:38:47 +0200
Subject: [PATCH v13 2/4] Add the MODE option to the WAIT FOR LSN command

This commit extends the WAIT FOR LSN command with an optional MODE option in
the WITH clause that specifies which LSN type to wait for:

  WAIT FOR LSN '<lsn>' [WITH (MODE '<mode>', ...)]

where mode can be:
 - 'standby_replay' (default): Wait for WAL to be replayed to the specified
   LSN,
 - 'standby_write': Wait for WAL to be written (received) to the specified
   LSN,
 - 'standby_flush': Wait for WAL to be flushed to disk at the specified LSN,
 - 'primary_flush': Wait for WAL to be flushed to disk on the primary server.

The default mode is 'standby_replay', matching the original behavior when MODE
is not specified. This follows the pattern used by COPY and EXPLAIN
commands, where options are specified as string values in the WITH clause.

Modes are explicitly named to distinguish between primary and standby
operations:
- Standby modes ('standby_replay', 'standby_write', 'standby_flush') can only
  be used during recovery (on a standby server),
- Primary mode ('primary_flush') can only be used on a primary server.

The 'standby_write' and 'standby_flush' modes are useful for scenarios where
applications need to ensure WAL has been received or persisted on the standby
without necessarily waiting for replay to complete. The 'primary_flush' mode
allows waiting for WAL to be flushed on the primary server.

This commit also includes includes:
- Documentation updates for the new syntax and mode descriptions,
- Test coverage for all four modes, including error cases and concurrent
  waiters,
- Wakeup logic in walreceiver for standby write/flush waiters,
- Wakeup logic in WAL writer for primary flush waiters.

Discussion: https://postgr.es/m/CABPTF7UiArgW-sXj9CNwRzUhYOQrevLzkYcgBydmX5oDes1sjg%40mail.gmail.com
Author: Xuneng Zhou <[email protected]>
Reviewed-by: Alexander Korotkov <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Reviewed-by: Alvaro Herrera <[email protected]>
---
 doc/src/sgml/ref/wait_for.sgml          | 213 +++++++++---
 src/backend/access/transam/xlog.c       |  22 +-
 src/backend/commands/wait.c             | 174 ++++++++--
 src/backend/replication/walreceiver.c   |  18 ++
 src/test/recovery/t/049_wait_for_lsn.pl | 411 ++++++++++++++++++++++--
 5 files changed, 741 insertions(+), 97 deletions(-)

diff --git a/doc/src/sgml/ref/wait_for.sgml b/doc/src/sgml/ref/wait_for.sgml
index 3b8e842d1de..df72b3327c8 100644
--- a/doc/src/sgml/ref/wait_for.sgml
+++ b/doc/src/sgml/ref/wait_for.sgml
@@ -16,17 +16,23 @@ PostgreSQL documentation
 
  <refnamediv>
   <refname>WAIT FOR</refname>
-  <refpurpose>wait for target <acronym>LSN</acronym> to be replayed, optionally with a timeout</refpurpose>
+  <refpurpose>wait for WAL to reach a target <acronym>LSN</acronym></refpurpose>
  </refnamediv>
 
  <refsynopsisdiv>
 <synopsis>
-WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
+WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>'
+    [ WITH ( <replaceable class="parameter">option</replaceable> [, ...] ) ]
 
 <phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase>
 
+    MODE '<replaceable class="parameter">mode</replaceable>'
     TIMEOUT '<replaceable class="parameter">timeout</replaceable>'
     NO_THROW
+
+<phrase>and <replaceable class="parameter">mode</replaceable> can be:</phrase>
+
+    standby_replay | standby_write | standby_flush | primary_flush
 </synopsis>
  </refsynopsisdiv>
 
@@ -34,20 +40,27 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Description</title>
 
   <para>
-    Waits until recovery replays <parameter>lsn</parameter>.
-    If no <parameter>timeout</parameter> is specified or it is set to
-    zero, this command waits indefinitely for the
-    <parameter>lsn</parameter>.
-    On timeout, or if the server is promoted before
-    <parameter>lsn</parameter> is reached, an error is emitted,
-    unless <literal>NO_THROW</literal> is specified in the WITH clause.
-    If <parameter>NO_THROW</parameter> is specified, then the command
-    doesn't throw errors.
+   Waits until the specified <parameter>lsn</parameter> is reached
+   according to the specified <parameter>mode</parameter>,
+   which determines whether to wait for WAL to be written, flushed, or replayed.
+   If no <parameter>timeout</parameter> is specified or it is set to
+   zero, this command waits indefinitely for the
+   <parameter>lsn</parameter>.
+  </para>
+
+  <para>
+   On timeout, an error is emitted unless <literal>NO_THROW</literal>
+   is specified in the WITH clause. For standby modes
+   (<literal>standby_replay</literal>, <literal>standby_write</literal>,
+   <literal>standby_flush</literal>), an error is also emitted if the
+   server is promoted before the <parameter>lsn</parameter> is reached.
+   If <parameter>NO_THROW</parameter> is specified, the command returns
+   a status string instead of throwing errors.
   </para>
 
   <para>
-    The possible return values are <literal>success</literal>,
-    <literal>timeout</literal>, and <literal>not in recovery</literal>.
+   The possible return values are <literal>success</literal>,
+   <literal>timeout</literal>, and <literal>not in recovery</literal>.
   </para>
  </refsect1>
 
@@ -72,6 +85,65 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
       The following parameters are supported:
 
       <variablelist>
+       <varlistentry>
+        <term><literal>MODE</literal> '<replaceable class="parameter">mode</replaceable>'</term>
+        <listitem>
+         <para>
+          Specifies the type of LSN processing to wait for. If not specified,
+          the default is <literal>standby_replay</literal>. The valid modes are:
+         </para>
+         <itemizedlist>
+          <listitem>
+           <para>
+            <literal>standby_replay</literal>: Wait for the LSN to be replayed
+            (applied to the database) on a standby server. After successful
+            completion, <function>pg_last_wal_replay_lsn()</function> will
+            return a value greater than or equal to the target LSN. This mode
+            can only be used during recovery.
+           </para>
+          </listitem>
+          <listitem>
+           <para>
+            <literal>standby_write</literal>: Wait for the WAL containing the
+            LSN to be received from the primary and written to disk on a
+            standby server, but not yet flushed. This is faster than
+            <literal>standby_flush</literal> but provides weaker durability
+            guarantees since the data may still be in operating system
+            buffers. After successful completion, the
+            <structfield>written_lsn</structfield> column in
+            <link linkend="monitoring-pg-stat-wal-receiver-view">
+            <structname>pg_stat_wal_receiver</structname></link> will show
+            a value greater than or equal to the target LSN. This mode can
+            only be used during recovery.
+           </para>
+          </listitem>
+          <listitem>
+           <para>
+            <literal>standby_flush</literal>: Wait for the WAL containing the
+            LSN to be received from the primary and flushed to disk on a
+            standby server. This provides a durability guarantee without
+            waiting for the WAL to be applied. After successful completion,
+            <function>pg_last_wal_receive_lsn()</function> will return a
+            value greater than or equal to the target LSN. This value is
+            also available as the <structfield>flushed_lsn</structfield>
+            column in <link linkend="monitoring-pg-stat-wal-receiver-view">
+            <structname>pg_stat_wal_receiver</structname></link>. This mode
+            can only be used during recovery.
+           </para>
+          </listitem>
+          <listitem>
+           <para>
+            <literal>primary_flush</literal>: Wait for the WAL containing the
+            LSN to be flushed to disk on a primary server. After successful
+            completion, <function>pg_current_wal_flush_lsn()</function> will
+            return a value greater than or equal to the target LSN. This mode
+            can only be used on a primary server (not during recovery).
+           </para>
+          </listitem>
+         </itemizedlist>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><literal>TIMEOUT</literal> '<replaceable class="parameter">timeout</replaceable>'</term>
         <listitem>
@@ -135,9 +207,12 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
     <listitem>
      <para>
       This return value denotes that the database server is not in a recovery
-      state.  This might mean either the database server was not in recovery
-      at the moment of receiving the command, or it was promoted before
-      reaching the target <parameter>lsn</parameter>.
+      state. This might mean either the database server was not in recovery
+      at the moment of receiving the command (i.e., executed on a primary),
+      or it was promoted before reaching the target <parameter>lsn</parameter>.
+      In the promotion case, this status indicates a timeline change occurred,
+      and the application should re-evaluate whether the target LSN is still
+      relevant.
      </para>
     </listitem>
    </varlistentry>
@@ -148,25 +223,34 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Notes</title>
 
   <para>
-    <command>WAIT FOR</command> command waits till
-    <parameter>lsn</parameter> to be replayed on standby.
-    That is, after this command execution, the value returned by
-    <function>pg_last_wal_replay_lsn</function> should be greater or equal
-    to the <parameter>lsn</parameter> value.  This is useful to achieve
-    read-your-writes-consistency, while using async replica for reads and
-    primary for writes.  In that case, the <acronym>lsn</acronym> of the last
-    modification should be stored on the client application side or the
-    connection pooler side.
+   <command>WAIT FOR</command> waits until the specified
+   <parameter>lsn</parameter> is reached according to the specified
+   <parameter>mode</parameter>. The <literal>standby_replay</literal> mode
+   waits for the LSN to be replayed (applied to the database), which is
+   useful to achieve read-your-writes consistency while using an async
+   replica for reads and the primary for writes. The
+   <literal>standby_flush</literal> mode waits for the WAL to be flushed
+   to durable storage on the replica, providing a durability guarantee
+   without waiting for replay. The <literal>standby_write</literal> mode
+   waits for the WAL to be written to the operating system, which is
+   faster than flush but provides weaker durability guarantees. The
+   <literal>primary_flush</literal> mode waits for WAL to be flushed on
+   a primary server. In all cases, the <acronym>LSN</acronym> of the last
+   modification should be stored on the client application side or the
+   connection pooler side.
   </para>
 
   <para>
-    <command>WAIT FOR</command> command should be called on standby.
-    If a user runs <command>WAIT FOR</command> on primary, it
-    will error out unless <parameter>NO_THROW</parameter> is specified in the WITH clause.
-    However, if <command>WAIT FOR</command> is
-    called on primary promoted from standby and <literal>lsn</literal>
-    was already replayed, then the <command>WAIT FOR</command> command just
-    exits immediately.
+   The standby modes (<literal>standby_replay</literal>,
+   <literal>standby_write</literal>, <literal>standby_flush</literal>)
+   can only be used during recovery, and <literal>primary_flush</literal>
+   can only be used on a primary server. Using the wrong mode for the
+   current server state will result in an error. If a standby is promoted
+   while waiting with a standby mode, the command will return
+   <literal>not in recovery</literal> (or throw an error if
+   <literal>NO_THROW</literal> is not specified). Promotion creates a new
+   timeline, and the LSN being waited for may refer to WAL from the old
+   timeline.
   </para>
 
 </refsect1>
@@ -175,21 +259,21 @@ WAIT FOR LSN '<replaceable class="parameter">lsn</replaceable>' [ WITH ( <replac
   <title>Examples</title>
 
   <para>
-    You can use <command>WAIT FOR</command> command to wait for
-    the <type>pg_lsn</type> value.  For example, an application could update
-    the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
-    changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
-    on primary server to get the <acronym>lsn</acronym> given that
-    <varname>synchronous_commit</varname> could be set to
-    <literal>off</literal>.
+   You can use <command>WAIT FOR</command> command to wait for
+   the <type>pg_lsn</type> value.  For example, an application could update
+   the <literal>movie</literal> table and get the <acronym>lsn</acronym> after
+   changes just made.  This example uses <function>pg_current_wal_insert_lsn</function>
+   on primary server to get the <acronym>lsn</acronym> given that
+   <varname>synchronous_commit</varname> could be set to
+   <literal>off</literal>.
 
    <programlisting>
 postgres=# UPDATE movie SET genre = 'Dramatic' WHERE genre = 'Drama';
 UPDATE 100
 postgres=# SELECT pg_current_wal_insert_lsn();
-pg_current_wal_insert_lsn
---------------------
-0/306EE20
+ pg_current_wal_insert_lsn
+---------------------------
+ 0/306EE20
 (1 row)
 </programlisting>
 
@@ -200,7 +284,7 @@ pg_current_wal_insert_lsn
 <programlisting>
 postgres=# WAIT FOR LSN '0/306EE20';
  status
---------
+---------
  success
 (1 row)
 postgres=# SELECT * FROM movie WHERE genre = 'Drama';
@@ -211,7 +295,43 @@ postgres=# SELECT * FROM movie WHERE genre = 'Drama';
   </para>
 
   <para>
-    If the target LSN is not reached before the timeout, the error is thrown.
+   Wait for flush (data durable on replica):
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'standby_flush');
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   Wait for write with timeout:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'standby_write', TIMEOUT '100ms', NO_THROW);
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   Wait for flush on primary:
+
+<programlisting>
+postgres=# WAIT FOR LSN '0/306EE20' WITH (MODE 'primary_flush');
+ status
+---------
+ success
+(1 row)
+</programlisting>
+  </para>
+
+  <para>
+   If the target LSN is not reached before the timeout, an error is thrown:
 
 <programlisting>
 postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '0.1s');
@@ -221,11 +341,12 @@ ERROR:  timed out while waiting for target LSN 0/306EE20 to be replayed; current
 
   <para>
    The same example uses <command>WAIT FOR</command> with
-   <parameter>NO_THROW</parameter> option.
+   <parameter>NO_THROW</parameter> option:
+
 <programlisting>
 postgres=# WAIT FOR LSN '0/306EE20' WITH (TIMEOUT '100ms', NO_THROW);
  status
---------
+---------
  timeout
 (1 row)
 </programlisting>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 05ac7c5f7f8..81dc86847c0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2918,6 +2918,14 @@ XLogFlush(XLogRecPtr record)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for, notify the waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >=
+		 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_PRIMARY_FLUSH])))
+		WaitLSNWakeup(WAIT_LSN_TYPE_PRIMARY_FLUSH, LogwrtResult.Flush);
+
 	/*
 	 * If we still haven't flushed to the request point then we have a
 	 * problem; most likely, the requested flush point is past end of XLOG.
@@ -3100,6 +3108,14 @@ XLogBackgroundFlush(void)
 	/* wake up walsenders now that we've released heavily contended locks */
 	WalSndWakeupProcessRequests(true, !RecoveryInProgress());
 
+	/*
+	 * If we flushed an LSN that someone was waiting for, notify the waiters.
+	 */
+	if (waitLSNState &&
+		(LogwrtResult.Flush >=
+		 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_PRIMARY_FLUSH])))
+		WaitLSNWakeup(WAIT_LSN_TYPE_PRIMARY_FLUSH, LogwrtResult.Flush);
+
 	/*
 	 * Great, done. To take some work off the critical path, try to initialize
 	 * as many of the no-longer-needed WAL buffers for future use as we can.
@@ -6277,10 +6293,12 @@ StartupXLOG(void)
 	WakeupCheckpointer();
 
 	/*
-	 * Wake up all waiters for replay LSN.  They need to report an error that
-	 * recovery was ended before reaching the target LSN.
+	 * Wake up all waiters.  They need to report an error that recovery was
+	 * ended before reaching the target LSN.
 	 */
 	WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, InvalidXLogRecPtr);
 
 	/*
 	 * Shutdown the recovery environment.  This must occur after
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
index 4867f59691e..264f81571d4 100644
--- a/src/backend/commands/wait.c
+++ b/src/backend/commands/wait.c
@@ -2,7 +2,7 @@
  *
  * wait.c
  *	  Implements WAIT FOR, which allows waiting for events such as
- *	  time passing or LSN having been replayed on replica.
+ *	  time passing or LSN having been replayed, flushed, or written.
  *
  * Portions Copyright (c) 2025-2026, PostgreSQL Global Development Group
  *
@@ -15,6 +15,7 @@
 
 #include <math.h>
 
+#include "access/xlog.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogwait.h"
 #include "commands/defrem.h"
@@ -34,12 +35,14 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	XLogRecPtr	lsn;
 	int64		timeout = 0;
 	WaitLSNResult waitLSNResult;
+	WaitLSNType lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY; /* default */
 	bool		throw = true;
 	TupleDesc	tupdesc;
 	TupOutputState *tstate;
 	const char *result = "<unset>";
 	bool		timeout_specified = false;
 	bool		no_throw_specified = false;
+	bool		mode_specified = false;
 
 	/* Parse and validate the mandatory LSN */
 	lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in,
@@ -47,7 +50,32 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 
 	foreach_node(DefElem, defel, stmt->options)
 	{
-		if (strcmp(defel->defname, "timeout") == 0)
+		if (strcmp(defel->defname, "mode") == 0)
+		{
+			char	   *mode_str;
+
+			if (mode_specified)
+				errorConflictingDefElem(defel, pstate);
+			mode_specified = true;
+
+			mode_str = defGetString(defel);
+
+			if (pg_strcasecmp(mode_str, "standby_replay") == 0)
+				lsnType = WAIT_LSN_TYPE_STANDBY_REPLAY;
+			else if (pg_strcasecmp(mode_str, "standby_write") == 0)
+				lsnType = WAIT_LSN_TYPE_STANDBY_WRITE;
+			else if (pg_strcasecmp(mode_str, "standby_flush") == 0)
+				lsnType = WAIT_LSN_TYPE_STANDBY_FLUSH;
+			else if (pg_strcasecmp(mode_str, "primary_flush") == 0)
+				lsnType = WAIT_LSN_TYPE_PRIMARY_FLUSH;
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("unrecognized value for %s option \"%s\": \"%s\"",
+								"WAIT", defel->defname, mode_str),
+						 parser_errposition(pstate, defel->location)));
+		}
+		else if (strcmp(defel->defname, "timeout") == 0)
 		{
 			char	   *timeout_str;
 			const char *hintmsg;
@@ -107,8 +135,8 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	}
 
 	/*
-	 * We are going to wait for the LSN replay.  We should first care that we
-	 * don't hold a snapshot and correspondingly our MyProc->xmin is invalid.
+	 * We are going to wait for the LSN.  We should first care that we don't
+	 * hold a snapshot and correspondingly our MyProc->xmin is invalid.
 	 * Otherwise, our snapshot could prevent the replay of WAL records
 	 * implying a kind of self-deadlock.  This is the reason why WAIT FOR is a
 	 * command, not a procedure or function.
@@ -140,7 +168,22 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	 */
 	Assert(MyProc->xmin == InvalidTransactionId);
 
-	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_STANDBY_REPLAY, lsn, timeout);
+	/*
+	 * Validate that the requested mode matches the current server state.
+	 * Primary modes can only be used on a primary.
+	 */
+	if (lsnType == WAIT_LSN_TYPE_PRIMARY_FLUSH)
+	{
+		if (RecoveryInProgress())
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("recovery is in progress"),
+					 errhint("Waiting for primary_flush can only be done on a primary server. "
+							 "Use standby_flush mode on a standby server.")));
+	}
+
+	/* Now wait for the LSN */
+	waitLSNResult = WaitForLSN(lsnType, lsn, timeout);
 
 	/*
 	 * Process the result of WaitForLSN().  Throw appropriate error if needed.
@@ -154,11 +197,48 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 
 		case WAIT_LSN_RESULT_TIMEOUT:
 			if (throw)
-				ereport(ERROR,
-						errcode(ERRCODE_QUERY_CANCELED),
-						errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current replay LSN %X/%08X",
-							   LSN_FORMAT_ARGS(lsn),
-							   LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+			{
+				XLogRecPtr	currentLSN = GetCurrentLSNForWaitType(lsnType);
+
+				switch (lsnType)
+				{
+					case WAIT_LSN_TYPE_STANDBY_REPLAY:
+						ereport(ERROR,
+								errcode(ERRCODE_QUERY_CANCELED),
+								errmsg("timed out while waiting for target LSN %X/%08X to be replayed; current standby_replay LSN %X/%08X",
+									   LSN_FORMAT_ARGS(lsn),
+									   LSN_FORMAT_ARGS(currentLSN)));
+						break;
+
+					case WAIT_LSN_TYPE_STANDBY_WRITE:
+						ereport(ERROR,
+								errcode(ERRCODE_QUERY_CANCELED),
+								errmsg("timed out while waiting for target LSN %X/%08X to be written; current standby_write LSN %X/%08X",
+									   LSN_FORMAT_ARGS(lsn),
+									   LSN_FORMAT_ARGS(currentLSN)));
+						break;
+
+					case WAIT_LSN_TYPE_STANDBY_FLUSH:
+						ereport(ERROR,
+								errcode(ERRCODE_QUERY_CANCELED),
+								errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current standby_flush LSN %X/%08X",
+									   LSN_FORMAT_ARGS(lsn),
+									   LSN_FORMAT_ARGS(currentLSN)));
+						break;
+
+					case WAIT_LSN_TYPE_PRIMARY_FLUSH:
+						ereport(ERROR,
+								errcode(ERRCODE_QUERY_CANCELED),
+								errmsg("timed out while waiting for target LSN %X/%08X to be flushed; current primary_flush LSN %X/%08X",
+									   LSN_FORMAT_ARGS(lsn),
+									   LSN_FORMAT_ARGS(currentLSN)));
+						break;
+
+					default:
+						elog(ERROR, "unexpected wait LSN type %d", lsnType);
+						pg_unreachable();
+				}
+			}
 			else
 				result = "timeout";
 			break;
@@ -168,18 +248,72 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 			{
 				if (PromoteIsTriggered())
 				{
-					ereport(ERROR,
-							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-							errmsg("recovery is not in progress"),
-							errdetail("Recovery ended before replaying target LSN %X/%08X; last replay LSN %X/%08X.",
-									  LSN_FORMAT_ARGS(lsn),
-									  LSN_FORMAT_ARGS(GetXLogReplayRecPtr(NULL))));
+					XLogRecPtr	currentLSN = GetCurrentLSNForWaitType(lsnType);
+
+					switch (lsnType)
+					{
+						case WAIT_LSN_TYPE_STANDBY_REPLAY:
+							ereport(ERROR,
+									errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+									errmsg("recovery is not in progress"),
+									errdetail("Recovery ended before target LSN %X/%08X was replayed; last standby_replay LSN %X/%08X.",
+											  LSN_FORMAT_ARGS(lsn),
+											  LSN_FORMAT_ARGS(currentLSN)));
+							break;
+
+						case WAIT_LSN_TYPE_STANDBY_WRITE:
+							ereport(ERROR,
+									errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+									errmsg("recovery is not in progress"),
+									errdetail("Recovery ended before target LSN %X/%08X was written; last standby_write LSN %X/%08X.",
+											  LSN_FORMAT_ARGS(lsn),
+											  LSN_FORMAT_ARGS(currentLSN)));
+							break;
+
+						case WAIT_LSN_TYPE_STANDBY_FLUSH:
+							ereport(ERROR,
+									errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+									errmsg("recovery is not in progress"),
+									errdetail("Recovery ended before target LSN %X/%08X was flushed; last standby_flush LSN %X/%08X.",
+											  LSN_FORMAT_ARGS(lsn),
+											  LSN_FORMAT_ARGS(currentLSN)));
+							break;
+
+						default:
+							elog(ERROR, "unexpected wait LSN type %d", lsnType);
+							pg_unreachable();
+					}
 				}
 				else
-					ereport(ERROR,
-							errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-							errmsg("recovery is not in progress"),
-							errhint("Waiting for the replay LSN can only be executed during recovery."));
+				{
+					switch (lsnType)
+					{
+						case WAIT_LSN_TYPE_STANDBY_REPLAY:
+							ereport(ERROR,
+									errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+									errmsg("recovery is not in progress"),
+									errhint("Waiting for the standby_replay LSN can only be executed during recovery."));
+							break;
+
+						case WAIT_LSN_TYPE_STANDBY_WRITE:
+							ereport(ERROR,
+									errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+									errmsg("recovery is not in progress"),
+									errhint("Waiting for the standby_write LSN can only be executed during recovery."));
+							break;
+
+						case WAIT_LSN_TYPE_STANDBY_FLUSH:
+							ereport(ERROR,
+									errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+									errmsg("recovery is not in progress"),
+									errhint("Waiting for the standby_flush LSN can only be executed during recovery."));
+							break;
+
+						default:
+							elog(ERROR, "unexpected wait LSN type %d", lsnType);
+							pg_unreachable();
+					}
+				}
 			}
 			else
 				result = "not in recovery";
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ac002f730c3..a41453530a1 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -57,6 +57,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xlogrecovery.h"
+#include "access/xlogwait.h"
 #include "catalog/pg_authid.h"
 #include "funcapi.h"
 #include "libpq/pqformat.h"
@@ -965,6 +966,14 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
 	/* Update shared-memory status */
 	pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
 
+	/*
+	 * If we wrote an LSN that someone was waiting for, notify the waiters.
+	 */
+	if (waitLSNState &&
+		(LogstreamResult.Write >=
+		 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_WRITE])))
+		WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, LogstreamResult.Write);
+
 	/*
 	 * Close the current segment if it's fully written up in the last cycle of
 	 * the loop, to create its archive notification file soon. Otherwise WAL
@@ -1004,6 +1013,15 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
 		}
 		SpinLockRelease(&walrcv->mutex);
 
+		/*
+		 * If we flushed an LSN that someone was waiting for, notify the
+		 * waiters.
+		 */
+		if (waitLSNState &&
+			(LogstreamResult.Flush >=
+			 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_FLUSH])))
+			WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, LogstreamResult.Flush);
+
 		/* Signal the startup process and walsender that new WAL has arrived */
 		WakeupRecovery();
 		if (AllowCascadeReplication())
diff --git a/src/test/recovery/t/049_wait_for_lsn.pl b/src/test/recovery/t/049_wait_for_lsn.pl
index e0ddb06a2f0..b767b475ff7 100644
--- a/src/test/recovery/t/049_wait_for_lsn.pl
+++ b/src/test/recovery/t/049_wait_for_lsn.pl
@@ -1,5 +1,6 @@
-# Checks waiting for the LSN replay on standby using
-# the WAIT FOR command.
+# Checks waiting for the LSN using the WAIT FOR command.
+# Tests standby modes (standby_replay/standby_write/standby_flush) on standby
+# and primary_flush mode on primary.
 use strict;
 use warnings FATAL => 'all';
 
@@ -7,6 +8,42 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+# Helper functions to control walreceiver for testing wait conditions.
+# These allow us to stop WAL streaming so waiters block, then resume it.
+my $saved_primary_conninfo;
+
+sub stop_walreceiver
+{
+	my ($node) = @_;
+	$saved_primary_conninfo = $node->safe_psql(
+		'postgres', qq[
+		SELECT pg_catalog.quote_literal(setting)
+		FROM pg_settings
+		WHERE name = 'primary_conninfo';
+	]);
+	$node->safe_psql(
+		'postgres', qq[
+		ALTER SYSTEM SET primary_conninfo = '';
+		SELECT pg_reload_conf();
+	]);
+
+	$node->poll_query_until('postgres',
+		"SELECT NOT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+}
+
+sub resume_walreceiver
+{
+	my ($node) = @_;
+	$node->safe_psql(
+		'postgres', qq[
+		ALTER SYSTEM SET primary_conninfo = $saved_primary_conninfo;
+		SELECT pg_reload_conf();
+	]);
+
+	$node->poll_query_until('postgres',
+		"SELECT EXISTS (SELECT * FROM pg_stat_wal_receiver);");
+}
+
 # Initialize primary node
 my $node_primary = PostgreSQL::Test::Cluster->new('primary');
 $node_primary->init(allows_streaming => 1);
@@ -62,7 +99,52 @@ $output = $node_standby->safe_psql(
 ok((split("\n", $output))[-1] eq 30,
 	"standby reached the same LSN as primary");
 
-# 3. Check that waiting for unreachable LSN triggers the timeout.  The
+# 3. Check that WAIT FOR works with standby_write, standby_flush, and
+# primary_flush modes.
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(31, 40))");
+my $lsn_write =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '${lsn_write}' WITH (MODE 'standby_write', timeout '1d');
+	SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${lsn_write}'::pg_lsn);
+]);
+
+ok( (split("\n", $output))[-1] >= 0,
+	"standby wrote WAL up to target LSN after WAIT FOR with MODE 'standby_write'"
+);
+
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(41, 50))");
+my $lsn_flush =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '${lsn_flush}' WITH (MODE 'standby_flush', timeout '1d');
+	SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '${lsn_flush}'::pg_lsn);
+]);
+
+ok( (split("\n", $output))[-1] >= 0,
+	"standby flushed WAL up to target LSN after WAIT FOR with MODE 'standby_flush'"
+);
+
+# Check primary_flush mode on primary
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(51, 60))");
+my $lsn_primary_flush =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+$output = $node_primary->safe_psql(
+	'postgres', qq[
+	WAIT FOR LSN '${lsn_primary_flush}' WITH (MODE 'primary_flush', timeout '1d');
+	SELECT pg_lsn_cmp(pg_current_wal_flush_lsn(), '${lsn_primary_flush}'::pg_lsn);
+]);
+
+ok( (split("\n", $output))[-1] >= 0,
+	"primary flushed WAL up to target LSN after WAIT FOR with MODE 'primary_flush'"
+);
+
+# 4. Check that waiting for unreachable LSN triggers the timeout.  The
 # unreachable LSN must be well in advance.  So WAL records issued by
 # the concurrent autovacuum could not affect that.
 my $lsn3 =
@@ -88,14 +170,26 @@ $output = $node_standby->safe_psql(
 	WAIT FOR LSN '${lsn3}' WITH (timeout '10ms', no_throw);]);
 ok($output eq "timeout", "WAIT FOR returns correct status after timeout");
 
-# 4. Check that WAIT FOR triggers an error if called on primary,
-# within another function, or inside a transaction with an isolation level
-# higher than READ COMMITTED.
+# 5. Check mode validation: standby modes error on primary, primary mode errors
+# on standby, and primary_flush works on primary.  Also check that WAIT FOR
+# triggers an error if called within another function or inside a transaction
+# with an isolation level higher than READ COMMITTED.
+
+# Test standby_flush on primary - should error
+$node_primary->psql(
+	'postgres',
+	"WAIT FOR LSN '${lsn3}' WITH (MODE 'standby_flush');",
+	stderr => \$stderr);
+ok($stderr =~ /recovery is not in progress/,
+	"get an error when running standby_flush on the primary");
 
-$node_primary->psql('postgres', "WAIT FOR LSN '${lsn3}';",
+# Test primary_flush on standby - should error
+$node_standby->psql(
+	'postgres',
+	"WAIT FOR LSN '${lsn3}' WITH (MODE 'primary_flush');",
 	stderr => \$stderr);
-ok( $stderr =~ /recovery is not in progress/,
-	"get an error when running on the primary");
+ok($stderr =~ /recovery is in progress/,
+	"get an error when running primary_flush on the standby");
 
 $node_standby->psql(
 	'postgres',
@@ -125,7 +219,7 @@ ok( $stderr =~
 	  /WAIT FOR must be only called without an active or registered snapshot/,
 	"get an error when running within another function");
 
-# 5. Check parameter validation error cases on standby before promotion
+# 6. Check parameter validation error cases on standby before promotion
 my $test_lsn =
   $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
 
@@ -208,10 +302,26 @@ $node_standby->psql(
 ok( $stderr =~ /option "invalid_option" not recognized/,
 	"get error for invalid WITH clause option");
 
-# 6. Check the scenario of multiple LSN waiters.  We make 5 background
-# psql sessions each waiting for a corresponding insertion.  When waiting is
-# finished, stored procedures logs if there are visible as many rows as
-# should be.
+# Test invalid MODE value
+$node_standby->psql(
+	'postgres',
+	"WAIT FOR LSN '${test_lsn}' WITH (MODE 'invalid');",
+	stderr => \$stderr);
+ok($stderr =~ /unrecognized value for WAIT option "mode": "invalid"/,
+	"get error for invalid MODE value");
+
+# Test duplicate MODE parameter
+$node_standby->psql(
+	'postgres',
+	"WAIT FOR LSN '${test_lsn}' WITH (MODE 'standby_replay', MODE 'standby_write');",
+	stderr => \$stderr);
+ok( $stderr =~ /conflicting or redundant options/,
+	"get error for duplicate MODE parameter");
+
+# 7a. Check the scenario of multiple standby_replay waiters.  We make 5
+# background psql sessions each waiting for a corresponding insertion.  When
+# waiting is finished, stored procedures logs if there are visible as many
+# rows as should be.
 $node_primary->safe_psql(
 	'postgres', qq[
 CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
@@ -225,8 +335,17 @@ CREATE FUNCTION log_count(i int) RETURNS void AS \$\$
   END
 \$\$
 LANGUAGE plpgsql;
+
+CREATE FUNCTION log_wait_done(prefix text, i int) RETURNS void AS \$\$
+  BEGIN
+    RAISE LOG '% %', prefix, i;
+  END
+\$\$
+LANGUAGE plpgsql;
 ]);
+
 $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
 my @psql_sessions;
 for (my $i = 0; $i < 5; $i++)
 {
@@ -243,6 +362,7 @@ for (my $i = 0; $i < 5; $i++)
 		SELECT log_count(${i});
 	]);
 }
+
 my $log_offset = -s $node_standby->logfile;
 $node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
 for (my $i = 0; $i < 5; $i++)
@@ -251,23 +371,246 @@ for (my $i = 0; $i < 5; $i++)
 	$psql_sessions[$i]->quit;
 }
 
-ok(1, 'multiple LSN waiters reported consistent data');
+ok(1, 'multiple standby_replay waiters reported consistent data');
+
+# 7b. Check the scenario of multiple standby_write waiters.
+# Stop walreceiver to ensure waiters actually block.
+stop_walreceiver($node_standby);
+
+# Generate WAL on primary (standby won't receive it yet)
+my @write_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (100 + ${i});");
+	$write_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start standby_write waiters (they will block since walreceiver is stopped)
+my @write_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+	$write_sessions[$i] = $node_standby->background_psql('postgres');
+	$write_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$write_lsns[$i]' WITH (MODE 'standby_write', timeout '1d');
+		SELECT log_wait_done('write_done', $i);
+	]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+	"SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalWrite'"
+);
+
+# Restore walreceiver to unblock waiters
+my $write_log_offset = -s $node_standby->logfile;
+resume_walreceiver($node_standby);
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_standby->wait_for_log("write_done $i", $write_log_offset);
+	$write_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was written up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '$write_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+	"multiple standby_write waiters: standby wrote WAL up to target LSN");
+
+# 7c. Check the scenario of multiple standby_flush waiters.
+# Stop walreceiver to ensure waiters actually block.
+stop_walreceiver($node_standby);
+
+# Generate WAL on primary (standby won't receive it yet)
+my @flush_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (200 + ${i});");
+	$flush_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+}
+
+# Start standby_flush waiters (they will block since walreceiver is stopped)
+my @flush_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+	$flush_sessions[$i] = $node_standby->background_psql('postgres');
+	$flush_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$flush_lsns[$i]' WITH (MODE 'standby_flush', timeout '1d');
+		SELECT log_wait_done('flush_done', $i);
+	]);
+}
+
+# Verify waiters are blocked
+$node_standby->poll_query_until('postgres',
+	"SELECT count(*) = 5 FROM pg_stat_activity WHERE wait_event = 'WaitForWalFlush'"
+);
+
+# Restore walreceiver to unblock waiters
+my $flush_log_offset = -s $node_standby->logfile;
+resume_walreceiver($node_standby);
+
+# Wait for all waiters to complete and close sessions
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_standby->wait_for_log("flush_done $i", $flush_log_offset);
+	$flush_sessions[$i]->quit;
+}
+
+# Verify on standby that WAL was flushed up to the target LSN
+$output = $node_standby->safe_psql('postgres',
+	"SELECT pg_lsn_cmp(pg_last_wal_receive_lsn(), '$flush_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+	"multiple standby_flush waiters: standby flushed WAL up to target LSN");
+
+# 7d. Check the scenario of mixed standby mode waiters (standby_replay,
+# standby_write, standby_flush) running concurrently.  We start 6 sessions:
+# 2 for each mode, all waiting for the same target LSN.  We stop the
+# walreceiver and pause replay to ensure all waiters block.  Then we resume
+# replay and restart the walreceiver to verify they unblock and complete
+# correctly.
+
+# Stop walreceiver first to ensure we can control the flow without hanging
+# (stopping it after pausing replay can hang if the startup process is paused).
+stop_walreceiver($node_standby);
+
+# Pause replay
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_pause();");
+
+# Generate WAL on primary
+$node_primary->safe_psql('postgres',
+	"INSERT INTO wait_test VALUES (generate_series(301, 310));");
+my $mixed_target_lsn =
+  $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
+
+# Start 6 waiters: 2 for each mode
+my @mixed_sessions;
+my @mixed_modes = ('standby_replay', 'standby_write', 'standby_flush');
+for (my $i = 0; $i < 6; $i++)
+{
+	$mixed_sessions[$i] = $node_standby->background_psql('postgres');
+	$mixed_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '${mixed_target_lsn}' WITH (MODE '$mixed_modes[$i % 3]', timeout '1d');
+		SELECT log_wait_done('mixed_done', $i);
+	]);
+}
+
+# Verify all waiters are blocked
+$node_standby->poll_query_until('postgres',
+	"SELECT count(*) = 6 FROM pg_stat_activity WHERE wait_event LIKE 'WaitForWal%'"
+);
+
+# Resume replay (waiters should still be blocked as no WAL has arrived)
+my $mixed_log_offset = -s $node_standby->logfile;
+$node_standby->safe_psql('postgres', "SELECT pg_wal_replay_resume();");
+$node_standby->poll_query_until('postgres',
+	"SELECT NOT pg_is_wal_replay_paused();");
+
+# Restore walreceiver to allow WAL to arrive
+resume_walreceiver($node_standby);
 
-# 7. Check that the standby promotion terminates the wait on LSN.  Start
-# waiting for an unreachable LSN then promote.  Check the log for the relevant
-# error message.  Also, check that waiting for already replayed LSN doesn't
-# cause an error even after promotion.
+# Wait for all sessions to complete and close them
+for (my $i = 0; $i < 6; $i++)
+{
+	$node_standby->wait_for_log("mixed_done $i", $mixed_log_offset);
+	$mixed_sessions[$i]->quit;
+}
+
+# Verify all modes reached the target LSN
+$output = $node_standby->safe_psql(
+	'postgres', qq[
+	SELECT pg_lsn_cmp((SELECT written_lsn FROM pg_stat_wal_receiver), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+	       pg_lsn_cmp(pg_last_wal_receive_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0 AND
+	       pg_lsn_cmp(pg_last_wal_replay_lsn(), '${mixed_target_lsn}'::pg_lsn) >= 0;
+]);
+
+ok($output eq 't',
+	"mixed mode waiters: all modes completed and reached target LSN");
+
+# 7e. Check the scenario of multiple primary_flush waiters on primary.
+# We start 5 background sessions waiting for different LSNs with primary_flush
+# mode.  Each waiter logs when done.
+my @primary_flush_lsns;
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_primary->safe_psql('postgres',
+		"INSERT INTO wait_test VALUES (400 + ${i});");
+	$primary_flush_lsns[$i] =
+	  $node_primary->safe_psql('postgres',
+		"SELECT pg_current_wal_insert_lsn()");
+}
+
+my $primary_flush_log_offset = -s $node_primary->logfile;
+
+# Start primary_flush waiters
+my @primary_flush_sessions;
+for (my $i = 0; $i < 5; $i++)
+{
+	$primary_flush_sessions[$i] = $node_primary->background_psql('postgres');
+	$primary_flush_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '$primary_flush_lsns[$i]' WITH (MODE 'primary_flush', timeout '1d');
+		SELECT log_wait_done('primary_flush_done', $i);
+	]);
+}
+
+# The WAL should already be flushed, so waiters should complete quickly
+for (my $i = 0; $i < 5; $i++)
+{
+	$node_primary->wait_for_log("primary_flush_done $i",
+		$primary_flush_log_offset);
+	$primary_flush_sessions[$i]->quit;
+}
+
+# Verify on primary that WAL was flushed up to the target LSN
+$output = $node_primary->safe_psql('postgres',
+	"SELECT pg_lsn_cmp(pg_current_wal_flush_lsn(), '$primary_flush_lsns[4]'::pg_lsn);"
+);
+
+ok($output >= 0,
+	"multiple primary_flush waiters: primary flushed WAL up to target LSN");
+
+# 8. Check that the standby promotion terminates all standby wait modes.  Start
+# waiting for unreachable LSNs with standby_replay, standby_write, and
+# standby_flush modes, then promote.  Check the log for the relevant error
+# messages.  Also, check that waiting for already replayed LSN doesn't cause
+# an error even after promotion.
 my $lsn4 =
   $node_primary->safe_psql('postgres',
 	"SELECT pg_current_wal_insert_lsn() + 10000000000");
+
 my $lsn5 =
   $node_primary->safe_psql('postgres', "SELECT pg_current_wal_insert_lsn()");
-my $psql_session = $node_standby->background_psql('postgres');
-$psql_session->query_until(
-	qr/start/, qq[
-	\\echo start
-	WAIT FOR LSN '${lsn4}';
-]);
+
+# Start background sessions waiting for unreachable LSN with all modes
+my @wait_modes = ('standby_replay', 'standby_write', 'standby_flush');
+my @wait_sessions;
+for (my $i = 0; $i < 3; $i++)
+{
+	$wait_sessions[$i] = $node_standby->background_psql('postgres');
+	$wait_sessions[$i]->query_until(
+		qr/start/, qq[
+		\\echo start
+		WAIT FOR LSN '${lsn4}' WITH (MODE '$wait_modes[$i]');
+	]);
+}
 
 # Make sure standby will be promoted at least at the primary insert LSN we
 # have just observed.  Use pg_switch_wal() to force the insert LSN to be
@@ -277,9 +620,16 @@ $node_primary->wait_for_catchup($node_standby);
 
 $log_offset = -s $node_standby->logfile;
 $node_standby->promote;
-$node_standby->wait_for_log('recovery is not in progress', $log_offset);
 
-ok(1, 'got error after standby promote');
+# Wait for all three sessions to get the error (each mode has distinct message)
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was written/,
+	$log_offset);
+$node_standby->wait_for_log(qr/Recovery ended before target LSN.*was flushed/,
+	$log_offset);
+$node_standby->wait_for_log(
+	qr/Recovery ended before target LSN.*was replayed/, $log_offset);
+
+ok(1, 'promotion interrupted all wait modes');
 
 $node_standby->safe_psql('postgres', "WAIT FOR LSN '${lsn5}';");
 
@@ -295,8 +645,11 @@ ok($output eq "not in recovery",
 $node_standby->stop;
 $node_primary->stop;
 
-# If we send \q with $psql_session->quit the command can be sent to the session
+# If we send \q with $session->quit the command can be sent to the session
 # already closed. So \q is in initial script, here we only finish IPC::Run.
-$psql_session->{run}->finish;
+for (my $i = 0; $i < 3; $i++)
+{
+	$wait_sessions[$i]->{run}->finish;
+}
 
 done_testing();
-- 
2.39.5 (Apple Git-154)



reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: Re: Implement waiting for wal lsn replay: reloaded
  In-Reply-To: <CAPpHfds-KiZRuCruc0jHxLSxLqzKcHJGwOFFA0b_RgaJvtUOEQ@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox