public inbox for [email protected]  
help / color / mirror / Atom feed
From: Imran Zaheer <[email protected]>
To: Zsolt Parragi <[email protected]>
To: Jakub Wartak <[email protected]>
Cc: Hayato Kuroda (Fujitsu) <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: [WIP] Pipelined Recovery
Date: Wed, 18 Mar 2026 15:18:55 +0500
Message-ID: <CA+UBfa=PKdShpSTTTSHwXdGPZnm2rGMKPjERNOdS0SG9t9CT3Q@mail.gmail.com> (raw)
In-Reply-To: <CA+UBfa=Dv-2tLSEKHJ0YFFH8PCTHxnX9rtVZeV8gd8q1a-GmYA@mail.gmail.com>
References: <CA+UBfa=vDV8wbmAV0pgrx-FuJh+x8YOW23vJ90Jzr=14rV+9jA@mail.gmail.com>
	<OS9PR01MB12149A4E7927072A215AEED69F565A@OS9PR01MB12149.jpnprd01.prod.outlook.com>
	<CA+UBfakmkdtauuRsOVXFqhFVJt0nTdEadx94tJn+qG0Pe8Wjfw@mail.gmail.com>
	<CAN4CZFM7FV0VTNkujD=Mb7tNa+jkmEfnX7carvj95fY6Tp11FQ@mail.gmail.com>
	<CA+UBfamW6NuuMMQTDRPDQ0a9fWN_u2OvjEF98u3CfYKTBcOZMw@mail.gmail.com>
	<CA+UBfa=Dv-2tLSEKHJ0YFFH8PCTHxnX9rtVZeV8gd8q1a-GmYA@mail.gmail.com>

(resending the mail, previous mail was held for moderation for some reason.)

Hi

I am attaching a new rebased version of the patch. Following are some
major changes in the new patch set.

* Streaming replication is now working. The prefetcher was not fully
decoupled from the startup process; that's why there were inconsistencies
in some scenarios and most of the recovery tap tests were failing.

* Patch is now split into consumer and producer patches. This will make
review easier.

* Pipeline shutdown flow is also improved. Now producer will always check
for the shutdown flag (being set by the consumer)

* Pipeline msg queue size is now configurable `wal_pipeline_queue_size`

* Tap tests now passes with PG_TEST_INITDB_EXTRA_OPTS="-c wal_pipeline=on"

* New tap test for `recovery/t/053_walpipeline.pl`. This covers some basic
functionality of the pipeline.

* The filename is changed to xlogpipeline.{h|C}

Thanks to all for the valuable feedback.
Looking forward to your reviews, comments, etc.

--
Regards,
Imran Zaheer


On Wed, Mar 18, 2026 at 3:15 PM Imran Zaheer <[email protected]> wrote:

> (resending the mail, previous mail was held for moderation for some
> reason. Now pdf is moved to the tar.gz)
>
> Hi
>
> I am attaching a new rebased version of the patch. Following are some
> major changes in the new patch set.
>
> * Streaming replication is now working. The prefetcher was not fully
> decoupled from the startup process; that's why there were inconsistencies
> in some scenarios and most of the recovery tap tests were failing.
>
> * Patch is now split into consumer and producer patches. This will make
> review easier.
>
> * Pipeline shutdown flow is also improved. Now producer will always check
> for the shutdown flag (being set by the consumer)
>
> * Pipeline msg queue size is now configurable `wal_pipeline_queue_size`
>
> * Tap tests now passes with PG_TEST_INITDB_EXTRA_OPTS="-c wal_pipeline=on"
>
> * New tap test for `recovery/t/053_walpipeline.pl`. This covers some
> basic functionality of the pipeline.
>
> * The filename is changed to xlogpipeline.{h|C}
>
> Thanks to all for the valuable feedback.
> Looking forward to your reviews, comments, etc.
>
> --
> Regards,
> Imran Zaheer
>
>
> On Wed, Mar 18, 2026 at 12:43 PM Imran Zaheer <[email protected]>
> wrote:
>
>> Hi Zsolt.
>>
>> Thanks alot for the review and pointing out the bugs. I have fixed the
>> bugs you mentioned in my new patch set. But
>> patchset mail is held for moderation for some reason.
>>
>> >
>> >   if (reachedRecoveryTarget)
>> >   {
>> > + if (wal_pipeline_enabled)
>> > + WalPipeline_Stop();
>> >
>> > What if we didn't reach the recovery target, shouldn't we stop the
>> > pipelines then?
>> >
>>
>> I have fixed the bugs shutdown logic.
>>
>> As we already know we will exist the recovery redo loop in
>> `PerformWalRecovery()` only in two cases
>>
>> 1: Recovery target reached:
>> In this case consumers will call to stop the pipeline.
>>
>> @@ -1807,6 +1931,9 @@ PerformWalRecovery(void)
>>
>>   if (reachedRecoveryTarget)
>>   {
>> + if (wal_pipeline_enabled)
>> + WalPipeline_Stop();
>> +
>>
>> 2: Available pg_wal is consumed and now more wal to read.
>> In this case pipeline producers will send the shutdown msg to the
>> consumer. Consumer will
>> detect this message and then call ` WalPipeline_Stop`. This is the case
>> where we cannot read
>> more records and the while loop will break here.
>>
>> + if (decoded_record)
>> + {
>> + record = &decoded_record->header;
>> + return record;
>> + }
>> + else
>> + {
>> + /*
>> + * We will end up here only when pipeline couldn't read more
>> + * records and have sent a shutdown msg. We will acknowldge this
>> + * and will trigger request to stop the pipeline workers.
>> + */
>> + WalPipeline_Stop();
>> + return NULL;
>> + }
>>
>>
>> Hope this makes sense.
>>
>> Once again thanks for reporting the bugs. You will receive the new
>> patchset mail soon once it is cleared from
>> the moderation.
>>
>> Looking forward to your reviews, comments, etc.
>>
>> Regards,
>> Imran Zaheer
>>
>


Attachments:

  [text/x-patch] v2-0001-Pipelined-Recovery-Producer.patch (40.3K, 3-v2-0001-Pipelined-Recovery-Producer.patch)
  download | inline diff:
From e8619cb11f065a8a86495a9a7aa16c073cd6e37e Mon Sep 17 00:00:00 2001
From: Imran Zaheer <[email protected]>
Date: Tue, 17 Mar 2026 21:47:56 +0500
Subject: [PATCH v2 1/2] Pipelined Recovery - Producer

This includes the producer specific code for the producer-consumer
architecture for WAL replay that separates WAL decoding from the recovery process,
enabling parallel processing between differemt steps of replay.

The producer includes a background worker that reads and decodes WAL records,
then send them to the startup process for the redo. IPC happens via shared
 memory message queues (shm_mq), allowing the decoder to run ahead of the apply process.

This provides some improvement in recovery performance for CPU-bound workloads.

New GUC: wal_pipeline (default: off)

Author: Imran Zaheer <[email protected]>
Idea by: Ants Aasma <[email protected]>
---
 src/backend/access/transam/Makefile           |   1 +
 src/backend/access/transam/meson.build        |   1 +
 src/backend/access/transam/xlogpipeline.c     | 677 ++++++++++++++++++
 src/backend/access/transam/xlogrecovery.c     |  43 ++
 src/backend/postmaster/bgworker.c             |   5 +
 src/backend/storage/ipc/ipci.c                |   5 +
 src/backend/utils/misc/guc_parameters.dat     |  15 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/include/access/xlog.h                     |   2 +
 src/include/access/xlogpipeline.h             | 188 +++++
 src/include/access/xlogrecovery.h             |  19 +
 src/test/recovery/meson.build                 |   1 +
 src/test/recovery/t/053_walpipeline.pl        | 208 ++++++
 13 files changed, 1167 insertions(+)
 create mode 100644 src/backend/access/transam/xlogpipeline.c
 create mode 100644 src/include/access/xlogpipeline.h
 create mode 100644 src/test/recovery/t/053_walpipeline.pl

diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index a32f473e0a2..ba0bf343769 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -32,6 +32,7 @@ OBJS = \
 	xlogbackup.o \
 	xlogfuncs.o \
 	xloginsert.o \
+	xlogpipeline.o \
 	xlogprefetcher.o \
 	xlogreader.o \
 	xlogrecovery.o \
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index 06aadc7f315..be37b40581d 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -20,6 +20,7 @@ backend_sources += files(
   'xlogbackup.c',
   'xlogfuncs.c',
   'xloginsert.c',
+  'xlogpipeline.c',
   'xlogprefetcher.c',
   'xlogrecovery.c',
   'xlogstats.c',
diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c
new file mode 100644
index 00000000000..4b95a11d16b
--- /dev/null
+++ b/src/backend/access/transam/xlogpipeline.c
@@ -0,0 +1,677 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.c
+ *    WAL replay pipeline implementation
+ *
+ * This module implements a producer-consumer pipeline for WAL replay.
+ * The producer (background worker) reads and decodes WAL records in parallel
+ * with the consumer (startup process) that applies them.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *    src/backend/access/transam/xlogpipeline.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/xlog.h"
+#include "access/xlogpipeline.h"
+#include "access/xlogprefetcher.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "access/xlog_internal.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/startup.h"
+#include "storage/bufmgr.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/md.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "storage/smgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/elog.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/rel.h"
+#include "utils/timeout.h"
+
+
+/* Global shared memory control structure */
+WalPipelineShmCtl *WalPipelineShm = NULL;
+
+XLogPrefetcher *xlogprefetcher_pipelined = NULL;
+
+/* Local state for producer */
+static dsm_segment *producer_dsm_seg = NULL;
+static shm_mq *producer_mq = NULL;
+static shm_mq_handle *producer_mq_handle = NULL;
+
+
+/*
+ * Flags set by interrupt handlers for later service in the redo loop.
+ */
+static volatile sig_atomic_t got_SIGHUP = false;
+
+/* Signal handlers */
+static void PipelineBgwSigHupHandler(SIGNAL_ARGS);
+
+/* Forward declarations */
+static void wal_pipeline_cleanup_callback(int code, Datum arg);
+static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static void cleanup_producer_resources(void);
+static void WalPipeline_WaitForConsumerShutdownRequest(void);
+
+/* copied from xlogrecovery.c */
+/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
+typedef struct XLogPageReadPrivate
+{
+	int			emode;
+	bool		fetching_ckpt;	/* are we fetching a checkpoint record? */
+	bool		randAccess;
+	TimeLineID	replayTLI;
+} XLogPageReadPrivate;
+
+/*
+ * Compute space needed for WAL pipeline shared memory
+ */
+Size
+WalPipelineShmemSize(void)
+{
+	Size        size = 0;
+
+	size = add_size(size, sizeof(WalPipelineShmCtl));
+
+	return size;
+}
+
+/*
+ * Initialize WAL pipeline shared memory structures
+ */
+void
+WalPipelineShmemInit(void)
+{
+	bool        found;
+
+	WalPipelineShm = (WalPipelineShmCtl *)
+		ShmemInitStruct("WAL Pipeline Control",
+						sizeof(WalPipelineShmCtl),
+						&found);
+
+	if (!found)
+	{
+		/* First time through, initialize */
+		SpinLockInit(&WalPipelineShm->mutex);
+		WalPipelineShm->initialized = false;
+		WalPipelineShm->shutdown_requested = false;
+		WalPipelineShm->producer_exited = false;
+		WalPipelineShm->producer_pid = 0;
+		WalPipelineShm->consumer_pid = 0;
+		WalPipelineShm->producer_lsn = InvalidXLogRecPtr;
+		WalPipelineShm->consumer_lsn = InvalidXLogRecPtr;
+		WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID;
+		WalPipelineShm->records_sent = 0;
+		WalPipelineShm->records_received = 0;
+		WalPipelineShm->bytes_sent = 0;
+		WalPipelineShm->bytes_received = 0;
+		WalPipelineShm->error_code = 0;
+		WalPipelineShm->error_message[0] = '\0';
+	}
+}
+
+
+
+/*
+ * Producer Function.
+ * Main loop for the producer background worker.
+ */
+void
+WalPipeline_ProducerMain(Datum main_arg)
+{
+	dsm_handle           handle = DatumGetUInt32(main_arg);
+	dsm_segment        	*seg;
+	shm_toc            	*toc;
+	WalPipelineParams  	*params;
+	XLogReaderState    	*xlogreader;
+	XLogPageReadPrivate *private;
+	XLogRecord         	*record;
+	TimeLineID           replayTLI = 0;
+	bool 				 end_of_wal = false;
+	uint64				 records_sent;
+	uint64				 records_received;
+
+	/*
+	 * Properly accept or ignore signals the postmaster might send us.
+	 */
+	pqsignal(SIGHUP, PipelineBgwSigHupHandler); 	/* reload config file */
+
+	/* Register cleanup callback */
+	before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+	seg = dsm_attach(handle);
+	if (seg == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("[walpipeline] producer: could not map dynamic shared memory segment")));
+
+	toc = shm_toc_attach(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg));
+	if (toc == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("[walpipeline] producer: bad magic number in dynamic shared memory segment")));
+
+	/* Lookup params and queue */
+	params = shm_toc_lookup(toc, 1, false);
+	producer_mq = shm_toc_lookup(toc, 2, false);
+
+	/* Set up producer side of queue */
+	producer_dsm_seg = seg;
+	shm_mq_set_sender(producer_mq, MyProc);
+	producer_mq_handle = shm_mq_attach(producer_mq, seg, NULL);
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->producer_pid = MyProcPid;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	/* DSM is now attached, so safe to unblock the signals */
+	BackgroundWorkerUnblockSignals();
+
+	/* Set up WAL reading processor */
+	private = palloc0(sizeof(XLogPageReadPrivate));
+	xlogreader =
+		XLogReaderAllocate(wal_segment_size, NULL,
+						   XL_ROUTINE(.page_read = &XLogPageRead,
+									  .segment_open = NULL,
+									  .segment_close = wal_segment_close),
+						   private);
+
+	if (!xlogreader)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+	xlogreader->system_identifier = GetSystemIdentifier();
+
+	/*
+	 * Set the WAL decode buffer size.  This limits how far ahead we can read
+	 * in the WAL.
+	 */
+	XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+	/* Init some important globals before starting */
+	replayTLI = params->ReplayTLI;
+	InitializePipelineRecoveryEnv(params);
+
+	/* Reinit the WAL prefetcher. */
+	xlogprefetcher_pipelined = XLogPrefetcherAllocate(xlogreader);
+
+
+	elog(LOG, "[walpipeline] producer: started at %X/%X, TLI %u",
+		 LSN_FORMAT_ARGS(params->NextRecPtr), replayTLI);
+
+	XLogPrefetcherBeginRead(xlogprefetcher_pipelined, params->NextRecPtr);
+
+	/* Main decoding loop */
+	while (true)
+	{
+		bool shutdown_requested;
+
+		/* Check if consumer requested shutdown */
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		shutdown_requested = WalPipelineShm->shutdown_requested;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (shutdown_requested)
+		{
+			elog(DEBUG1, "[walpipeline] producer: shutdown requested by consumer");
+			break;
+		}
+
+		/* Read next WAL record */
+		record = ReadRecord(xlogprefetcher_pipelined, LOG, false, replayTLI);
+
+		if (record == NULL)
+		{
+			end_of_wal = true;
+			elog(DEBUG1, "[walpipeline] producer: reached end of WAL");
+			break;
+		}
+
+		/*
+		 * Successfully decoded a record. Send it to the consumer.
+		 */
+		if (!WalPipeline_SendRecord(xlogreader))
+		{
+			elog(WARNING, "[walpipeline] producer: failed to send record, queue full or detached");
+			break;
+		}
+
+		/* Update our position for monitoring */
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		WalPipelineShm->producer_lsn = xlogreader->EndRecPtr;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+
+	if (end_of_wal)
+	{
+		/* Notify consumer we need to exit early */
+		WalPipeline_SendShutdown();
+
+		/* wait until consumer set the flag */
+		WalPipeline_WaitForConsumerShutdownRequest();
+	}
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	records_sent = WalPipelineShm->records_sent;
+	records_received = WalPipelineShm->records_received;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT " received=" UINT64_FORMAT,
+		 records_sent, records_received);
+
+	/* Cleanup */
+	XLogReaderFree(xlogreader);
+	DisownRecoveryWakeupLatch();
+	pfree(private);
+	cleanup_producer_resources();
+}
+
+/*
+ * Producer Function.
+ * Send a decoded WAL record to the consumer
+ */
+bool
+WalPipeline_SendRecord(XLogReaderState *record)
+{
+	char       *buffer = NULL;
+	Size        msglen;
+	shm_mq_result res;
+
+
+	if (!producer_mq_handle)
+		return false;
+
+	/* Serialize the record */
+	msglen = serialize_wal_record(record, &buffer);
+
+	res = shm_mq_send(producer_mq_handle, msglen, buffer, false, true);
+
+	if (res == SHM_MQ_SUCCESS)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		WalPipelineShm->records_sent++;
+		WalPipelineShm->bytes_sent += msglen;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		pfree(buffer);
+		return true;
+	}
+
+	if (res == SHM_MQ_DETACHED)
+	{
+		elog(PANIC, "[walpipeline] producer: consumer detached");
+		pfree(buffer);
+		return false;
+	}
+
+	/* Some other error */
+	elog(PANIC, "[walpipeline] producer: shm_mq_send failed with result %d", res);
+	pfree(buffer);
+	return false;
+}
+
+/*
+ * Producer Function.
+ * Send shutdown message to consumer
+ */
+bool
+WalPipeline_SendShutdown(void)
+{
+	WalRecordMsgHeader hdr;
+	shm_mq_result res;
+
+	if (!producer_mq_handle)
+		return false;
+
+	hdr.msg_type = WAL_MSG_SHUTDOWN;
+	hdr.endRecPtr = InvalidXLogRecPtr;
+
+	res = shm_mq_send(producer_mq_handle, sizeof(hdr), &hdr, false, true);
+	return (res == SHM_MQ_SUCCESS);
+}
+
+/*
+ * Producer Function.
+ * Send error message to consumer
+ */
+bool
+WalPipeline_SendError(int errcode, const char *errmsg)
+{
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->error_code = errcode;
+	strlcpy(WalPipelineShm->error_message, errmsg,
+			sizeof(WalPipelineShm->error_message));
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	return true;
+}
+
+/*
+ * Producer Function.
+ * Producer may can exit without waiting for the consumer, but its better to
+ * wait until consumer request shutdown. This way log messages will show
+ * no of records_sent & records_received records equal to each other.
+ */
+static void
+WalPipeline_WaitForConsumerShutdownRequest(void)
+{
+	int			iters = 0;
+
+	while (true)
+	{
+		bool		shutdown_requested;
+
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		shutdown_requested = WalPipelineShm->shutdown_requested;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (shutdown_requested)
+			break;
+
+		if (++iters >= MAX_SHUTDOWN_WAIT_ITERS)
+		{
+			elog(WARNING,
+					"[walpipeline] producer: timed out waiting for consumer "
+					"to acknowledge shutdown, exiting anyway");
+			break;
+		}
+
+		/* Allow SIGTERM / SIGHUP to interrupt the wait */
+		ProcessPipelineBgwInterrupts();
+
+		pg_usleep(10000);  /* sleep 10ms */
+	}
+}
+
+
+/*
+ * serialize_wal_record (Producer)
+ *
+ * Serialize the currently decoded WAL record owned by the given
+ * XLogReaderState into a contiguous buffer.
+ *
+ * Allocation/layout of the output buffer:
+ *
+ *   [WalRecordMsgHeader]
+ *   [XLogRecord]
+ *   [main_data]
+ *   Repeated for each in-use block:
+ *     [SerializedBlockMeta]
+ *     [backup image bytes]   (optional)
+ *     [block data bytes]     (optional)
+ *
+ * The resulting buffer contains no pointers and is safe to transfer
+ * across processes or store externally.
+ */
+static Size
+serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
+{
+	DecodedXLogRecord *dec = xlogreader->record;
+	WalRecordMsgHeader hdr;
+	Size total;
+	char *ptr;
+
+	if (dec == NULL)
+		return 0;
+
+	/* ---- size calculation ---- */
+	total =
+		sizeof(WalRecordMsgHeader) +
+		sizeof(XLogRecord) +
+		dec->main_data_len;
+
+	for (int i = 0; i <= dec->max_block_id; i++)
+	{
+		const DecodedBkpBlock *blk = &dec->blocks[i];
+
+		if (!blk->in_use)
+			continue;
+
+		total += sizeof(SerializedBlockMeta);
+
+		if (blk->has_image && blk->bkp_image && blk->bimg_len > 0)
+			total += blk->bimg_len;
+
+		if (blk->has_data && blk->data && blk->data_len > 0)
+			total += blk->data_len;
+	}
+
+	ptr = *outbuf = palloc(total);
+
+	/* ---- message header ---- */
+	memset(&hdr, 0, sizeof(hdr));
+	hdr.msg_type      		= WAL_MSG_RECORD;
+	hdr.readRecPtr    		= xlogreader->ReadRecPtr;
+	hdr.abortedRecPtr    	= xlogreader->abortedRecPtr;
+	hdr.missingContrecPtr	= xlogreader->missingContrecPtr;
+	hdr.overwrittenRecPtr	= xlogreader->overwrittenRecPtr;
+	hdr.endRecPtr     		= xlogreader->EndRecPtr;
+	hdr.decoded_size  		= total - sizeof(WalRecordMsgHeader);
+	hdr.max_block_id  		= dec->max_block_id;
+	hdr.main_data_len 		= dec->main_data_len;
+	hdr.toplevel_xid 		= dec->toplevel_xid;
+	hdr.record_origin 		= dec->record_origin;
+
+	memcpy(ptr, &hdr, sizeof(hdr));
+	ptr += sizeof(hdr);
+
+	/* ---- XLogRecord ---- */
+	memcpy(ptr, &dec->header, sizeof(XLogRecord));
+	ptr += sizeof(XLogRecord);
+
+	/* ---- main data ---- */
+	if (dec->main_data_len > 0)
+	{
+		memcpy(ptr, dec->main_data, dec->main_data_len);
+		ptr += dec->main_data_len;
+	}
+
+	/* ---- blocks ---- */
+	for (int i = 0; i <= dec->max_block_id; i++)
+	{
+		const DecodedBkpBlock *blk = &dec->blocks[i];
+		SerializedBlockMeta meta;
+
+		if (!blk->in_use)
+			continue;
+
+		memset(&meta, 0, sizeof(meta));
+		meta.block_id    = i;
+		meta.in_use      = true;
+		meta.rlocator    = blk->rlocator;
+		meta.forknum     = blk->forknum;
+		meta.blkno       = blk->blkno;
+		meta.flags       = blk->flags;
+		meta.has_image   = blk->has_image;
+		meta.apply_image = blk->apply_image;
+		meta.has_data    = blk->has_data;
+		meta.bimg_len    = blk->bimg_len;
+		meta.bimg_info   = blk->bimg_info;
+		meta.hole_offset = blk->hole_offset;
+		meta.hole_length = blk->hole_length;
+		meta.data_len    = blk->data_len;
+
+		memcpy(ptr, &meta, sizeof(meta));
+		ptr += sizeof(meta);
+
+		if (blk->has_image && blk->bkp_image && blk->bimg_len > 0)
+		{
+			memcpy(ptr, blk->bkp_image, blk->bimg_len);
+			ptr += blk->bimg_len;
+		}
+
+		if (blk->has_data && blk->data && blk->data_len > 0)
+		{
+			memcpy(ptr, blk->data, blk->data_len);
+			ptr += blk->data_len;
+		}
+	}
+
+	Assert(ptr - *outbuf == total);
+	return total;
+}
+
+
+/*
+ * We need to put some assertion that only pipeline worker should be touching
+ * the specific code.
+ */
+bool AmWalPipeline()
+{
+	if (MyBackendType == B_BG_WORKER && MyBgworkerEntry)
+	{
+		if (strncmp(MyBgworkerEntry->bgw_name, "wal pipeline", 12) == 0)
+			return true;
+	}
+
+	return false;
+}
+
+/*
+ * Clean up producer-side resources
+ */
+static void
+cleanup_producer_resources(void)
+{
+	if (producer_mq_handle)
+	{
+		shm_mq_detach(producer_mq_handle);
+		producer_mq_handle = NULL;
+	}
+
+	if (producer_dsm_seg)
+	{
+		dsm_detach(producer_dsm_seg);
+		producer_dsm_seg = NULL;
+	}
+
+	producer_mq = NULL;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->producer_pid = 0;
+	WalPipelineShm->producer_exited = true;
+	SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+
+/*
+ * Cleanup callback for process exit
+ */
+static void
+wal_pipeline_cleanup_callback(int code, Datum arg)
+{
+	pid_t mypid = MyProcPid;
+	bool is_producer = false;
+
+	if (WalPipelineShm)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		is_producer = (WalPipelineShm->producer_pid == mypid);
+		SpinLockRelease(&WalPipelineShm->mutex);
+	}
+
+	if (is_producer)
+		cleanup_producer_resources();
+	else
+		cleanup_consumer_resources();
+}
+
+/* --------------------------------
+ *		signal handler routines
+ * --------------------------------
+ */
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+PipelineBgwSigHupHandler(SIGNAL_ARGS)
+{
+	got_SIGHUP = true;
+	WakeupRecovery();
+}
+
+/*
+ * Re-read the config file.
+ *
+ * If one of the critical walreceiver options has changed, flag xlogrecovery.c
+ * to restart it.
+ */
+static void
+PipelineRereadConfig(void)
+{
+	char	   *conninfo = pstrdup(PrimaryConnInfo);
+	char	   *slotname = pstrdup(PrimarySlotName);
+	bool		tempSlot = wal_receiver_create_temp_slot;
+	bool		conninfoChanged;
+	bool		slotnameChanged;
+	bool		tempSlotChanged = false;
+
+	ProcessConfigFile(PGC_SIGHUP);
+
+	conninfoChanged = strcmp(conninfo, PrimaryConnInfo) != 0;
+	slotnameChanged = strcmp(slotname, PrimarySlotName) != 0;
+
+	/*
+	 * wal_receiver_create_temp_slot is used only when we have no slot
+	 * configured.  We do not need to track this change if it has no effect.
+	 */
+	if (!slotnameChanged && strcmp(PrimarySlotName, "") == 0)
+		tempSlotChanged = tempSlot != wal_receiver_create_temp_slot;
+	pfree(conninfo);
+	pfree(slotname);
+
+	if (conninfoChanged || slotnameChanged || tempSlotChanged)
+		StartupRequestWalReceiverRestart();
+}
+
+/*
+ * Process any requests or signals received recently.
+ */
+void
+ProcessPipelineBgwInterrupts(void)
+{
+
+	bool shutdown_requested;
+
+	if (got_SIGHUP)
+	{
+		got_SIGHUP = false;
+		PipelineRereadConfig();
+	}
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	shutdown_requested = WalPipelineShm->shutdown_requested;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	if (shutdown_requested)
+		proc_exit(0);
+
+	CHECK_FOR_INTERRUPTS();
+}
\ No newline at end of file
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 6d2c4a86b96..b66ec80fa25 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -4738,6 +4738,49 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue
 }
 
 
+/*
+ * Pipeline bgw should be aware of all the parameters thats been initialized by
+ * the startup process before performing the actual recoevery. Other then this
+ * there are also some variables that keep on changing. The pipeline & the startup
+ * process should be aware of the state change of such variables, we can use shared
+ * memory for such variables.
+ */
+void
+InitializePipelineRecoveryEnv(WalPipelineParams *params)
+{
+	StandbyMode = params->StandbyMode;
+	StandbyModeRequested = params->StandbyModeRequested;
+	ArchiveRecoveryRequested = params->ArchiveRecoveryRequested;
+	InArchiveRecovery = params->InArchiveRecovery;
+	recoveryTargetTLI = params->recoveryTargetTLI;
+	minRecoveryPointTLI = params->minRecoveryPointTLI;
+	minRecoveryPoint = params->minRecoveryPoint;
+	currentSource = params->currentSource;
+	lastSourceFailed = params->lastSourceFailed;
+	pendingWalRcvRestart = params->pendingWalRcvRestart;
+	RedoStartTLI = params->RedoStartTLI;
+	RedoStartLSN = params->RedoStartLSN;
+	standbyState = params->standbyState;
+	CheckPointLoc = params->CheckPointLoc;
+	CheckPointTLI = params->CheckPointTLI;
+	flushedUpto = params->flushedUpto;
+	receiveTLI = params->receiveTLI;
+	abortedRecPtr = params->abortedRecPtr;
+	missingContrecPtr = params->missingContrecPtr;
+	InRedo = params->InRedo;
+	backupEndRequired = params->backupEndRequired;
+	backupStartPoint = params->backupStartPoint;
+	backupEndPoint = params->backupEndPoint;
+	curFileTLI = params->curFileTLI;
+	InRecovery = true;
+
+	/*
+	 * As pipeline will be reading the wal, so better to own the latch to wait at.
+	 */
+	if (ArchiveRecoveryRequested)
+		OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
+
 /*
  * GUC check_hook for primary_slot_name
  */
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 0104a86b9ec..192295ad695 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
 #include "postgres.h"
 
 #include "access/parallel.h"
+#include "access/xlogpipeline.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -143,6 +144,10 @@ static const struct
 	{
 		.fn_name = "SequenceSyncWorkerMain",
 		.fn_addr = SequenceSyncWorkerMain
+	},
+	{
+		.fn_name = "WalPipeline_ProducerMain",
+		.fn_addr = WalPipeline_ProducerMain
 	}
 };
 
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index a4785daf1e5..f5eaff675f0 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/syncscan.h"
 #include "access/transam.h"
 #include "access/twophase.h"
+#include "access/xlogpipeline.h"
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogwait.h"
@@ -142,6 +143,7 @@ CalculateShmemSize(void)
 	size = add_size(size, AioShmemSize());
 	size = add_size(size, WaitLSNShmemSize());
 	size = add_size(size, LogicalDecodingCtlShmemSize());
+	size = add_size(size, WalPipelineShmemSize());
 
 	/* include additional requested shmem from preload libraries */
 	size = add_size(size, total_addin_request);
@@ -224,6 +226,9 @@ CreateSharedMemoryAndSemaphores(void)
 	/* Initialize dynamic shared memory facilities. */
 	dsm_postmaster_startup(shim);
 
+	/* Initialize WAL pipeline module */
+	WalPipelineShmemInit();
+
 	/*
 	 * Now give loadable modules a chance to set up their shmem allocations
 	 */
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index a5a0edf2534..3523e7d459f 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3398,6 +3398,21 @@
   boot_val => 'false',
 },
 
+{ name => 'wal_pipeline', type => 'bool', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+  short_desc => 'Use parallel workers to speedup recovery.',
+  variable => 'wal_pipeline_enabled',
+  boot_val => 'false',
+},
+
+{ name => 'wal_pipeline_queue_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+  short_desc => 'Size of the shared memory queue used by the WAL pipeline.',
+  flags => 'GUC_UNIT_MB',
+  variable => 'wal_pipeline_mq_size_mb',
+  boot_val => '128',
+  min => '1',
+  max => '1024',
+},
+
 { name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
   short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.',
   variable => 'wal_receiver_create_temp_slot',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e686d88afc4..1d2fdb9747a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -277,6 +277,8 @@
 #recovery_prefetch = try        # prefetch pages referenced in the WAL?
 #wal_decode_buffer_size = 512kB # lookahead window used for prefetching
                                 # (change requires restart)
+#wal_pipeline = off              # decode in parallel
+#wal_pipeline_queue_size = 128MB
 
 # - Archiving -
 
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index dcc12eb8cbe..a0c26f8005f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -40,6 +40,8 @@ extern PGDLLIMPORT int min_wal_size_mb;
 extern PGDLLIMPORT int max_wal_size_mb;
 extern PGDLLIMPORT int wal_keep_size_mb;
 extern PGDLLIMPORT int max_slot_wal_keep_size_mb;
+extern PGDLLIMPORT int wal_pipeline_mq_size_mb;
+extern PGDLLIMPORT bool wal_pipeline_enabled;
 extern PGDLLIMPORT int XLOGbuffers;
 extern PGDLLIMPORT int XLogArchiveTimeout;
 extern PGDLLIMPORT int wal_retrieve_retry_interval;
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
new file mode 100644
index 00000000000..5740b05f79c
--- /dev/null
+++ b/src/include/access/xlogpipeline.h
@@ -0,0 +1,188 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.h
+ *    WAL replay pipeline for parallel recovery
+ *
+ * This module implements a producer-consumer pipeline for WAL replay:
+ * - Producer: background worker that reads and decodes WAL records
+ * - Consumer: startup process: core redo loop
+ *
+ * The pipeline uses shared memory queues (shm_mq) to pass decoded WAL
+ * records from producer to consumer, enabling parallelism while
+ * maintaining sequential replay semantics.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * src/include/access/xlogpipeline.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAL_PIPELINE_H
+#define WAL_PIPELINE_H
+
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+
+/*
+ * Magic number for shared memory TOC
+ */
+#define PG_WAL_PIPELINE_MAGIC 0x57414C50  /* "WALP" */
+
+/*
+ * Message types sent through the pipeline
+ */
+typedef enum WalMsgType
+{
+	WAL_MSG_INVALID = 0,
+	WAL_MSG_RECORD,         /* Decoded WAL record */
+	WAL_MSG_SHUTDOWN,       /* Graceful shutdown request */
+	WAL_MSG_ERROR           /* Error occurred in producer */
+} WalMsgType;
+
+/* Wire header for a serialized WAL message */
+typedef struct WalRecordMsgHeader
+{
+	WalMsgType  	 msg_type;       	/* WAL_MSG_RECORD etc */
+	XLogRecPtr  	readRecPtr;     	/* XLogReaderState->ReadRecPtr */
+	XLogRecPtr		abortedRecPtr;		/* XLogReaderState->abortedRecPtr */
+	XLogRecPtr		missingContrecPtr;	/* XLogReaderState->missingContrecPtr */
+	XLogRecPtr		overwrittenRecPtr;	/* XLogReaderState->overwrittenRecPtr */
+	ReplOriginId 	record_origin;		/* DecodedXLogRecord->record_origin */
+	TransactionId 	toplevel_xid; 		/* DecodedXLogRecord->toplevel_xid */
+	XLogRecPtr  	endRecPtr;      /* EndRecPtr */
+	uint32      	decoded_size;   /* total size of decoded record payload (not including this header) */
+	int32       	max_block_id;   /* highest block id (could be -1) */
+	uint32      	main_data_len;  /* length of main_data */
+	/* followed by decoded payload bytes */
+} WalRecordMsgHeader;
+
+/* Per-block metadata on the wire (no pointers) */
+typedef struct SerializedBlockMeta
+{
+	int32               block_id;
+	bool                in_use;
+	RelFileLocator      rlocator;   /* same as decoded block */
+	ForkNumber          forknum;
+	BlockNumber         blkno;
+	uint8               flags;
+	bool                has_image;
+	bool                apply_image;
+	bool                has_data;
+	uint16              bimg_len;
+	uint8               bimg_info;
+	uint16              hole_offset;
+	uint16              hole_length;
+	uint16              data_len;
+	/* followed by bimg bytes then data bytes */
+} SerializedBlockMeta;
+
+/*
+ * Parameters passed from StartupXLOG (consumer side)
+ * to the WAL pipeline producer background worker.
+ */
+typedef struct WalPipelineParams
+{
+	bool		StandbyMode;
+	bool		StandbyModeRequested;
+	bool		ArchiveRecoveryRequested;
+	bool		InArchiveRecovery;
+	bool		InRedo;
+	bool 		lastSourceFailed;
+	bool 		pendingWalRcvRestart;
+	bool 		backupEndRequired;
+
+	TimeLineID  RedoStartTLI;
+	TimeLineID  CheckPointTLI;
+	TimeLineID  recoveryTargetTLI;
+	TimeLineID	minRecoveryPointTLI;
+	TimeLineID  ReplayTLI;
+	TimeLineID	receiveTLI;
+
+	XLogRecPtr 	backupStartPoint;
+	XLogRecPtr 	backupEndPoint;
+	XLogRecPtr  CheckPointLoc;
+	XLogRecPtr  RedoStartLSN;
+	XLogRecPtr  NextRecPtr;
+	XLogRecPtr	minRecoveryPoint;
+	XLogRecPtr 	flushedUpto;
+	XLogRecPtr 	abortedRecPtr;
+	XLogRecPtr 	missingContrecPtr;
+
+	int	readFile;
+	XLogSegNo readSegNo;
+	uint32 readOff;
+	uint32 readLen;
+	XLogSource readSource;
+	TimeLineID curFileTLI;
+
+
+	HotStandbyState standbyState;
+	XLogSource 	currentSource;
+
+} WalPipelineParams;
+
+/*
+ * Shared memory control structure for the WAL pipeline
+ */
+typedef struct WalPipelineShmCtl
+{
+	/* Lifecycle management */
+	slock_t         mutex;
+	bool            initialized;
+	bool            shutdown_requested;
+	bool            producer_exited;
+
+	/* Producer state */
+	pid_t           producer_pid;
+	ProcNumber      producer_procnum;
+	XLogRecPtr      producer_lsn;   /* Last LSN read by producer */
+
+	/* Consumer state */
+	pid_t           consumer_pid;
+	XLogRecPtr      consumer_lsn;   /* Last LSN recieved by consumer */
+	XLogRecPtr      applied_lsn;   	/* Last LSN applied by consumer */
+
+	/* Queue handles */
+	dsm_handle      dsm_seg_handle;
+	shm_mq_handle   *producer_mq_handle;
+	shm_mq_handle   *consumer_mq_handle;
+
+	/* Statistics */
+	uint64          records_sent;
+	uint64          records_received;
+	uint64          bytes_sent;
+	uint64          bytes_received;
+
+	/* Error state */
+	int             error_code;
+	char            error_message[256];
+} WalPipelineShmCtl;
+
+/* consumer may have to compute prefetecher stats */
+extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined;
+
+/*
+ * Public API functions
+ */
+
+/* Initialize the WAL pipeline shared memory structures */
+extern Size WalPipelineShmemSize(void);
+extern void WalPipelineShmemInit(void);
+
+/* Producer functions (called by background worker) */
+extern void WalPipeline_ProducerMain(Datum main_arg);
+extern bool WalPipeline_SendRecord(XLogReaderState *record);
+extern bool WalPipeline_SendShutdown(void);
+extern bool WalPipeline_SendError(int errcode, const char *errmsg);
+
+
+extern void ProcessPipelineBgwInterrupts(void);
+
+/* Global shared memory pointer */
+extern WalPipelineShmCtl *WalPipelineShm;
+
+#endif   /* WAL_PIPELINE_H */
\ No newline at end of file
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 2842106b285..e675ab8353d 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,6 +11,7 @@
 #ifndef XLOGRECOVERY_H
 #define XLOGRECOVERY_H
 
+#include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
@@ -60,6 +61,17 @@ typedef enum RecoveryPauseState
 	RECOVERY_PAUSED,			/* recovery is paused */
 } RecoveryPauseState;
 
+/* Codes indicating where we got a WAL file from during recovery, or where
+ * to attempt to get one.
+ */
+typedef enum
+{
+	XLOG_FROM_ANY = 0,			/* request to read WAL from any source */
+	XLOG_FROM_ARCHIVE,			/* restored using restore_command */
+	XLOG_FROM_PG_WAL,			/* existing file in pg_wal */
+	XLOG_FROM_STREAM,			/* streamed from primary */
+} XLogSource;
+
 /*
  * Shared-memory state for WAL recovery.
  */
@@ -208,6 +220,8 @@ typedef struct
 	bool		recovery_signal_file_found;
 } EndOfWalRecoveryInfo;
 
+struct WalPipelineParams;   /* forward declaration */
+
 extern EndOfWalRecoveryInfo *FinishWalRecovery(void);
 extern void ShutdownWalRecovery(void);
 extern void RemovePromoteSignalFiles(void);
@@ -220,6 +234,10 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
 extern TimestampTz GetLatestXTime(void);
 extern TimestampTz GetCurrentChunkReplayStartTime(void);
 extern XLogRecPtr GetCurrentReplayRecPtr(TimeLineID *replayEndTLI);
+extern XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
+		   bool fetching_ckpt, TimeLineID replayTLI);
+extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+			 XLogRecPtr targetRecPtr, char *readBuf);
 
 extern bool PromoteIsTriggered(void);
 extern bool CheckPromoteSignal(void);
@@ -229,6 +247,7 @@ extern void StartupRequestWalReceiverRestart(void);
 extern void XLogRequestWalReceiverReply(void);
 
 extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue);
+extern void InitializePipelineRecoveryEnv(struct WalPipelineParams *params);
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 36d789720a3..fc23acbaec2 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -61,6 +61,7 @@ tests += {
       't/050_redo_segment_missing.pl',
       't/051_effective_wal_level.pl',
       't/052_checkpoint_segment_missing.pl',
+      't/053_walpipeline.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/053_walpipeline.pl b/src/test/recovery/t/053_walpipeline.pl
new file mode 100644
index 00000000000..2fb790aadc5
--- /dev/null
+++ b/src/test/recovery/t/053_walpipeline.pl
@@ -0,0 +1,208 @@
+# Copyright (c) 2025-2026, PostgreSQL Global Development Group
+#
+# Tests for the WAL pipeline feature (wal_pipeline GUC).
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# ----------
+# Helpers
+# ----------
+
+sub slurp_log
+{
+	my ($node) = @_;
+	open(my $fh, '<', $node->logfile()) or die "Cannot open log: $!";
+	my @lines = <$fh>;
+	close($fh);
+	return @lines;
+}
+
+sub log_matches
+{
+	my ($node, $re) = @_;
+	return grep { /$re/ } slurp_log($node);
+}
+
+
+# ########################################
+#   wal_pipeline = on, basic recovery
+# ########################################
+
+my $node1 = PostgreSQL::Test::Cluster->new('p1-recovery');
+$node1->init;
+$node1->start;
+
+$node1->safe_psql('postgres', q{
+    CREATE TABLE t (id serial PRIMARY KEY, v text);
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# generate more WAL
+$node1->safe_psql('postgres', q{
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# crash stop to force WAL recovery
+$node1->stop('immediate');
+
+# restart → recovery happens
+$node1->append_conf('postgresql.conf', "wal_pipeline = on");
+$node1->start;
+
+
+# Producer started
+ok(scalar log_matches($node1, qr/\[walpipeline\] producer: started at/),
+	'producer started message found in log');
+
+# Pipeline stopped cleanly
+ok(scalar log_matches($node1, qr/\[walpipeline\] stopped/),
+	'pipeline stopped message found in log');
+
+# Consumer received shutdown from producer
+ok(scalar log_matches($node1, qr/\[walpipeline\] consumer: received shutdown message/),
+	'consumer received shutdown message from producer');
+
+# sent == received
+my @exit_lines = log_matches($node1,
+	qr/\[walpipeline\] producer: exiting: sent=\d+ received=\d+/);
+ok(scalar @exit_lines >= 1, 'producer exiting line found in log');
+
+my ($sent, $recv) = $exit_lines[-1] =~ /sent=(\d+) received=(\d+)/;
+ok(defined $sent && $sent > 0, "sent count ($sent) is positive");
+ok(defined $recv && $recv > 0, "received count ($recv) is positive");
+is($sent, $recv, "no records lost in pipeline queue: sent=$sent received=$recv");
+
+# No PANIC
+ok(!(scalar log_matches($node1, qr/\bPANIC\b/)),
+	'no PANIC messages during pipeline recovery');
+
+# Data integrity
+my $count = $node1->safe_psql('postgres', 'SELECT count(*) FROM t');
+is($count + 0, 100_000, 'all 100000 rows visible after pipeline recovery');
+
+$node1->stop;
+
+# ##############################################################
+#    wal_pipeline = off (baseline, no pipeline log messages)
+# ##############################################################
+
+my $node2 = PostgreSQL::Test::Cluster->new('p0-recovery');
+$node2->init;
+$node2->start;
+
+$node2->safe_psql('postgres', q{
+    CREATE TABLE t (id serial PRIMARY KEY, v text);
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# generate more WAL
+$node2->safe_psql('postgres', q{
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# crash stop to force WAL recovery
+$node2->stop('immediate');
+
+# restart → recovery happens
+$node2->append_conf('postgresql.conf', "wal_pipeline = off");
+$node2->start;
+
+ok(!(scalar log_matches($node2, qr/\[walpipeline\] producer: started/)),
+	'no pipeline log messages when wal_pipeline = off');
+
+my $count2 = $node2->safe_psql('postgres', 'SELECT count(*) FROM t');
+is($count2 + 0, 100_000, 'all rows present after non-pipeline recovery');
+
+$node2->stop;
+
+
+
+# ###################################################################
+#    pipeline on vs off produce identical data (checksum comparison)
+# ###################################################################
+
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1);
+$primary->start;
+
+$primary->safe_psql('postgres', q{
+    CREATE TABLE t (id serial PRIMARY KEY, v text);
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1, 30000) i;
+});
+
+$primary->backup('backup3');
+
+$primary->safe_psql('postgres', q{
+    INSERT INTO t (v)
+    SELECT md5(i::text) FROM generate_series(1, 30000) i;
+    UPDATE t SET v = 'x' WHERE id % 10 = 0;
+});
+
+# ensure WAL boundary
+$primary->safe_psql('postgres', 'SELECT pg_switch_wal()');
+my $target_lsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_lsn()');
+
+my $replica_on = PostgreSQL::Test::Cluster->new('replica_p1');
+$replica_on->init_from_backup($primary, 'backup3',
+    has_streaming => 1);
+$replica_on->append_conf('postgresql.conf', "wal_pipeline = on\n");
+$replica_on->start;
+
+my $replica_off = PostgreSQL::Test::Cluster->new('replica_p0');
+$replica_off->init_from_backup($primary, 'backup3',
+    has_streaming => 1);
+$replica_off->append_conf('postgresql.conf', "wal_pipeline = off\n");
+$replica_off->start;
+
+# wait for replicas to catch up
+$primary->wait_for_catchup($replica_on);
+$primary->wait_for_catchup($replica_off);
+
+my $md5_on  = $replica_on->safe_psql('postgres',
+    "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t");
+
+my $md5_off = $replica_off->safe_psql('postgres',
+    "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t");
+
+is($md5_on, $md5_off,
+    'table checksum identical between pipeline=on and pipeline=off');
+
+$replica_on->stop;
+$replica_off->stop;
+$primary->stop('fast');
+
+
+
+# #################################
+#    pipeline when on tiny replay
+# #################################
+
+my $node3 = PostgreSQL::Test::Cluster->new('p1-small-replay');
+$node3->init;
+$node3->start;
+
+# crash stop to force WAL recovery
+$node3->stop('immediate');
+
+# restart → recovery happens
+$node3->append_conf('postgresql.conf', "wal_pipeline = on");
+$node3->start;
+
+ok(scalar log_matches($node3, qr/\[walpipeline\] producer: exiting: sent=0 received=0/),
+	'pipeline producer sent zero records');
+
+ok((scalar log_matches($node3, qr/redo done at/)),
+	'pipeline redo done even with tiny replay');
+
+$node3->stop;
+
+done_testing();
\ No newline at end of file
-- 
2.34.1



  [text/x-patch] v2-0002-Pipelined-Recovery-Consumer.patch (50.7K, 4-v2-0002-Pipelined-Recovery-Consumer.patch)
  download | inline diff:
From 323482e965096afb5fc2a0ddefb4a075f2716f28 Mon Sep 17 00:00:00 2001
From: Imran Zaheer <[email protected]>
Date: Tue, 17 Mar 2026 21:48:05 +0500
Subject: [PATCH v2 2/2] Pipelined Recovery - Consumer

This includes the consumer-specific code for the producer-consumer
architecture for WAL replay that separates WAL decoding from the recovery process,
enabling parallel processing between different steps of replay.

The consumer receives the decoded record from the shared memory message
queue. Both producer and consumer should be aware of any state changes,
so variable states are synced through shared memory `XLogRecoveryCtlData`.
Also, constant states are shared from the consumer to the producer using
`WalPipelineParams`.

Author: Imran Zaheer <[email protected]>
Idea by: Ants Aasma <[email protected]>
---
 src/backend/access/transam/xlog.c           |   5 +-
 src/backend/access/transam/xlogpipeline.c   | 525 ++++++++++++++++++++
 src/backend/access/transam/xlogprefetcher.c |   3 +-
 src/backend/access/transam/xlogrecovery.c   | 469 +++++++++++++++--
 src/backend/storage/ipc/standby.c           |   1 +
 src/include/access/xlogpipeline.h           |  15 +
 src/include/access/xlogrecovery.h           |  50 ++
 7 files changed, 1028 insertions(+), 40 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f5c9a34374d..3e1b7d78055 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -59,6 +59,7 @@
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
 #include "access/xloginsert.h"
+#include "access/xlogpipeline.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
 #include "access/xlogutils.h"
@@ -5924,6 +5925,7 @@ StartupXLOG(void)
 
 				ProcArrayApplyRecoveryInfo(&running);
 			}
+			SetSharedHotStandbyState();
 		}
 
 		/*
@@ -8488,6 +8490,7 @@ xlog_redo(XLogReaderState *record)
 			running.xids = xids;
 
 			ProcArrayApplyRecoveryInfo(&running);
+			SetSharedHotStandbyState();
 		}
 
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
@@ -9667,7 +9670,7 @@ GetOldestRestartPoint(XLogRecPtr *oldrecptr, TimeLineID *oldtli)
 void
 XLogShutdownWalRcv(void)
 {
-	Assert(AmStartupProcess() || !IsUnderPostmaster);
+	Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
 
 	ShutdownWalRcv();
 	ResetInstallXLogFileSegmentActive();
diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c
index 4b95a11d16b..5bb575b1f14 100644
--- a/src/backend/access/transam/xlogpipeline.c
+++ b/src/backend/access/transam/xlogpipeline.c
@@ -52,6 +52,16 @@
 #include "utils/timeout.h"
 
 
+/*
+ * Convert values of GUCs measured in megabytes to bytes
+ */
+#define MBToBytes(mbvar) (mbvar * 1024 * 1024)
+
+/*
+ * Waiting for consumer before exiting gracefully.
+ */
+#define MAX_SHUTDOWN_WAIT_ITERS 1000	/* 1000 * 10ms = 10 seconds */
+
 /* Global shared memory control structure */
 WalPipelineShmCtl *WalPipelineShm = NULL;
 
@@ -62,6 +72,10 @@ static dsm_segment *producer_dsm_seg = NULL;
 static shm_mq *producer_mq = NULL;
 static shm_mq_handle *producer_mq_handle = NULL;
 
+/* Local state for consumer */
+static dsm_segment *consumer_dsm_seg = NULL;
+static shm_mq *consumer_mq = NULL;
+static shm_mq_handle *consumer_mq_handle = NULL;
 
 /*
  * Flags set by interrupt handlers for later service in the redo loop.
@@ -74,7 +88,9 @@ static void PipelineBgwSigHupHandler(SIGNAL_ARGS);
 /* Forward declarations */
 static void wal_pipeline_cleanup_callback(int code, Datum arg);
 static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static DecodedXLogRecord *deserialize_wal_record(const char *buffer, Size len, XLogReaderState *startup_reader, bool first_iteration);
 static void cleanup_producer_resources(void);
+static void cleanup_consumer_resources(void);
 static void WalPipeline_WaitForConsumerShutdownRequest(void);
 
 /* copied from xlogrecovery.c */
@@ -135,6 +151,168 @@ WalPipelineShmemInit(void)
 }
 
 
+/*
+ * Called by Consumer.
+ *
+ * Initialize and start the WAL pipeline. This will be called by the startup
+ * process (consumer) as a request to start the pipeline.
+ */
+void
+WalPipeline_Start(WalPipelineParams *params)
+{
+	BackgroundWorker worker;
+	BackgroundWorkerHandle *handle;
+	dsm_segment *seg;
+	shm_toc_estimator e;
+	shm_toc *toc;
+	Size        segsize;
+	shm_mq     *mq;
+	WalPipelineParams *shared_params;
+	pid_t		pid;
+	BgwHandleStatus status;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	if (WalPipelineShm->initialized)
+	{
+		SpinLockRelease(&WalPipelineShm->mutex);
+		return;  /* Already started */
+	}
+	WalPipelineShm->initialized = true;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	shm_toc_initialize_estimator(&e);
+	shm_toc_estimate_chunk(&e, sizeof(WalPipelineParams));
+	shm_toc_estimate_chunk(&e, MBToBytes(wal_pipeline_mq_size_mb));
+	shm_toc_estimate_keys(&e, 2);   /* key=1 → params, key=2 → mq */
+	segsize = shm_toc_estimate(&e);
+
+	seg = dsm_create(segsize, 0);
+	dsm_pin_segment(seg);
+
+	toc = shm_toc_create(PG_WAL_PIPELINE_MAGIC,
+						 dsm_segment_address(seg), segsize);
+
+	/*
+	 * Pass the startup process vaaraibles state through WalPipelineParams
+	 */
+	shared_params = shm_toc_allocate(toc, sizeof(WalPipelineParams));
+	shm_toc_insert(toc, 1, shared_params);
+	*shared_params = *params;
+
+	/* create the message queue */
+	mq = shm_mq_create(shm_toc_allocate(toc, MBToBytes(wal_pipeline_mq_size_mb)),
+					   MBToBytes(wal_pipeline_mq_size_mb));
+	shm_toc_insert(toc, 2, mq);
+
+	/* update shared state */
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->dsm_seg_handle = dsm_segment_handle(seg);
+	WalPipelineShm->consumer_pid = MyProcPid;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	/* Set up consumer side of the queue */
+	consumer_dsm_seg = seg;
+	consumer_mq = mq;
+	shm_mq_set_receiver(consumer_mq, MyProc);
+	consumer_mq_handle = shm_mq_attach(consumer_mq, seg, NULL);
+
+	/* Register cleanup callback */
+	before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+	/* Register background worker */
+	memset(&worker, 0, sizeof(worker));
+	worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
+	worker.bgw_start_time = BgWorkerStart_PostmasterStart;
+	worker.bgw_restart_time = BGW_NEVER_RESTART;
+	sprintf(worker.bgw_library_name, "postgres");
+	sprintf(worker.bgw_function_name, "WalPipeline_ProducerMain");
+	snprintf(worker.bgw_name, BGW_MAXLEN, "wal pipeline producer");
+	snprintf(worker.bgw_type, BGW_MAXLEN, "wal pipeline producer");
+	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+	worker.bgw_notify_pid = MyProcPid;
+
+	if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		WalPipelineShm->initialized = false;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		dsm_unpin_segment(dsm_segment_handle(seg));
+		dsm_detach(seg);
+		consumer_dsm_seg = NULL;
+		consumer_mq = NULL;
+		consumer_mq_handle = NULL;
+
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				 errmsg("could not register background worker for WAL pipeline")));
+	}
+
+	status = WaitForBackgroundWorkerStartup(handle, &pid);
+
+	if (status != BGWH_STARTED)
+	ereport(ERROR,
+			(errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+				errmsg("could not start background process"),
+				errhint("More details may be available in the server log.")));
+	else
+		ereport(LOG, (errmsg("[walpipeline] started.")));
+}
+
+/*
+ * Request producer shutdown.
+ * This is called by the consumer when it no longer needs records.
+ */
+static void
+WalPipeline_RequestShutdown(void)
+{
+	if (!WalPipelineShm)
+		return;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->shutdown_requested = true;
+	SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+/*
+ * Consumer Function.
+ * Stop the WAL pipeline. This will also be called be the startup process
+ * (consumer). This will only be called when consumer don't need any more
+ * decoded records. This function will also wait until the pipeline workers
+ * are stopped.
+ */
+void
+WalPipeline_Stop(void)
+{
+	if (!WalPipelineShm || !WalPipelineShm->initialized)
+		return;
+
+	/* Ask producer to stop */
+	WalPipeline_RequestShutdown();
+
+	/* Wait for producer to exit (max 10 seconds) */
+	for (int i = 0; i < 100; i++)
+	{
+		bool producer_alive;
+
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		producer_alive = (WalPipelineShm->producer_pid != 0);
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (!producer_alive)
+			break;
+
+		pg_usleep(100000); /* 100 ms */
+	}
+
+	cleanup_consumer_resources();
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->initialized = false;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	elog(LOG, "[walpipeline] stopped");
+}
 
 /*
  * Producer Function.
@@ -374,6 +552,117 @@ WalPipeline_SendError(int errcode, const char *errmsg)
 	return true;
 }
 
+/*
+ * Consumer Function.
+ * Receive and deserialize a WAL record from the producer
+ */
+DecodedXLogRecord *
+WalPipeline_ReceiveRecord(XLogReaderState *startup_reader, bool first_iteration)
+{
+	shm_mq_result res;
+	Size        nbytes;
+	void       *data;
+	char       *err_msg;
+	int        err_code;
+	WalRecordMsgHeader *hdr;
+	DecodedXLogRecord *record;
+
+	if (!consumer_mq_handle)
+		return NULL;
+
+	/* Receive message from queue */
+	res = shm_mq_receive(consumer_mq_handle, &nbytes, &data, false);
+
+	if (res != SHM_MQ_SUCCESS)
+		elog(ERROR, "[walpipeline] consumer: failed to receive record");
+
+	hdr = (WalRecordMsgHeader *) data;
+
+	/* Handle different message types */
+	switch (hdr->msg_type)
+	{
+		case WAL_MSG_RECORD:
+			record = deserialize_wal_record((char *) data, nbytes, startup_reader, first_iteration);
+
+			/* Update statistics */
+			SpinLockAcquire(&WalPipelineShm->mutex);
+			WalPipelineShm->records_received++;
+			WalPipelineShm->bytes_received += nbytes;
+			WalPipelineShm->consumer_lsn = hdr->endRecPtr;
+			SpinLockRelease(&WalPipelineShm->mutex);
+
+			return record;
+
+		case WAL_MSG_SHUTDOWN:
+			elog(LOG, "[walpipeline] consumer: received shutdown message from the producer");
+			return NULL;
+
+		case WAL_MSG_ERROR:
+			SpinLockAcquire(&WalPipelineShm->mutex);
+			err_code = WalPipelineShm->error_code;
+			err_msg = WalPipelineShm->error_message;
+			SpinLockRelease(&WalPipelineShm->mutex);
+
+			ereport(ERROR,
+					(errcode(err_code),
+					 errmsg("[walpipeline] consumer: received error from the producer: %s", err_msg)));
+			return NULL;
+
+		default:
+			elog(PANIC, "[walpipeline] consumer: unknown message type: %d",
+				 hdr->msg_type);
+			return NULL;
+	}
+}
+
+/*
+ * Consumer Function.
+ * Check if producer is still running
+ */
+bool
+WalPipeline_CheckProducerAlive(void)
+{
+	pid_t       pid;
+	bool        alive;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	pid = WalPipelineShm->producer_pid;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	if (pid == 0)
+		return false;
+
+	alive = (kill(pid, 0) == 0);
+
+	if (!alive)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		WalPipelineShm->producer_pid = 0;
+		SpinLockRelease(&WalPipelineShm->mutex);
+	}
+
+	return alive;
+}
+
+/*
+ * Consumer Function.
+ * Check if pipeline is active
+ */
+bool
+WalPipeline_IsActive(void)
+{
+	bool        active;
+
+	if (!WalPipelineShm)
+		return false;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	active = WalPipelineShm->initialized && !WalPipelineShm->shutdown_requested;
+	SpinLockRelease(&WalPipelineShm->mutex);
+
+	return active;
+}
+
 /*
  * Producer Function.
  * Producer may can exit without waiting for the consumer, but its better to
@@ -411,6 +700,55 @@ WalPipeline_WaitForConsumerShutdownRequest(void)
 	}
 }
 
+/*
+ * Consumer Function.
+ * Wait unless last sent record by the pipeline is applied by the
+ * startup process.
+ */
+void
+WalPipeline_WaitForConsumerCatchup(void)
+{
+	XLogRecPtr producer_lsn;
+	XLogRecPtr consumer_lsn;
+
+	for (;;)
+	{
+		SpinLockAcquire(&WalPipelineShm->mutex);
+		producer_lsn = WalPipelineShm->producer_lsn;
+		consumer_lsn = WalPipelineShm->applied_lsn;
+		SpinLockRelease(&WalPipelineShm->mutex);
+
+		if (producer_lsn == consumer_lsn)
+			return;
+
+		CHECK_FOR_INTERRUPTS();
+
+		/* short sleep to avoid busy looping */
+		pg_usleep(50);   /* 50 microseconds */
+	}
+}
+
+/*
+ * Consumer Function.
+ * Get pipeline statistics
+ */
+void
+WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+					 XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn)
+{
+	SpinLockAcquire(&WalPipelineShm->mutex);
+
+	if (records_sent)
+		*records_sent = WalPipelineShm->records_sent;
+	if (records_received)
+		*records_received = WalPipelineShm->records_received;
+	if (producer_lsn)
+		*producer_lsn = WalPipelineShm->producer_lsn;
+	if (consumer_lsn)
+		*consumer_lsn = WalPipelineShm->consumer_lsn;
+
+	SpinLockRelease(&WalPipelineShm->mutex);
+}
 
 /*
  * serialize_wal_record (Producer)
@@ -539,6 +877,167 @@ serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
 	return total;
 }
 
+/*
+ * deserialize_wal_record (Consumer)
+ *
+ * Deserialize a WAL record from a buffer into a DecodedXLogRecord.
+ *
+ * Memory layout:
+ *   [DecodedXLogRecord + blocks][main_data][block_images][block_data]
+ */
+DecodedXLogRecord *
+deserialize_wal_record(const char *buf, Size len, XLogReaderState *startup_reader, bool first_iteration)
+{
+	const char *ptr = buf;
+	const char *end = buf + len;
+	WalRecordMsgHeader hdr;
+	DecodedXLogRecord *dec = NULL;
+	char *alloc_ptr;
+	int nblocks;
+	Size total;
+
+	if (len < sizeof(WalRecordMsgHeader))
+		return NULL;
+
+	memcpy(&hdr, ptr, sizeof(hdr));
+	ptr += sizeof(hdr);
+
+	if (hdr.decoded_size != len - sizeof(WalRecordMsgHeader))
+		return NULL;
+
+	nblocks = (hdr.max_block_id >= 0) ? hdr.max_block_id + 1 : 0;
+
+	/* ---- space allocation ---- */
+	total = MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock))
+		  + MAXALIGN(hdr.decoded_size);
+
+	dec = palloc(total);
+	memset(dec, 0, total);
+
+	alloc_ptr = (char *)dec + MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock));
+
+	/* ---- record metadata ---- */
+	dec->lsn          = hdr.readRecPtr;
+	dec->next_lsn     = hdr.endRecPtr;
+	dec->max_block_id = hdr.max_block_id;
+	dec->main_data_len = hdr.main_data_len;
+	dec->toplevel_xid = hdr.toplevel_xid;
+	dec->record_origin = hdr.record_origin;
+
+	/* ---- XLogRecord ---- */
+	if (ptr + sizeof(XLogRecord) > end)
+		goto fail;
+
+	memcpy(&dec->header, ptr, sizeof(XLogRecord));
+	ptr += sizeof(XLogRecord);
+
+	/* ---- main data ---- */
+	if (hdr.main_data_len > 0)
+	{
+		if (ptr + hdr.main_data_len > end)
+			goto fail;
+
+		dec->main_data = alloc_ptr;
+		memcpy(dec->main_data, ptr, hdr.main_data_len);
+		ptr += hdr.main_data_len;
+		alloc_ptr += MAXALIGN(hdr.main_data_len);
+	}
+
+	/* ---- blocks ---- */
+	for (int i = 0; i < nblocks && ptr < end; i++)
+	{
+		SerializedBlockMeta meta;
+		DecodedBkpBlock *blk;
+
+		if (ptr + sizeof(meta) > end)
+			break;
+
+		memcpy(&meta, ptr, sizeof(meta));
+		ptr += sizeof(meta);
+
+		if (!meta.in_use)
+			continue;
+
+		if (meta.block_id < 0 || meta.block_id >= nblocks)
+			goto fail;
+
+		blk = &dec->blocks[meta.block_id];
+
+		blk->in_use       = true;
+		blk->rlocator     = meta.rlocator;
+		blk->forknum      = meta.forknum;
+		blk->blkno        = meta.blkno;
+		blk->flags        = meta.flags;
+		blk->has_image    = meta.has_image;
+		blk->apply_image  = meta.apply_image;
+		blk->has_data     = meta.has_data;
+		blk->bimg_len     = meta.bimg_len;
+		blk->bimg_info    = meta.bimg_info;
+		blk->hole_offset  = meta.hole_offset;
+		blk->hole_length  = meta.hole_length;
+		blk->data_len     = meta.data_len;
+
+		if (blk->has_image && blk->bimg_len > 0)
+		{
+			if (ptr + blk->bimg_len > end)
+				goto fail;
+
+			blk->bkp_image = alloc_ptr;
+			memcpy(blk->bkp_image, ptr, blk->bimg_len);
+			ptr += blk->bimg_len;
+			alloc_ptr += MAXALIGN(blk->bimg_len);
+		}
+
+		if (blk->has_data && blk->data_len > 0)
+		{
+			if (ptr + blk->data_len > end)
+				goto fail;
+
+			blk->data = alloc_ptr;
+			memcpy(blk->data, ptr, blk->data_len);
+			ptr += blk->data_len;
+			alloc_ptr += MAXALIGN(blk->data_len);
+		}
+	}
+
+	dec->size = alloc_ptr - (char *)dec;
+	dec->oversized = false;
+
+	/*
+	 * The previous decoded record has been deserialized from
+	 * from the pipeline and hence need to free the memory after
+	 * use.
+	 *
+	 * But for the first iteration memory space for `reader->record`
+	 * was allocated from the `decode_buffer`, and freeing this
+	 * memory can be fatal. This memory will be freed automatically
+	 * at the end of the recovery in `finishwalrecovery()`. So we
+	 * will skip pfree for the first iteration (apply).
+	 */
+	if (startup_reader->record && !first_iteration)
+		pfree(startup_reader->record);
+
+	/* Attach to reader, only updating the public parameters */
+	startup_reader->record = dec;
+	startup_reader->ReadRecPtr   = dec->lsn;
+	startup_reader->DecodeRecPtr = dec->lsn;
+	startup_reader->EndRecPtr    = dec->next_lsn;
+	startup_reader->NextRecPtr   = dec->next_lsn;
+	startup_reader->decode_queue_head = dec;
+	startup_reader->decode_queue_tail = dec;
+	startup_reader->missingContrecPtr = hdr.missingContrecPtr;
+	startup_reader->abortedRecPtr = hdr.abortedRecPtr;
+	startup_reader->overwrittenRecPtr = hdr.overwrittenRecPtr;
+
+	return dec;
+
+fail:
+	if (dec)
+		pfree(dec);
+
+	elog(LOG, "deserialize_wal_record: failed");
+	return NULL;
+}
 
 /*
  * We need to put some assertion that only pipeline worker should be touching
@@ -581,6 +1080,32 @@ cleanup_producer_resources(void)
 	SpinLockRelease(&WalPipelineShm->mutex);
 }
 
+/*
+ * Clean up consumer-side resources
+ */
+static void
+cleanup_consumer_resources(void)
+{
+	if (consumer_mq_handle)
+	{
+		shm_mq_detach(consumer_mq_handle);
+		consumer_mq_handle = NULL;
+	}
+
+	if (consumer_dsm_seg)
+	{
+		dsm_unpin_segment(dsm_segment_handle(consumer_dsm_seg));
+		dsm_detach(consumer_dsm_seg);
+		consumer_dsm_seg = NULL;
+	}
+
+	consumer_mq = NULL;
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->consumer_pid = 0;
+	WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID;
+	SpinLockRelease(&WalPipelineShm->mutex);
+}
 
 /*
  * Cleanup callback for process exit
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index c235eca7c51..1536ea34c41 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -27,6 +27,7 @@
 
 #include "postgres.h"
 
+#include "access/xlogpipeline.h"
 #include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
 #include "catalog/pg_control.h"
@@ -352,7 +353,7 @@ XLogPrefetchReconfigure(void)
 static inline void
 XLogPrefetchIncrement(pg_atomic_uint64 *counter)
 {
-	Assert(AmStartupProcess() || !IsUnderPostmaster);
+	Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
 	pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
 }
 
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index b66ec80fa25..49cc0b9aafa 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -35,6 +35,7 @@
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xlogarchive.h"
+#include "access/xlogpipeline.h"
 #include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
 #include "access/xlogrecovery.h"
@@ -99,6 +100,8 @@ int			recovery_min_apply_delay = 0;
 char	   *PrimaryConnInfo = NULL;
 char	   *PrimarySlotName = NULL;
 bool		wal_receiver_create_temp_slot = false;
+bool		wal_pipeline_enabled = false;
+int			wal_pipeline_mq_size_mb = 128;
 
 /*
  * recoveryTargetTimeLineGoal: what the user requested, if any
@@ -205,17 +208,6 @@ typedef struct XLogPageReadPrivate
 /* flag to tell XLogPageRead that we have started replaying */
 static bool InRedo = false;
 
-/*
- * Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one.
- */
-typedef enum
-{
-	XLOG_FROM_ANY = 0,			/* request to read WAL from any source */
-	XLOG_FROM_ARCHIVE,			/* restored using restore_command */
-	XLOG_FROM_PG_WAL,			/* existing file in pg_wal */
-	XLOG_FROM_STREAM,			/* streamed from primary */
-} XLogSource;
 
 /* human-readable names for XLogSources, for debugging output */
 static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"};
@@ -356,12 +348,6 @@ static void recoveryPausesHere(bool endOfRecovery);
 static bool recoveryApplyDelay(XLogReaderState *record);
 static void ConfirmRecoveryPaused(void);
 
-static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher,
-							  int emode, bool fetching_ckpt,
-							  TimeLineID replayTLI);
-
-static int	XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
-						 int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
 static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr,
 													  bool randAccess,
 													  bool fetching_ckpt,
@@ -383,6 +369,7 @@ static bool HotStandbyActiveInReplay(void);
 
 static void SetCurrentChunkStartTime(TimestampTz xtime);
 static void SetLatestXTime(TimestampTz xtime);
+static void InitializePipelineStartupEnv(WalPipelineParams *params);
 
 /*
  * Initialization of shared memory for WAL recovery
@@ -411,9 +398,27 @@ XLogRecoveryShmemInit(void)
 
 	SpinLockInit(&XLogRecoveryCtl->info_lck);
 	InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+	InitSharedLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
 	ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
 }
 
+/*
+ * We may not be able to share expectedTLEs list across the sharedmemory.
+ * For now just trigger the startup process (consumer) to
+ * reread the timelinehistory file  whenever pipeline updates the value for
+ * expectedTLEs. So the consumer proc will expectedTLEs updated locally.
+ */
+static void
+PipelineConsumerexpectedTLEsUpdateTLI(TimeLineID recoveryTargetTLI)
+{
+	if (wal_pipeline_enabled)
+	{
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		XLogRecoveryCtl->expectedTLEsUpdateTLI = recoveryTargetTLI;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+}
+
 /*
  * A thin wrapper to enable StandbyMode and do other preparatory work as
  * needed.
@@ -429,7 +434,8 @@ EnableStandbyMode(void)
 	 * startup progress timeout in standby mode to avoid calling
 	 * startup_progress_timeout_handler() unnecessarily.
 	 */
-	disable_startup_progress_timeout();
+	if (!AmWalPipeline())
+		disable_startup_progress_timeout();
 }
 
 /*
@@ -489,7 +495,10 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
 	 * recovery, if required.
 	 */
 	if (ArchiveRecoveryRequested)
+	{
+		OwnLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
 		OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+	}
 
 	/*
 	 * Set the WAL reading processor now, as it will be needed when reading
@@ -961,6 +970,14 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
 		minRecoveryPointTLI = 0;
 	}
 
+	/* update shared state. */
+	if (wal_pipeline_enabled)
+	{
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+
 	/*
 	 * Start recovery assuming that the final record isn't lost.
 	 */
@@ -972,6 +989,12 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
 	*haveTblspcMap_ptr = haveTblspcMap;
 }
 
+void DisownRecoveryWakeupLatch()
+{
+	if (ArchiveRecoveryRequested)
+		DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
+
 /*
  * See if there are any recovery signal files and if so, set state for
  * recovery.
@@ -1420,6 +1443,15 @@ FinishWalRecovery(void)
 	TimeLineID	lastRecTLI;
 	XLogRecPtr	endOfLog;
 
+	if (wal_pipeline_enabled)
+	{
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		InArchiveRecovery = XLogRecoveryCtl->InArchiveRecovery;
+		missingContrecPtr = XLogRecoveryCtl->missingContrecPtr;
+		abortedRecPtr = XLogRecoveryCtl->abortedRecPtr;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+
 	/*
 	 * Kill WAL receiver, if it's still running, before we continue to write
 	 * the startup checkpoint and aborted-contrecord records. It will trump
@@ -1477,6 +1509,17 @@ FinishWalRecovery(void)
 		lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr;
 		lastRecTLI = XLogRecoveryCtl->lastReplayedTLI;
 	}
+
+	/*
+	 * Invalidate contents of internal buffer before read attempt.  Just set
+	 * the length to 0, rather than a full XLogReaderInvalReadState().
+	 *
+	 * This is needed because we could be reading from the pipeline reader so
+	 * far, so before moving back the the startup proc readerstate better to
+	 * invalidate it.
+	 */
+	xlogreader->readLen = 0;
+
 	XLogPrefetcherBeginRead(xlogprefetcher, lastRec);
 	(void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI);
 	endOfLog = xlogreader->EndRecPtr;
@@ -1599,7 +1642,69 @@ ShutdownWalRecovery(void)
 	 * it, but let's do it for the sake of tidiness.
 	 */
 	if (ArchiveRecoveryRequested)
-		DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+	{
+		DisownLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
+
+		/*
+		 * Only disown the latch if we were the owner (pipeline disabled).
+		 */
+		if (!wal_pipeline_enabled)
+			DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+	}
+
+
+}
+
+/*
+ * Get next record for redo.
+ * Use the pipeline if enabled for parallel decoding and receive decoded
+ * records from a shared queue, else read it directly.
+ */
+static XLogRecord *
+ReceiveRecord(XLogPrefetcher *xlogprefetcher, int emode,
+				bool fetching_ckpt, TimeLineID replayTLI,
+				XLogReaderState **localreader, bool first_iteration)
+{
+
+	XLogRecord *record = NULL;
+	XLogReaderState *reader = *localreader;
+
+	/*
+	 * If pipeline not enabled read the record directly
+	 */
+	if (!wal_pipeline_enabled)
+	{
+		record = ReadRecord(xlogprefetcher, emode, fetching_ckpt, replayTLI);
+		return record;
+	}
+
+	/*
+	 * Get record from the pipeline
+	 */
+	if (WalPipeline_IsActive())
+	{
+		DecodedXLogRecord *decoded_record = NULL;
+
+		decoded_record = WalPipeline_ReceiveRecord(reader, first_iteration);
+
+		if (decoded_record)
+		{
+			record = &decoded_record->header;
+			return record;
+		}
+		else
+		{
+			/*
+			 * We will end up here only when pipeline couldn't read more
+			 * records and have sent a shutdown msg. We will acknowldge this
+			 * and will trigger request to stop the pipeline workers.
+			 */
+			WalPipeline_Stop();
+			return NULL;
+		}
+	}
+
+	elog(PANIC, "[walpipeline] consumer: pipeline not active, even though wal_pipeline is set to on.");
 }
 
 /*
@@ -1614,6 +1719,12 @@ PerformWalRecovery(void)
 	bool		reachedRecoveryTarget = false;
 	TimeLineID	replayTLI;
 
+	/*
+	 * standalone backend may exist in case of pg_rewind.
+	 */
+	if (!IsUnderPostmaster)
+		wal_pipeline_enabled = false;
+
 	/*
 	 * Initialize shared variables for tracking progress of WAL replay, as if
 	 * we had just replayed the record before the REDO location (or the
@@ -1688,11 +1799,24 @@ PerformWalRecovery(void)
 	{
 		TimestampTz xtime;
 		PGRUsage	ru0;
+		uint64 		loop_count = 0;
 
 		pg_rusage_init(&ru0);
 
 		InRedo = true;
 
+		if(wal_pipeline_enabled)
+		{
+			/*
+			 * Startup proc parameters that pipeline shold also be aware of.
+			 */
+			WalPipelineParams *params = palloc0(sizeof(WalPipelineParams));
+
+			params->ReplayTLI = replayTLI;
+			InitializePipelineStartupEnv(params);
+			WalPipeline_Start(params);
+		}
+
 		RmgrStartup();
 
 		ereport(LOG,
@@ -1798,7 +1922,7 @@ PerformWalRecovery(void)
 			}
 
 			/* Else, try to fetch the next WAL record */
-			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
+			record = ReceiveRecord(xlogprefetcher, LOG, false, replayTLI, &xlogreader, loop_count++ == 0);
 		} while (record != NULL);
 
 		/*
@@ -1807,6 +1931,9 @@ PerformWalRecovery(void)
 
 		if (reachedRecoveryTarget)
 		{
+			if (wal_pipeline_enabled)
+				WalPipeline_Stop();
+
 			if (!reachedConsistency)
 				ereport(FATAL,
 						(errmsg("requested recovery stop point is before consistent recovery point")));
@@ -1841,6 +1968,8 @@ PerformWalRecovery(void)
 
 		RmgrCleanup();
 
+		// XXX: testing purpose only
+		ereport(DEBUG1, (errmsg("replay loop fiinished with loop count: " UINT64_FORMAT, loop_count)));
 		ereport(LOG,
 				errmsg("redo done at %X/%08X system usage: %s",
 					   LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
@@ -1879,7 +2008,9 @@ static void
 ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI)
 {
 	ErrorContextCallback errcallback;
+	XLogRecPtr 	walrcv_flushedupto;
 	bool		switchedTLI = false;
+	bool		pipeline_enabled_stanby = false;
 
 	/* Setup error traceback support for ereport() */
 	errcallback.callback = rm_redo_error_callback;
@@ -1980,6 +2111,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 	XLogRecoveryCtl->lastReplayedReadRecPtr = xlogreader->ReadRecPtr;
 	XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr;
 	XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
+	walrcv_flushedupto = XLogRecoveryCtl->flushedUptoRecPtr;
+	pipeline_enabled_stanby = XLogRecoveryCtl->stanbyEnabled;
 	SpinLockRelease(&XLogRecoveryCtl->info_lck);
 
 	/* ------
@@ -2018,6 +2151,16 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		WalRcvForceReply();
 	}
 
+	/*
+	 * As the pipeline (producer) was running way ahead of the startup proc
+	 * (consumer), see if the producer asked to wakeup the wal_reciever by
+	 * updating the value of `flushedUptoRecPtr`.
+	 */
+	if ((walrcv_flushedupto != InvalidXLogRecPtr) &&
+		((walrcv_flushedupto == xlogreader->EndRecPtr) ||
+		(walrcv_flushedupto == xlogreader->ReadRecPtr)))
+		WalRcvForceReply();
+
 	/* Allow read-only connections if we're consistent now */
 	CheckRecoveryConsistency();
 
@@ -2033,6 +2176,14 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
 		/* Reset the prefetcher. */
 		XLogPrefetchReconfigure();
 	}
+
+	/* Conusmer should also enable the standby if pipline have */
+	if (pipeline_enabled_stanby)
+		EnableStandbyMode();
+
+	SpinLockAcquire(&WalPipelineShm->mutex);
+	WalPipelineShm->applied_lsn = xlogreader->EndRecPtr;
+	SpinLockRelease(&WalPipelineShm->mutex);
 }
 
 /*
@@ -2158,12 +2309,27 @@ CheckRecoveryConsistency(void)
 
 	Assert(InArchiveRecovery);
 
-	/*
-	 * assume that we are called in the startup process, and hence don't need
-	 * a lock to read lastReplayedEndRecPtr
-	 */
-	lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
-	lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+
+	if (AmStartupProcess())
+	{
+		/*
+		 * assume that we are called in the startup process, and hence don't need
+		 * a lock to read lastReplayedEndRecPtr
+		 */
+		lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
+		lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+	}
+	else
+	{
+		/*
+		 * We could be in the pipeline worker, so update the shared states.
+		 */
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
+		lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+		standbyState = XLogRecoveryCtl->standbyState;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
 
 	/*
 	 * Have we reached the point where our base backup was completed?
@@ -2350,12 +2516,28 @@ static void
 checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI,
 					TimeLineID replayTLI)
 {
+
+
 	/* Check that the record agrees on what the current (old) timeline is */
 	if (prevTLI != replayTLI)
 		ereport(PANIC,
 				(errmsg("unexpected previous timeline ID %u (current timeline ID %u) in checkpoint record",
 						prevTLI, replayTLI)));
 
+
+	/* Pipeline may have updated the expectedTLEs */
+	if (wal_pipeline_enabled)
+	{
+		TimeLineID	targetTLI;
+
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		targetTLI = XLogRecoveryCtl->expectedTLEsUpdateTLI;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+
+		if (targetTLI)
+			expectedTLEs = readTimeLineHistory(targetTLI);
+	}
+
 	/*
 	 * The new timeline better be in the list of timelines we expect to see,
 	 * according to the timeline history. It should also not decrease.
@@ -3003,7 +3185,7 @@ recoveryApplyDelay(XLogReaderState *record)
 
 	while (true)
 	{
-		ResetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+		ResetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
 
 		/* This might change recovery_min_apply_delay. */
 		ProcessStartupProcInterrupts();
@@ -3028,7 +3210,7 @@ recoveryApplyDelay(XLogReaderState *record)
 
 		elog(DEBUG2, "recovery apply delay %ld milliseconds", msecs);
 
-		(void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
+		(void) WaitLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch,
 						 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 						 msecs,
 						 WAIT_EVENT_RECOVERY_APPLY_DELAY);
@@ -3100,7 +3282,7 @@ ConfirmRecoveryPaused(void)
  * (emode must be either PANIC, LOG). In standby mode, retries until a valid
  * record is available.
  */
-static XLogRecord *
+XLogRecord *
 ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 		   bool fetching_ckpt, TimeLineID replayTLI)
 {
@@ -3108,7 +3290,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 	XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher);
 	XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
 
-	Assert(AmStartupProcess() || !IsUnderPostmaster);
+	Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
 
 	/* Pass through parameters to XLogPageRead */
 	private->fetching_ckpt = fetching_ckpt;
@@ -3143,6 +3325,17 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			{
 				abortedRecPtr = xlogreader->abortedRecPtr;
 				missingContrecPtr = xlogreader->missingContrecPtr;
+
+				/*
+				 * Also update the shared state if necessary
+				 */
+				if (wal_pipeline_enabled)
+				{
+					SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+					XLogRecoveryCtl->abortedRecPtr = abortedRecPtr;
+					XLogRecoveryCtl->missingContrecPtr = missingContrecPtr;
+					SpinLockRelease(&XLogRecoveryCtl->info_lck);
+				}
 			}
 
 			if (readFile >= 0)
@@ -3210,9 +3403,31 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
 			if (!InArchiveRecovery && ArchiveRecoveryRequested &&
 				!fetching_ckpt)
 			{
+				/*
+				 * Wait for the startup process to apply the last sent record
+				 * by the pipeline, otherwise we will fail the consistency
+				 * check as all the records decoded by the pipeline have not
+				 * arrived/consumed by the consumer (statup proc) yet.
+				 */
+				if (wal_pipeline_enabled && AmWalPipeline())
+					WalPipeline_WaitForConsumerCatchup();
+
 				ereport(DEBUG1,
 						(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
 				InArchiveRecovery = true;
+
+				if (wal_pipeline_enabled)
+				{
+					/* also update the shared state */
+					SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+					XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+
+					/* update startup proc (consumer) about the standbymode */
+					if (StandbyModeRequested)
+						XLogRecoveryCtl->stanbyEnabled = true;
+					SpinLockRelease(&XLogRecoveryCtl->info_lck);
+				}
+
 				if (StandbyModeRequested)
 					EnableStandbyMode();
 
@@ -3269,7 +3484,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
  * XLogPageRead() to try fetching the record from another source, or to
  * sleep and retry.
  */
-static int
+int
 XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 			 XLogRecPtr targetRecPtr, char *readBuf)
 {
@@ -3281,7 +3496,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
 	int			r;
 	instr_time	io_start;
 
-	Assert(AmStartupProcess() || !IsUnderPostmaster);
+	Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
 
 	XLByteToSeg(targetPagePtr, targetSegNo, wal_segment_size);
 	targetPageOff = XLogSegmentOffset(targetPagePtr, wal_segment_size);
@@ -3707,7 +3922,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 							 LSN_FORMAT_ARGS(RecPtr));
 
 						/* Do background tasks that might benefit us later. */
-						KnownAssignedTransactionIdsIdleMaintenance();
+						if (AmStartupProcess())
+							KnownAssignedTransactionIdsIdleMaintenance();
 
 						(void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
 										 WL_LATCH_SET | WL_TIMEOUT |
@@ -3719,6 +3935,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 
 						/* Handle interrupt signals of startup process */
 						ProcessStartupProcInterrupts();
+						if (wal_pipeline_enabled)
+							ProcessPipelineBgwInterrupts();
 					}
 					last_fail_time = now;
 					currentSource = XLOG_FROM_ARCHIVE;
@@ -3744,6 +3962,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 				 xlogSourceNames[oldSource], xlogSourceNames[currentSource],
 				 lastSourceFailed ? "failure" : "success");
 
+		if (wal_pipeline_enabled)
+		{
+			/* also update the shared state */
+			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+			XLogRecoveryCtl->currentSource = currentSource;
+			pendingWalRcvRestart = XLogRecoveryCtl->pendingWalRcvRestart;
+			SpinLockRelease(&XLogRecoveryCtl->info_lck);
+		}
+
 		/*
 		 * We've now handled possible failure. Try to read from the chosen
 		 * source.
@@ -3818,6 +4045,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					}
 					pendingWalRcvRestart = false;
 
+					if (wal_pipeline_enabled)
+					{
+						/* also update the shared state */
+						SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+						XLogRecoveryCtl->pendingWalRcvRestart = false;
+						SpinLockRelease(&XLogRecoveryCtl->info_lck);
+					}
+
 					/*
 					 * Launch walreceiver if needed.
 					 *
@@ -3895,6 +4130,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 							if (latestChunkStart <= RecPtr)
 							{
 								XLogReceiptTime = GetCurrentTimestamp();
+
+								if (wal_pipeline_enabled)
+								{
+									/* also update the shared state */
+									SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+									XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+									SpinLockRelease(&XLogRecoveryCtl->info_lck);
+								}
+
 								SetCurrentChunkStartTime(XLogReceiptTime);
 							}
 						}
@@ -3923,7 +4167,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 						if (readFile < 0)
 						{
 							if (!expectedTLEs)
+							{
 								expectedTLEs = readTimeLineHistory(recoveryTargetTLI);
+								PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI);
+							}
+
+
 							readFile = XLogFileRead(readSegNo, receiveTLI,
 													XLOG_FROM_STREAM, false);
 							Assert(readFile >= 0);
@@ -3933,6 +4182,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 							/* just make sure source info is correct... */
 							readSource = XLOG_FROM_STREAM;
 							XLogReceiptSource = XLOG_FROM_STREAM;
+							if (wal_pipeline_enabled)
+							{
+								/* also update the shared state */
+								SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+								XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+								SpinLockRelease(&XLogRecoveryCtl->info_lck);
+							}
 							return XLREAD_SUCCESS;
 						}
 						break;
@@ -3970,15 +4226,35 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 					 */
 					if (!streaming_reply_sent)
 					{
-						WalRcvForceReply();
-						streaming_reply_sent = true;
+						if (wal_pipeline_enabled && AmWalPipeline())
+						{
+							/*
+							 * In case of pipeline enabled, we cannot just call
+							 * WalRcvForceReply() directly as the consumer (startup proc)
+							 * haven't actually received/replayed all the wal
+							 * received from the wal_receiver yet.
+							 */
+							SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+							XLogRecoveryCtl->flushedUptoRecPtr = flushedUpto;
+							SpinLockRelease(&XLogRecoveryCtl->info_lck);
+							streaming_reply_sent = true;
+						}
+						else
+						{
+							WalRcvForceReply();
+							streaming_reply_sent = true;
+						}
 					}
 
 					/* Do any background tasks that might benefit us later. */
-					KnownAssignedTransactionIdsIdleMaintenance();
+					if (AmStartupProcess())
+						KnownAssignedTransactionIdsIdleMaintenance();
 
 					/* Update pg_stat_recovery_prefetch before sleeping. */
-					XLogPrefetcherComputeStats(xlogprefetcher);
+					if (AmWalPipeline())
+						XLogPrefetcherComputeStats(xlogprefetcher_pipelined);
+					else
+						XLogPrefetcherComputeStats(xlogprefetcher);
 
 					/*
 					 * Wait for more WAL to arrive, when we will be woken
@@ -4009,6 +4285,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 		 * process.
 		 */
 		ProcessStartupProcInterrupts();
+		if (wal_pipeline_enabled)
+			ProcessPipelineBgwInterrupts();
 	}
 
 	return XLREAD_FAIL;			/* not reached */
@@ -4173,6 +4451,7 @@ rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN)
 	recoveryTargetTLI = newtarget;
 	list_free_deep(expectedTLEs);
 	expectedTLEs = newExpectedTLEs;
+	PipelineConsumerexpectedTLEsUpdateTLI(newtarget);
 
 	/*
 	 * As in StartupXLOG(), try to ensure we have all the history files
@@ -4262,6 +4541,15 @@ XLogFileRead(XLogSegNo segno, TimeLineID tli,
 		if (source != XLOG_FROM_STREAM)
 			XLogReceiptTime = GetCurrentTimestamp();
 
+		if (wal_pipeline_enabled)
+		{
+			/* also update the shared state */
+			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+			XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+			XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+			SpinLockRelease(&XLogRecoveryCtl->info_lck);
+		}
+
 		return fd;
 	}
 	if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -4346,7 +4634,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
 			{
 				elog(DEBUG1, "got WAL segment from archive");
 				if (!expectedTLEs)
+				{
 					expectedTLEs = tles;
+					PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI);
+				}
 				return fd;
 			}
 		}
@@ -4357,7 +4648,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
 			if (fd != -1)
 			{
 				if (!expectedTLEs)
+				{
 					expectedTLEs = tles;
+					PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI);
+				}
 				return fd;
 			}
 		}
@@ -4379,16 +4673,52 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
 void
 StartupRequestWalReceiverRestart(void)
 {
+
+	/*
+	 * currentSource is also defined as pipeline shared state variable.
+	 * Update the state before procedding.
+	 */
+	if (wal_pipeline_enabled)
+	{
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		currentSource = XLogRecoveryCtl->currentSource;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+
 	if (currentSource == XLOG_FROM_STREAM && WalRcvRunning())
 	{
 		ereport(LOG,
 				(errmsg("WAL receiver process shutdown requested")));
 
 		pendingWalRcvRestart = true;
+
+		/*
+		 * pendingWalRcvRestart is also defined as pipeline shared state variable.
+		 * Update the state before procedding.
+		 */
+		if (wal_pipeline_enabled)
+		{
+			SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+			XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart;
+			SpinLockRelease(&XLogRecoveryCtl->info_lck);
+		}
 	}
 }
 
 
+/*
+ * standbyState is also defined as a shared state. Pipeline worker can also
+ * update its value, so always confirm the shared state before procedding.
+ */
+void
+SetSharedHotStandbyState(void)
+{
+	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	XLogRecoveryCtl->standbyState = standbyState;
+	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+}
+
+
 /*
  * Has a standby promotion already been triggered?
  *
@@ -4440,7 +4770,7 @@ CheckForStandbyTrigger(void)
 	if (LocalPromoteIsTriggered)
 		return true;
 
-	if (IsPromoteSignaled() && CheckPromoteSignal())
+	if (CheckPromoteSignal())
 	{
 		ereport(LOG, (errmsg("received promote request")));
 		RemovePromoteSignalFiles();
@@ -4483,6 +4813,7 @@ void
 WakeupRecovery(void)
 {
 	SetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+	SetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
 }
 
 /*
@@ -4652,6 +4983,14 @@ GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream)
 	 */
 	Assert(InRecovery);
 
+	if (wal_pipeline_enabled)
+	{
+		SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+		XLogReceiptTime = XLogRecoveryCtl->XLogReceiptTime;
+		XLogReceiptSource = XLogRecoveryCtl->XLogReceiptSource;
+		SpinLockRelease(&XLogRecoveryCtl->info_lck);
+	}
+
 	*rtime = XLogReceiptTime;
 	*fromStream = (XLogReceiptSource == XLOG_FROM_STREAM);
 }
@@ -4737,6 +5076,60 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue
 	}
 }
 
+static void
+InitializePipelineStartupEnv(WalPipelineParams *params)
+{
+	/*
+	 * These parameters are already set for the startup process but not for our
+	 * pipeline worker. So in order to start decoding through the pipeline,
+	 * these variables should be saved and then restored later.
+	 */
+	params->NextRecPtr = xlogreader->NextRecPtr;
+	params->recoveryTargetTLI = recoveryTargetTLI;
+	params->StandbyModeRequested = StandbyModeRequested;
+	params->StandbyMode = StandbyMode;
+	params->ArchiveRecoveryRequested = ArchiveRecoveryRequested;
+	params->InArchiveRecovery = InArchiveRecovery;
+	params->minRecoveryPointTLI = minRecoveryPointTLI;
+	params->minRecoveryPoint = minRecoveryPoint;
+	params->InRedo = InRedo;
+	params->currentSource = currentSource;
+	params->lastSourceFailed = lastSourceFailed;
+	params->pendingWalRcvRestart = pendingWalRcvRestart;
+	params->RedoStartTLI = RedoStartTLI;
+	params->CheckPointLoc = CheckPointLoc;
+	params->CheckPointTLI = CheckPointTLI;
+	params->RedoStartLSN = RedoStartLSN;
+	params->standbyState = standbyState;
+	params->flushedUpto = flushedUpto;
+	params->receiveTLI = receiveTLI;
+	params->abortedRecPtr = abortedRecPtr;
+	params->missingContrecPtr = missingContrecPtr;
+	params->backupEndRequired = backupEndRequired;
+	params->backupStartPoint = backupStartPoint;
+	params->backupEndPoint = backupEndPoint;
+	params->curFileTLI = curFileTLI;
+
+	/*
+	 * The pipeline will do the waiting in this case startup proc should disown
+	 * the latch.
+	 */
+	DisownRecoveryWakeupLatch();
+
+	/*
+	 * Update shared state before starting.
+	 */
+	SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+	XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+	XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart;
+	XLogRecoveryCtl->abortedRecPtr = abortedRecPtr;
+	XLogRecoveryCtl->missingContrecPtr = missingContrecPtr;
+	XLogRecoveryCtl->currentSource = currentSource;
+	XLogRecoveryCtl->standbyState = standbyState;
+	XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+	XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+	SpinLockRelease(&XLogRecoveryCtl->info_lck);
+}
 
 /*
  * Pipeline bgw should be aware of all the parameters thats been initialized by
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index f3ad90c7c7a..dbf0d39212a 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1196,6 +1196,7 @@ standby_redo(XLogReaderState *record)
 		running.xids = xlrec->xids;
 
 		ProcArrayApplyRecoveryInfo(&running);
+		SetSharedHotStandbyState();
 
 		/*
 		 * The startup process currently has no convenient way to schedule
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
index 5740b05f79c..e2be9f9e395 100644
--- a/src/include/access/xlogpipeline.h
+++ b/src/include/access/xlogpipeline.h
@@ -173,12 +173,27 @@ extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined;
 extern Size WalPipelineShmemSize(void);
 extern void WalPipelineShmemInit(void);
 
+/* Start/stop the pipeline */
+extern void WalPipeline_Start(WalPipelineParams *params);
+extern void WalPipeline_Stop(void);
+
 /* Producer functions (called by background worker) */
 extern void WalPipeline_ProducerMain(Datum main_arg);
 extern bool WalPipeline_SendRecord(XLogReaderState *record);
 extern bool WalPipeline_SendShutdown(void);
 extern bool WalPipeline_SendError(int errcode, const char *errmsg);
 
+/* Consumer functions (called by startup process) */
+extern DecodedXLogRecord *WalPipeline_ReceiveRecord(XLogReaderState *startup_reader, bool first_iteration);
+extern bool WalPipeline_CheckProducerAlive(void);
+
+/* Status and monitoring */
+extern bool WalPipeline_IsActive(void);
+extern void WalPipeline_WaitForConsumerCatchup(void);
+extern void WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+								  XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn);
+extern bool AmWalPipeline(void);
+
 
 extern void ProcessPipelineBgwInterrupts(void);
 
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index e675ab8353d..b61aad08627 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -13,6 +13,7 @@
 
 #include "access/xlogprefetcher.h"
 #include "access/xlogreader.h"
+#include "access/xlogutils.h"
 #include "catalog/pg_control.h"
 #include "lib/stringinfo.h"
 #include "storage/condition_variable.h"
@@ -106,6 +107,13 @@ typedef struct XLogRecoveryCtlData
 	 */
 	Latch		recoveryWakeupLatch;
 
+	/*
+	 * In case pipeline enabled we will need two latches. One that can be used
+	 * by the pipeline for WAL waiting and other that can be used by the
+	 * startup process for the apply delay
+	 */
+	Latch		recoveryApplyDelayLatch;
+
 	/*
 	 * Last record successfully replayed.
 	 */
@@ -133,6 +141,46 @@ typedef struct XLogRecoveryCtlData
 	ConditionVariable recoveryNotPausedCV;
 
 	slock_t		info_lck;		/* locks shared variables shown above */
+
+	/* ------------------------------------------------------------------
+	 * Variables use for IPC between pipeline and the startup proc.
+	 * These are also the static variables in xlogrecovery.c but there values
+	 * keep on changing. So we added then in the shared memory so that both
+	 * the pipeline and the startup proc stay synced on any of this state
+	 * change
+	 * ------------------------------------------------------------------
+	 */
+
+	/*
+	 * Pipeline could be waiting for the startup process to catchup with the
+	 * decoder. This could happend when no wait wal is available from the
+	 * current resource and now pipline have change the wal srouce
+	 * i.e enabling standby if requested.
+	 */
+	bool		pipeline_waiting;
+	bool		InArchiveRecovery;
+	bool		pendingWalRcvRestart;
+	bool		stanbyEnabled;
+
+	/* The target TLI for which expectedTLEs should be recomputed */
+	TimeLineID	expectedTLEsUpdateTLI;
+
+	/*
+	 * Normaly we wakeup walrcvr after specific records have been applied, as
+	 * reads are sequential so we wkaeup after specific read. But in case of pipeline
+	 * reads (decoded records) could be way ahead of the consumer. We cannot wakeup
+	 * wal rcvr based on read, so we tell consumer to wakup after apllied records
+	 * upto flushedUptoRecPtr
+	 */
+	XLogRecPtr	flushedUptoRecPtr;
+	XLogRecPtr	abortedRecPtr;
+	XLogRecPtr	missingContrecPtr;
+
+	XLogSource	currentSource;
+	XLogSource	XLogReceiptSource;
+
+	HotStandbyState standbyState;
+	TimestampTz		XLogReceiptTime;
 } XLogRecoveryCtlData;
 
 extern PGDLLIMPORT XLogRecoveryCtlData *XLogRecoveryCtl;
@@ -242,6 +290,8 @@ extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, i
 extern bool PromoteIsTriggered(void);
 extern bool CheckPromoteSignal(void);
 extern void WakeupRecovery(void);
+extern void DisownRecoveryWakeupLatch(void);
+extern void SetSharedHotStandbyState(void);
 
 extern void StartupRequestWalReceiverRestart(void);
 extern void XLogRequestWalReceiverReply(void);
-- 
2.34.1



  [application/pdf] recoveries-becnhmark-v02.pdf (48.6K, 5-recoveries-becnhmark-v02.pdf)
  download

  [application/x-xz] recoveries-benchmark-v02.tar.xz (1.4M, 6-recoveries-benchmark-v02.tar.xz)
  download

view thread (9+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected]
  Subject: Re: [WIP] Pipelined Recovery
  In-Reply-To: <CA+UBfa=PKdShpSTTTSHwXdGPZnm2rGMKPjERNOdS0SG9t9CT3Q@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox