public inbox for [email protected]
help / color / mirror / Atom feedRe: [WIP] Pipelined Recovery
9+ messages / 5 participants
[nested] [flat]
* Re: [WIP] Pipelined Recovery
@ 2026-02-17 11:21 Zsolt Parragi <[email protected]>
2026-03-18 07:43 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-04-03 03:18 ` RE: [WIP] Pipelined Recovery Hayato Kuroda (Fujitsu) <[email protected]>
0 siblings, 2 replies; 9+ messages in thread
From: Zsolt Parragi @ 2026-02-17 11:21 UTC (permalink / raw)
To: Imran Zaheer <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; pgsql-hackers
Hello!
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+
+ if (WalPipelineShm->initialized)
+ {
+ SpinLockRelease(&WalPipelineShm->mutex);
+ return; /* Already started */
+ }
+
This doesn't seem to be a good use for a spinlock, as it guards a
longer operation. Spinlocks are supposed to guard "a few
instructions", not long initialization processes, according to their
documentation. Since the code already uses dsm segment, wouldn't it be
easier to use something like GetNamedDSMSegment which explicitly
supports this use case with an initialization callback?
Also see the next two more specific comments about errors and spinlocks.
+ case WAL_MSG_ERROR:
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ ereport(ERROR,
+ (errcode(WalPipelineShm->error_code),
+ errmsg("[walpipeline] consumer: received error from the producer: %s",
+ WalPipelineShm->error_message)));
+ SpinLockRelease(&WalPipelineShm->mutex);
+ return NULL;
According to the documentation spinlocks are not automatically
released on errors, and ereport ERROR stops the code flow so
everything after that is dead code.
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT "
received=" UINT64_FORMAT,
+ WalPipelineShm->records_sent, WalPipelineShm->records_received);
+ SpinLockRelease(&WalPipelineShm->mutex);
A LOG is not an error, but elog can call palloc, which can cause an
out of memory error, and then again we never release the spinlock.
+ if (msglen > WAL_PIPELINE_MAX_MSG_SIZE)
+ {
+ elog(WARNING, "[walpipeline] producer: wal record at %X/%X too large
(%zu bytes), skipping",
+ LSN_FORMAT_ARGS(record->ReadRecPtr), msglen);
+ pfree(buffer);
+ return true;
+ }
This doesn't seem like a good idea to me, won't skipping records cause
data corruption?
+ shm_mq_handle *producer_mq_handle;
+ shm_mq_handle *consumer_mq_handle;
Aren't these handles process local, yet stored in WalPipelineShmCtl?
+{ name => 'wal_pipeline', type => 'bool', context => 'PGC_SIGHUP',
group => 'WAL_RECOVERY',
+ short_desc => 'Use parallel workers to speedup recovery.',
+ variable => 'wal_pipeline_enabled',
+ boot_val => 'false',
+},
Is SIGHUP really useful for this feature? It only runs at startup.
+ elog(FATAL, "[walpipeline] consumer: either pipeline not active, or
no record available from pipeline.");
+ return record;
FATAL also stops the codeflow, so that return is never executed.
+/* Size of the shared memory queue (can be made configurable) */
+#define WAL_PIPELINE_QUEUE_SIZE (128 * 1024 * 1024) /* 8 MB */
+
+/* Maximum size of a single message */
+#define WAL_PIPELINE_MAX_MSG_SIZE (2 * 1024 * 1024) /* 1 MB */
The comments about the sizes seem to be off.
if (reachedRecoveryTarget)
{
+ if (wal_pipeline_enabled)
+ WalPipeline_Stop();
What if we didn't reach the recovery target, shouldn't we stop the
pipelines then?
+ /* Send shutdown message if queue is available */
+ if (consumer_mq_handle)
+ WalPipeline_SendShutdown();
+}
This seems wrong, WalPipeline_SendShutdown checks for the producer
handle inside it instead? What's the exact contract, who should call
these methods? By looking at the code I'm not sure if this shutdown
logic works as intended.
^ permalink raw reply [nested|flat] 9+ messages in thread
* Re: [WIP] Pipelined Recovery
2026-02-17 11:21 Re: [WIP] Pipelined Recovery Zsolt Parragi <[email protected]>
@ 2026-03-18 07:43 ` Imran Zaheer <[email protected]>
2026-03-18 10:18 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
1 sibling, 1 reply; 9+ messages in thread
From: Imran Zaheer @ 2026-03-18 07:43 UTC (permalink / raw)
To: Zsolt Parragi <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; pgsql-hackers
Hi Zsolt.
Thanks alot for the review and pointing out the bugs. I have fixed the bugs
you mentioned in my new patch set. But
patchset mail is held for moderation for some reason.
>
> if (reachedRecoveryTarget)
> {
> + if (wal_pipeline_enabled)
> + WalPipeline_Stop();
>
> What if we didn't reach the recovery target, shouldn't we stop the
> pipelines then?
>
I have fixed the bugs shutdown logic.
As we already know we will exist the recovery redo loop in
`PerformWalRecovery()` only in two cases
1: Recovery target reached:
In this case consumers will call to stop the pipeline.
@@ -1807,6 +1931,9 @@ PerformWalRecovery(void)
if (reachedRecoveryTarget)
{
+ if (wal_pipeline_enabled)
+ WalPipeline_Stop();
+
2: Available pg_wal is consumed and now more wal to read.
In this case pipeline producers will send the shutdown msg to the consumer.
Consumer will
detect this message and then call ` WalPipeline_Stop`. This is the case
where we cannot read
more records and the while loop will break here.
+ if (decoded_record)
+ {
+ record = &decoded_record->header;
+ return record;
+ }
+ else
+ {
+ /*
+ * We will end up here only when pipeline couldn't read more
+ * records and have sent a shutdown msg. We will acknowldge this
+ * and will trigger request to stop the pipeline workers.
+ */
+ WalPipeline_Stop();
+ return NULL;
+ }
Hope this makes sense.
Once again thanks for reporting the bugs. You will receive the new patchset
mail soon once it is cleared from
the moderation.
Looking forward to your reviews, comments, etc.
Regards,
Imran Zaheer
^ permalink raw reply [nested|flat] 9+ messages in thread
* Re: [WIP] Pipelined Recovery
2026-02-17 11:21 Re: [WIP] Pipelined Recovery Zsolt Parragi <[email protected]>
2026-03-18 07:43 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
@ 2026-03-18 10:18 ` Imran Zaheer <[email protected]>
2026-03-19 02:33 ` Re: [WIP] Pipelined Recovery Xuneng Zhou <[email protected]>
0 siblings, 1 reply; 9+ messages in thread
From: Imran Zaheer @ 2026-03-18 10:18 UTC (permalink / raw)
To: Zsolt Parragi <[email protected]>; Jakub Wartak <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; pgsql-hackers
(resending the mail, previous mail was held for moderation for some reason.)
Hi
I am attaching a new rebased version of the patch. Following are some
major changes in the new patch set.
* Streaming replication is now working. The prefetcher was not fully
decoupled from the startup process; that's why there were inconsistencies
in some scenarios and most of the recovery tap tests were failing.
* Patch is now split into consumer and producer patches. This will make
review easier.
* Pipeline shutdown flow is also improved. Now producer will always check
for the shutdown flag (being set by the consumer)
* Pipeline msg queue size is now configurable `wal_pipeline_queue_size`
* Tap tests now passes with PG_TEST_INITDB_EXTRA_OPTS="-c wal_pipeline=on"
* New tap test for `recovery/t/053_walpipeline.pl`. This covers some basic
functionality of the pipeline.
* The filename is changed to xlogpipeline.{h|C}
Thanks to all for the valuable feedback.
Looking forward to your reviews, comments, etc.
--
Regards,
Imran Zaheer
On Wed, Mar 18, 2026 at 3:15 PM Imran Zaheer <[email protected]> wrote:
> (resending the mail, previous mail was held for moderation for some
> reason. Now pdf is moved to the tar.gz)
>
> Hi
>
> I am attaching a new rebased version of the patch. Following are some
> major changes in the new patch set.
>
> * Streaming replication is now working. The prefetcher was not fully
> decoupled from the startup process; that's why there were inconsistencies
> in some scenarios and most of the recovery tap tests were failing.
>
> * Patch is now split into consumer and producer patches. This will make
> review easier.
>
> * Pipeline shutdown flow is also improved. Now producer will always check
> for the shutdown flag (being set by the consumer)
>
> * Pipeline msg queue size is now configurable `wal_pipeline_queue_size`
>
> * Tap tests now passes with PG_TEST_INITDB_EXTRA_OPTS="-c wal_pipeline=on"
>
> * New tap test for `recovery/t/053_walpipeline.pl`. This covers some
> basic functionality of the pipeline.
>
> * The filename is changed to xlogpipeline.{h|C}
>
> Thanks to all for the valuable feedback.
> Looking forward to your reviews, comments, etc.
>
> --
> Regards,
> Imran Zaheer
>
>
> On Wed, Mar 18, 2026 at 12:43 PM Imran Zaheer <[email protected]>
> wrote:
>
>> Hi Zsolt.
>>
>> Thanks alot for the review and pointing out the bugs. I have fixed the
>> bugs you mentioned in my new patch set. But
>> patchset mail is held for moderation for some reason.
>>
>> >
>> > if (reachedRecoveryTarget)
>> > {
>> > + if (wal_pipeline_enabled)
>> > + WalPipeline_Stop();
>> >
>> > What if we didn't reach the recovery target, shouldn't we stop the
>> > pipelines then?
>> >
>>
>> I have fixed the bugs shutdown logic.
>>
>> As we already know we will exist the recovery redo loop in
>> `PerformWalRecovery()` only in two cases
>>
>> 1: Recovery target reached:
>> In this case consumers will call to stop the pipeline.
>>
>> @@ -1807,6 +1931,9 @@ PerformWalRecovery(void)
>>
>> if (reachedRecoveryTarget)
>> {
>> + if (wal_pipeline_enabled)
>> + WalPipeline_Stop();
>> +
>>
>> 2: Available pg_wal is consumed and now more wal to read.
>> In this case pipeline producers will send the shutdown msg to the
>> consumer. Consumer will
>> detect this message and then call ` WalPipeline_Stop`. This is the case
>> where we cannot read
>> more records and the while loop will break here.
>>
>> + if (decoded_record)
>> + {
>> + record = &decoded_record->header;
>> + return record;
>> + }
>> + else
>> + {
>> + /*
>> + * We will end up here only when pipeline couldn't read more
>> + * records and have sent a shutdown msg. We will acknowldge this
>> + * and will trigger request to stop the pipeline workers.
>> + */
>> + WalPipeline_Stop();
>> + return NULL;
>> + }
>>
>>
>> Hope this makes sense.
>>
>> Once again thanks for reporting the bugs. You will receive the new
>> patchset mail soon once it is cleared from
>> the moderation.
>>
>> Looking forward to your reviews, comments, etc.
>>
>> Regards,
>> Imran Zaheer
>>
>
Attachments:
[text/x-patch] v2-0001-Pipelined-Recovery-Producer.patch (40.3K, 3-v2-0001-Pipelined-Recovery-Producer.patch)
download | inline diff:
From e8619cb11f065a8a86495a9a7aa16c073cd6e37e Mon Sep 17 00:00:00 2001
From: Imran Zaheer <[email protected]>
Date: Tue, 17 Mar 2026 21:47:56 +0500
Subject: [PATCH v2 1/2] Pipelined Recovery - Producer
This includes the producer specific code for the producer-consumer
architecture for WAL replay that separates WAL decoding from the recovery process,
enabling parallel processing between differemt steps of replay.
The producer includes a background worker that reads and decodes WAL records,
then send them to the startup process for the redo. IPC happens via shared
memory message queues (shm_mq), allowing the decoder to run ahead of the apply process.
This provides some improvement in recovery performance for CPU-bound workloads.
New GUC: wal_pipeline (default: off)
Author: Imran Zaheer <[email protected]>
Idea by: Ants Aasma <[email protected]>
---
src/backend/access/transam/Makefile | 1 +
src/backend/access/transam/meson.build | 1 +
src/backend/access/transam/xlogpipeline.c | 677 ++++++++++++++++++
src/backend/access/transam/xlogrecovery.c | 43 ++
src/backend/postmaster/bgworker.c | 5 +
src/backend/storage/ipc/ipci.c | 5 +
src/backend/utils/misc/guc_parameters.dat | 15 +
src/backend/utils/misc/postgresql.conf.sample | 2 +
src/include/access/xlog.h | 2 +
src/include/access/xlogpipeline.h | 188 +++++
src/include/access/xlogrecovery.h | 19 +
src/test/recovery/meson.build | 1 +
src/test/recovery/t/053_walpipeline.pl | 208 ++++++
13 files changed, 1167 insertions(+)
create mode 100644 src/backend/access/transam/xlogpipeline.c
create mode 100644 src/include/access/xlogpipeline.h
create mode 100644 src/test/recovery/t/053_walpipeline.pl
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index a32f473e0a2..ba0bf343769 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -32,6 +32,7 @@ OBJS = \
xlogbackup.o \
xlogfuncs.o \
xloginsert.o \
+ xlogpipeline.o \
xlogprefetcher.o \
xlogreader.o \
xlogrecovery.o \
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index 06aadc7f315..be37b40581d 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -20,6 +20,7 @@ backend_sources += files(
'xlogbackup.c',
'xlogfuncs.c',
'xloginsert.c',
+ 'xlogpipeline.c',
'xlogprefetcher.c',
'xlogrecovery.c',
'xlogstats.c',
diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c
new file mode 100644
index 00000000000..4b95a11d16b
--- /dev/null
+++ b/src/backend/access/transam/xlogpipeline.c
@@ -0,0 +1,677 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.c
+ * WAL replay pipeline implementation
+ *
+ * This module implements a producer-consumer pipeline for WAL replay.
+ * The producer (background worker) reads and decodes WAL records in parallel
+ * with the consumer (startup process) that applies them.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogpipeline.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/xlog.h"
+#include "access/xlogpipeline.h"
+#include "access/xlogprefetcher.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "access/xlog_internal.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/startup.h"
+#include "storage/bufmgr.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/md.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "storage/smgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/elog.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/rel.h"
+#include "utils/timeout.h"
+
+
+/* Global shared memory control structure */
+WalPipelineShmCtl *WalPipelineShm = NULL;
+
+XLogPrefetcher *xlogprefetcher_pipelined = NULL;
+
+/* Local state for producer */
+static dsm_segment *producer_dsm_seg = NULL;
+static shm_mq *producer_mq = NULL;
+static shm_mq_handle *producer_mq_handle = NULL;
+
+
+/*
+ * Flags set by interrupt handlers for later service in the redo loop.
+ */
+static volatile sig_atomic_t got_SIGHUP = false;
+
+/* Signal handlers */
+static void PipelineBgwSigHupHandler(SIGNAL_ARGS);
+
+/* Forward declarations */
+static void wal_pipeline_cleanup_callback(int code, Datum arg);
+static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static void cleanup_producer_resources(void);
+static void WalPipeline_WaitForConsumerShutdownRequest(void);
+
+/* copied from xlogrecovery.c */
+/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
+typedef struct XLogPageReadPrivate
+{
+ int emode;
+ bool fetching_ckpt; /* are we fetching a checkpoint record? */
+ bool randAccess;
+ TimeLineID replayTLI;
+} XLogPageReadPrivate;
+
+/*
+ * Compute space needed for WAL pipeline shared memory
+ */
+Size
+WalPipelineShmemSize(void)
+{
+ Size size = 0;
+
+ size = add_size(size, sizeof(WalPipelineShmCtl));
+
+ return size;
+}
+
+/*
+ * Initialize WAL pipeline shared memory structures
+ */
+void
+WalPipelineShmemInit(void)
+{
+ bool found;
+
+ WalPipelineShm = (WalPipelineShmCtl *)
+ ShmemInitStruct("WAL Pipeline Control",
+ sizeof(WalPipelineShmCtl),
+ &found);
+
+ if (!found)
+ {
+ /* First time through, initialize */
+ SpinLockInit(&WalPipelineShm->mutex);
+ WalPipelineShm->initialized = false;
+ WalPipelineShm->shutdown_requested = false;
+ WalPipelineShm->producer_exited = false;
+ WalPipelineShm->producer_pid = 0;
+ WalPipelineShm->consumer_pid = 0;
+ WalPipelineShm->producer_lsn = InvalidXLogRecPtr;
+ WalPipelineShm->consumer_lsn = InvalidXLogRecPtr;
+ WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID;
+ WalPipelineShm->records_sent = 0;
+ WalPipelineShm->records_received = 0;
+ WalPipelineShm->bytes_sent = 0;
+ WalPipelineShm->bytes_received = 0;
+ WalPipelineShm->error_code = 0;
+ WalPipelineShm->error_message[0] = '\0';
+ }
+}
+
+
+
+/*
+ * Producer Function.
+ * Main loop for the producer background worker.
+ */
+void
+WalPipeline_ProducerMain(Datum main_arg)
+{
+ dsm_handle handle = DatumGetUInt32(main_arg);
+ dsm_segment *seg;
+ shm_toc *toc;
+ WalPipelineParams *params;
+ XLogReaderState *xlogreader;
+ XLogPageReadPrivate *private;
+ XLogRecord *record;
+ TimeLineID replayTLI = 0;
+ bool end_of_wal = false;
+ uint64 records_sent;
+ uint64 records_received;
+
+ /*
+ * Properly accept or ignore signals the postmaster might send us.
+ */
+ pqsignal(SIGHUP, PipelineBgwSigHupHandler); /* reload config file */
+
+ /* Register cleanup callback */
+ before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+ seg = dsm_attach(handle);
+ if (seg == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("[walpipeline] producer: could not map dynamic shared memory segment")));
+
+ toc = shm_toc_attach(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("[walpipeline] producer: bad magic number in dynamic shared memory segment")));
+
+ /* Lookup params and queue */
+ params = shm_toc_lookup(toc, 1, false);
+ producer_mq = shm_toc_lookup(toc, 2, false);
+
+ /* Set up producer side of queue */
+ producer_dsm_seg = seg;
+ shm_mq_set_sender(producer_mq, MyProc);
+ producer_mq_handle = shm_mq_attach(producer_mq, seg, NULL);
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_pid = MyProcPid;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ /* DSM is now attached, so safe to unblock the signals */
+ BackgroundWorkerUnblockSignals();
+
+ /* Set up WAL reading processor */
+ private = palloc0(sizeof(XLogPageReadPrivate));
+ xlogreader =
+ XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &XLogPageRead,
+ .segment_open = NULL,
+ .segment_close = wal_segment_close),
+ private);
+
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating a WAL reading processor.")));
+ xlogreader->system_identifier = GetSystemIdentifier();
+
+ /*
+ * Set the WAL decode buffer size. This limits how far ahead we can read
+ * in the WAL.
+ */
+ XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+ /* Init some important globals before starting */
+ replayTLI = params->ReplayTLI;
+ InitializePipelineRecoveryEnv(params);
+
+ /* Reinit the WAL prefetcher. */
+ xlogprefetcher_pipelined = XLogPrefetcherAllocate(xlogreader);
+
+
+ elog(LOG, "[walpipeline] producer: started at %X/%X, TLI %u",
+ LSN_FORMAT_ARGS(params->NextRecPtr), replayTLI);
+
+ XLogPrefetcherBeginRead(xlogprefetcher_pipelined, params->NextRecPtr);
+
+ /* Main decoding loop */
+ while (true)
+ {
+ bool shutdown_requested;
+
+ /* Check if consumer requested shutdown */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ shutdown_requested = WalPipelineShm->shutdown_requested;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (shutdown_requested)
+ {
+ elog(DEBUG1, "[walpipeline] producer: shutdown requested by consumer");
+ break;
+ }
+
+ /* Read next WAL record */
+ record = ReadRecord(xlogprefetcher_pipelined, LOG, false, replayTLI);
+
+ if (record == NULL)
+ {
+ end_of_wal = true;
+ elog(DEBUG1, "[walpipeline] producer: reached end of WAL");
+ break;
+ }
+
+ /*
+ * Successfully decoded a record. Send it to the consumer.
+ */
+ if (!WalPipeline_SendRecord(xlogreader))
+ {
+ elog(WARNING, "[walpipeline] producer: failed to send record, queue full or detached");
+ break;
+ }
+
+ /* Update our position for monitoring */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_lsn = xlogreader->EndRecPtr;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+
+ if (end_of_wal)
+ {
+ /* Notify consumer we need to exit early */
+ WalPipeline_SendShutdown();
+
+ /* wait until consumer set the flag */
+ WalPipeline_WaitForConsumerShutdownRequest();
+ }
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ records_sent = WalPipelineShm->records_sent;
+ records_received = WalPipelineShm->records_received;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT " received=" UINT64_FORMAT,
+ records_sent, records_received);
+
+ /* Cleanup */
+ XLogReaderFree(xlogreader);
+ DisownRecoveryWakeupLatch();
+ pfree(private);
+ cleanup_producer_resources();
+}
+
+/*
+ * Producer Function.
+ * Send a decoded WAL record to the consumer
+ */
+bool
+WalPipeline_SendRecord(XLogReaderState *record)
+{
+ char *buffer = NULL;
+ Size msglen;
+ shm_mq_result res;
+
+
+ if (!producer_mq_handle)
+ return false;
+
+ /* Serialize the record */
+ msglen = serialize_wal_record(record, &buffer);
+
+ res = shm_mq_send(producer_mq_handle, msglen, buffer, false, true);
+
+ if (res == SHM_MQ_SUCCESS)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->records_sent++;
+ WalPipelineShm->bytes_sent += msglen;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ pfree(buffer);
+ return true;
+ }
+
+ if (res == SHM_MQ_DETACHED)
+ {
+ elog(PANIC, "[walpipeline] producer: consumer detached");
+ pfree(buffer);
+ return false;
+ }
+
+ /* Some other error */
+ elog(PANIC, "[walpipeline] producer: shm_mq_send failed with result %d", res);
+ pfree(buffer);
+ return false;
+}
+
+/*
+ * Producer Function.
+ * Send shutdown message to consumer
+ */
+bool
+WalPipeline_SendShutdown(void)
+{
+ WalRecordMsgHeader hdr;
+ shm_mq_result res;
+
+ if (!producer_mq_handle)
+ return false;
+
+ hdr.msg_type = WAL_MSG_SHUTDOWN;
+ hdr.endRecPtr = InvalidXLogRecPtr;
+
+ res = shm_mq_send(producer_mq_handle, sizeof(hdr), &hdr, false, true);
+ return (res == SHM_MQ_SUCCESS);
+}
+
+/*
+ * Producer Function.
+ * Send error message to consumer
+ */
+bool
+WalPipeline_SendError(int errcode, const char *errmsg)
+{
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->error_code = errcode;
+ strlcpy(WalPipelineShm->error_message, errmsg,
+ sizeof(WalPipelineShm->error_message));
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ return true;
+}
+
+/*
+ * Producer Function.
+ * Producer may can exit without waiting for the consumer, but its better to
+ * wait until consumer request shutdown. This way log messages will show
+ * no of records_sent & records_received records equal to each other.
+ */
+static void
+WalPipeline_WaitForConsumerShutdownRequest(void)
+{
+ int iters = 0;
+
+ while (true)
+ {
+ bool shutdown_requested;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ shutdown_requested = WalPipelineShm->shutdown_requested;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (shutdown_requested)
+ break;
+
+ if (++iters >= MAX_SHUTDOWN_WAIT_ITERS)
+ {
+ elog(WARNING,
+ "[walpipeline] producer: timed out waiting for consumer "
+ "to acknowledge shutdown, exiting anyway");
+ break;
+ }
+
+ /* Allow SIGTERM / SIGHUP to interrupt the wait */
+ ProcessPipelineBgwInterrupts();
+
+ pg_usleep(10000); /* sleep 10ms */
+ }
+}
+
+
+/*
+ * serialize_wal_record (Producer)
+ *
+ * Serialize the currently decoded WAL record owned by the given
+ * XLogReaderState into a contiguous buffer.
+ *
+ * Allocation/layout of the output buffer:
+ *
+ * [WalRecordMsgHeader]
+ * [XLogRecord]
+ * [main_data]
+ * Repeated for each in-use block:
+ * [SerializedBlockMeta]
+ * [backup image bytes] (optional)
+ * [block data bytes] (optional)
+ *
+ * The resulting buffer contains no pointers and is safe to transfer
+ * across processes or store externally.
+ */
+static Size
+serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
+{
+ DecodedXLogRecord *dec = xlogreader->record;
+ WalRecordMsgHeader hdr;
+ Size total;
+ char *ptr;
+
+ if (dec == NULL)
+ return 0;
+
+ /* ---- size calculation ---- */
+ total =
+ sizeof(WalRecordMsgHeader) +
+ sizeof(XLogRecord) +
+ dec->main_data_len;
+
+ for (int i = 0; i <= dec->max_block_id; i++)
+ {
+ const DecodedBkpBlock *blk = &dec->blocks[i];
+
+ if (!blk->in_use)
+ continue;
+
+ total += sizeof(SerializedBlockMeta);
+
+ if (blk->has_image && blk->bkp_image && blk->bimg_len > 0)
+ total += blk->bimg_len;
+
+ if (blk->has_data && blk->data && blk->data_len > 0)
+ total += blk->data_len;
+ }
+
+ ptr = *outbuf = palloc(total);
+
+ /* ---- message header ---- */
+ memset(&hdr, 0, sizeof(hdr));
+ hdr.msg_type = WAL_MSG_RECORD;
+ hdr.readRecPtr = xlogreader->ReadRecPtr;
+ hdr.abortedRecPtr = xlogreader->abortedRecPtr;
+ hdr.missingContrecPtr = xlogreader->missingContrecPtr;
+ hdr.overwrittenRecPtr = xlogreader->overwrittenRecPtr;
+ hdr.endRecPtr = xlogreader->EndRecPtr;
+ hdr.decoded_size = total - sizeof(WalRecordMsgHeader);
+ hdr.max_block_id = dec->max_block_id;
+ hdr.main_data_len = dec->main_data_len;
+ hdr.toplevel_xid = dec->toplevel_xid;
+ hdr.record_origin = dec->record_origin;
+
+ memcpy(ptr, &hdr, sizeof(hdr));
+ ptr += sizeof(hdr);
+
+ /* ---- XLogRecord ---- */
+ memcpy(ptr, &dec->header, sizeof(XLogRecord));
+ ptr += sizeof(XLogRecord);
+
+ /* ---- main data ---- */
+ if (dec->main_data_len > 0)
+ {
+ memcpy(ptr, dec->main_data, dec->main_data_len);
+ ptr += dec->main_data_len;
+ }
+
+ /* ---- blocks ---- */
+ for (int i = 0; i <= dec->max_block_id; i++)
+ {
+ const DecodedBkpBlock *blk = &dec->blocks[i];
+ SerializedBlockMeta meta;
+
+ if (!blk->in_use)
+ continue;
+
+ memset(&meta, 0, sizeof(meta));
+ meta.block_id = i;
+ meta.in_use = true;
+ meta.rlocator = blk->rlocator;
+ meta.forknum = blk->forknum;
+ meta.blkno = blk->blkno;
+ meta.flags = blk->flags;
+ meta.has_image = blk->has_image;
+ meta.apply_image = blk->apply_image;
+ meta.has_data = blk->has_data;
+ meta.bimg_len = blk->bimg_len;
+ meta.bimg_info = blk->bimg_info;
+ meta.hole_offset = blk->hole_offset;
+ meta.hole_length = blk->hole_length;
+ meta.data_len = blk->data_len;
+
+ memcpy(ptr, &meta, sizeof(meta));
+ ptr += sizeof(meta);
+
+ if (blk->has_image && blk->bkp_image && blk->bimg_len > 0)
+ {
+ memcpy(ptr, blk->bkp_image, blk->bimg_len);
+ ptr += blk->bimg_len;
+ }
+
+ if (blk->has_data && blk->data && blk->data_len > 0)
+ {
+ memcpy(ptr, blk->data, blk->data_len);
+ ptr += blk->data_len;
+ }
+ }
+
+ Assert(ptr - *outbuf == total);
+ return total;
+}
+
+
+/*
+ * We need to put some assertion that only pipeline worker should be touching
+ * the specific code.
+ */
+bool AmWalPipeline()
+{
+ if (MyBackendType == B_BG_WORKER && MyBgworkerEntry)
+ {
+ if (strncmp(MyBgworkerEntry->bgw_name, "wal pipeline", 12) == 0)
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Clean up producer-side resources
+ */
+static void
+cleanup_producer_resources(void)
+{
+ if (producer_mq_handle)
+ {
+ shm_mq_detach(producer_mq_handle);
+ producer_mq_handle = NULL;
+ }
+
+ if (producer_dsm_seg)
+ {
+ dsm_detach(producer_dsm_seg);
+ producer_dsm_seg = NULL;
+ }
+
+ producer_mq = NULL;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_pid = 0;
+ WalPipelineShm->producer_exited = true;
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+
+/*
+ * Cleanup callback for process exit
+ */
+static void
+wal_pipeline_cleanup_callback(int code, Datum arg)
+{
+ pid_t mypid = MyProcPid;
+ bool is_producer = false;
+
+ if (WalPipelineShm)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ is_producer = (WalPipelineShm->producer_pid == mypid);
+ SpinLockRelease(&WalPipelineShm->mutex);
+ }
+
+ if (is_producer)
+ cleanup_producer_resources();
+ else
+ cleanup_consumer_resources();
+}
+
+/* --------------------------------
+ * signal handler routines
+ * --------------------------------
+ */
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+PipelineBgwSigHupHandler(SIGNAL_ARGS)
+{
+ got_SIGHUP = true;
+ WakeupRecovery();
+}
+
+/*
+ * Re-read the config file.
+ *
+ * If one of the critical walreceiver options has changed, flag xlogrecovery.c
+ * to restart it.
+ */
+static void
+PipelineRereadConfig(void)
+{
+ char *conninfo = pstrdup(PrimaryConnInfo);
+ char *slotname = pstrdup(PrimarySlotName);
+ bool tempSlot = wal_receiver_create_temp_slot;
+ bool conninfoChanged;
+ bool slotnameChanged;
+ bool tempSlotChanged = false;
+
+ ProcessConfigFile(PGC_SIGHUP);
+
+ conninfoChanged = strcmp(conninfo, PrimaryConnInfo) != 0;
+ slotnameChanged = strcmp(slotname, PrimarySlotName) != 0;
+
+ /*
+ * wal_receiver_create_temp_slot is used only when we have no slot
+ * configured. We do not need to track this change if it has no effect.
+ */
+ if (!slotnameChanged && strcmp(PrimarySlotName, "") == 0)
+ tempSlotChanged = tempSlot != wal_receiver_create_temp_slot;
+ pfree(conninfo);
+ pfree(slotname);
+
+ if (conninfoChanged || slotnameChanged || tempSlotChanged)
+ StartupRequestWalReceiverRestart();
+}
+
+/*
+ * Process any requests or signals received recently.
+ */
+void
+ProcessPipelineBgwInterrupts(void)
+{
+
+ bool shutdown_requested;
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ PipelineRereadConfig();
+ }
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ shutdown_requested = WalPipelineShm->shutdown_requested;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (shutdown_requested)
+ proc_exit(0);
+
+ CHECK_FOR_INTERRUPTS();
+}
\ No newline at end of file
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 6d2c4a86b96..b66ec80fa25 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -4738,6 +4738,49 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue
}
+/*
+ * Pipeline bgw should be aware of all the parameters thats been initialized by
+ * the startup process before performing the actual recoevery. Other then this
+ * there are also some variables that keep on changing. The pipeline & the startup
+ * process should be aware of the state change of such variables, we can use shared
+ * memory for such variables.
+ */
+void
+InitializePipelineRecoveryEnv(WalPipelineParams *params)
+{
+ StandbyMode = params->StandbyMode;
+ StandbyModeRequested = params->StandbyModeRequested;
+ ArchiveRecoveryRequested = params->ArchiveRecoveryRequested;
+ InArchiveRecovery = params->InArchiveRecovery;
+ recoveryTargetTLI = params->recoveryTargetTLI;
+ minRecoveryPointTLI = params->minRecoveryPointTLI;
+ minRecoveryPoint = params->minRecoveryPoint;
+ currentSource = params->currentSource;
+ lastSourceFailed = params->lastSourceFailed;
+ pendingWalRcvRestart = params->pendingWalRcvRestart;
+ RedoStartTLI = params->RedoStartTLI;
+ RedoStartLSN = params->RedoStartLSN;
+ standbyState = params->standbyState;
+ CheckPointLoc = params->CheckPointLoc;
+ CheckPointTLI = params->CheckPointTLI;
+ flushedUpto = params->flushedUpto;
+ receiveTLI = params->receiveTLI;
+ abortedRecPtr = params->abortedRecPtr;
+ missingContrecPtr = params->missingContrecPtr;
+ InRedo = params->InRedo;
+ backupEndRequired = params->backupEndRequired;
+ backupStartPoint = params->backupStartPoint;
+ backupEndPoint = params->backupEndPoint;
+ curFileTLI = params->curFileTLI;
+ InRecovery = true;
+
+ /*
+ * As pipeline will be reading the wal, so better to own the latch to wait at.
+ */
+ if (ArchiveRecoveryRequested)
+ OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
+
/*
* GUC check_hook for primary_slot_name
*/
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 0104a86b9ec..192295ad695 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/parallel.h"
+#include "access/xlogpipeline.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -143,6 +144,10 @@ static const struct
{
.fn_name = "SequenceSyncWorkerMain",
.fn_addr = SequenceSyncWorkerMain
+ },
+ {
+ .fn_name = "WalPipeline_ProducerMain",
+ .fn_addr = WalPipeline_ProducerMain
}
};
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index a4785daf1e5..f5eaff675f0 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/syncscan.h"
#include "access/transam.h"
#include "access/twophase.h"
+#include "access/xlogpipeline.h"
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
@@ -142,6 +143,7 @@ CalculateShmemSize(void)
size = add_size(size, AioShmemSize());
size = add_size(size, WaitLSNShmemSize());
size = add_size(size, LogicalDecodingCtlShmemSize());
+ size = add_size(size, WalPipelineShmemSize());
/* include additional requested shmem from preload libraries */
size = add_size(size, total_addin_request);
@@ -224,6 +226,9 @@ CreateSharedMemoryAndSemaphores(void)
/* Initialize dynamic shared memory facilities. */
dsm_postmaster_startup(shim);
+ /* Initialize WAL pipeline module */
+ WalPipelineShmemInit();
+
/*
* Now give loadable modules a chance to set up their shmem allocations
*/
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index a5a0edf2534..3523e7d459f 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3398,6 +3398,21 @@
boot_val => 'false',
},
+{ name => 'wal_pipeline', type => 'bool', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+ short_desc => 'Use parallel workers to speedup recovery.',
+ variable => 'wal_pipeline_enabled',
+ boot_val => 'false',
+},
+
+{ name => 'wal_pipeline_queue_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+ short_desc => 'Size of the shared memory queue used by the WAL pipeline.',
+ flags => 'GUC_UNIT_MB',
+ variable => 'wal_pipeline_mq_size_mb',
+ boot_val => '128',
+ min => '1',
+ max => '1024',
+},
+
{ name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.',
variable => 'wal_receiver_create_temp_slot',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e686d88afc4..1d2fdb9747a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -277,6 +277,8 @@
#recovery_prefetch = try # prefetch pages referenced in the WAL?
#wal_decode_buffer_size = 512kB # lookahead window used for prefetching
# (change requires restart)
+#wal_pipeline = off # decode in parallel
+#wal_pipeline_queue_size = 128MB
# - Archiving -
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index dcc12eb8cbe..a0c26f8005f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -40,6 +40,8 @@ extern PGDLLIMPORT int min_wal_size_mb;
extern PGDLLIMPORT int max_wal_size_mb;
extern PGDLLIMPORT int wal_keep_size_mb;
extern PGDLLIMPORT int max_slot_wal_keep_size_mb;
+extern PGDLLIMPORT int wal_pipeline_mq_size_mb;
+extern PGDLLIMPORT bool wal_pipeline_enabled;
extern PGDLLIMPORT int XLOGbuffers;
extern PGDLLIMPORT int XLogArchiveTimeout;
extern PGDLLIMPORT int wal_retrieve_retry_interval;
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
new file mode 100644
index 00000000000..5740b05f79c
--- /dev/null
+++ b/src/include/access/xlogpipeline.h
@@ -0,0 +1,188 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.h
+ * WAL replay pipeline for parallel recovery
+ *
+ * This module implements a producer-consumer pipeline for WAL replay:
+ * - Producer: background worker that reads and decodes WAL records
+ * - Consumer: startup process: core redo loop
+ *
+ * The pipeline uses shared memory queues (shm_mq) to pass decoded WAL
+ * records from producer to consumer, enabling parallelism while
+ * maintaining sequential replay semantics.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * src/include/access/xlogpipeline.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAL_PIPELINE_H
+#define WAL_PIPELINE_H
+
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+
+/*
+ * Magic number for shared memory TOC
+ */
+#define PG_WAL_PIPELINE_MAGIC 0x57414C50 /* "WALP" */
+
+/*
+ * Message types sent through the pipeline
+ */
+typedef enum WalMsgType
+{
+ WAL_MSG_INVALID = 0,
+ WAL_MSG_RECORD, /* Decoded WAL record */
+ WAL_MSG_SHUTDOWN, /* Graceful shutdown request */
+ WAL_MSG_ERROR /* Error occurred in producer */
+} WalMsgType;
+
+/* Wire header for a serialized WAL message */
+typedef struct WalRecordMsgHeader
+{
+ WalMsgType msg_type; /* WAL_MSG_RECORD etc */
+ XLogRecPtr readRecPtr; /* XLogReaderState->ReadRecPtr */
+ XLogRecPtr abortedRecPtr; /* XLogReaderState->abortedRecPtr */
+ XLogRecPtr missingContrecPtr; /* XLogReaderState->missingContrecPtr */
+ XLogRecPtr overwrittenRecPtr; /* XLogReaderState->overwrittenRecPtr */
+ ReplOriginId record_origin; /* DecodedXLogRecord->record_origin */
+ TransactionId toplevel_xid; /* DecodedXLogRecord->toplevel_xid */
+ XLogRecPtr endRecPtr; /* EndRecPtr */
+ uint32 decoded_size; /* total size of decoded record payload (not including this header) */
+ int32 max_block_id; /* highest block id (could be -1) */
+ uint32 main_data_len; /* length of main_data */
+ /* followed by decoded payload bytes */
+} WalRecordMsgHeader;
+
+/* Per-block metadata on the wire (no pointers) */
+typedef struct SerializedBlockMeta
+{
+ int32 block_id;
+ bool in_use;
+ RelFileLocator rlocator; /* same as decoded block */
+ ForkNumber forknum;
+ BlockNumber blkno;
+ uint8 flags;
+ bool has_image;
+ bool apply_image;
+ bool has_data;
+ uint16 bimg_len;
+ uint8 bimg_info;
+ uint16 hole_offset;
+ uint16 hole_length;
+ uint16 data_len;
+ /* followed by bimg bytes then data bytes */
+} SerializedBlockMeta;
+
+/*
+ * Parameters passed from StartupXLOG (consumer side)
+ * to the WAL pipeline producer background worker.
+ */
+typedef struct WalPipelineParams
+{
+ bool StandbyMode;
+ bool StandbyModeRequested;
+ bool ArchiveRecoveryRequested;
+ bool InArchiveRecovery;
+ bool InRedo;
+ bool lastSourceFailed;
+ bool pendingWalRcvRestart;
+ bool backupEndRequired;
+
+ TimeLineID RedoStartTLI;
+ TimeLineID CheckPointTLI;
+ TimeLineID recoveryTargetTLI;
+ TimeLineID minRecoveryPointTLI;
+ TimeLineID ReplayTLI;
+ TimeLineID receiveTLI;
+
+ XLogRecPtr backupStartPoint;
+ XLogRecPtr backupEndPoint;
+ XLogRecPtr CheckPointLoc;
+ XLogRecPtr RedoStartLSN;
+ XLogRecPtr NextRecPtr;
+ XLogRecPtr minRecoveryPoint;
+ XLogRecPtr flushedUpto;
+ XLogRecPtr abortedRecPtr;
+ XLogRecPtr missingContrecPtr;
+
+ int readFile;
+ XLogSegNo readSegNo;
+ uint32 readOff;
+ uint32 readLen;
+ XLogSource readSource;
+ TimeLineID curFileTLI;
+
+
+ HotStandbyState standbyState;
+ XLogSource currentSource;
+
+} WalPipelineParams;
+
+/*
+ * Shared memory control structure for the WAL pipeline
+ */
+typedef struct WalPipelineShmCtl
+{
+ /* Lifecycle management */
+ slock_t mutex;
+ bool initialized;
+ bool shutdown_requested;
+ bool producer_exited;
+
+ /* Producer state */
+ pid_t producer_pid;
+ ProcNumber producer_procnum;
+ XLogRecPtr producer_lsn; /* Last LSN read by producer */
+
+ /* Consumer state */
+ pid_t consumer_pid;
+ XLogRecPtr consumer_lsn; /* Last LSN recieved by consumer */
+ XLogRecPtr applied_lsn; /* Last LSN applied by consumer */
+
+ /* Queue handles */
+ dsm_handle dsm_seg_handle;
+ shm_mq_handle *producer_mq_handle;
+ shm_mq_handle *consumer_mq_handle;
+
+ /* Statistics */
+ uint64 records_sent;
+ uint64 records_received;
+ uint64 bytes_sent;
+ uint64 bytes_received;
+
+ /* Error state */
+ int error_code;
+ char error_message[256];
+} WalPipelineShmCtl;
+
+/* consumer may have to compute prefetecher stats */
+extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined;
+
+/*
+ * Public API functions
+ */
+
+/* Initialize the WAL pipeline shared memory structures */
+extern Size WalPipelineShmemSize(void);
+extern void WalPipelineShmemInit(void);
+
+/* Producer functions (called by background worker) */
+extern void WalPipeline_ProducerMain(Datum main_arg);
+extern bool WalPipeline_SendRecord(XLogReaderState *record);
+extern bool WalPipeline_SendShutdown(void);
+extern bool WalPipeline_SendError(int errcode, const char *errmsg);
+
+
+extern void ProcessPipelineBgwInterrupts(void);
+
+/* Global shared memory pointer */
+extern WalPipelineShmCtl *WalPipelineShm;
+
+#endif /* WAL_PIPELINE_H */
\ No newline at end of file
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 2842106b285..e675ab8353d 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,6 +11,7 @@
#ifndef XLOGRECOVERY_H
#define XLOGRECOVERY_H
+#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "lib/stringinfo.h"
@@ -60,6 +61,17 @@ typedef enum RecoveryPauseState
RECOVERY_PAUSED, /* recovery is paused */
} RecoveryPauseState;
+/* Codes indicating where we got a WAL file from during recovery, or where
+ * to attempt to get one.
+ */
+typedef enum
+{
+ XLOG_FROM_ANY = 0, /* request to read WAL from any source */
+ XLOG_FROM_ARCHIVE, /* restored using restore_command */
+ XLOG_FROM_PG_WAL, /* existing file in pg_wal */
+ XLOG_FROM_STREAM, /* streamed from primary */
+} XLogSource;
+
/*
* Shared-memory state for WAL recovery.
*/
@@ -208,6 +220,8 @@ typedef struct
bool recovery_signal_file_found;
} EndOfWalRecoveryInfo;
+struct WalPipelineParams; /* forward declaration */
+
extern EndOfWalRecoveryInfo *FinishWalRecovery(void);
extern void ShutdownWalRecovery(void);
extern void RemovePromoteSignalFiles(void);
@@ -220,6 +234,10 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern TimestampTz GetLatestXTime(void);
extern TimestampTz GetCurrentChunkReplayStartTime(void);
extern XLogRecPtr GetCurrentReplayRecPtr(TimeLineID *replayEndTLI);
+extern XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
+ bool fetching_ckpt, TimeLineID replayTLI);
+extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetRecPtr, char *readBuf);
extern bool PromoteIsTriggered(void);
extern bool CheckPromoteSignal(void);
@@ -229,6 +247,7 @@ extern void StartupRequestWalReceiverRestart(void);
extern void XLogRequestWalReceiverReply(void);
extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue);
+extern void InitializePipelineRecoveryEnv(struct WalPipelineParams *params);
extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 36d789720a3..fc23acbaec2 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -61,6 +61,7 @@ tests += {
't/050_redo_segment_missing.pl',
't/051_effective_wal_level.pl',
't/052_checkpoint_segment_missing.pl',
+ 't/053_walpipeline.pl',
],
},
}
diff --git a/src/test/recovery/t/053_walpipeline.pl b/src/test/recovery/t/053_walpipeline.pl
new file mode 100644
index 00000000000..2fb790aadc5
--- /dev/null
+++ b/src/test/recovery/t/053_walpipeline.pl
@@ -0,0 +1,208 @@
+# Copyright (c) 2025-2026, PostgreSQL Global Development Group
+#
+# Tests for the WAL pipeline feature (wal_pipeline GUC).
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# ----------
+# Helpers
+# ----------
+
+sub slurp_log
+{
+ my ($node) = @_;
+ open(my $fh, '<', $node->logfile()) or die "Cannot open log: $!";
+ my @lines = <$fh>;
+ close($fh);
+ return @lines;
+}
+
+sub log_matches
+{
+ my ($node, $re) = @_;
+ return grep { /$re/ } slurp_log($node);
+}
+
+
+# ########################################
+# wal_pipeline = on, basic recovery
+# ########################################
+
+my $node1 = PostgreSQL::Test::Cluster->new('p1-recovery');
+$node1->init;
+$node1->start;
+
+$node1->safe_psql('postgres', q{
+ CREATE TABLE t (id serial PRIMARY KEY, v text);
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# generate more WAL
+$node1->safe_psql('postgres', q{
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# crash stop to force WAL recovery
+$node1->stop('immediate');
+
+# restart → recovery happens
+$node1->append_conf('postgresql.conf', "wal_pipeline = on");
+$node1->start;
+
+
+# Producer started
+ok(scalar log_matches($node1, qr/\[walpipeline\] producer: started at/),
+ 'producer started message found in log');
+
+# Pipeline stopped cleanly
+ok(scalar log_matches($node1, qr/\[walpipeline\] stopped/),
+ 'pipeline stopped message found in log');
+
+# Consumer received shutdown from producer
+ok(scalar log_matches($node1, qr/\[walpipeline\] consumer: received shutdown message/),
+ 'consumer received shutdown message from producer');
+
+# sent == received
+my @exit_lines = log_matches($node1,
+ qr/\[walpipeline\] producer: exiting: sent=\d+ received=\d+/);
+ok(scalar @exit_lines >= 1, 'producer exiting line found in log');
+
+my ($sent, $recv) = $exit_lines[-1] =~ /sent=(\d+) received=(\d+)/;
+ok(defined $sent && $sent > 0, "sent count ($sent) is positive");
+ok(defined $recv && $recv > 0, "received count ($recv) is positive");
+is($sent, $recv, "no records lost in pipeline queue: sent=$sent received=$recv");
+
+# No PANIC
+ok(!(scalar log_matches($node1, qr/\bPANIC\b/)),
+ 'no PANIC messages during pipeline recovery');
+
+# Data integrity
+my $count = $node1->safe_psql('postgres', 'SELECT count(*) FROM t');
+is($count + 0, 100_000, 'all 100000 rows visible after pipeline recovery');
+
+$node1->stop;
+
+# ##############################################################
+# wal_pipeline = off (baseline, no pipeline log messages)
+# ##############################################################
+
+my $node2 = PostgreSQL::Test::Cluster->new('p0-recovery');
+$node2->init;
+$node2->start;
+
+$node2->safe_psql('postgres', q{
+ CREATE TABLE t (id serial PRIMARY KEY, v text);
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# generate more WAL
+$node2->safe_psql('postgres', q{
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# crash stop to force WAL recovery
+$node2->stop('immediate');
+
+# restart → recovery happens
+$node2->append_conf('postgresql.conf', "wal_pipeline = off");
+$node2->start;
+
+ok(!(scalar log_matches($node2, qr/\[walpipeline\] producer: started/)),
+ 'no pipeline log messages when wal_pipeline = off');
+
+my $count2 = $node2->safe_psql('postgres', 'SELECT count(*) FROM t');
+is($count2 + 0, 100_000, 'all rows present after non-pipeline recovery');
+
+$node2->stop;
+
+
+
+# ###################################################################
+# pipeline on vs off produce identical data (checksum comparison)
+# ###################################################################
+
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1);
+$primary->start;
+
+$primary->safe_psql('postgres', q{
+ CREATE TABLE t (id serial PRIMARY KEY, v text);
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1, 30000) i;
+});
+
+$primary->backup('backup3');
+
+$primary->safe_psql('postgres', q{
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1, 30000) i;
+ UPDATE t SET v = 'x' WHERE id % 10 = 0;
+});
+
+# ensure WAL boundary
+$primary->safe_psql('postgres', 'SELECT pg_switch_wal()');
+my $target_lsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_lsn()');
+
+my $replica_on = PostgreSQL::Test::Cluster->new('replica_p1');
+$replica_on->init_from_backup($primary, 'backup3',
+ has_streaming => 1);
+$replica_on->append_conf('postgresql.conf', "wal_pipeline = on\n");
+$replica_on->start;
+
+my $replica_off = PostgreSQL::Test::Cluster->new('replica_p0');
+$replica_off->init_from_backup($primary, 'backup3',
+ has_streaming => 1);
+$replica_off->append_conf('postgresql.conf', "wal_pipeline = off\n");
+$replica_off->start;
+
+# wait for replicas to catch up
+$primary->wait_for_catchup($replica_on);
+$primary->wait_for_catchup($replica_off);
+
+my $md5_on = $replica_on->safe_psql('postgres',
+ "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t");
+
+my $md5_off = $replica_off->safe_psql('postgres',
+ "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t");
+
+is($md5_on, $md5_off,
+ 'table checksum identical between pipeline=on and pipeline=off');
+
+$replica_on->stop;
+$replica_off->stop;
+$primary->stop('fast');
+
+
+
+# #################################
+# pipeline when on tiny replay
+# #################################
+
+my $node3 = PostgreSQL::Test::Cluster->new('p1-small-replay');
+$node3->init;
+$node3->start;
+
+# crash stop to force WAL recovery
+$node3->stop('immediate');
+
+# restart → recovery happens
+$node3->append_conf('postgresql.conf', "wal_pipeline = on");
+$node3->start;
+
+ok(scalar log_matches($node3, qr/\[walpipeline\] producer: exiting: sent=0 received=0/),
+ 'pipeline producer sent zero records');
+
+ok((scalar log_matches($node3, qr/redo done at/)),
+ 'pipeline redo done even with tiny replay');
+
+$node3->stop;
+
+done_testing();
\ No newline at end of file
--
2.34.1
[text/x-patch] v2-0002-Pipelined-Recovery-Consumer.patch (50.7K, 4-v2-0002-Pipelined-Recovery-Consumer.patch)
download | inline diff:
From 323482e965096afb5fc2a0ddefb4a075f2716f28 Mon Sep 17 00:00:00 2001
From: Imran Zaheer <[email protected]>
Date: Tue, 17 Mar 2026 21:48:05 +0500
Subject: [PATCH v2 2/2] Pipelined Recovery - Consumer
This includes the consumer-specific code for the producer-consumer
architecture for WAL replay that separates WAL decoding from the recovery process,
enabling parallel processing between different steps of replay.
The consumer receives the decoded record from the shared memory message
queue. Both producer and consumer should be aware of any state changes,
so variable states are synced through shared memory `XLogRecoveryCtlData`.
Also, constant states are shared from the consumer to the producer using
`WalPipelineParams`.
Author: Imran Zaheer <[email protected]>
Idea by: Ants Aasma <[email protected]>
---
src/backend/access/transam/xlog.c | 5 +-
src/backend/access/transam/xlogpipeline.c | 525 ++++++++++++++++++++
src/backend/access/transam/xlogprefetcher.c | 3 +-
src/backend/access/transam/xlogrecovery.c | 469 +++++++++++++++--
src/backend/storage/ipc/standby.c | 1 +
src/include/access/xlogpipeline.h | 15 +
src/include/access/xlogrecovery.h | 50 ++
7 files changed, 1028 insertions(+), 40 deletions(-)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f5c9a34374d..3e1b7d78055 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -59,6 +59,7 @@
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xloginsert.h"
+#include "access/xlogpipeline.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
@@ -5924,6 +5925,7 @@ StartupXLOG(void)
ProcArrayApplyRecoveryInfo(&running);
}
+ SetSharedHotStandbyState();
}
/*
@@ -8488,6 +8490,7 @@ xlog_redo(XLogReaderState *record)
running.xids = xids;
ProcArrayApplyRecoveryInfo(&running);
+ SetSharedHotStandbyState();
}
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
@@ -9667,7 +9670,7 @@ GetOldestRestartPoint(XLogRecPtr *oldrecptr, TimeLineID *oldtli)
void
XLogShutdownWalRcv(void)
{
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
ShutdownWalRcv();
ResetInstallXLogFileSegmentActive();
diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c
index 4b95a11d16b..5bb575b1f14 100644
--- a/src/backend/access/transam/xlogpipeline.c
+++ b/src/backend/access/transam/xlogpipeline.c
@@ -52,6 +52,16 @@
#include "utils/timeout.h"
+/*
+ * Convert values of GUCs measured in megabytes to bytes
+ */
+#define MBToBytes(mbvar) (mbvar * 1024 * 1024)
+
+/*
+ * Waiting for consumer before exiting gracefully.
+ */
+#define MAX_SHUTDOWN_WAIT_ITERS 1000 /* 1000 * 10ms = 10 seconds */
+
/* Global shared memory control structure */
WalPipelineShmCtl *WalPipelineShm = NULL;
@@ -62,6 +72,10 @@ static dsm_segment *producer_dsm_seg = NULL;
static shm_mq *producer_mq = NULL;
static shm_mq_handle *producer_mq_handle = NULL;
+/* Local state for consumer */
+static dsm_segment *consumer_dsm_seg = NULL;
+static shm_mq *consumer_mq = NULL;
+static shm_mq_handle *consumer_mq_handle = NULL;
/*
* Flags set by interrupt handlers for later service in the redo loop.
@@ -74,7 +88,9 @@ static void PipelineBgwSigHupHandler(SIGNAL_ARGS);
/* Forward declarations */
static void wal_pipeline_cleanup_callback(int code, Datum arg);
static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static DecodedXLogRecord *deserialize_wal_record(const char *buffer, Size len, XLogReaderState *startup_reader, bool first_iteration);
static void cleanup_producer_resources(void);
+static void cleanup_consumer_resources(void);
static void WalPipeline_WaitForConsumerShutdownRequest(void);
/* copied from xlogrecovery.c */
@@ -135,6 +151,168 @@ WalPipelineShmemInit(void)
}
+/*
+ * Called by Consumer.
+ *
+ * Initialize and start the WAL pipeline. This will be called by the startup
+ * process (consumer) as a request to start the pipeline.
+ */
+void
+WalPipeline_Start(WalPipelineParams *params)
+{
+ BackgroundWorker worker;
+ BackgroundWorkerHandle *handle;
+ dsm_segment *seg;
+ shm_toc_estimator e;
+ shm_toc *toc;
+ Size segsize;
+ shm_mq *mq;
+ WalPipelineParams *shared_params;
+ pid_t pid;
+ BgwHandleStatus status;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ if (WalPipelineShm->initialized)
+ {
+ SpinLockRelease(&WalPipelineShm->mutex);
+ return; /* Already started */
+ }
+ WalPipelineShm->initialized = true;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ shm_toc_initialize_estimator(&e);
+ shm_toc_estimate_chunk(&e, sizeof(WalPipelineParams));
+ shm_toc_estimate_chunk(&e, MBToBytes(wal_pipeline_mq_size_mb));
+ shm_toc_estimate_keys(&e, 2); /* key=1 → params, key=2 → mq */
+ segsize = shm_toc_estimate(&e);
+
+ seg = dsm_create(segsize, 0);
+ dsm_pin_segment(seg);
+
+ toc = shm_toc_create(PG_WAL_PIPELINE_MAGIC,
+ dsm_segment_address(seg), segsize);
+
+ /*
+ * Pass the startup process vaaraibles state through WalPipelineParams
+ */
+ shared_params = shm_toc_allocate(toc, sizeof(WalPipelineParams));
+ shm_toc_insert(toc, 1, shared_params);
+ *shared_params = *params;
+
+ /* create the message queue */
+ mq = shm_mq_create(shm_toc_allocate(toc, MBToBytes(wal_pipeline_mq_size_mb)),
+ MBToBytes(wal_pipeline_mq_size_mb));
+ shm_toc_insert(toc, 2, mq);
+
+ /* update shared state */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->dsm_seg_handle = dsm_segment_handle(seg);
+ WalPipelineShm->consumer_pid = MyProcPid;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ /* Set up consumer side of the queue */
+ consumer_dsm_seg = seg;
+ consumer_mq = mq;
+ shm_mq_set_receiver(consumer_mq, MyProc);
+ consumer_mq_handle = shm_mq_attach(consumer_mq, seg, NULL);
+
+ /* Register cleanup callback */
+ before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+ /* Register background worker */
+ memset(&worker, 0, sizeof(worker));
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
+ worker.bgw_start_time = BgWorkerStart_PostmasterStart;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ sprintf(worker.bgw_library_name, "postgres");
+ sprintf(worker.bgw_function_name, "WalPipeline_ProducerMain");
+ snprintf(worker.bgw_name, BGW_MAXLEN, "wal pipeline producer");
+ snprintf(worker.bgw_type, BGW_MAXLEN, "wal pipeline producer");
+ worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+ worker.bgw_notify_pid = MyProcPid;
+
+ if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->initialized = false;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ dsm_unpin_segment(dsm_segment_handle(seg));
+ dsm_detach(seg);
+ consumer_dsm_seg = NULL;
+ consumer_mq = NULL;
+ consumer_mq_handle = NULL;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not register background worker for WAL pipeline")));
+ }
+
+ status = WaitForBackgroundWorkerStartup(handle, &pid);
+
+ if (status != BGWH_STARTED)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not start background process"),
+ errhint("More details may be available in the server log.")));
+ else
+ ereport(LOG, (errmsg("[walpipeline] started.")));
+}
+
+/*
+ * Request producer shutdown.
+ * This is called by the consumer when it no longer needs records.
+ */
+static void
+WalPipeline_RequestShutdown(void)
+{
+ if (!WalPipelineShm)
+ return;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->shutdown_requested = true;
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+/*
+ * Consumer Function.
+ * Stop the WAL pipeline. This will also be called be the startup process
+ * (consumer). This will only be called when consumer don't need any more
+ * decoded records. This function will also wait until the pipeline workers
+ * are stopped.
+ */
+void
+WalPipeline_Stop(void)
+{
+ if (!WalPipelineShm || !WalPipelineShm->initialized)
+ return;
+
+ /* Ask producer to stop */
+ WalPipeline_RequestShutdown();
+
+ /* Wait for producer to exit (max 10 seconds) */
+ for (int i = 0; i < 100; i++)
+ {
+ bool producer_alive;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ producer_alive = (WalPipelineShm->producer_pid != 0);
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (!producer_alive)
+ break;
+
+ pg_usleep(100000); /* 100 ms */
+ }
+
+ cleanup_consumer_resources();
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->initialized = false;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ elog(LOG, "[walpipeline] stopped");
+}
/*
* Producer Function.
@@ -374,6 +552,117 @@ WalPipeline_SendError(int errcode, const char *errmsg)
return true;
}
+/*
+ * Consumer Function.
+ * Receive and deserialize a WAL record from the producer
+ */
+DecodedXLogRecord *
+WalPipeline_ReceiveRecord(XLogReaderState *startup_reader, bool first_iteration)
+{
+ shm_mq_result res;
+ Size nbytes;
+ void *data;
+ char *err_msg;
+ int err_code;
+ WalRecordMsgHeader *hdr;
+ DecodedXLogRecord *record;
+
+ if (!consumer_mq_handle)
+ return NULL;
+
+ /* Receive message from queue */
+ res = shm_mq_receive(consumer_mq_handle, &nbytes, &data, false);
+
+ if (res != SHM_MQ_SUCCESS)
+ elog(ERROR, "[walpipeline] consumer: failed to receive record");
+
+ hdr = (WalRecordMsgHeader *) data;
+
+ /* Handle different message types */
+ switch (hdr->msg_type)
+ {
+ case WAL_MSG_RECORD:
+ record = deserialize_wal_record((char *) data, nbytes, startup_reader, first_iteration);
+
+ /* Update statistics */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->records_received++;
+ WalPipelineShm->bytes_received += nbytes;
+ WalPipelineShm->consumer_lsn = hdr->endRecPtr;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ return record;
+
+ case WAL_MSG_SHUTDOWN:
+ elog(LOG, "[walpipeline] consumer: received shutdown message from the producer");
+ return NULL;
+
+ case WAL_MSG_ERROR:
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ err_code = WalPipelineShm->error_code;
+ err_msg = WalPipelineShm->error_message;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ ereport(ERROR,
+ (errcode(err_code),
+ errmsg("[walpipeline] consumer: received error from the producer: %s", err_msg)));
+ return NULL;
+
+ default:
+ elog(PANIC, "[walpipeline] consumer: unknown message type: %d",
+ hdr->msg_type);
+ return NULL;
+ }
+}
+
+/*
+ * Consumer Function.
+ * Check if producer is still running
+ */
+bool
+WalPipeline_CheckProducerAlive(void)
+{
+ pid_t pid;
+ bool alive;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ pid = WalPipelineShm->producer_pid;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (pid == 0)
+ return false;
+
+ alive = (kill(pid, 0) == 0);
+
+ if (!alive)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_pid = 0;
+ SpinLockRelease(&WalPipelineShm->mutex);
+ }
+
+ return alive;
+}
+
+/*
+ * Consumer Function.
+ * Check if pipeline is active
+ */
+bool
+WalPipeline_IsActive(void)
+{
+ bool active;
+
+ if (!WalPipelineShm)
+ return false;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ active = WalPipelineShm->initialized && !WalPipelineShm->shutdown_requested;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ return active;
+}
+
/*
* Producer Function.
* Producer may can exit without waiting for the consumer, but its better to
@@ -411,6 +700,55 @@ WalPipeline_WaitForConsumerShutdownRequest(void)
}
}
+/*
+ * Consumer Function.
+ * Wait unless last sent record by the pipeline is applied by the
+ * startup process.
+ */
+void
+WalPipeline_WaitForConsumerCatchup(void)
+{
+ XLogRecPtr producer_lsn;
+ XLogRecPtr consumer_lsn;
+
+ for (;;)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ producer_lsn = WalPipelineShm->producer_lsn;
+ consumer_lsn = WalPipelineShm->applied_lsn;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (producer_lsn == consumer_lsn)
+ return;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* short sleep to avoid busy looping */
+ pg_usleep(50); /* 50 microseconds */
+ }
+}
+
+/*
+ * Consumer Function.
+ * Get pipeline statistics
+ */
+void
+WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+ XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn)
+{
+ SpinLockAcquire(&WalPipelineShm->mutex);
+
+ if (records_sent)
+ *records_sent = WalPipelineShm->records_sent;
+ if (records_received)
+ *records_received = WalPipelineShm->records_received;
+ if (producer_lsn)
+ *producer_lsn = WalPipelineShm->producer_lsn;
+ if (consumer_lsn)
+ *consumer_lsn = WalPipelineShm->consumer_lsn;
+
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
/*
* serialize_wal_record (Producer)
@@ -539,6 +877,167 @@ serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
return total;
}
+/*
+ * deserialize_wal_record (Consumer)
+ *
+ * Deserialize a WAL record from a buffer into a DecodedXLogRecord.
+ *
+ * Memory layout:
+ * [DecodedXLogRecord + blocks][main_data][block_images][block_data]
+ */
+DecodedXLogRecord *
+deserialize_wal_record(const char *buf, Size len, XLogReaderState *startup_reader, bool first_iteration)
+{
+ const char *ptr = buf;
+ const char *end = buf + len;
+ WalRecordMsgHeader hdr;
+ DecodedXLogRecord *dec = NULL;
+ char *alloc_ptr;
+ int nblocks;
+ Size total;
+
+ if (len < sizeof(WalRecordMsgHeader))
+ return NULL;
+
+ memcpy(&hdr, ptr, sizeof(hdr));
+ ptr += sizeof(hdr);
+
+ if (hdr.decoded_size != len - sizeof(WalRecordMsgHeader))
+ return NULL;
+
+ nblocks = (hdr.max_block_id >= 0) ? hdr.max_block_id + 1 : 0;
+
+ /* ---- space allocation ---- */
+ total = MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock))
+ + MAXALIGN(hdr.decoded_size);
+
+ dec = palloc(total);
+ memset(dec, 0, total);
+
+ alloc_ptr = (char *)dec + MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock));
+
+ /* ---- record metadata ---- */
+ dec->lsn = hdr.readRecPtr;
+ dec->next_lsn = hdr.endRecPtr;
+ dec->max_block_id = hdr.max_block_id;
+ dec->main_data_len = hdr.main_data_len;
+ dec->toplevel_xid = hdr.toplevel_xid;
+ dec->record_origin = hdr.record_origin;
+
+ /* ---- XLogRecord ---- */
+ if (ptr + sizeof(XLogRecord) > end)
+ goto fail;
+
+ memcpy(&dec->header, ptr, sizeof(XLogRecord));
+ ptr += sizeof(XLogRecord);
+
+ /* ---- main data ---- */
+ if (hdr.main_data_len > 0)
+ {
+ if (ptr + hdr.main_data_len > end)
+ goto fail;
+
+ dec->main_data = alloc_ptr;
+ memcpy(dec->main_data, ptr, hdr.main_data_len);
+ ptr += hdr.main_data_len;
+ alloc_ptr += MAXALIGN(hdr.main_data_len);
+ }
+
+ /* ---- blocks ---- */
+ for (int i = 0; i < nblocks && ptr < end; i++)
+ {
+ SerializedBlockMeta meta;
+ DecodedBkpBlock *blk;
+
+ if (ptr + sizeof(meta) > end)
+ break;
+
+ memcpy(&meta, ptr, sizeof(meta));
+ ptr += sizeof(meta);
+
+ if (!meta.in_use)
+ continue;
+
+ if (meta.block_id < 0 || meta.block_id >= nblocks)
+ goto fail;
+
+ blk = &dec->blocks[meta.block_id];
+
+ blk->in_use = true;
+ blk->rlocator = meta.rlocator;
+ blk->forknum = meta.forknum;
+ blk->blkno = meta.blkno;
+ blk->flags = meta.flags;
+ blk->has_image = meta.has_image;
+ blk->apply_image = meta.apply_image;
+ blk->has_data = meta.has_data;
+ blk->bimg_len = meta.bimg_len;
+ blk->bimg_info = meta.bimg_info;
+ blk->hole_offset = meta.hole_offset;
+ blk->hole_length = meta.hole_length;
+ blk->data_len = meta.data_len;
+
+ if (blk->has_image && blk->bimg_len > 0)
+ {
+ if (ptr + blk->bimg_len > end)
+ goto fail;
+
+ blk->bkp_image = alloc_ptr;
+ memcpy(blk->bkp_image, ptr, blk->bimg_len);
+ ptr += blk->bimg_len;
+ alloc_ptr += MAXALIGN(blk->bimg_len);
+ }
+
+ if (blk->has_data && blk->data_len > 0)
+ {
+ if (ptr + blk->data_len > end)
+ goto fail;
+
+ blk->data = alloc_ptr;
+ memcpy(blk->data, ptr, blk->data_len);
+ ptr += blk->data_len;
+ alloc_ptr += MAXALIGN(blk->data_len);
+ }
+ }
+
+ dec->size = alloc_ptr - (char *)dec;
+ dec->oversized = false;
+
+ /*
+ * The previous decoded record has been deserialized from
+ * from the pipeline and hence need to free the memory after
+ * use.
+ *
+ * But for the first iteration memory space for `reader->record`
+ * was allocated from the `decode_buffer`, and freeing this
+ * memory can be fatal. This memory will be freed automatically
+ * at the end of the recovery in `finishwalrecovery()`. So we
+ * will skip pfree for the first iteration (apply).
+ */
+ if (startup_reader->record && !first_iteration)
+ pfree(startup_reader->record);
+
+ /* Attach to reader, only updating the public parameters */
+ startup_reader->record = dec;
+ startup_reader->ReadRecPtr = dec->lsn;
+ startup_reader->DecodeRecPtr = dec->lsn;
+ startup_reader->EndRecPtr = dec->next_lsn;
+ startup_reader->NextRecPtr = dec->next_lsn;
+ startup_reader->decode_queue_head = dec;
+ startup_reader->decode_queue_tail = dec;
+ startup_reader->missingContrecPtr = hdr.missingContrecPtr;
+ startup_reader->abortedRecPtr = hdr.abortedRecPtr;
+ startup_reader->overwrittenRecPtr = hdr.overwrittenRecPtr;
+
+ return dec;
+
+fail:
+ if (dec)
+ pfree(dec);
+
+ elog(LOG, "deserialize_wal_record: failed");
+ return NULL;
+}
/*
* We need to put some assertion that only pipeline worker should be touching
@@ -581,6 +1080,32 @@ cleanup_producer_resources(void)
SpinLockRelease(&WalPipelineShm->mutex);
}
+/*
+ * Clean up consumer-side resources
+ */
+static void
+cleanup_consumer_resources(void)
+{
+ if (consumer_mq_handle)
+ {
+ shm_mq_detach(consumer_mq_handle);
+ consumer_mq_handle = NULL;
+ }
+
+ if (consumer_dsm_seg)
+ {
+ dsm_unpin_segment(dsm_segment_handle(consumer_dsm_seg));
+ dsm_detach(consumer_dsm_seg);
+ consumer_dsm_seg = NULL;
+ }
+
+ consumer_mq = NULL;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->consumer_pid = 0;
+ WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID;
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
/*
* Cleanup callback for process exit
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index c235eca7c51..1536ea34c41 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -27,6 +27,7 @@
#include "postgres.h"
+#include "access/xlogpipeline.h"
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
@@ -352,7 +353,7 @@ XLogPrefetchReconfigure(void)
static inline void
XLogPrefetchIncrement(pg_atomic_uint64 *counter)
{
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
}
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index b66ec80fa25..49cc0b9aafa 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -35,6 +35,7 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
+#include "access/xlogpipeline.h"
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
@@ -99,6 +100,8 @@ int recovery_min_apply_delay = 0;
char *PrimaryConnInfo = NULL;
char *PrimarySlotName = NULL;
bool wal_receiver_create_temp_slot = false;
+bool wal_pipeline_enabled = false;
+int wal_pipeline_mq_size_mb = 128;
/*
* recoveryTargetTimeLineGoal: what the user requested, if any
@@ -205,17 +208,6 @@ typedef struct XLogPageReadPrivate
/* flag to tell XLogPageRead that we have started replaying */
static bool InRedo = false;
-/*
- * Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one.
- */
-typedef enum
-{
- XLOG_FROM_ANY = 0, /* request to read WAL from any source */
- XLOG_FROM_ARCHIVE, /* restored using restore_command */
- XLOG_FROM_PG_WAL, /* existing file in pg_wal */
- XLOG_FROM_STREAM, /* streamed from primary */
-} XLogSource;
/* human-readable names for XLogSources, for debugging output */
static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"};
@@ -356,12 +348,6 @@ static void recoveryPausesHere(bool endOfRecovery);
static bool recoveryApplyDelay(XLogReaderState *record);
static void ConfirmRecoveryPaused(void);
-static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher,
- int emode, bool fetching_ckpt,
- TimeLineID replayTLI);
-
-static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr,
bool randAccess,
bool fetching_ckpt,
@@ -383,6 +369,7 @@ static bool HotStandbyActiveInReplay(void);
static void SetCurrentChunkStartTime(TimestampTz xtime);
static void SetLatestXTime(TimestampTz xtime);
+static void InitializePipelineStartupEnv(WalPipelineParams *params);
/*
* Initialization of shared memory for WAL recovery
@@ -411,9 +398,27 @@ XLogRecoveryShmemInit(void)
SpinLockInit(&XLogRecoveryCtl->info_lck);
InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ InitSharedLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
}
+/*
+ * We may not be able to share expectedTLEs list across the sharedmemory.
+ * For now just trigger the startup process (consumer) to
+ * reread the timelinehistory file whenever pipeline updates the value for
+ * expectedTLEs. So the consumer proc will expectedTLEs updated locally.
+ */
+static void
+PipelineConsumerexpectedTLEsUpdateTLI(TimeLineID recoveryTargetTLI)
+{
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->expectedTLEsUpdateTLI = recoveryTargetTLI;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+}
+
/*
* A thin wrapper to enable StandbyMode and do other preparatory work as
* needed.
@@ -429,7 +434,8 @@ EnableStandbyMode(void)
* startup progress timeout in standby mode to avoid calling
* startup_progress_timeout_handler() unnecessarily.
*/
- disable_startup_progress_timeout();
+ if (!AmWalPipeline())
+ disable_startup_progress_timeout();
}
/*
@@ -489,7 +495,10 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
* recovery, if required.
*/
if (ArchiveRecoveryRequested)
+ {
+ OwnLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ }
/*
* Set the WAL reading processor now, as it will be needed when reading
@@ -961,6 +970,14 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
minRecoveryPointTLI = 0;
}
+ /* update shared state. */
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
/*
* Start recovery assuming that the final record isn't lost.
*/
@@ -972,6 +989,12 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
*haveTblspcMap_ptr = haveTblspcMap;
}
+void DisownRecoveryWakeupLatch()
+{
+ if (ArchiveRecoveryRequested)
+ DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
+
/*
* See if there are any recovery signal files and if so, set state for
* recovery.
@@ -1420,6 +1443,15 @@ FinishWalRecovery(void)
TimeLineID lastRecTLI;
XLogRecPtr endOfLog;
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ InArchiveRecovery = XLogRecoveryCtl->InArchiveRecovery;
+ missingContrecPtr = XLogRecoveryCtl->missingContrecPtr;
+ abortedRecPtr = XLogRecoveryCtl->abortedRecPtr;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
/*
* Kill WAL receiver, if it's still running, before we continue to write
* the startup checkpoint and aborted-contrecord records. It will trump
@@ -1477,6 +1509,17 @@ FinishWalRecovery(void)
lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr;
lastRecTLI = XLogRecoveryCtl->lastReplayedTLI;
}
+
+ /*
+ * Invalidate contents of internal buffer before read attempt. Just set
+ * the length to 0, rather than a full XLogReaderInvalReadState().
+ *
+ * This is needed because we could be reading from the pipeline reader so
+ * far, so before moving back the the startup proc readerstate better to
+ * invalidate it.
+ */
+ xlogreader->readLen = 0;
+
XLogPrefetcherBeginRead(xlogprefetcher, lastRec);
(void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI);
endOfLog = xlogreader->EndRecPtr;
@@ -1599,7 +1642,69 @@ ShutdownWalRecovery(void)
* it, but let's do it for the sake of tidiness.
*/
if (ArchiveRecoveryRequested)
- DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ {
+ DisownLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
+
+ /*
+ * Only disown the latch if we were the owner (pipeline disabled).
+ */
+ if (!wal_pipeline_enabled)
+ DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ }
+
+
+}
+
+/*
+ * Get next record for redo.
+ * Use the pipeline if enabled for parallel decoding and receive decoded
+ * records from a shared queue, else read it directly.
+ */
+static XLogRecord *
+ReceiveRecord(XLogPrefetcher *xlogprefetcher, int emode,
+ bool fetching_ckpt, TimeLineID replayTLI,
+ XLogReaderState **localreader, bool first_iteration)
+{
+
+ XLogRecord *record = NULL;
+ XLogReaderState *reader = *localreader;
+
+ /*
+ * If pipeline not enabled read the record directly
+ */
+ if (!wal_pipeline_enabled)
+ {
+ record = ReadRecord(xlogprefetcher, emode, fetching_ckpt, replayTLI);
+ return record;
+ }
+
+ /*
+ * Get record from the pipeline
+ */
+ if (WalPipeline_IsActive())
+ {
+ DecodedXLogRecord *decoded_record = NULL;
+
+ decoded_record = WalPipeline_ReceiveRecord(reader, first_iteration);
+
+ if (decoded_record)
+ {
+ record = &decoded_record->header;
+ return record;
+ }
+ else
+ {
+ /*
+ * We will end up here only when pipeline couldn't read more
+ * records and have sent a shutdown msg. We will acknowldge this
+ * and will trigger request to stop the pipeline workers.
+ */
+ WalPipeline_Stop();
+ return NULL;
+ }
+ }
+
+ elog(PANIC, "[walpipeline] consumer: pipeline not active, even though wal_pipeline is set to on.");
}
/*
@@ -1614,6 +1719,12 @@ PerformWalRecovery(void)
bool reachedRecoveryTarget = false;
TimeLineID replayTLI;
+ /*
+ * standalone backend may exist in case of pg_rewind.
+ */
+ if (!IsUnderPostmaster)
+ wal_pipeline_enabled = false;
+
/*
* Initialize shared variables for tracking progress of WAL replay, as if
* we had just replayed the record before the REDO location (or the
@@ -1688,11 +1799,24 @@ PerformWalRecovery(void)
{
TimestampTz xtime;
PGRUsage ru0;
+ uint64 loop_count = 0;
pg_rusage_init(&ru0);
InRedo = true;
+ if(wal_pipeline_enabled)
+ {
+ /*
+ * Startup proc parameters that pipeline shold also be aware of.
+ */
+ WalPipelineParams *params = palloc0(sizeof(WalPipelineParams));
+
+ params->ReplayTLI = replayTLI;
+ InitializePipelineStartupEnv(params);
+ WalPipeline_Start(params);
+ }
+
RmgrStartup();
ereport(LOG,
@@ -1798,7 +1922,7 @@ PerformWalRecovery(void)
}
/* Else, try to fetch the next WAL record */
- record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
+ record = ReceiveRecord(xlogprefetcher, LOG, false, replayTLI, &xlogreader, loop_count++ == 0);
} while (record != NULL);
/*
@@ -1807,6 +1931,9 @@ PerformWalRecovery(void)
if (reachedRecoveryTarget)
{
+ if (wal_pipeline_enabled)
+ WalPipeline_Stop();
+
if (!reachedConsistency)
ereport(FATAL,
(errmsg("requested recovery stop point is before consistent recovery point")));
@@ -1841,6 +1968,8 @@ PerformWalRecovery(void)
RmgrCleanup();
+ // XXX: testing purpose only
+ ereport(DEBUG1, (errmsg("replay loop fiinished with loop count: " UINT64_FORMAT, loop_count)));
ereport(LOG,
errmsg("redo done at %X/%08X system usage: %s",
LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
@@ -1879,7 +2008,9 @@ static void
ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI)
{
ErrorContextCallback errcallback;
+ XLogRecPtr walrcv_flushedupto;
bool switchedTLI = false;
+ bool pipeline_enabled_stanby = false;
/* Setup error traceback support for ereport() */
errcallback.callback = rm_redo_error_callback;
@@ -1980,6 +2111,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
XLogRecoveryCtl->lastReplayedReadRecPtr = xlogreader->ReadRecPtr;
XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr;
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
+ walrcv_flushedupto = XLogRecoveryCtl->flushedUptoRecPtr;
+ pipeline_enabled_stanby = XLogRecoveryCtl->stanbyEnabled;
SpinLockRelease(&XLogRecoveryCtl->info_lck);
/* ------
@@ -2018,6 +2151,16 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
WalRcvForceReply();
}
+ /*
+ * As the pipeline (producer) was running way ahead of the startup proc
+ * (consumer), see if the producer asked to wakeup the wal_reciever by
+ * updating the value of `flushedUptoRecPtr`.
+ */
+ if ((walrcv_flushedupto != InvalidXLogRecPtr) &&
+ ((walrcv_flushedupto == xlogreader->EndRecPtr) ||
+ (walrcv_flushedupto == xlogreader->ReadRecPtr)))
+ WalRcvForceReply();
+
/* Allow read-only connections if we're consistent now */
CheckRecoveryConsistency();
@@ -2033,6 +2176,14 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
/* Reset the prefetcher. */
XLogPrefetchReconfigure();
}
+
+ /* Conusmer should also enable the standby if pipline have */
+ if (pipeline_enabled_stanby)
+ EnableStandbyMode();
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->applied_lsn = xlogreader->EndRecPtr;
+ SpinLockRelease(&WalPipelineShm->mutex);
}
/*
@@ -2158,12 +2309,27 @@ CheckRecoveryConsistency(void)
Assert(InArchiveRecovery);
- /*
- * assume that we are called in the startup process, and hence don't need
- * a lock to read lastReplayedEndRecPtr
- */
- lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
- lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+
+ if (AmStartupProcess())
+ {
+ /*
+ * assume that we are called in the startup process, and hence don't need
+ * a lock to read lastReplayedEndRecPtr
+ */
+ lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
+ lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+ }
+ else
+ {
+ /*
+ * We could be in the pipeline worker, so update the shared states.
+ */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
+ lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+ standbyState = XLogRecoveryCtl->standbyState;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
/*
* Have we reached the point where our base backup was completed?
@@ -2350,12 +2516,28 @@ static void
checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI,
TimeLineID replayTLI)
{
+
+
/* Check that the record agrees on what the current (old) timeline is */
if (prevTLI != replayTLI)
ereport(PANIC,
(errmsg("unexpected previous timeline ID %u (current timeline ID %u) in checkpoint record",
prevTLI, replayTLI)));
+
+ /* Pipeline may have updated the expectedTLEs */
+ if (wal_pipeline_enabled)
+ {
+ TimeLineID targetTLI;
+
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ targetTLI = XLogRecoveryCtl->expectedTLEsUpdateTLI;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+
+ if (targetTLI)
+ expectedTLEs = readTimeLineHistory(targetTLI);
+ }
+
/*
* The new timeline better be in the list of timelines we expect to see,
* according to the timeline history. It should also not decrease.
@@ -3003,7 +3185,7 @@ recoveryApplyDelay(XLogReaderState *record)
while (true)
{
- ResetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ ResetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
/* This might change recovery_min_apply_delay. */
ProcessStartupProcInterrupts();
@@ -3028,7 +3210,7 @@ recoveryApplyDelay(XLogReaderState *record)
elog(DEBUG2, "recovery apply delay %ld milliseconds", msecs);
- (void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
+ (void) WaitLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
msecs,
WAIT_EVENT_RECOVERY_APPLY_DELAY);
@@ -3100,7 +3282,7 @@ ConfirmRecoveryPaused(void)
* (emode must be either PANIC, LOG). In standby mode, retries until a valid
* record is available.
*/
-static XLogRecord *
+XLogRecord *
ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
bool fetching_ckpt, TimeLineID replayTLI)
{
@@ -3108,7 +3290,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher);
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
/* Pass through parameters to XLogPageRead */
private->fetching_ckpt = fetching_ckpt;
@@ -3143,6 +3325,17 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
{
abortedRecPtr = xlogreader->abortedRecPtr;
missingContrecPtr = xlogreader->missingContrecPtr;
+
+ /*
+ * Also update the shared state if necessary
+ */
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->abortedRecPtr = abortedRecPtr;
+ XLogRecoveryCtl->missingContrecPtr = missingContrecPtr;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
}
if (readFile >= 0)
@@ -3210,9 +3403,31 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
if (!InArchiveRecovery && ArchiveRecoveryRequested &&
!fetching_ckpt)
{
+ /*
+ * Wait for the startup process to apply the last sent record
+ * by the pipeline, otherwise we will fail the consistency
+ * check as all the records decoded by the pipeline have not
+ * arrived/consumed by the consumer (statup proc) yet.
+ */
+ if (wal_pipeline_enabled && AmWalPipeline())
+ WalPipeline_WaitForConsumerCatchup();
+
ereport(DEBUG1,
(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
InArchiveRecovery = true;
+
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+
+ /* update startup proc (consumer) about the standbymode */
+ if (StandbyModeRequested)
+ XLogRecoveryCtl->stanbyEnabled = true;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
if (StandbyModeRequested)
EnableStandbyMode();
@@ -3269,7 +3484,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
*/
-static int
+int
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *readBuf)
{
@@ -3281,7 +3496,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
int r;
instr_time io_start;
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
XLByteToSeg(targetPagePtr, targetSegNo, wal_segment_size);
targetPageOff = XLogSegmentOffset(targetPagePtr, wal_segment_size);
@@ -3707,7 +3922,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
LSN_FORMAT_ARGS(RecPtr));
/* Do background tasks that might benefit us later. */
- KnownAssignedTransactionIdsIdleMaintenance();
+ if (AmStartupProcess())
+ KnownAssignedTransactionIdsIdleMaintenance();
(void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
WL_LATCH_SET | WL_TIMEOUT |
@@ -3719,6 +3935,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/* Handle interrupt signals of startup process */
ProcessStartupProcInterrupts();
+ if (wal_pipeline_enabled)
+ ProcessPipelineBgwInterrupts();
}
last_fail_time = now;
currentSource = XLOG_FROM_ARCHIVE;
@@ -3744,6 +3962,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
xlogSourceNames[oldSource], xlogSourceNames[currentSource],
lastSourceFailed ? "failure" : "success");
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->currentSource = currentSource;
+ pendingWalRcvRestart = XLogRecoveryCtl->pendingWalRcvRestart;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
/*
* We've now handled possible failure. Try to read from the chosen
* source.
@@ -3818,6 +4045,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
}
pendingWalRcvRestart = false;
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->pendingWalRcvRestart = false;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
/*
* Launch walreceiver if needed.
*
@@ -3895,6 +4130,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (latestChunkStart <= RecPtr)
{
XLogReceiptTime = GetCurrentTimestamp();
+
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
SetCurrentChunkStartTime(XLogReceiptTime);
}
}
@@ -3923,7 +4167,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (readFile < 0)
{
if (!expectedTLEs)
+ {
expectedTLEs = readTimeLineHistory(recoveryTargetTLI);
+ PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI);
+ }
+
+
readFile = XLogFileRead(readSegNo, receiveTLI,
XLOG_FROM_STREAM, false);
Assert(readFile >= 0);
@@ -3933,6 +4182,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/* just make sure source info is correct... */
readSource = XLOG_FROM_STREAM;
XLogReceiptSource = XLOG_FROM_STREAM;
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
return XLREAD_SUCCESS;
}
break;
@@ -3970,15 +4226,35 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/
if (!streaming_reply_sent)
{
- WalRcvForceReply();
- streaming_reply_sent = true;
+ if (wal_pipeline_enabled && AmWalPipeline())
+ {
+ /*
+ * In case of pipeline enabled, we cannot just call
+ * WalRcvForceReply() directly as the consumer (startup proc)
+ * haven't actually received/replayed all the wal
+ * received from the wal_receiver yet.
+ */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->flushedUptoRecPtr = flushedUpto;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ streaming_reply_sent = true;
+ }
+ else
+ {
+ WalRcvForceReply();
+ streaming_reply_sent = true;
+ }
}
/* Do any background tasks that might benefit us later. */
- KnownAssignedTransactionIdsIdleMaintenance();
+ if (AmStartupProcess())
+ KnownAssignedTransactionIdsIdleMaintenance();
/* Update pg_stat_recovery_prefetch before sleeping. */
- XLogPrefetcherComputeStats(xlogprefetcher);
+ if (AmWalPipeline())
+ XLogPrefetcherComputeStats(xlogprefetcher_pipelined);
+ else
+ XLogPrefetcherComputeStats(xlogprefetcher);
/*
* Wait for more WAL to arrive, when we will be woken
@@ -4009,6 +4285,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* process.
*/
ProcessStartupProcInterrupts();
+ if (wal_pipeline_enabled)
+ ProcessPipelineBgwInterrupts();
}
return XLREAD_FAIL; /* not reached */
@@ -4173,6 +4451,7 @@ rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN)
recoveryTargetTLI = newtarget;
list_free_deep(expectedTLEs);
expectedTLEs = newExpectedTLEs;
+ PipelineConsumerexpectedTLEsUpdateTLI(newtarget);
/*
* As in StartupXLOG(), try to ensure we have all the history files
@@ -4262,6 +4541,15 @@ XLogFileRead(XLogSegNo segno, TimeLineID tli,
if (source != XLOG_FROM_STREAM)
XLogReceiptTime = GetCurrentTimestamp();
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+ XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
return fd;
}
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -4346,7 +4634,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
{
elog(DEBUG1, "got WAL segment from archive");
if (!expectedTLEs)
+ {
expectedTLEs = tles;
+ PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI);
+ }
return fd;
}
}
@@ -4357,7 +4648,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
if (fd != -1)
{
if (!expectedTLEs)
+ {
expectedTLEs = tles;
+ PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI);
+ }
return fd;
}
}
@@ -4379,16 +4673,52 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
void
StartupRequestWalReceiverRestart(void)
{
+
+ /*
+ * currentSource is also defined as pipeline shared state variable.
+ * Update the state before procedding.
+ */
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ currentSource = XLogRecoveryCtl->currentSource;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
if (currentSource == XLOG_FROM_STREAM && WalRcvRunning())
{
ereport(LOG,
(errmsg("WAL receiver process shutdown requested")));
pendingWalRcvRestart = true;
+
+ /*
+ * pendingWalRcvRestart is also defined as pipeline shared state variable.
+ * Update the state before procedding.
+ */
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
}
}
+/*
+ * standbyState is also defined as a shared state. Pipeline worker can also
+ * update its value, so always confirm the shared state before procedding.
+ */
+void
+SetSharedHotStandbyState(void)
+{
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->standbyState = standbyState;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+}
+
+
/*
* Has a standby promotion already been triggered?
*
@@ -4440,7 +4770,7 @@ CheckForStandbyTrigger(void)
if (LocalPromoteIsTriggered)
return true;
- if (IsPromoteSignaled() && CheckPromoteSignal())
+ if (CheckPromoteSignal())
{
ereport(LOG, (errmsg("received promote request")));
RemovePromoteSignalFiles();
@@ -4483,6 +4813,7 @@ void
WakeupRecovery(void)
{
SetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ SetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
}
/*
@@ -4652,6 +4983,14 @@ GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream)
*/
Assert(InRecovery);
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogReceiptTime = XLogRecoveryCtl->XLogReceiptTime;
+ XLogReceiptSource = XLogRecoveryCtl->XLogReceiptSource;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
*rtime = XLogReceiptTime;
*fromStream = (XLogReceiptSource == XLOG_FROM_STREAM);
}
@@ -4737,6 +5076,60 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue
}
}
+static void
+InitializePipelineStartupEnv(WalPipelineParams *params)
+{
+ /*
+ * These parameters are already set for the startup process but not for our
+ * pipeline worker. So in order to start decoding through the pipeline,
+ * these variables should be saved and then restored later.
+ */
+ params->NextRecPtr = xlogreader->NextRecPtr;
+ params->recoveryTargetTLI = recoveryTargetTLI;
+ params->StandbyModeRequested = StandbyModeRequested;
+ params->StandbyMode = StandbyMode;
+ params->ArchiveRecoveryRequested = ArchiveRecoveryRequested;
+ params->InArchiveRecovery = InArchiveRecovery;
+ params->minRecoveryPointTLI = minRecoveryPointTLI;
+ params->minRecoveryPoint = minRecoveryPoint;
+ params->InRedo = InRedo;
+ params->currentSource = currentSource;
+ params->lastSourceFailed = lastSourceFailed;
+ params->pendingWalRcvRestart = pendingWalRcvRestart;
+ params->RedoStartTLI = RedoStartTLI;
+ params->CheckPointLoc = CheckPointLoc;
+ params->CheckPointTLI = CheckPointTLI;
+ params->RedoStartLSN = RedoStartLSN;
+ params->standbyState = standbyState;
+ params->flushedUpto = flushedUpto;
+ params->receiveTLI = receiveTLI;
+ params->abortedRecPtr = abortedRecPtr;
+ params->missingContrecPtr = missingContrecPtr;
+ params->backupEndRequired = backupEndRequired;
+ params->backupStartPoint = backupStartPoint;
+ params->backupEndPoint = backupEndPoint;
+ params->curFileTLI = curFileTLI;
+
+ /*
+ * The pipeline will do the waiting in this case startup proc should disown
+ * the latch.
+ */
+ DisownRecoveryWakeupLatch();
+
+ /*
+ * Update shared state before starting.
+ */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+ XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart;
+ XLogRecoveryCtl->abortedRecPtr = abortedRecPtr;
+ XLogRecoveryCtl->missingContrecPtr = missingContrecPtr;
+ XLogRecoveryCtl->currentSource = currentSource;
+ XLogRecoveryCtl->standbyState = standbyState;
+ XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+ XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+}
/*
* Pipeline bgw should be aware of all the parameters thats been initialized by
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index f3ad90c7c7a..dbf0d39212a 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1196,6 +1196,7 @@ standby_redo(XLogReaderState *record)
running.xids = xlrec->xids;
ProcArrayApplyRecoveryInfo(&running);
+ SetSharedHotStandbyState();
/*
* The startup process currently has no convenient way to schedule
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
index 5740b05f79c..e2be9f9e395 100644
--- a/src/include/access/xlogpipeline.h
+++ b/src/include/access/xlogpipeline.h
@@ -173,12 +173,27 @@ extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined;
extern Size WalPipelineShmemSize(void);
extern void WalPipelineShmemInit(void);
+/* Start/stop the pipeline */
+extern void WalPipeline_Start(WalPipelineParams *params);
+extern void WalPipeline_Stop(void);
+
/* Producer functions (called by background worker) */
extern void WalPipeline_ProducerMain(Datum main_arg);
extern bool WalPipeline_SendRecord(XLogReaderState *record);
extern bool WalPipeline_SendShutdown(void);
extern bool WalPipeline_SendError(int errcode, const char *errmsg);
+/* Consumer functions (called by startup process) */
+extern DecodedXLogRecord *WalPipeline_ReceiveRecord(XLogReaderState *startup_reader, bool first_iteration);
+extern bool WalPipeline_CheckProducerAlive(void);
+
+/* Status and monitoring */
+extern bool WalPipeline_IsActive(void);
+extern void WalPipeline_WaitForConsumerCatchup(void);
+extern void WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+ XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn);
+extern bool AmWalPipeline(void);
+
extern void ProcessPipelineBgwInterrupts(void);
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index e675ab8353d..b61aad08627 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -13,6 +13,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
+#include "access/xlogutils.h"
#include "catalog/pg_control.h"
#include "lib/stringinfo.h"
#include "storage/condition_variable.h"
@@ -106,6 +107,13 @@ typedef struct XLogRecoveryCtlData
*/
Latch recoveryWakeupLatch;
+ /*
+ * In case pipeline enabled we will need two latches. One that can be used
+ * by the pipeline for WAL waiting and other that can be used by the
+ * startup process for the apply delay
+ */
+ Latch recoveryApplyDelayLatch;
+
/*
* Last record successfully replayed.
*/
@@ -133,6 +141,46 @@ typedef struct XLogRecoveryCtlData
ConditionVariable recoveryNotPausedCV;
slock_t info_lck; /* locks shared variables shown above */
+
+ /* ------------------------------------------------------------------
+ * Variables use for IPC between pipeline and the startup proc.
+ * These are also the static variables in xlogrecovery.c but there values
+ * keep on changing. So we added then in the shared memory so that both
+ * the pipeline and the startup proc stay synced on any of this state
+ * change
+ * ------------------------------------------------------------------
+ */
+
+ /*
+ * Pipeline could be waiting for the startup process to catchup with the
+ * decoder. This could happend when no wait wal is available from the
+ * current resource and now pipline have change the wal srouce
+ * i.e enabling standby if requested.
+ */
+ bool pipeline_waiting;
+ bool InArchiveRecovery;
+ bool pendingWalRcvRestart;
+ bool stanbyEnabled;
+
+ /* The target TLI for which expectedTLEs should be recomputed */
+ TimeLineID expectedTLEsUpdateTLI;
+
+ /*
+ * Normaly we wakeup walrcvr after specific records have been applied, as
+ * reads are sequential so we wkaeup after specific read. But in case of pipeline
+ * reads (decoded records) could be way ahead of the consumer. We cannot wakeup
+ * wal rcvr based on read, so we tell consumer to wakup after apllied records
+ * upto flushedUptoRecPtr
+ */
+ XLogRecPtr flushedUptoRecPtr;
+ XLogRecPtr abortedRecPtr;
+ XLogRecPtr missingContrecPtr;
+
+ XLogSource currentSource;
+ XLogSource XLogReceiptSource;
+
+ HotStandbyState standbyState;
+ TimestampTz XLogReceiptTime;
} XLogRecoveryCtlData;
extern PGDLLIMPORT XLogRecoveryCtlData *XLogRecoveryCtl;
@@ -242,6 +290,8 @@ extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, i
extern bool PromoteIsTriggered(void);
extern bool CheckPromoteSignal(void);
extern void WakeupRecovery(void);
+extern void DisownRecoveryWakeupLatch(void);
+extern void SetSharedHotStandbyState(void);
extern void StartupRequestWalReceiverRestart(void);
extern void XLogRequestWalReceiverReply(void);
--
2.34.1
[application/pdf] recoveries-becnhmark-v02.pdf (48.6K, 5-recoveries-becnhmark-v02.pdf)
download
[application/x-xz] recoveries-benchmark-v02.tar.xz (1.4M, 6-recoveries-benchmark-v02.tar.xz)
download
^ permalink raw reply [nested|flat] 9+ messages in thread
* Re: [WIP] Pipelined Recovery
2026-02-17 11:21 Re: [WIP] Pipelined Recovery Zsolt Parragi <[email protected]>
2026-03-18 07:43 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-03-18 10:18 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
@ 2026-03-19 02:33 ` Xuneng Zhou <[email protected]>
2026-04-03 06:58 ` Re: [WIP] Pipelined Recovery Henson Choi <[email protected]>
0 siblings, 1 reply; 9+ messages in thread
From: Xuneng Zhou @ 2026-03-19 02:33 UTC (permalink / raw)
To: Imran Zaheer <[email protected]>; +Cc: Zsolt Parragi <[email protected]>; Jakub Wartak <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; pgsql-hackers
Hi Imran,
On Wed, Mar 18, 2026 at 8:06 PM Imran Zaheer <[email protected]> wrote:
>
> (resending the mail, previous mail was held for moderation for some reason.)
>
> Hi
>
> I am attaching a new rebased version of the patch. Following are some major changes in the new patch set.
>
> * Streaming replication is now working. The prefetcher was not fully decoupled from the startup process; that's why there were inconsistencies in some scenarios and most of the recovery tap tests were failing.
>
> * Patch is now split into consumer and producer patches. This will make review easier.
>
> * Pipeline shutdown flow is also improved. Now producer will always check for the shutdown flag (being set by the consumer)
>
> * Pipeline msg queue size is now configurable `wal_pipeline_queue_size`
>
> * Tap tests now passes with PG_TEST_INITDB_EXTRA_OPTS="-c wal_pipeline=on"
>
> * New tap test for `recovery/t/053_walpipeline.pl`. This covers some basic functionality of the pipeline.
>
> * The filename is changed to xlogpipeline.{h|C}
>
> Thanks to all for the valuable feedback.
> Looking forward to your reviews, comments, etc.
>
> --
> Regards,
> Imran Zaheer
>
>
> On Wed, Mar 18, 2026 at 3:15 PM Imran Zaheer <[email protected]> wrote:
>>
>> (resending the mail, previous mail was held for moderation for some reason. Now pdf is moved to the tar.gz)
>>
>> Hi
>>
>> I am attaching a new rebased version of the patch. Following are some major changes in the new patch set.
>>
>> * Streaming replication is now working. The prefetcher was not fully decoupled from the startup process; that's why there were inconsistencies in some scenarios and most of the recovery tap tests were failing.
>>
>> * Patch is now split into consumer and producer patches. This will make review easier.
>>
>> * Pipeline shutdown flow is also improved. Now producer will always check for the shutdown flag (being set by the consumer)
>>
>> * Pipeline msg queue size is now configurable `wal_pipeline_queue_size`
>>
>> * Tap tests now passes with PG_TEST_INITDB_EXTRA_OPTS="-c wal_pipeline=on"
>>
>> * New tap test for `recovery/t/053_walpipeline.pl`. This covers some basic functionality of the pipeline.
>>
>> * The filename is changed to xlogpipeline.{h|C}
>>
>> Thanks to all for the valuable feedback.
>> Looking forward to your reviews, comments, etc.
>>
>> --
>> Regards,
>> Imran Zaheer
>>
>>
>> On Wed, Mar 18, 2026 at 12:43 PM Imran Zaheer <[email protected]> wrote:
>>>
>>> Hi Zsolt.
>>>
>>> Thanks alot for the review and pointing out the bugs. I have fixed the bugs you mentioned in my new patch set. But
>>> patchset mail is held for moderation for some reason.
>>>
>>> >
>>> > if (reachedRecoveryTarget)
>>> > {
>>> > + if (wal_pipeline_enabled)
>>> > + WalPipeline_Stop();
>>> >
>>> > What if we didn't reach the recovery target, shouldn't we stop the
>>> > pipelines then?
>>> >
>>>
>>> I have fixed the bugs shutdown logic.
>>>
>>> As we already know we will exist the recovery redo loop in `PerformWalRecovery()` only in two cases
>>>
>>> 1: Recovery target reached:
>>> In this case consumers will call to stop the pipeline.
>>>
>>> @@ -1807,6 +1931,9 @@ PerformWalRecovery(void)
>>>
>>> if (reachedRecoveryTarget)
>>> {
>>> + if (wal_pipeline_enabled)
>>> + WalPipeline_Stop();
>>> +
>>>
>>> 2: Available pg_wal is consumed and now more wal to read.
>>> In this case pipeline producers will send the shutdown msg to the consumer. Consumer will
>>> detect this message and then call ` WalPipeline_Stop`. This is the case where we cannot read
>>> more records and the while loop will break here.
>>>
>>> + if (decoded_record)
>>> + {
>>> + record = &decoded_record->header;
>>> + return record;
>>> + }
>>> + else
>>> + {
>>> + /*
>>> + * We will end up here only when pipeline couldn't read more
>>> + * records and have sent a shutdown msg. We will acknowldge this
>>> + * and will trigger request to stop the pipeline workers.
>>> + */
>>> + WalPipeline_Stop();
>>> + return NULL;
>>> + }
>>>
>>>
>>> Hope this makes sense.
>>>
>>> Once again thanks for reporting the bugs. You will receive the new patchset mail soon once it is cleared from
>>> the moderation.
>>>
>>> Looking forward to your reviews, comments, etc.
>>>
>>> Regards,
>>> Imran Zaheer
Thanks for this patch—it’s quite interesting. To my knowledge, there
have been prior attempts to introduce parallelism into recovery, as
you mentioned in your earlier email.
I’m curious how this approach differs from those previous efforts, and
why those attempts ultimately did not land. I imagine there were
constraints or complexities involved. It would be valuable to
understand what lessons can be drawn from them.
It also raises an implicit question: what makes the current approach
more promising—whether due to a simpler design or improved
performance.
While these may not be directly related to your current proposal, the
insights and experience from earlier work could help guide the
development and shape the direction of this patch. Of course, some of
this context can be pieced together from mailing list discussions and
past talks, but doing so raises the bar for future reviewers. Any
additional background you can share would be very helpful.
--
Best,
Xuneng
^ permalink raw reply [nested|flat] 9+ messages in thread
* Re: [WIP] Pipelined Recovery
2026-02-17 11:21 Re: [WIP] Pipelined Recovery Zsolt Parragi <[email protected]>
2026-03-18 07:43 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-03-18 10:18 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-03-19 02:33 ` Re: [WIP] Pipelined Recovery Xuneng Zhou <[email protected]>
@ 2026-04-03 06:58 ` Henson Choi <[email protected]>
2026-04-08 08:46 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
0 siblings, 1 reply; 9+ messages in thread
From: Henson Choi @ 2026-04-03 06:58 UTC (permalink / raw)
To: Xuneng Zhou <[email protected]>; Imran Zaheer <[email protected]>; +Cc: Zsolt Parragi <[email protected]>; Jakub Wartak <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; pgsql-hackers
Hi Xuneng, Imran, and everyone,
I’m curious how this approach differs from those previous efforts, and
> why those attempts ultimately did not land.
There is directly relevant prior art that may be worth looking at.
Koichi Suzuki presented parallel recovery at PGCon 2023 [1] and
published a detailed design on the PostgreSQL wiki [2] with a working
prototype on GitHub.
Koichi's approach is quite different from the current patch: instead of
pipelining decode, he parallelizes redo itself by dispatching WAL
records to block workers based on page identity. The key rule is that
for a given page, WAL records are applied in written order, but
different pages can be replayed in parallel by different workers.
His design uses a dispatcher to route records to workers, with
synchronization needed for multi-block WAL records. One thing I
wondered is whether the dispatcher could be avoided entirely: if each
child simply reads the whole WAL stream on its own and skips blocks
that are not assigned to it, there would be no IPC and no need to
coordinate multi-block records across workers.
The hard problem he ran into was Hot Standby visibility: when index and
heap pages are replayed by different workers at different speeds,
concurrent queries can see inconsistent state. The wiki itself notes
the idea is to "use this when hot standby is disabled." As far as I
know, this was never submitted as a patch to hackers.
It also raises an implicit question: what makes the current approach
> more promising—whether due to a simpler design or improved
> performance.
>
The two approaches target different bottlenecks. The current patch
parallelizes WAL decoding, which keeps the redo path single-threaded
and avoids the Hot Standby visibility problem entirely.
One thing I am curious about in the current patch: WAL records are
already in a serialized format on disk. The producer decodes them and
then re-serializes into a different custom format for shm_mq. What is
the advantage of this second serialization format over simply passing
the raw WAL bytes after CRC validation and letting the consumer decode
directly? Offloading CRC to a separate core could still improve
throughput at the cost of higher total CPU usage, without needing the
custom format.
Koichi's approach parallelizes redo (buffer I/O) itself, which attacks
a larger cost — Jakub's flamegraphs show BufferAlloc ->
GetVictimBuffer -> FlushBuffer dominating in both p0 and p1 — but at
the expense of much harder concurrency problems.
Whether the decode pipelining ceiling is high enough, or whether the
redo parallelization complexity is tractable, seems like the central
design question for this area.
[1]
https://www.pgcon.org/2023/schedule/session/392-parallel-recovery-in-postgresql/
[2] https://wiki.postgresql.org/wiki/Parallel_Recovery
Best regards,
Henson
^ permalink raw reply [nested|flat] 9+ messages in thread
* Re: [WIP] Pipelined Recovery
2026-02-17 11:21 Re: [WIP] Pipelined Recovery Zsolt Parragi <[email protected]>
2026-03-18 07:43 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-03-18 10:18 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-03-19 02:33 ` Re: [WIP] Pipelined Recovery Xuneng Zhou <[email protected]>
2026-04-03 06:58 ` Re: [WIP] Pipelined Recovery Henson Choi <[email protected]>
@ 2026-04-08 08:46 ` Imran Zaheer <[email protected]>
2026-04-08 11:14 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
0 siblings, 1 reply; 9+ messages in thread
From: Imran Zaheer @ 2026-04-08 08:46 UTC (permalink / raw)
To: [email protected]; +Cc: Xuneng Zhou <[email protected]>; Zsolt Parragi <[email protected]>; Jakub Wartak <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; pgsql-hackers
>
> Hi Xuneng, Imran, and everyone,
>
Hi Henson and Xuneng.
Thanks for explaining the approaches to Xuneng.
>
> The two approaches target different bottlenecks. The current patch
> parallelizes WAL decoding, which keeps the redo path single-threaded
> and avoids the Hot Standby visibility problem entirely.
>
You are right both approaches
target different bottlenecks. Pipeline patch aims to improve overall
cpu throughput
and to save CPU time by offloading the steps we can safely do in parallel with
out causing synchronization problems.
> One thing I am curious about in the current patch: WAL records are
> already in a serialized format on disk. The producer decodes them and
> then re-serializes into a different custom format for shm_mq. What is
> the advantage of this second serialization format over simply passing
> the raw WAL bytes after CRC validation and letting the consumer decode
> directly? Offloading CRC to a separate core could still improve
> throughput at the cost of higher total CPU usage, without needing the
> custom format.
>
Thanks. You are right there was no need to serialize the decoded record again.
I was not aware that we already have continuous bytes in memory. In my
next patch
I will remove this extra serialization step.
> Koichi's approach parallelizes redo (buffer I/O) itself, which attacks
> a larger cost — Jakub's flamegraphs show BufferAlloc ->
> GetVictimBuffer -> FlushBuffer dominating in both p0 and p1 — but at
> the expense of much harder concurrency problems.
>
> Whether the decode pipelining ceiling is high enough, or whether the
> redo parallelization complexity is tractable, seems like the central
> design question for this area.
I still have to investigate the problem related to `GetVictimBuffer` that
Jakub mentioned. But I was trying that how can we safely offload the work done
by `XLogReadBufferForRedoExtended` to a separate
pipeline worker, or maybe we can try prefetching the buffer header so
the main redo
loop doesn't have to spend time getting the buffer
Thanks for the feedback. That was helpful.
Regards,
Imran Zaheer
^ permalink raw reply [nested|flat] 9+ messages in thread
* Re: [WIP] Pipelined Recovery
2026-02-17 11:21 Re: [WIP] Pipelined Recovery Zsolt Parragi <[email protected]>
2026-03-18 07:43 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-03-18 10:18 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-03-19 02:33 ` Re: [WIP] Pipelined Recovery Xuneng Zhou <[email protected]>
2026-04-03 06:58 ` Re: [WIP] Pipelined Recovery Henson Choi <[email protected]>
2026-04-08 08:46 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
@ 2026-04-08 11:14 ` Imran Zaheer <[email protected]>
2026-04-22 09:43 ` Re: [WIP] Pipelined Recovery Xuneng Zhou <[email protected]>
0 siblings, 1 reply; 9+ messages in thread
From: Imran Zaheer @ 2026-04-08 11:14 UTC (permalink / raw)
To: [email protected]; +Cc: Xuneng Zhou <[email protected]>; Zsolt Parragi <[email protected]>; Jakub Wartak <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; pgsql-hackers
Hi
I am uploading the new version with the following fixes
* Rebased version.
* Skip serialization of decoded records. As pointed out by Henson,
there was no need to serialize the records again
for the sh_mq. We can simply pass the continuous bytes with minor
pointer fixing to the sh_mq
This time I am uploading the benchmarking results to drive and
attaching the link here. Otherwise my mail will get holded for
moderation (My guess is overall attachment size is greater than 1MB thats why).
I am still not sure whether my testing approach is good enough.
Because sometimes I am not able to get the same performance
improvement
with the pgbench builtin scripts as I got with the custom sql scripts.
Maybe pgbench is not creating enough WAL to test on
or maybe I am missing something.
Benchmarks: https://drive.google.com/file/d/1Y4SYVnrFEQRE5T2r87rrTr7SWC9m19Si/view?usp=sharing
Thanks & Regards
Imran Zaheer
Imran Zaheer
On Wed, Apr 8, 2026 at 1:46 PM Imran Zaheer <[email protected]> wrote:
>
> >
> > Hi Xuneng, Imran, and everyone,
> >
>
> Hi Henson and Xuneng.
>
> Thanks for explaining the approaches to Xuneng.
>
> >
> > The two approaches target different bottlenecks. The current patch
> > parallelizes WAL decoding, which keeps the redo path single-threaded
> > and avoids the Hot Standby visibility problem entirely.
> >
>
> You are right both approaches
> target different bottlenecks. Pipeline patch aims to improve overall
> cpu throughput
> and to save CPU time by offloading the steps we can safely do in parallel with
> out causing synchronization problems.
>
> > One thing I am curious about in the current patch: WAL records are
> > already in a serialized format on disk. The producer decodes them and
> > then re-serializes into a different custom format for shm_mq. What is
> > the advantage of this second serialization format over simply passing
> > the raw WAL bytes after CRC validation and letting the consumer decode
> > directly? Offloading CRC to a separate core could still improve
> > throughput at the cost of higher total CPU usage, without needing the
> > custom format.
> >
>
> Thanks. You are right there was no need to serialize the decoded record again.
> I was not aware that we already have continuous bytes in memory. In my
> next patch
> I will remove this extra serialization step.
>
> > Koichi's approach parallelizes redo (buffer I/O) itself, which attacks
> > a larger cost — Jakub's flamegraphs show BufferAlloc ->
> > GetVictimBuffer -> FlushBuffer dominating in both p0 and p1 — but at
> > the expense of much harder concurrency problems.
> >
> > Whether the decode pipelining ceiling is high enough, or whether the
> > redo parallelization complexity is tractable, seems like the central
> > design question for this area.
>
> I still have to investigate the problem related to `GetVictimBuffer` that
> Jakub mentioned. But I was trying that how can we safely offload the work done
> by `XLogReadBufferForRedoExtended` to a separate
> pipeline worker, or maybe we can try prefetching the buffer header so
> the main redo
> loop doesn't have to spend time getting the buffer
>
> Thanks for the feedback. That was helpful.
>
>
> Regards,
> Imran Zaheer
Attachments:
[application/octet-stream] v3-0002-Pipelined-Recovery-Consumer.patch (47.7K, 2-v3-0002-Pipelined-Recovery-Consumer.patch)
download | inline diff:
From 3265e397a3cd5ddde4c5376d8ce2f55908ae0af7 Mon Sep 17 00:00:00 2001
From: Imran Zaheer <[email protected]>
Date: Tue, 7 Apr 2026 14:40:49 +0500
Subject: [PATCH v3 2/2] Pipelined Recovery - Consumer
This includes the consumer-specific code for the producer-consumer
architecture for WAL replay that separates WAL decoding from the recovery process,
enabling parallel processing between different steps of replay.
The consumer receives the decoded record from the shared memory message
queue. Both producer and consumer should be aware of any state changes,
so variable states are synced through shared memory `XLogRecoveryCtlData`.
Also, constant states are shared from the consumer to the producer using
`WalPipelineParams`.
Author: Imran Zaheer <[email protected]>
Idea by: Ants Aasma <[email protected]>
---
src/backend/access/transam/xlog.c | 5 +-
src/backend/access/transam/xlogpipeline.c | 451 +++++++++++++++++++
src/backend/access/transam/xlogprefetcher.c | 3 +-
src/backend/access/transam/xlogrecovery.c | 455 ++++++++++++++++++--
src/backend/storage/ipc/standby.c | 1 +
src/include/access/xlogpipeline.h | 16 +
src/include/access/xlogrecovery.h | 50 +++
7 files changed, 952 insertions(+), 29 deletions(-)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 260fc801ce2..3fb2af913d8 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -59,6 +59,7 @@
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xloginsert.h"
+#include "access/xlogpipeline.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
#include "access/xlogutils.h"
@@ -6269,6 +6270,7 @@ StartupXLOG(void)
ProcArrayApplyRecoveryInfo(&running);
}
+ SetSharedHotStandbyState();
}
/*
@@ -8929,6 +8931,7 @@ xlog_redo(XLogReaderState *record)
running.xids = xids;
ProcArrayApplyRecoveryInfo(&running);
+ SetSharedHotStandbyState();
}
/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
@@ -10161,7 +10164,7 @@ GetOldestRestartPoint(XLogRecPtr *oldrecptr, TimeLineID *oldtli)
void
XLogShutdownWalRcv(void)
{
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
ShutdownWalRcv();
ResetInstallXLogFileSegmentActive();
diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c
index 3f482c6ba44..2c98b5d7dec 100644
--- a/src/backend/access/transam/xlogpipeline.c
+++ b/src/backend/access/transam/xlogpipeline.c
@@ -53,6 +53,16 @@
#include "utils/timeout.h"
+/*
+ * Convert values of GUCs measured in megabytes to bytes
+ */
+#define MBToBytes(mbvar) (mbvar * 1024 * 1024)
+
+/*
+ * Waiting for consumer before exiting gracefully.
+ */
+#define MAX_SHUTDOWN_WAIT_ITERS 1000 /* 1000 * 10ms = 10 seconds */
+
/* Global shared memory control structure */
WalPipelineShmCtl *WalPipelineShm = NULL;
@@ -72,6 +82,10 @@ static dsm_segment *producer_dsm_seg = NULL;
static shm_mq *producer_mq = NULL;
static shm_mq_handle *producer_mq_handle = NULL;
+/* Local state for consumer */
+static dsm_segment *consumer_dsm_seg = NULL;
+static shm_mq *consumer_mq = NULL;
+static shm_mq_handle *consumer_mq_handle = NULL;
/*
* Flags set by interrupt handlers for later service in the redo loop.
@@ -84,8 +98,10 @@ static void PipelineBgwSigHupHandler(SIGNAL_ARGS);
/* Forward declarations */
static void wal_pipeline_cleanup_callback(int code, Datum arg);
static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static DecodedXLogRecord *deserialize_wal_record(const char *buffer, Size len, XLogReaderState *startup_reader, bool first_iteration);
static void cleanup_producer_resources(void);
static void cleanup_consumer_resources(void);
+static void WalPipeline_WaitForConsumerShutdownRequest(void);
/* copied from xlogrecovery.c */
/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
@@ -118,6 +134,168 @@ WalPipelineShmemInit(void *arg)
SpinLockInit(&WalPipelineShm->mutex);
}
+/*
+ * Called by Consumer.
+ *
+ * Initialize and start the WAL pipeline. This will be called by the startup
+ * process (consumer) as a request to start the pipeline.
+ */
+void
+WalPipeline_Start(WalPipelineParams *params)
+{
+ BackgroundWorker worker;
+ BackgroundWorkerHandle *handle;
+ dsm_segment *seg;
+ shm_toc_estimator e;
+ shm_toc *toc;
+ Size segsize;
+ shm_mq *mq;
+ WalPipelineParams *shared_params;
+ pid_t pid;
+ BgwHandleStatus status;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ if (WalPipelineShm->initialized)
+ {
+ SpinLockRelease(&WalPipelineShm->mutex);
+ return; /* Already started */
+ }
+ WalPipelineShm->initialized = true;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ shm_toc_initialize_estimator(&e);
+ shm_toc_estimate_chunk(&e, sizeof(WalPipelineParams));
+ shm_toc_estimate_chunk(&e, MBToBytes(wal_pipeline_mq_size_mb));
+ shm_toc_estimate_keys(&e, 2); /* key=1 → params, key=2 → mq */
+ segsize = shm_toc_estimate(&e);
+
+ seg = dsm_create(segsize, 0);
+ dsm_pin_segment(seg);
+
+ toc = shm_toc_create(PG_WAL_PIPELINE_MAGIC,
+ dsm_segment_address(seg), segsize);
+
+ /*
+ * Pass the startup process vaaraibles state through WalPipelineParams
+ */
+ shared_params = shm_toc_allocate(toc, sizeof(WalPipelineParams));
+ shm_toc_insert(toc, 1, shared_params);
+ *shared_params = *params;
+
+ /* create the message queue */
+ mq = shm_mq_create(shm_toc_allocate(toc, MBToBytes(wal_pipeline_mq_size_mb)),
+ MBToBytes(wal_pipeline_mq_size_mb));
+ shm_toc_insert(toc, 2, mq);
+
+ /* update shared state */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->dsm_seg_handle = dsm_segment_handle(seg);
+ WalPipelineShm->consumer_pid = MyProcPid;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ /* Set up consumer side of the queue */
+ consumer_dsm_seg = seg;
+ consumer_mq = mq;
+ shm_mq_set_receiver(consumer_mq, MyProc);
+ consumer_mq_handle = shm_mq_attach(consumer_mq, seg, NULL);
+
+ /* Register cleanup callback */
+ before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+ /* Register background worker */
+ memset(&worker, 0, sizeof(worker));
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
+ worker.bgw_start_time = BgWorkerStart_PostmasterStart;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ sprintf(worker.bgw_library_name, "postgres");
+ sprintf(worker.bgw_function_name, "WalPipeline_ProducerMain");
+ snprintf(worker.bgw_name, BGW_MAXLEN, "wal pipeline producer");
+ snprintf(worker.bgw_type, BGW_MAXLEN, "wal pipeline producer");
+ worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+ worker.bgw_notify_pid = MyProcPid;
+
+ if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->initialized = false;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ dsm_unpin_segment(dsm_segment_handle(seg));
+ dsm_detach(seg);
+ consumer_dsm_seg = NULL;
+ consumer_mq = NULL;
+ consumer_mq_handle = NULL;
+
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not register background worker for WAL pipeline")));
+ }
+
+ status = WaitForBackgroundWorkerStartup(handle, &pid);
+
+ if (status != BGWH_STARTED)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not start background process"),
+ errhint("More details may be available in the server log.")));
+ else
+ ereport(LOG, (errmsg("[walpipeline] started.")));
+}
+
+/*
+ * Request producer shutdown.
+ * This is called by the consumer when it no longer needs records.
+ */
+static void
+WalPipeline_RequestShutdown(void)
+{
+ if (!WalPipelineShm)
+ return;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->shutdown_requested = true;
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+/*
+ * Consumer Function.
+ * Stop the WAL pipeline. This will also be called be the startup process
+ * (consumer). This will only be called when consumer don't need any more
+ * decoded records. This function will also wait until the pipeline workers
+ * are stopped.
+ */
+void
+WalPipeline_Stop(void)
+{
+ if (!WalPipelineShm || !WalPipelineShm->initialized)
+ return;
+
+ /* Ask producer to stop */
+ WalPipeline_RequestShutdown();
+
+ /* Wait for producer to exit (max 10 seconds) */
+ for (int i = 0; i < 100; i++)
+ {
+ bool producer_alive;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ producer_alive = (WalPipelineShm->producer_pid != 0);
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (!producer_alive)
+ break;
+
+ pg_usleep(100000); /* 100 ms */
+ }
+
+ cleanup_consumer_resources();
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->initialized = false;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ elog(LOG, "[walpipeline] stopped");
+}
/*
* Producer Function.
@@ -357,6 +535,116 @@ WalPipeline_SendError(int errcode, const char *errmsg)
return true;
}
+/*
+ * Consumer Function.
+ * Receive and deserialize a WAL record from the producer
+ */
+DecodedXLogRecord *
+WalPipeline_ReceiveRecord(XLogReaderState *startup_reader, bool first_iteration)
+{
+ shm_mq_result res;
+ Size nbytes;
+ void *data;
+ char *err_msg;
+ int err_code;
+ WalRecordMsgHeader *hdr;
+ DecodedXLogRecord *record;
+
+ if (!consumer_mq_handle)
+ return NULL;
+
+ /* Receive message from queue */
+ res = shm_mq_receive(consumer_mq_handle, &nbytes, &data, false);
+
+ if (res != SHM_MQ_SUCCESS)
+ elog(ERROR, "[walpipeline] consumer: failed to receive record");
+
+ hdr = (WalRecordMsgHeader *) data;
+
+ /* Handle different message types */
+ switch (hdr->msg_type)
+ {
+ case WAL_MSG_RECORD:
+ record = deserialize_wal_record((char *) data, nbytes, startup_reader, first_iteration);
+
+ /* Update statistics */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->records_received++;
+ WalPipelineShm->bytes_received += nbytes;
+ WalPipelineShm->consumer_lsn = hdr->endRecPtr;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ return record;
+
+ case WAL_MSG_SHUTDOWN:
+ elog(LOG, "[walpipeline] consumer: received shutdown message from the producer");
+ return NULL;
+
+ case WAL_MSG_ERROR:
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ err_code = WalPipelineShm->error_code;
+ err_msg = WalPipelineShm->error_message;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ ereport(ERROR,
+ (errcode(err_code),
+ errmsg("[walpipeline] consumer: received error from the producer: %s", err_msg)));
+ return NULL;
+
+ default:
+ elog(PANIC, "[walpipeline] consumer: unknown message type: %d",
+ hdr->msg_type);
+ return NULL;
+ }
+}
+
+/*
+ * Consumer Function.
+ * Check if producer is still running
+ */
+bool
+WalPipeline_CheckProducerAlive(void)
+{
+ pid_t pid;
+ bool alive;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ pid = WalPipelineShm->producer_pid;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (pid == 0)
+ return false;
+
+ alive = (kill(pid, 0) == 0);
+
+ if (!alive)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_pid = 0;
+ SpinLockRelease(&WalPipelineShm->mutex);
+ }
+
+ return alive;
+}
+
+/*
+ * Consumer Function.
+ * Check if pipeline is active
+ */
+bool
+WalPipeline_IsActive(void)
+{
+ bool active;
+
+ if (!WalPipelineShm)
+ return false;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ active = WalPipelineShm->initialized && !WalPipelineShm->shutdown_requested;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ return active;
+}
/*
* Producer Function.
@@ -395,6 +683,55 @@ WalPipeline_WaitForConsumerShutdownRequest(void)
}
}
+/*
+ * Consumer Function.
+ * Wait unless last sent record by the pipeline is applied by the
+ * startup process.
+ */
+void
+WalPipeline_WaitForConsumerCatchup(void)
+{
+ XLogRecPtr producer_lsn;
+ XLogRecPtr consumer_lsn;
+
+ for (;;)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ producer_lsn = WalPipelineShm->producer_lsn;
+ consumer_lsn = WalPipelineShm->applied_lsn;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (producer_lsn == consumer_lsn)
+ return;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* short sleep to avoid busy looping */
+ pg_usleep(50); /* 50 microseconds */
+ }
+}
+
+/*
+ * Consumer Function.
+ * Get pipeline statistics
+ */
+void
+WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+ XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn)
+{
+ SpinLockAcquire(&WalPipelineShm->mutex);
+
+ if (records_sent)
+ *records_sent = WalPipelineShm->records_sent;
+ if (records_received)
+ *records_received = WalPipelineShm->records_received;
+ if (producer_lsn)
+ *producer_lsn = WalPipelineShm->producer_lsn;
+ if (consumer_lsn)
+ *consumer_lsn = WalPipelineShm->consumer_lsn;
+
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
/*
@@ -454,6 +791,94 @@ serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
return total;
}
+/*
+ * deserialize_wal_record (Consumer)
+ *
+ * Unpack a buffer produced by serialize_wal_record, restore interior
+ * offsets to pointers, and attach the record to the startup reader.
+ */
+DecodedXLogRecord *
+deserialize_wal_record(const char *buf, Size len,
+ XLogReaderState *startup_reader, bool first_iteration)
+{
+ WalRecordMsgHeader hdr;
+ DecodedXLogRecord *dec;
+
+ if (len < sizeof(hdr))
+ return NULL;
+
+ memcpy(&hdr, buf, sizeof(hdr));
+
+ if (hdr.decoded_size != len - sizeof(hdr))
+ return NULL;
+
+ dec = palloc(hdr.decoded_size);
+ memcpy(dec, buf + sizeof(hdr), hdr.decoded_size);
+
+ /*
+ * Restore interior pointers from offsets.
+ * Offset 0 means the original pointer was NULL.
+ */
+ if (dec->main_data_len > 0)
+ dec->main_data = (char *)dec + (ptrdiff_t)dec->main_data;
+
+ for (int i = 0; i <= dec->max_block_id; i++)
+ {
+ DecodedBkpBlock *blk = &dec->blocks[i];
+ if (!blk->in_use)
+ continue;
+ if (blk->has_data)
+ blk->data = (char *)dec + (ptrdiff_t)blk->data;
+ if (blk->has_image)
+ blk->bkp_image = (char *)dec + (ptrdiff_t)blk->bkp_image;
+ }
+
+ /* clear the queue link — it belongs to the producer's queue */
+ dec->next = NULL;
+
+ /*
+ * The previous decoded record has been deserialized from
+ * from the pipeline and hence need to free the memory after
+ * use.
+ *
+ * But for the first iteration memory space for `reader->record`
+ * was allocated from the `decode_buffer`, and freeing this
+ * memory can be fatal. This memory will be freed automatically
+ * at the end of the recovery in `finishwalrecovery()`. So we
+ * will skip pfree for the first iteration (main redo loop).
+ */
+ if (startup_reader->record && !first_iteration)
+ pfree(startup_reader->record);
+
+ /* Attach to reader, only updating the public parameters */
+ startup_reader->record = dec;
+ startup_reader->ReadRecPtr = dec->lsn;
+ startup_reader->DecodeRecPtr = dec->lsn;
+ startup_reader->EndRecPtr = dec->next_lsn;
+ startup_reader->NextRecPtr = dec->next_lsn;
+ startup_reader->decode_queue_head = dec;
+ startup_reader->decode_queue_tail = dec;
+ startup_reader->missingContrecPtr = hdr.missingContrecPtr;
+ startup_reader->abortedRecPtr = hdr.abortedRecPtr;
+ startup_reader->overwrittenRecPtr = hdr.overwrittenRecPtr;
+
+ return dec;
+}
+
+/*
+ * We need to put some assertion that only pipeline worker should be touching
+ * the specific code.
+ */
+bool AmWalPipeline(void)
+{
+ if (MyBackendType == B_BG_WORKER && MyBgworkerEntry)
+ {
+ if (strncmp(MyBgworkerEntry->bgw_name, "wal pipeline", 12) == 0)
+ return true;
+ }
+
+ return false;
+}
/*
* Clean up producer-side resources
@@ -481,6 +906,32 @@ cleanup_producer_resources(void)
SpinLockRelease(&WalPipelineShm->mutex);
}
+/*
+ * Clean up consumer-side resources
+ */
+static void
+cleanup_consumer_resources(void)
+{
+ if (consumer_mq_handle)
+ {
+ shm_mq_detach(consumer_mq_handle);
+ consumer_mq_handle = NULL;
+ }
+
+ if (consumer_dsm_seg)
+ {
+ dsm_unpin_segment(dsm_segment_handle(consumer_dsm_seg));
+ dsm_detach(consumer_dsm_seg);
+ consumer_dsm_seg = NULL;
+ }
+
+ consumer_mq = NULL;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->consumer_pid = 0;
+ WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID;
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
/*
* Cleanup callback for process exit
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index 83a3f97a57c..e378b646557 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -27,6 +27,7 @@
#include "postgres.h"
+#include "access/xlogpipeline.h"
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
@@ -355,7 +356,7 @@ XLogPrefetchReconfigure(void)
static inline void
XLogPrefetchIncrement(pg_atomic_uint64 *counter)
{
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
}
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index 88c0b43ac76..3b894745033 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -357,12 +357,6 @@ static void recoveryPausesHere(bool endOfRecovery);
static bool recoveryApplyDelay(XLogReaderState *record);
static void ConfirmRecoveryPaused(void);
-static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher,
- int emode, bool fetching_ckpt,
- TimeLineID replayTLI);
-
-static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr,
bool randAccess,
bool fetching_ckpt,
@@ -384,6 +378,7 @@ static bool HotStandbyActiveInReplay(void);
static void SetCurrentChunkStartTime(TimestampTz xtime);
static void SetLatestXTime(TimestampTz xtime);
+static void InitializePipelineStartupEnv(WalPipelineParams *params);
/*
* Register shared memory for WAL recovery
@@ -404,9 +399,27 @@ XLogRecoveryShmemInit(void *arg)
SpinLockInit(&XLogRecoveryCtl->info_lck);
InitSharedLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ InitSharedLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
ConditionVariableInit(&XLogRecoveryCtl->recoveryNotPausedCV);
}
+/*
+ * We may not be able to share expectedTLEs list across the sharedmemory.
+ * For now just trigger the startup process (consumer) to
+ * reread the timelinehistory file whenever pipeline updates the value for
+ * expectedTLEs. So the consumer proc will expectedTLEs updated locally.
+ */
+static void
+PipelineConsumerexpectedTLEsUpdateTLI(TimeLineID recoveryTargetTLI)
+{
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->expectedTLEsUpdateTLI = recoveryTargetTLI;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+}
+
/*
* A thin wrapper to enable StandbyMode and do other preparatory work as
* needed.
@@ -422,7 +435,8 @@ EnableStandbyMode(void)
* startup progress timeout in standby mode to avoid calling
* startup_progress_timeout_handler() unnecessarily.
*/
- disable_startup_progress_timeout();
+ if (!AmWalPipeline())
+ disable_startup_progress_timeout();
}
/*
@@ -482,7 +496,10 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
* recovery, if required.
*/
if (ArchiveRecoveryRequested)
+ {
+ OwnLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ }
/*
* Set the WAL reading processor now, as it will be needed when reading
@@ -954,6 +971,14 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
minRecoveryPointTLI = 0;
}
+ /* update shared state. */
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
/*
* Start recovery assuming that the final record isn't lost.
*/
@@ -965,6 +990,12 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr,
*haveTblspcMap_ptr = haveTblspcMap;
}
+void DisownRecoveryWakeupLatch(void)
+{
+ if (ArchiveRecoveryRequested)
+ DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
+
/*
* See if there are any recovery signal files and if so, set state for
* recovery.
@@ -1413,6 +1444,15 @@ FinishWalRecovery(void)
TimeLineID lastRecTLI;
XLogRecPtr endOfLog;
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ InArchiveRecovery = XLogRecoveryCtl->InArchiveRecovery;
+ missingContrecPtr = XLogRecoveryCtl->missingContrecPtr;
+ abortedRecPtr = XLogRecoveryCtl->abortedRecPtr;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
/*
* Kill WAL receiver, if it's still running, before we continue to write
* the startup checkpoint and aborted-contrecord records. It will trump
@@ -1470,6 +1510,17 @@ FinishWalRecovery(void)
lastRec = XLogRecoveryCtl->lastReplayedReadRecPtr;
lastRecTLI = XLogRecoveryCtl->lastReplayedTLI;
}
+
+ /*
+ * Invalidate contents of internal buffer before read attempt. Just set
+ * the length to 0, rather than a full XLogReaderInvalReadState().
+ *
+ * This is needed because we could be reading from the pipeline reader so
+ * far, so before moving back the the startup proc readerstate better to
+ * invalidate it.
+ */
+ xlogreader->readLen = 0;
+
XLogPrefetcherBeginRead(xlogprefetcher, lastRec);
(void) ReadRecord(xlogprefetcher, PANIC, false, lastRecTLI);
endOfLog = xlogreader->EndRecPtr;
@@ -1592,7 +1643,69 @@ ShutdownWalRecovery(void)
* it, but let's do it for the sake of tidiness.
*/
if (ArchiveRecoveryRequested)
- DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ {
+ DisownLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
+
+ /*
+ * Only disown the latch if we were the owner (pipeline disabled).
+ */
+ if (!wal_pipeline_enabled)
+ DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ }
+
+
+}
+
+/*
+ * Get next record for redo.
+ * Use the pipeline if enabled for parallel decoding and receive decoded
+ * records from a shared queue, else read it directly.
+ */
+static XLogRecord *
+ReceiveRecord(XLogPrefetcher *xlogprefetcher, int emode,
+ bool fetching_ckpt, TimeLineID replayTLI,
+ XLogReaderState **localreader, bool first_iteration)
+{
+
+ XLogRecord *record = NULL;
+ XLogReaderState *reader = *localreader;
+
+ /*
+ * If pipeline not enabled read the record directly
+ */
+ if (!wal_pipeline_enabled)
+ {
+ record = ReadRecord(xlogprefetcher, emode, fetching_ckpt, replayTLI);
+ return record;
+ }
+
+ /*
+ * Get record from the pipeline
+ */
+ if (WalPipeline_IsActive())
+ {
+ DecodedXLogRecord *decoded_record = NULL;
+
+ decoded_record = WalPipeline_ReceiveRecord(reader, first_iteration);
+
+ if (decoded_record)
+ {
+ record = &decoded_record->header;
+ return record;
+ }
+ else
+ {
+ /*
+ * We will end up here only when pipeline couldn't read more
+ * records and have sent a shutdown msg. We will acknowldge this
+ * and will trigger request to stop the pipeline workers.
+ */
+ WalPipeline_Stop();
+ return NULL;
+ }
+ }
+
+ elog(PANIC, "[walpipeline] consumer: pipeline not active, even though wal_pipeline is set to on.");
}
/*
@@ -1607,6 +1720,12 @@ PerformWalRecovery(void)
bool reachedRecoveryTarget = false;
TimeLineID replayTLI;
+ /*
+ * standalone backend may exist in case of pg_rewind.
+ */
+ if (!IsUnderPostmaster)
+ wal_pipeline_enabled = false;
+
/*
* Initialize shared variables for tracking progress of WAL replay, as if
* we had just replayed the record before the REDO location (or the
@@ -1681,11 +1800,24 @@ PerformWalRecovery(void)
{
TimestampTz xtime;
PGRUsage ru0;
+ uint64 loop_count = 0;
pg_rusage_init(&ru0);
InRedo = true;
+ if(wal_pipeline_enabled)
+ {
+ /*
+ * Startup proc parameters that pipeline shold also be aware of.
+ */
+ WalPipelineParams *params = palloc0(sizeof(WalPipelineParams));
+
+ params->ReplayTLI = replayTLI;
+ InitializePipelineStartupEnv(params);
+ WalPipeline_Start(params);
+ }
+
RmgrStartup();
ereport(LOG,
@@ -1791,7 +1923,7 @@ PerformWalRecovery(void)
}
/* Else, try to fetch the next WAL record */
- record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
+ record = ReceiveRecord(xlogprefetcher, LOG, false, replayTLI, &xlogreader, loop_count++ == 0);
} while (record != NULL);
/*
@@ -1800,6 +1932,9 @@ PerformWalRecovery(void)
if (reachedRecoveryTarget)
{
+ if (wal_pipeline_enabled)
+ WalPipeline_Stop();
+
if (!reachedConsistency)
ereport(FATAL,
(errmsg("requested recovery stop point is before consistent recovery point")));
@@ -1834,6 +1969,8 @@ PerformWalRecovery(void)
RmgrCleanup();
+ // XXX: testing purpose only
+ ereport(DEBUG1, (errmsg("replay loop fiinished with loop count: " UINT64_FORMAT, loop_count)));
ereport(LOG,
errmsg("redo done at %X/%08X system usage: %s",
LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
@@ -1872,7 +2009,9 @@ static void
ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI)
{
ErrorContextCallback errcallback;
+ XLogRecPtr walrcv_flushedupto;
bool switchedTLI = false;
+ bool pipeline_enabled_stanby = false;
/* Setup error traceback support for ereport() */
errcallback.callback = rm_redo_error_callback;
@@ -1973,6 +2112,8 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
XLogRecoveryCtl->lastReplayedReadRecPtr = xlogreader->ReadRecPtr;
XLogRecoveryCtl->lastReplayedEndRecPtr = xlogreader->EndRecPtr;
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
+ walrcv_flushedupto = XLogRecoveryCtl->flushedUptoRecPtr;
+ pipeline_enabled_stanby = XLogRecoveryCtl->stanbyEnabled;
SpinLockRelease(&XLogRecoveryCtl->info_lck);
/* ------
@@ -2011,6 +2152,16 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
WalRcvRequestApplyReply();
}
+ /*
+ * As the pipeline (producer) was running way ahead of the startup proc
+ * (consumer), see if the producer asked to wakeup the wal_reciever by
+ * updating the value of `flushedUptoRecPtr`.
+ */
+ if ((walrcv_flushedupto != InvalidXLogRecPtr) &&
+ ((walrcv_flushedupto == xlogreader->EndRecPtr) ||
+ (walrcv_flushedupto == xlogreader->ReadRecPtr)))
+ WalRcvRequestApplyReply();
+
/* Allow read-only connections if we're consistent now */
CheckRecoveryConsistency();
@@ -2026,6 +2177,14 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
/* Reset the prefetcher. */
XLogPrefetchReconfigure();
}
+
+ /* Conusmer should also enable the standby if pipline have */
+ if (pipeline_enabled_stanby)
+ EnableStandbyMode();
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->applied_lsn = xlogreader->EndRecPtr;
+ SpinLockRelease(&WalPipelineShm->mutex);
}
/*
@@ -2151,12 +2310,27 @@ CheckRecoveryConsistency(void)
Assert(InArchiveRecovery);
- /*
- * assume that we are called in the startup process, and hence don't need
- * a lock to read lastReplayedEndRecPtr
- */
- lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
- lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+
+ if (AmStartupProcess())
+ {
+ /*
+ * assume that we are called in the startup process, and hence don't need
+ * a lock to read lastReplayedEndRecPtr
+ */
+ lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
+ lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+ }
+ else
+ {
+ /*
+ * We could be in the pipeline worker, so update the shared states.
+ */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ lastReplayedEndRecPtr = XLogRecoveryCtl->lastReplayedEndRecPtr;
+ lastReplayedTLI = XLogRecoveryCtl->lastReplayedTLI;
+ standbyState = XLogRecoveryCtl->standbyState;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
/*
* Have we reached the point where our base backup was completed?
@@ -2343,12 +2517,28 @@ static void
checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI,
TimeLineID replayTLI)
{
+
+
/* Check that the record agrees on what the current (old) timeline is */
if (prevTLI != replayTLI)
ereport(PANIC,
(errmsg("unexpected previous timeline ID %u (current timeline ID %u) in checkpoint record",
prevTLI, replayTLI)));
+
+ /* Pipeline may have updated the expectedTLEs */
+ if (wal_pipeline_enabled)
+ {
+ TimeLineID targetTLI;
+
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ targetTLI = XLogRecoveryCtl->expectedTLEsUpdateTLI;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+
+ if (targetTLI)
+ expectedTLEs = readTimeLineHistory(targetTLI);
+ }
+
/*
* The new timeline better be in the list of timelines we expect to see,
* according to the timeline history. It should also not decrease.
@@ -2996,7 +3186,7 @@ recoveryApplyDelay(XLogReaderState *record)
while (true)
{
- ResetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ ResetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
/* This might change recovery_min_apply_delay. */
ProcessStartupProcInterrupts();
@@ -3021,7 +3211,7 @@ recoveryApplyDelay(XLogReaderState *record)
elog(DEBUG2, "recovery apply delay %ld milliseconds", msecs);
- (void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
+ (void) WaitLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
msecs,
WAIT_EVENT_RECOVERY_APPLY_DELAY);
@@ -3093,7 +3283,7 @@ ConfirmRecoveryPaused(void)
* (emode must be either PANIC, LOG). In standby mode, retries until a valid
* record is available.
*/
-static XLogRecord *
+XLogRecord *
ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
bool fetching_ckpt, TimeLineID replayTLI)
{
@@ -3101,7 +3291,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher);
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
/* Pass through parameters to XLogPageRead */
private->fetching_ckpt = fetching_ckpt;
@@ -3136,6 +3326,17 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
{
abortedRecPtr = xlogreader->abortedRecPtr;
missingContrecPtr = xlogreader->missingContrecPtr;
+
+ /*
+ * Also update the shared state if necessary
+ */
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->abortedRecPtr = abortedRecPtr;
+ XLogRecoveryCtl->missingContrecPtr = missingContrecPtr;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
}
if (readFile >= 0)
@@ -3203,9 +3404,31 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
if (!InArchiveRecovery && ArchiveRecoveryRequested &&
!fetching_ckpt)
{
+ /*
+ * Wait for the startup process to apply the last sent record
+ * by the pipeline, otherwise we will fail the consistency
+ * check as all the records decoded by the pipeline have not
+ * arrived/consumed by the consumer (statup proc) yet.
+ */
+ if (wal_pipeline_enabled && AmWalPipeline())
+ WalPipeline_WaitForConsumerCatchup();
+
ereport(DEBUG1,
(errmsg_internal("reached end of WAL in pg_wal, entering archive recovery")));
InArchiveRecovery = true;
+
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+
+ /* update startup proc (consumer) about the standbymode */
+ if (StandbyModeRequested)
+ XLogRecoveryCtl->stanbyEnabled = true;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
if (StandbyModeRequested)
EnableStandbyMode();
@@ -3262,7 +3485,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
*/
-static int
+int
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *readBuf)
{
@@ -3274,7 +3497,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
int r;
instr_time io_start;
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ Assert(AmStartupProcess() || !IsUnderPostmaster || AmWalPipeline());
XLByteToSeg(targetPagePtr, targetSegNo, wal_segment_size);
targetPageOff = XLogSegmentOffset(targetPagePtr, wal_segment_size);
@@ -3700,7 +3923,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
LSN_FORMAT_ARGS(RecPtr));
/* Do background tasks that might benefit us later. */
- KnownAssignedTransactionIdsIdleMaintenance();
+ if (AmStartupProcess())
+ KnownAssignedTransactionIdsIdleMaintenance();
(void) WaitLatch(&XLogRecoveryCtl->recoveryWakeupLatch,
WL_LATCH_SET | WL_TIMEOUT |
@@ -3712,6 +3936,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/* Handle interrupt signals of startup process */
ProcessStartupProcInterrupts();
+ if (wal_pipeline_enabled)
+ ProcessPipelineBgwInterrupts();
}
last_fail_time = now;
currentSource = XLOG_FROM_ARCHIVE;
@@ -3737,6 +3963,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
xlogSourceNames[oldSource], xlogSourceNames[currentSource],
lastSourceFailed ? "failure" : "success");
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->currentSource = currentSource;
+ pendingWalRcvRestart = XLogRecoveryCtl->pendingWalRcvRestart;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
/*
* We've now handled possible failure. Try to read from the chosen
* source.
@@ -3811,6 +4046,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
}
pendingWalRcvRestart = false;
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->pendingWalRcvRestart = false;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
/*
* Launch walreceiver if needed.
*
@@ -3888,6 +4131,15 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (latestChunkStart <= RecPtr)
{
XLogReceiptTime = GetCurrentTimestamp();
+
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
SetCurrentChunkStartTime(XLogReceiptTime);
}
}
@@ -3916,7 +4168,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
if (readFile < 0)
{
if (!expectedTLEs)
+ {
expectedTLEs = readTimeLineHistory(recoveryTargetTLI);
+ PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI);
+ }
+
+
readFile = XLogFileRead(readSegNo, receiveTLI,
XLOG_FROM_STREAM, false);
Assert(readFile >= 0);
@@ -3926,6 +4183,13 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
/* just make sure source info is correct... */
readSource = XLOG_FROM_STREAM;
XLogReceiptSource = XLOG_FROM_STREAM;
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
return XLREAD_SUCCESS;
}
break;
@@ -3963,15 +4227,35 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/
if (!streaming_reply_sent)
{
- WalRcvRequestApplyReply();
- streaming_reply_sent = true;
+ if (wal_pipeline_enabled && AmWalPipeline())
+ {
+ /*
+ * In case of pipeline enabled, we cannot just call
+ * WalRcvForceReply() directly as the consumer (startup proc)
+ * haven't actually received/replayed all the wal
+ * received from the wal_receiver yet.
+ */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->flushedUptoRecPtr = flushedUpto;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ streaming_reply_sent = true;
+ }
+ else
+ {
+ WalRcvRequestApplyReply();
+ streaming_reply_sent = true;
+ }
}
/* Do any background tasks that might benefit us later. */
- KnownAssignedTransactionIdsIdleMaintenance();
+ if (AmStartupProcess())
+ KnownAssignedTransactionIdsIdleMaintenance();
/* Update pg_stat_recovery_prefetch before sleeping. */
- XLogPrefetcherComputeStats(xlogprefetcher);
+ if (AmWalPipeline())
+ XLogPrefetcherComputeStats(xlogprefetcher_pipelined);
+ else
+ XLogPrefetcherComputeStats(xlogprefetcher);
/*
* Wait for more WAL to arrive, when we will be woken
@@ -4002,6 +4286,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* process.
*/
ProcessStartupProcInterrupts();
+ if (wal_pipeline_enabled)
+ ProcessPipelineBgwInterrupts();
}
return XLREAD_FAIL; /* not reached */
@@ -4166,6 +4452,7 @@ rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN)
recoveryTargetTLI = newtarget;
list_free_deep(expectedTLEs);
expectedTLEs = newExpectedTLEs;
+ PipelineConsumerexpectedTLEsUpdateTLI(newtarget);
/*
* As in StartupXLOG(), try to ensure we have all the history files
@@ -4255,6 +4542,15 @@ XLogFileRead(XLogSegNo segno, TimeLineID tli,
if (source != XLOG_FROM_STREAM)
XLogReceiptTime = GetCurrentTimestamp();
+ if (wal_pipeline_enabled)
+ {
+ /* also update the shared state */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+ XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
return fd;
}
if (errno != ENOENT || !notfoundOk) /* unexpected failure? */
@@ -4339,7 +4635,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
{
elog(DEBUG1, "got WAL segment from archive");
if (!expectedTLEs)
+ {
expectedTLEs = tles;
+ PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI);
+ }
return fd;
}
}
@@ -4350,7 +4649,10 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
if (fd != -1)
{
if (!expectedTLEs)
+ {
expectedTLEs = tles;
+ PipelineConsumerexpectedTLEsUpdateTLI(recoveryTargetTLI);
+ }
return fd;
}
}
@@ -4372,16 +4674,52 @@ XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source)
void
StartupRequestWalReceiverRestart(void)
{
+
+ /*
+ * currentSource is also defined as pipeline shared state variable.
+ * Update the state before procedding.
+ */
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ currentSource = XLogRecoveryCtl->currentSource;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
if (currentSource == XLOG_FROM_STREAM && WalRcvRunning())
{
ereport(LOG,
(errmsg("WAL receiver process shutdown requested")));
pendingWalRcvRestart = true;
+
+ /*
+ * pendingWalRcvRestart is also defined as pipeline shared state variable.
+ * Update the state before procedding.
+ */
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
}
}
+/*
+ * standbyState is also defined as a shared state. Pipeline worker can also
+ * update its value, so always confirm the shared state before procedding.
+ */
+void
+SetSharedHotStandbyState(void)
+{
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->standbyState = standbyState;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+}
+
+
/*
* Has a standby promotion already been triggered?
*
@@ -4433,7 +4771,7 @@ CheckForStandbyTrigger(void)
if (LocalPromoteIsTriggered)
return true;
- if (IsPromoteSignaled() && CheckPromoteSignal())
+ if (CheckPromoteSignal())
{
ereport(LOG, (errmsg("received promote request")));
RemovePromoteSignalFiles();
@@ -4476,6 +4814,7 @@ void
WakeupRecovery(void)
{
SetLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+ SetLatch(&XLogRecoveryCtl->recoveryApplyDelayLatch);
}
/*
@@ -4645,6 +4984,14 @@ GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream)
*/
Assert(InRecovery);
+ if (wal_pipeline_enabled)
+ {
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogReceiptTime = XLogRecoveryCtl->XLogReceiptTime;
+ XLogReceiptSource = XLogRecoveryCtl->XLogReceiptSource;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ }
+
*rtime = XLogReceiptTime;
*fromStream = (XLogReceiptSource == XLOG_FROM_STREAM);
}
@@ -4730,6 +5077,60 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue
}
}
+static void
+InitializePipelineStartupEnv(WalPipelineParams *params)
+{
+ /*
+ * These parameters are already set for the startup process but not for our
+ * pipeline worker. So in order to start decoding through the pipeline,
+ * these variables should be saved and then restored later.
+ */
+ params->NextRecPtr = xlogreader->NextRecPtr;
+ params->recoveryTargetTLI = recoveryTargetTLI;
+ params->StandbyModeRequested = StandbyModeRequested;
+ params->StandbyMode = StandbyMode;
+ params->ArchiveRecoveryRequested = ArchiveRecoveryRequested;
+ params->InArchiveRecovery = InArchiveRecovery;
+ params->minRecoveryPointTLI = minRecoveryPointTLI;
+ params->minRecoveryPoint = minRecoveryPoint;
+ params->InRedo = InRedo;
+ params->currentSource = currentSource;
+ params->lastSourceFailed = lastSourceFailed;
+ params->pendingWalRcvRestart = pendingWalRcvRestart;
+ params->RedoStartTLI = RedoStartTLI;
+ params->CheckPointLoc = CheckPointLoc;
+ params->CheckPointTLI = CheckPointTLI;
+ params->RedoStartLSN = RedoStartLSN;
+ params->standbyState = standbyState;
+ params->flushedUpto = flushedUpto;
+ params->receiveTLI = receiveTLI;
+ params->abortedRecPtr = abortedRecPtr;
+ params->missingContrecPtr = missingContrecPtr;
+ params->backupEndRequired = backupEndRequired;
+ params->backupStartPoint = backupStartPoint;
+ params->backupEndPoint = backupEndPoint;
+ params->curFileTLI = curFileTLI;
+
+ /*
+ * The pipeline will do the waiting in this case startup proc should disown
+ * the latch.
+ */
+ DisownRecoveryWakeupLatch();
+
+ /*
+ * Update shared state before starting.
+ */
+ SpinLockAcquire(&XLogRecoveryCtl->info_lck);
+ XLogRecoveryCtl->InArchiveRecovery = InArchiveRecovery;
+ XLogRecoveryCtl->pendingWalRcvRestart = pendingWalRcvRestart;
+ XLogRecoveryCtl->abortedRecPtr = abortedRecPtr;
+ XLogRecoveryCtl->missingContrecPtr = missingContrecPtr;
+ XLogRecoveryCtl->currentSource = currentSource;
+ XLogRecoveryCtl->standbyState = standbyState;
+ XLogRecoveryCtl->XLogReceiptSource = XLogReceiptSource;
+ XLogRecoveryCtl->XLogReceiptTime = XLogReceiptTime;
+ SpinLockRelease(&XLogRecoveryCtl->info_lck);
+}
/*
* Pipeline bgw should be aware of all the parameters thats been initialized by
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index de9092fdf5b..f7f31a916a7 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -1197,6 +1197,7 @@ standby_redo(XLogReaderState *record)
running.xids = xlrec->xids;
ProcArrayApplyRecoveryInfo(&running);
+ SetSharedHotStandbyState();
/*
* The startup process currently has no convenient way to schedule
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
index b4cde273e1c..7ff10055390 100644
--- a/src/include/access/xlogpipeline.h
+++ b/src/include/access/xlogpipeline.h
@@ -144,12 +144,28 @@ extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined;
* Public API functions
*/
+/* Start/stop the pipeline */
+extern void WalPipeline_Start(WalPipelineParams *params);
+extern void WalPipeline_Stop(void);
+
/* Producer functions (called by background worker) */
extern void WalPipeline_ProducerMain(Datum main_arg);
extern bool WalPipeline_SendRecord(XLogReaderState *record);
extern bool WalPipeline_SendShutdown(void);
extern bool WalPipeline_SendError(int errcode, const char *errmsg);
+/* Consumer functions (called by startup process) */
+extern DecodedXLogRecord *WalPipeline_ReceiveRecord(XLogReaderState *startup_reader, bool first_iteration);
+extern bool WalPipeline_CheckProducerAlive(void);
+
+/* Status and monitoring */
+extern bool WalPipeline_IsActive(void);
+extern void WalPipeline_WaitForConsumerCatchup(void);
+extern void WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+ XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn);
+extern bool AmWalPipeline(void);
+
+
extern void ProcessPipelineBgwInterrupts(void);
/* Global shared memory pointer */
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 8b943b8f395..80a25b2dee7 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -13,6 +13,7 @@
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
+#include "access/xlogutils.h"
#include "catalog/pg_control.h"
#include "lib/stringinfo.h"
#include "storage/condition_variable.h"
@@ -106,6 +107,13 @@ typedef struct XLogRecoveryCtlData
*/
Latch recoveryWakeupLatch;
+ /*
+ * In case pipeline enabled we will need two latches. One that can be used
+ * by the pipeline for WAL waiting and other that can be used by the
+ * startup process for the apply delay
+ */
+ Latch recoveryApplyDelayLatch;
+
/*
* Last record successfully replayed.
*/
@@ -133,6 +141,46 @@ typedef struct XLogRecoveryCtlData
ConditionVariable recoveryNotPausedCV;
slock_t info_lck; /* locks shared variables shown above */
+
+ /* ------------------------------------------------------------------
+ * Variables use for IPC between pipeline and the startup proc.
+ * These are also the static variables in xlogrecovery.c but there values
+ * keep on changing. So we added then in the shared memory so that both
+ * the pipeline and the startup proc stay synced on any of this state
+ * change
+ * ------------------------------------------------------------------
+ */
+
+ /*
+ * Pipeline could be waiting for the startup process to catchup with the
+ * decoder. This could happend when no wait wal is available from the
+ * current resource and now pipline have change the wal srouce
+ * i.e enabling standby if requested.
+ */
+ bool pipeline_waiting;
+ bool InArchiveRecovery;
+ bool pendingWalRcvRestart;
+ bool stanbyEnabled;
+
+ /* The target TLI for which expectedTLEs should be recomputed */
+ TimeLineID expectedTLEsUpdateTLI;
+
+ /*
+ * Normaly we wakeup walrcvr after specific records have been applied, as
+ * reads are sequential so we wkaeup after specific read. But in case of pipeline
+ * reads (decoded records) could be way ahead of the consumer. We cannot wakeup
+ * wal rcvr based on read, so we tell consumer to wakup after apllied records
+ * upto flushedUptoRecPtr
+ */
+ XLogRecPtr flushedUptoRecPtr;
+ XLogRecPtr abortedRecPtr;
+ XLogRecPtr missingContrecPtr;
+
+ XLogSource currentSource;
+ XLogSource XLogReceiptSource;
+
+ HotStandbyState standbyState;
+ TimestampTz XLogReceiptTime;
} XLogRecoveryCtlData;
extern PGDLLIMPORT XLogRecoveryCtlData *XLogRecoveryCtl;
@@ -239,6 +287,8 @@ extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, i
extern bool PromoteIsTriggered(void);
extern bool CheckPromoteSignal(void);
extern void WakeupRecovery(void);
+extern void DisownRecoveryWakeupLatch(void);
+extern void SetSharedHotStandbyState(void);
extern void StartupRequestWalReceiverRestart(void);
extern void XLogRequestWalReceiverReply(void);
--
2.49.0
[application/octet-stream] v3-0001-Pipelined-Recovery-Producer.patch (37.7K, 3-v3-0001-Pipelined-Recovery-Producer.patch)
download | inline diff:
From 3714219dde72857c033d0029c8ef9b30ca72f574 Mon Sep 17 00:00:00 2001
From: Imran Zaheer <[email protected]>
Date: Tue, 7 Apr 2026 14:40:15 +0500
Subject: [PATCH v3 1/2] Pipelined Recovery - Producer
This includes the producer specific code for the producer-consumer
architecture for WAL replay that separates WAL decoding from the recovery process,
enabling parallel processing between differemt steps of replay.
The producer includes a background worker that reads and decodes WAL records,
then send them to the startup process for the redo. IPC happens via shared
memory message queues (shm_mq), allowing the decoder to run ahead of the apply process.
This provides some improvement in recovery performance for CPU-bound workloads.
New GUC: wal_pipeline (default: off)
Author: Imran Zaheer <[email protected]>
Idea by: Ants Aasma <[email protected]>
---
src/backend/access/transam/Makefile | 1 +
src/backend/access/transam/meson.build | 1 +
src/backend/access/transam/xlogpipeline.c | 577 ++++++++++++++++++
src/backend/access/transam/xlogrecovery.c | 57 +-
src/backend/postmaster/bgworker.c | 5 +
src/backend/utils/misc/guc_parameters.dat | 15 +
src/backend/utils/misc/postgresql.conf.sample | 2 +
src/include/access/xlog.h | 2 +
src/include/access/xlogpipeline.h | 158 +++++
src/include/access/xlogrecovery.h | 19 +
src/include/storage/subsystemlist.h | 1 +
src/test/recovery/meson.build | 1 +
src/test/recovery/t/053_walpipeline.pl | 208 +++++++
13 files changed, 1036 insertions(+), 11 deletions(-)
create mode 100644 src/backend/access/transam/xlogpipeline.c
create mode 100644 src/include/access/xlogpipeline.h
create mode 100644 src/test/recovery/t/053_walpipeline.pl
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index a32f473e0a2..ba0bf343769 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -32,6 +32,7 @@ OBJS = \
xlogbackup.o \
xlogfuncs.o \
xloginsert.o \
+ xlogpipeline.o \
xlogprefetcher.o \
xlogreader.o \
xlogrecovery.o \
diff --git a/src/backend/access/transam/meson.build b/src/backend/access/transam/meson.build
index 06aadc7f315..be37b40581d 100644
--- a/src/backend/access/transam/meson.build
+++ b/src/backend/access/transam/meson.build
@@ -20,6 +20,7 @@ backend_sources += files(
'xlogbackup.c',
'xlogfuncs.c',
'xloginsert.c',
+ 'xlogpipeline.c',
'xlogprefetcher.c',
'xlogrecovery.c',
'xlogstats.c',
diff --git a/src/backend/access/transam/xlogpipeline.c b/src/backend/access/transam/xlogpipeline.c
new file mode 100644
index 00000000000..3f482c6ba44
--- /dev/null
+++ b/src/backend/access/transam/xlogpipeline.c
@@ -0,0 +1,577 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.c
+ * WAL replay pipeline implementation
+ *
+ * This module implements a producer-consumer pipeline for WAL replay.
+ * The producer (background worker) reads and decodes WAL records in parallel
+ * with the consumer (startup process) that applies them.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogpipeline.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/xlog.h"
+#include "access/xlogpipeline.h"
+#include "access/xlogprefetcher.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "access/xlog_internal.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "postmaster/startup.h"
+#include "storage/bufmgr.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/md.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "storage/smgr.h"
+#include "storage/subsystems.h"
+#include "tcop/tcopprot.h"
+#include "utils/elog.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/rel.h"
+#include "utils/timeout.h"
+
+
+
+/* Global shared memory control structure */
+WalPipelineShmCtl *WalPipelineShm = NULL;
+
+static void WalPipelineShmemRequest(void *arg);
+static void WalPipelineShmemInit(void *arg);
+
+const ShmemCallbacks WalPipelineShmemCallbacks = {
+ .request_fn = WalPipelineShmemRequest,
+ .init_fn = WalPipelineShmemInit,
+};
+
+XLogPrefetcher *xlogprefetcher_pipelined = NULL;
+
+/* Local state for producer */
+static dsm_segment *producer_dsm_seg = NULL;
+static shm_mq *producer_mq = NULL;
+static shm_mq_handle *producer_mq_handle = NULL;
+
+
+/*
+ * Flags set by interrupt handlers for later service in the redo loop.
+ */
+static volatile sig_atomic_t got_SIGHUP = false;
+
+/* Signal handlers */
+static void PipelineBgwSigHupHandler(SIGNAL_ARGS);
+
+/* Forward declarations */
+static void wal_pipeline_cleanup_callback(int code, Datum arg);
+static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static void cleanup_producer_resources(void);
+static void cleanup_consumer_resources(void);
+
+/* copied from xlogrecovery.c */
+/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
+typedef struct XLogPageReadPrivate
+{
+ int emode;
+ bool fetching_ckpt; /* are we fetching a checkpoint record? */
+ bool randAccess;
+ TimeLineID replayTLI;
+} XLogPageReadPrivate;
+
+
+/*
+ * Register shared memory for WAL Pipeline
+ */
+static void
+WalPipelineShmemRequest(void *arg)
+{
+ ShmemRequestStruct(.name = "WAL Pipeline Ctl",
+ .size = sizeof(WalPipelineShmCtl),
+ .ptr = (void **) &WalPipelineShm,
+ );
+}
+
+static void
+WalPipelineShmemInit(void *arg)
+{
+ memset(WalPipelineShm, 0, sizeof(WalPipelineShmCtl));
+
+ SpinLockInit(&WalPipelineShm->mutex);
+}
+
+
+/*
+ * Producer Function.
+ * Main loop for the producer background worker.
+ */
+void
+WalPipeline_ProducerMain(Datum main_arg)
+{
+ dsm_handle handle = DatumGetUInt32(main_arg);
+ dsm_segment *seg;
+ shm_toc *toc;
+ WalPipelineParams *params;
+ XLogReaderState *xlogreader;
+ XLogPageReadPrivate *private;
+ XLogRecord *record;
+ TimeLineID replayTLI = 0;
+ bool end_of_wal = false;
+ uint64 records_sent;
+ uint64 records_received;
+
+ /*
+ * Properly accept or ignore signals the postmaster might send us.
+ */
+ pqsignal(SIGHUP, PipelineBgwSigHupHandler); /* reload config file */
+
+ /* Register cleanup callback */
+ before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+ seg = dsm_attach(handle);
+ if (seg == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("[walpipeline] producer: could not map dynamic shared memory segment")));
+
+ toc = shm_toc_attach(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("[walpipeline] producer: bad magic number in dynamic shared memory segment")));
+
+ /* Lookup params and queue */
+ params = shm_toc_lookup(toc, 1, false);
+ producer_mq = shm_toc_lookup(toc, 2, false);
+
+ /* Set up producer side of queue */
+ producer_dsm_seg = seg;
+ shm_mq_set_sender(producer_mq, MyProc);
+ producer_mq_handle = shm_mq_attach(producer_mq, seg, NULL);
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_pid = MyProcPid;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ /* DSM is now attached, so safe to unblock the signals */
+ BackgroundWorkerUnblockSignals();
+
+ /* Set up WAL reading processor */
+ private = palloc0(sizeof(XLogPageReadPrivate));
+ xlogreader =
+ XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &XLogPageRead,
+ .segment_open = NULL,
+ .segment_close = wal_segment_close),
+ private);
+
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating a WAL reading processor.")));
+ xlogreader->system_identifier = GetSystemIdentifier();
+
+ /*
+ * Set the WAL decode buffer size. This limits how far ahead we can read
+ * in the WAL.
+ */
+ XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+ /* Init some important globals before starting */
+ replayTLI = params->ReplayTLI;
+ InitializePipelineRecoveryEnv(params);
+
+ /* Reinit the WAL prefetcher. */
+ xlogprefetcher_pipelined = XLogPrefetcherAllocate(xlogreader);
+
+
+ elog(LOG, "[walpipeline] producer: started at %X/%X, TLI %u",
+ LSN_FORMAT_ARGS(params->NextRecPtr), replayTLI);
+
+ XLogPrefetcherBeginRead(xlogprefetcher_pipelined, params->NextRecPtr);
+
+ /* Main decoding loop */
+ while (true)
+ {
+ bool shutdown_requested;
+
+ /* Check if consumer requested shutdown */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ shutdown_requested = WalPipelineShm->shutdown_requested;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (shutdown_requested)
+ {
+ elog(DEBUG1, "[walpipeline] producer: shutdown requested by consumer");
+ break;
+ }
+
+ /* Read next WAL record */
+ record = ReadRecord(xlogprefetcher_pipelined, LOG, false, replayTLI);
+
+ if (record == NULL)
+ {
+ end_of_wal = true;
+ elog(DEBUG1, "[walpipeline] producer: reached end of WAL");
+ break;
+ }
+
+ /*
+ * Successfully decoded a record. Send it to the consumer.
+ */
+ if (!WalPipeline_SendRecord(xlogreader))
+ {
+ elog(WARNING, "[walpipeline] producer: failed to send record, queue full or detached");
+ break;
+ }
+
+ /* Update our position for monitoring */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_lsn = xlogreader->EndRecPtr;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+
+ if (end_of_wal)
+ {
+ /* Notify consumer we need to exit early */
+ WalPipeline_SendShutdown();
+
+ /* wait until consumer set the flag */
+ WalPipeline_WaitForConsumerShutdownRequest();
+ }
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ records_sent = WalPipelineShm->records_sent;
+ records_received = WalPipelineShm->records_received;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT " received=" UINT64_FORMAT,
+ records_sent, records_received);
+
+ /* Cleanup */
+ XLogReaderFree(xlogreader);
+ DisownRecoveryWakeupLatch();
+ pfree(private);
+ cleanup_producer_resources();
+}
+
+/*
+ * Producer Function.
+ * Send a decoded WAL record to the consumer
+ */
+bool
+WalPipeline_SendRecord(XLogReaderState *record)
+{
+ char *buffer = NULL;
+ Size msglen;
+ shm_mq_result res;
+
+
+ if (!producer_mq_handle)
+ return false;
+
+ /* Serialize the record */
+ msglen = serialize_wal_record(record, &buffer);
+
+ res = shm_mq_send(producer_mq_handle, msglen, buffer, false, true);
+
+ if (res == SHM_MQ_SUCCESS)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->records_sent++;
+ WalPipelineShm->bytes_sent += msglen;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ pfree(buffer);
+ return true;
+ }
+
+ if (res == SHM_MQ_DETACHED)
+ {
+ elog(PANIC, "[walpipeline] producer: consumer detached");
+ pfree(buffer);
+ return false;
+ }
+
+ /* Some other error */
+ elog(PANIC, "[walpipeline] producer: shm_mq_send failed with result %d", res);
+ pfree(buffer);
+ return false;
+}
+
+/*
+ * Producer Function.
+ * Send shutdown message to consumer
+ */
+bool
+WalPipeline_SendShutdown(void)
+{
+ WalRecordMsgHeader hdr;
+ shm_mq_result res;
+
+ if (!producer_mq_handle)
+ return false;
+
+ hdr.msg_type = WAL_MSG_SHUTDOWN;
+ hdr.endRecPtr = InvalidXLogRecPtr;
+
+ res = shm_mq_send(producer_mq_handle, sizeof(hdr), &hdr, false, true);
+ return (res == SHM_MQ_SUCCESS);
+}
+
+/*
+ * Producer Function.
+ * Send error message to consumer
+ */
+bool
+WalPipeline_SendError(int errcode, const char *errmsg)
+{
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->error_code = errcode;
+ strlcpy(WalPipelineShm->error_message, errmsg,
+ sizeof(WalPipelineShm->error_message));
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ return true;
+}
+
+
+/*
+ * Producer Function.
+ * Producer may can exit without waiting for the consumer, but its better to
+ * wait until consumer request shutdown. This way log messages will show
+ * no of records_sent & records_received records equal to each other.
+ */
+static void
+WalPipeline_WaitForConsumerShutdownRequest(void)
+{
+ int iters = 0;
+
+ while (true)
+ {
+ bool shutdown_requested;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ shutdown_requested = WalPipelineShm->shutdown_requested;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (shutdown_requested)
+ break;
+
+ if (++iters >= MAX_SHUTDOWN_WAIT_ITERS)
+ {
+ elog(WARNING,
+ "[walpipeline] producer: timed out waiting for consumer "
+ "to acknowledge shutdown, exiting anyway");
+ break;
+ }
+
+ /* Allow SIGTERM / SIGHUP to interrupt the wait */
+ ProcessPipelineBgwInterrupts();
+
+ pg_usleep(10000); /* sleep 10ms */
+ }
+}
+
+
+
+/*
+ * serialize_wal_record (Producer)
+ *
+ * Pack a WalRecordMsgHeader followed by the DecodedXLogRecord into a
+ * contiguous buffer, converting interior pointers to relative offsets.
+ *
+ * Output buffer layout:
+ * [WalRecordMsgHeader][DecodedXLogRecord + trailing data]
+ */
+static Size
+serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
+{
+ DecodedXLogRecord *dec = xlogreader->record;
+ DecodedXLogRecord *dec_copy;
+ WalRecordMsgHeader hdr;
+ Size payload_size = dec->size;
+ Size total = sizeof(WalRecordMsgHeader) + payload_size;
+ char *buf = palloc(total);
+
+ /* build header */
+ hdr.msg_type = WAL_MSG_RECORD;
+ hdr.readRecPtr = xlogreader->ReadRecPtr;
+ hdr.endRecPtr = xlogreader->EndRecPtr;
+ hdr.missingContrecPtr = xlogreader->missingContrecPtr;
+ hdr.abortedRecPtr = xlogreader->abortedRecPtr;
+ hdr.overwrittenRecPtr = xlogreader->overwrittenRecPtr;
+ hdr.decoded_size = payload_size;
+
+ memcpy(buf, &hdr, sizeof(hdr));
+ memcpy(buf + sizeof(hdr), dec, payload_size);
+
+ /*
+ * Fix up the interior pointers: main_data and each block's data/bkp_image
+ * are absolute addresses in the producer. Convert them to byte offsets
+ * from the start of the copied DecodedXLogRecord so the consumer can
+ * reconstruct them.
+ */
+ dec_copy = (DecodedXLogRecord *)(buf + sizeof(hdr));
+
+ if (dec_copy->main_data_len > 0)
+ dec_copy->main_data = (char *)((char *)dec->main_data - (char *)dec);
+
+ for (int i = 0; i <= dec_copy->max_block_id; i++)
+ {
+ DecodedBkpBlock *blk = &dec_copy->blocks[i];
+ if (!blk->in_use)
+ continue;
+ if (blk->has_data)
+ blk->data = (char *)((char *)dec->blocks[i].data - (char *)dec);
+ if (blk->has_image)
+ blk->bkp_image = (char *)((char *)dec->blocks[i].bkp_image - (char *)dec);
+ }
+
+ *outbuf = buf;
+ return total;
+}
+
+
+/*
+ * Clean up producer-side resources
+ */
+static void
+cleanup_producer_resources(void)
+{
+ if (producer_mq_handle)
+ {
+ shm_mq_detach(producer_mq_handle);
+ producer_mq_handle = NULL;
+ }
+
+ if (producer_dsm_seg)
+ {
+ dsm_detach(producer_dsm_seg);
+ producer_dsm_seg = NULL;
+ }
+
+ producer_mq = NULL;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_pid = 0;
+ WalPipelineShm->producer_exited = true;
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+
+/*
+ * Cleanup callback for process exit
+ */
+static void
+wal_pipeline_cleanup_callback(int code, Datum arg)
+{
+ pid_t mypid = MyProcPid;
+ bool is_producer = false;
+
+ if (WalPipelineShm)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ is_producer = (WalPipelineShm->producer_pid == mypid);
+ SpinLockRelease(&WalPipelineShm->mutex);
+ }
+
+ if (is_producer)
+ cleanup_producer_resources();
+ else
+ cleanup_consumer_resources();
+}
+
+/* --------------------------------
+ * signal handler routines
+ * --------------------------------
+ */
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+PipelineBgwSigHupHandler(SIGNAL_ARGS)
+{
+ got_SIGHUP = true;
+ WakeupRecovery();
+}
+
+/*
+ * Re-read the config file.
+ *
+ * If one of the critical walreceiver options has changed, flag xlogrecovery.c
+ * to restart it.
+ */
+static void
+PipelineRereadConfig(void)
+{
+ char *conninfo = pstrdup(PrimaryConnInfo);
+ char *slotname = pstrdup(PrimarySlotName);
+ bool tempSlot = wal_receiver_create_temp_slot;
+ bool conninfoChanged;
+ bool slotnameChanged;
+ bool tempSlotChanged = false;
+
+ ProcessConfigFile(PGC_SIGHUP);
+
+ conninfoChanged = strcmp(conninfo, PrimaryConnInfo) != 0;
+ slotnameChanged = strcmp(slotname, PrimarySlotName) != 0;
+
+ /*
+ * wal_receiver_create_temp_slot is used only when we have no slot
+ * configured. We do not need to track this change if it has no effect.
+ */
+ if (!slotnameChanged && strcmp(PrimarySlotName, "") == 0)
+ tempSlotChanged = tempSlot != wal_receiver_create_temp_slot;
+ pfree(conninfo);
+ pfree(slotname);
+
+ if (conninfoChanged || slotnameChanged || tempSlotChanged)
+ StartupRequestWalReceiverRestart();
+}
+
+/*
+ * Process any requests or signals received recently.
+ */
+void
+ProcessPipelineBgwInterrupts(void)
+{
+
+ bool shutdown_requested;
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ PipelineRereadConfig();
+ }
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ shutdown_requested = WalPipelineShm->shutdown_requested;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (shutdown_requested)
+ proc_exit(0);
+
+ CHECK_FOR_INTERRUPTS();
+}
\ No newline at end of file
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index c236e2b7969..88c0b43ac76 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -35,6 +35,7 @@
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
+#include "access/xlogpipeline.h"
#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "access/xlogrecovery.h"
@@ -100,6 +101,8 @@ int recovery_min_apply_delay = 0;
char *PrimaryConnInfo = NULL;
char *PrimarySlotName = NULL;
bool wal_receiver_create_temp_slot = false;
+bool wal_pipeline_enabled = false;
+int wal_pipeline_mq_size_mb = 128;
/*
* recoveryTargetTimeLineGoal: what the user requested, if any
@@ -206,17 +209,6 @@ typedef struct XLogPageReadPrivate
/* flag to tell XLogPageRead that we have started replaying */
static bool InRedo = false;
-/*
- * Codes indicating where we got a WAL file from during recovery, or where
- * to attempt to get one.
- */
-typedef enum
-{
- XLOG_FROM_ANY = 0, /* request to read WAL from any source */
- XLOG_FROM_ARCHIVE, /* restored using restore_command */
- XLOG_FROM_PG_WAL, /* existing file in pg_wal */
- XLOG_FROM_STREAM, /* streamed from primary */
-} XLogSource;
/* human-readable names for XLogSources, for debugging output */
static const char *const xlogSourceNames[] = {"any", "archive", "pg_wal", "stream"};
@@ -4739,6 +4731,49 @@ RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue
}
+/*
+ * Pipeline bgw should be aware of all the parameters thats been initialized by
+ * the startup process before performing the actual recoevery. Other then this
+ * there are also some variables that keep on changing. The pipeline & the startup
+ * process should be aware of the state change of such variables, we can use shared
+ * memory for such variables.
+ */
+void
+InitializePipelineRecoveryEnv(WalPipelineParams *params)
+{
+ StandbyMode = params->StandbyMode;
+ StandbyModeRequested = params->StandbyModeRequested;
+ ArchiveRecoveryRequested = params->ArchiveRecoveryRequested;
+ InArchiveRecovery = params->InArchiveRecovery;
+ recoveryTargetTLI = params->recoveryTargetTLI;
+ minRecoveryPointTLI = params->minRecoveryPointTLI;
+ minRecoveryPoint = params->minRecoveryPoint;
+ currentSource = params->currentSource;
+ lastSourceFailed = params->lastSourceFailed;
+ pendingWalRcvRestart = params->pendingWalRcvRestart;
+ RedoStartTLI = params->RedoStartTLI;
+ RedoStartLSN = params->RedoStartLSN;
+ standbyState = params->standbyState;
+ CheckPointLoc = params->CheckPointLoc;
+ CheckPointTLI = params->CheckPointTLI;
+ flushedUpto = params->flushedUpto;
+ receiveTLI = params->receiveTLI;
+ abortedRecPtr = params->abortedRecPtr;
+ missingContrecPtr = params->missingContrecPtr;
+ InRedo = params->InRedo;
+ backupEndRequired = params->backupEndRequired;
+ backupStartPoint = params->backupStartPoint;
+ backupEndPoint = params->backupEndPoint;
+ curFileTLI = params->curFileTLI;
+ InRecovery = true;
+
+ /*
+ * As pipeline will be reading the wal, so better to own the latch to wait at.
+ */
+ if (ArchiveRecoveryRequested)
+ OwnLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
+}
+
/*
* GUC check_hook for primary_slot_name
*/
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 3914d22a514..b118bf11505 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/parallel.h"
+#include "access/xlogpipeline.h"
#include "commands/repack.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
@@ -166,6 +167,10 @@ static const struct
{
.fn_name = "DataChecksumsWorkerMain",
.fn_addr = DataChecksumsWorkerMain
+ },
+ {
+ .fn_name = "WalPipeline_ProducerMain",
+ .fn_addr = WalPipeline_ProducerMain
}
};
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index fcb6ab80583..b1ae94215f3 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3462,6 +3462,21 @@
boot_val => 'false',
},
+{ name => 'wal_pipeline', type => 'bool', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+ short_desc => 'Use parallel workers to speedup recovery.',
+ variable => 'wal_pipeline_enabled',
+ boot_val => 'false',
+},
+
+{ name => 'wal_pipeline_queue_size', type => 'int', context => 'PGC_POSTMASTER', group => 'WAL_RECOVERY',
+ short_desc => 'Size of the shared memory queue used by the WAL pipeline.',
+ flags => 'GUC_UNIT_MB',
+ variable => 'wal_pipeline_mq_size_mb',
+ boot_val => '128',
+ min => '1',
+ max => '1024',
+},
+
{ name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.',
variable => 'wal_receiver_create_temp_slot',
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e3e462f3efb..2b130cfbd6b 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -280,6 +280,8 @@
#recovery_prefetch = try # prefetch pages referenced in the WAL?
#wal_decode_buffer_size = 512kB # lookahead window used for prefetching
# (change requires restart)
+#wal_pipeline = off # decode in parallel
+#wal_pipeline_queue_size = 128MB
# - Archiving -
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 437b4f32349..9465e52cf2b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -40,6 +40,8 @@ extern PGDLLIMPORT int min_wal_size_mb;
extern PGDLLIMPORT int max_wal_size_mb;
extern PGDLLIMPORT int wal_keep_size_mb;
extern PGDLLIMPORT int max_slot_wal_keep_size_mb;
+extern PGDLLIMPORT int wal_pipeline_mq_size_mb;
+extern PGDLLIMPORT bool wal_pipeline_enabled;
extern PGDLLIMPORT int XLOGbuffers;
extern PGDLLIMPORT int XLogArchiveTimeout;
extern PGDLLIMPORT int wal_retrieve_retry_interval;
diff --git a/src/include/access/xlogpipeline.h b/src/include/access/xlogpipeline.h
new file mode 100644
index 00000000000..b4cde273e1c
--- /dev/null
+++ b/src/include/access/xlogpipeline.h
@@ -0,0 +1,158 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogpipeline.h
+ * WAL replay pipeline for parallel recovery
+ *
+ * This module implements a producer-consumer pipeline for WAL replay:
+ * - Producer: background worker that reads and decodes WAL records
+ * - Consumer: startup process: core redo loop
+ *
+ * The pipeline uses shared memory queues (shm_mq) to pass decoded WAL
+ * records from producer to consumer, enabling parallelism while
+ * maintaining sequential replay semantics.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * src/include/access/xlogpipeline.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAL_PIPELINE_H
+#define WAL_PIPELINE_H
+
+#include "access/xlogreader.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+
+/*
+ * Magic number for shared memory TOC
+ */
+#define PG_WAL_PIPELINE_MAGIC 0x57414C50 /* "WALP" */
+
+/*
+ * Message types sent through the pipeline
+ */
+typedef enum WalMsgType
+{
+ WAL_MSG_INVALID = 0,
+ WAL_MSG_RECORD, /* Decoded WAL record */
+ WAL_MSG_SHUTDOWN, /* Graceful shutdown request */
+ WAL_MSG_ERROR /* Error occurred in producer */
+} WalMsgType;
+
+/* Wire header for a serialized WAL message */
+typedef struct WalRecordMsgHeader
+{
+ WalMsgType msg_type; /* WAL_MSG_RECORD etc */
+ uint32 decoded_size; /* byte length of the payload that follows */
+ XLogRecPtr readRecPtr; /* XLogReaderState->ReadRecPtr */
+ XLogRecPtr endRecPtr; /* XLogReaderState->EndRecPtr */
+ XLogRecPtr missingContrecPtr; /* XLogReaderState->missingContrecPtr */
+ XLogRecPtr abortedRecPtr; /* XLogReaderState->abortedRecPtr */
+ XLogRecPtr overwrittenRecPtr; /* XLogReaderState->overwrittenRecPtr */
+} WalRecordMsgHeader;
+
+/*
+ * Parameters passed from StartupXLOG (consumer side)
+ * to the WAL pipeline producer background worker.
+ */
+typedef struct WalPipelineParams
+{
+ bool StandbyMode;
+ bool StandbyModeRequested;
+ bool ArchiveRecoveryRequested;
+ bool InArchiveRecovery;
+ bool InRedo;
+ bool lastSourceFailed;
+ bool pendingWalRcvRestart;
+ bool backupEndRequired;
+
+ TimeLineID RedoStartTLI;
+ TimeLineID CheckPointTLI;
+ TimeLineID recoveryTargetTLI;
+ TimeLineID minRecoveryPointTLI;
+ TimeLineID ReplayTLI;
+ TimeLineID receiveTLI;
+
+ XLogRecPtr backupStartPoint;
+ XLogRecPtr backupEndPoint;
+ XLogRecPtr CheckPointLoc;
+ XLogRecPtr RedoStartLSN;
+ XLogRecPtr NextRecPtr;
+ XLogRecPtr minRecoveryPoint;
+ XLogRecPtr flushedUpto;
+ XLogRecPtr abortedRecPtr;
+ XLogRecPtr missingContrecPtr;
+
+ int readFile;
+ XLogSegNo readSegNo;
+ uint32 readOff;
+ uint32 readLen;
+ XLogSource readSource;
+ TimeLineID curFileTLI;
+
+
+ HotStandbyState standbyState;
+ XLogSource currentSource;
+
+} WalPipelineParams;
+
+/*
+ * Shared memory control structure for the WAL pipeline
+ */
+typedef struct WalPipelineShmCtl
+{
+ /* Lifecycle management */
+ slock_t mutex;
+ bool initialized;
+ bool shutdown_requested;
+ bool producer_exited;
+
+ /* Producer state */
+ pid_t producer_pid;
+ ProcNumber producer_procnum;
+ XLogRecPtr producer_lsn; /* Last LSN read by producer */
+
+ /* Consumer state */
+ pid_t consumer_pid;
+ XLogRecPtr consumer_lsn; /* Last LSN recieved by consumer */
+ XLogRecPtr applied_lsn; /* Last LSN applied by consumer */
+
+ /* Queue handles */
+ dsm_handle dsm_seg_handle;
+ shm_mq_handle *producer_mq_handle;
+ shm_mq_handle *consumer_mq_handle;
+
+ /* Statistics */
+ uint64 records_sent;
+ uint64 records_received;
+ uint64 bytes_sent;
+ uint64 bytes_received;
+
+ /* Error state */
+ int error_code;
+ char error_message[256];
+} WalPipelineShmCtl;
+
+/* consumer may have to compute prefetecher stats */
+extern PGDLLIMPORT XLogPrefetcher *xlogprefetcher_pipelined;
+
+/*
+ * Public API functions
+ */
+
+/* Producer functions (called by background worker) */
+extern void WalPipeline_ProducerMain(Datum main_arg);
+extern bool WalPipeline_SendRecord(XLogReaderState *record);
+extern bool WalPipeline_SendShutdown(void);
+extern bool WalPipeline_SendError(int errcode, const char *errmsg);
+
+extern void ProcessPipelineBgwInterrupts(void);
+
+/* Global shared memory pointer */
+extern WalPipelineShmCtl *WalPipelineShm;
+
+#endif /* WAL_PIPELINE_H */
\ No newline at end of file
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index ba7750dca0b..8b943b8f395 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,6 +11,7 @@
#ifndef XLOGRECOVERY_H
#define XLOGRECOVERY_H
+#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "lib/stringinfo.h"
@@ -60,6 +61,17 @@ typedef enum RecoveryPauseState
RECOVERY_PAUSED, /* recovery is paused */
} RecoveryPauseState;
+/* Codes indicating where we got a WAL file from during recovery, or where
+ * to attempt to get one.
+ */
+typedef enum
+{
+ XLOG_FROM_ANY = 0, /* request to read WAL from any source */
+ XLOG_FROM_ARCHIVE, /* restored using restore_command */
+ XLOG_FROM_PG_WAL, /* existing file in pg_wal */
+ XLOG_FROM_STREAM, /* streamed from primary */
+} XLogSource;
+
/*
* Shared-memory state for WAL recovery.
*/
@@ -205,6 +217,8 @@ typedef struct
bool recovery_signal_file_found;
} EndOfWalRecoveryInfo;
+struct WalPipelineParams; /* forward declaration */
+
extern EndOfWalRecoveryInfo *FinishWalRecovery(void);
extern void ShutdownWalRecovery(void);
extern void RemovePromoteSignalFiles(void);
@@ -217,6 +231,10 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern TimestampTz GetLatestXTime(void);
extern TimestampTz GetCurrentChunkReplayStartTime(void);
extern XLogRecPtr GetCurrentReplayRecPtr(TimeLineID *replayEndTLI);
+extern XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
+ bool fetching_ckpt, TimeLineID replayTLI);
+extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetRecPtr, char *readBuf);
extern bool PromoteIsTriggered(void);
extern bool CheckPromoteSignal(void);
@@ -226,6 +244,7 @@ extern void StartupRequestWalReceiverRestart(void);
extern void XLogRequestWalReceiverReply(void);
extern void RecoveryRequiresIntParameter(const char *param_name, int currValue, int minValue);
+extern void InitializePipelineRecoveryEnv(struct WalPipelineParams *params);
extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
diff --git a/src/include/storage/subsystemlist.h b/src/include/storage/subsystemlist.h
index 9ad619080be..e9ff6de9a1a 100644
--- a/src/include/storage/subsystemlist.h
+++ b/src/include/storage/subsystemlist.h
@@ -42,6 +42,7 @@ PG_SHMEM_SUBSYSTEM(MultiXactShmemCallbacks)
PG_SHMEM_SUBSYSTEM(BufferManagerShmemCallbacks)
PG_SHMEM_SUBSYSTEM(StrategyCtlShmemCallbacks)
PG_SHMEM_SUBSYSTEM(BufTableShmemCallbacks)
+PG_SHMEM_SUBSYSTEM(WalPipelineShmemCallbacks)
/* lock manager */
PG_SHMEM_SUBSYSTEM(LockManagerShmemCallbacks)
diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build
index 36d789720a3..fc23acbaec2 100644
--- a/src/test/recovery/meson.build
+++ b/src/test/recovery/meson.build
@@ -61,6 +61,7 @@ tests += {
't/050_redo_segment_missing.pl',
't/051_effective_wal_level.pl',
't/052_checkpoint_segment_missing.pl',
+ 't/053_walpipeline.pl',
],
},
}
diff --git a/src/test/recovery/t/053_walpipeline.pl b/src/test/recovery/t/053_walpipeline.pl
new file mode 100644
index 00000000000..2fb790aadc5
--- /dev/null
+++ b/src/test/recovery/t/053_walpipeline.pl
@@ -0,0 +1,208 @@
+# Copyright (c) 2025-2026, PostgreSQL Global Development Group
+#
+# Tests for the WAL pipeline feature (wal_pipeline GUC).
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# ----------
+# Helpers
+# ----------
+
+sub slurp_log
+{
+ my ($node) = @_;
+ open(my $fh, '<', $node->logfile()) or die "Cannot open log: $!";
+ my @lines = <$fh>;
+ close($fh);
+ return @lines;
+}
+
+sub log_matches
+{
+ my ($node, $re) = @_;
+ return grep { /$re/ } slurp_log($node);
+}
+
+
+# ########################################
+# wal_pipeline = on, basic recovery
+# ########################################
+
+my $node1 = PostgreSQL::Test::Cluster->new('p1-recovery');
+$node1->init;
+$node1->start;
+
+$node1->safe_psql('postgres', q{
+ CREATE TABLE t (id serial PRIMARY KEY, v text);
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# generate more WAL
+$node1->safe_psql('postgres', q{
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# crash stop to force WAL recovery
+$node1->stop('immediate');
+
+# restart → recovery happens
+$node1->append_conf('postgresql.conf', "wal_pipeline = on");
+$node1->start;
+
+
+# Producer started
+ok(scalar log_matches($node1, qr/\[walpipeline\] producer: started at/),
+ 'producer started message found in log');
+
+# Pipeline stopped cleanly
+ok(scalar log_matches($node1, qr/\[walpipeline\] stopped/),
+ 'pipeline stopped message found in log');
+
+# Consumer received shutdown from producer
+ok(scalar log_matches($node1, qr/\[walpipeline\] consumer: received shutdown message/),
+ 'consumer received shutdown message from producer');
+
+# sent == received
+my @exit_lines = log_matches($node1,
+ qr/\[walpipeline\] producer: exiting: sent=\d+ received=\d+/);
+ok(scalar @exit_lines >= 1, 'producer exiting line found in log');
+
+my ($sent, $recv) = $exit_lines[-1] =~ /sent=(\d+) received=(\d+)/;
+ok(defined $sent && $sent > 0, "sent count ($sent) is positive");
+ok(defined $recv && $recv > 0, "received count ($recv) is positive");
+is($sent, $recv, "no records lost in pipeline queue: sent=$sent received=$recv");
+
+# No PANIC
+ok(!(scalar log_matches($node1, qr/\bPANIC\b/)),
+ 'no PANIC messages during pipeline recovery');
+
+# Data integrity
+my $count = $node1->safe_psql('postgres', 'SELECT count(*) FROM t');
+is($count + 0, 100_000, 'all 100000 rows visible after pipeline recovery');
+
+$node1->stop;
+
+# ##############################################################
+# wal_pipeline = off (baseline, no pipeline log messages)
+# ##############################################################
+
+my $node2 = PostgreSQL::Test::Cluster->new('p0-recovery');
+$node2->init;
+$node2->start;
+
+$node2->safe_psql('postgres', q{
+ CREATE TABLE t (id serial PRIMARY KEY, v text);
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# generate more WAL
+$node2->safe_psql('postgres', q{
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1,50000) i;
+});
+
+# crash stop to force WAL recovery
+$node2->stop('immediate');
+
+# restart → recovery happens
+$node2->append_conf('postgresql.conf', "wal_pipeline = off");
+$node2->start;
+
+ok(!(scalar log_matches($node2, qr/\[walpipeline\] producer: started/)),
+ 'no pipeline log messages when wal_pipeline = off');
+
+my $count2 = $node2->safe_psql('postgres', 'SELECT count(*) FROM t');
+is($count2 + 0, 100_000, 'all rows present after non-pipeline recovery');
+
+$node2->stop;
+
+
+
+# ###################################################################
+# pipeline on vs off produce identical data (checksum comparison)
+# ###################################################################
+
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1);
+$primary->start;
+
+$primary->safe_psql('postgres', q{
+ CREATE TABLE t (id serial PRIMARY KEY, v text);
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1, 30000) i;
+});
+
+$primary->backup('backup3');
+
+$primary->safe_psql('postgres', q{
+ INSERT INTO t (v)
+ SELECT md5(i::text) FROM generate_series(1, 30000) i;
+ UPDATE t SET v = 'x' WHERE id % 10 = 0;
+});
+
+# ensure WAL boundary
+$primary->safe_psql('postgres', 'SELECT pg_switch_wal()');
+my $target_lsn = $primary->safe_psql('postgres', 'SELECT pg_current_wal_lsn()');
+
+my $replica_on = PostgreSQL::Test::Cluster->new('replica_p1');
+$replica_on->init_from_backup($primary, 'backup3',
+ has_streaming => 1);
+$replica_on->append_conf('postgresql.conf', "wal_pipeline = on\n");
+$replica_on->start;
+
+my $replica_off = PostgreSQL::Test::Cluster->new('replica_p0');
+$replica_off->init_from_backup($primary, 'backup3',
+ has_streaming => 1);
+$replica_off->append_conf('postgresql.conf', "wal_pipeline = off\n");
+$replica_off->start;
+
+# wait for replicas to catch up
+$primary->wait_for_catchup($replica_on);
+$primary->wait_for_catchup($replica_off);
+
+my $md5_on = $replica_on->safe_psql('postgres',
+ "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t");
+
+my $md5_off = $replica_off->safe_psql('postgres',
+ "SELECT md5(string_agg(id::text||v, ',' ORDER BY id)) FROM t");
+
+is($md5_on, $md5_off,
+ 'table checksum identical between pipeline=on and pipeline=off');
+
+$replica_on->stop;
+$replica_off->stop;
+$primary->stop('fast');
+
+
+
+# #################################
+# pipeline when on tiny replay
+# #################################
+
+my $node3 = PostgreSQL::Test::Cluster->new('p1-small-replay');
+$node3->init;
+$node3->start;
+
+# crash stop to force WAL recovery
+$node3->stop('immediate');
+
+# restart → recovery happens
+$node3->append_conf('postgresql.conf', "wal_pipeline = on");
+$node3->start;
+
+ok(scalar log_matches($node3, qr/\[walpipeline\] producer: exiting: sent=0 received=0/),
+ 'pipeline producer sent zero records');
+
+ok((scalar log_matches($node3, qr/redo done at/)),
+ 'pipeline redo done even with tiny replay');
+
+$node3->stop;
+
+done_testing();
\ No newline at end of file
--
2.49.0
[application/pdf] recoveries-becnhmark-v03-pdf.pdf (48.6K, 4-recoveries-becnhmark-v03-pdf.pdf)
download
^ permalink raw reply [nested|flat] 9+ messages in thread
* Re: [WIP] Pipelined Recovery
2026-02-17 11:21 Re: [WIP] Pipelined Recovery Zsolt Parragi <[email protected]>
2026-03-18 07:43 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-03-18 10:18 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-03-19 02:33 ` Re: [WIP] Pipelined Recovery Xuneng Zhou <[email protected]>
2026-04-03 06:58 ` Re: [WIP] Pipelined Recovery Henson Choi <[email protected]>
2026-04-08 08:46 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
2026-04-08 11:14 ` Re: [WIP] Pipelined Recovery Imran Zaheer <[email protected]>
@ 2026-04-22 09:43 ` Xuneng Zhou <[email protected]>
0 siblings, 0 replies; 9+ messages in thread
From: Xuneng Zhou @ 2026-04-22 09:43 UTC (permalink / raw)
To: Imran Zaheer <[email protected]>; [email protected]; +Cc: Zsolt Parragi <[email protected]>; Jakub Wartak <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; pgsql-hackers
Hi Henson, Imran,
On Wed, Apr 8, 2026 at 7:14 PM Imran Zaheer <[email protected]> wrote:
>
> Hi
>
> I am uploading the new version with the following fixes
>
> * Rebased version.
> * Skip serialization of decoded records. As pointed out by Henson,
> there was no need to serialize the records again
> for the sh_mq. We can simply pass the continuous bytes with minor
> pointer fixing to the sh_mq
>
> This time I am uploading the benchmarking results to drive and
> attaching the link here. Otherwise my mail will get holded for
> moderation (My guess is overall attachment size is greater than 1MB thats why).
>
> I am still not sure whether my testing approach is good enough.
> Because sometimes I am not able to get the same performance
> improvement
> with the pgbench builtin scripts as I got with the custom sql scripts.
> Maybe pgbench is not creating enough WAL to test on
> or maybe I am missing something.
>
> Benchmarks: https://drive.google.com/file/d/1Y4SYVnrFEQRE5T2r87rrTr7SWC9m19Si/view?usp=sharing
>
> Thanks & Regards
> Imran Zaheer
>
> Imran Zaheer
>
> On Wed, Apr 8, 2026 at 1:46 PM Imran Zaheer <[email protected]> wrote:
> >
> > >
> > > Hi Xuneng, Imran, and everyone,
> > >
> >
> > Hi Henson and Xuneng.
> >
> > Thanks for explaining the approaches to Xuneng.
> >
> > >
> > > The two approaches target different bottlenecks. The current patch
> > > parallelizes WAL decoding, which keeps the redo path single-threaded
> > > and avoids the Hot Standby visibility problem entirely.
> > >
> >
> > You are right both approaches
> > target different bottlenecks. Pipeline patch aims to improve overall
> > cpu throughput
> > and to save CPU time by offloading the steps we can safely do in parallel with
> > out causing synchronization problems.
> >
> > > One thing I am curious about in the current patch: WAL records are
> > > already in a serialized format on disk. The producer decodes them and
> > > then re-serializes into a different custom format for shm_mq. What is
> > > the advantage of this second serialization format over simply passing
> > > the raw WAL bytes after CRC validation and letting the consumer decode
> > > directly? Offloading CRC to a separate core could still improve
> > > throughput at the cost of higher total CPU usage, without needing the
> > > custom format.
> > >
> >
> > Thanks. You are right there was no need to serialize the decoded record again.
> > I was not aware that we already have continuous bytes in memory. In my
> > next patch
> > I will remove this extra serialization step.
> >
> > > Koichi's approach parallelizes redo (buffer I/O) itself, which attacks
> > > a larger cost — Jakub's flamegraphs show BufferAlloc ->
> > > GetVictimBuffer -> FlushBuffer dominating in both p0 and p1 — but at
> > > the expense of much harder concurrency problems.
> > >
> > > Whether the decode pipelining ceiling is high enough, or whether the
> > > redo parallelization complexity is tractable, seems like the central
> > > design question for this area.
> >
> > I still have to investigate the problem related to `GetVictimBuffer` that
> > Jakub mentioned. But I was trying that how can we safely offload the work done
> > by `XLogReadBufferForRedoExtended` to a separate
> > pipeline worker, or maybe we can try prefetching the buffer header so
> > the main redo
> > loop doesn't have to spend time getting the buffer
Thanks for your clarification! I'll try to review this patch later.
--
Best,
Xuneng
^ permalink raw reply [nested|flat] 9+ messages in thread
* RE: [WIP] Pipelined Recovery
2026-02-17 11:21 Re: [WIP] Pipelined Recovery Zsolt Parragi <[email protected]>
@ 2026-04-03 03:18 ` Hayato Kuroda (Fujitsu) <[email protected]>
1 sibling, 0 replies; 9+ messages in thread
From: Hayato Kuroda (Fujitsu) @ 2026-04-03 03:18 UTC (permalink / raw)
To: 'Imran Zaheer' <[email protected]>; Zsolt Parragi <[email protected]>; +Cc: pgsql-hackers; Jakub Wartak <[email protected]>
Dear Imran,
v2 could not be applied anymore. And even when I solve the conflict it has
complier warnings. Can you fix them?
Best regards,
Hayato Kuroda
FUJITSU LIMITED
^ permalink raw reply [nested|flat] 9+ messages in thread
end of thread, other threads:[~2026-04-22 09:43 UTC | newest]
Thread overview: 9+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2026-02-17 11:21 Re: [WIP] Pipelined Recovery Zsolt Parragi <[email protected]>
2026-03-18 07:43 ` Imran Zaheer <[email protected]>
2026-03-18 10:18 ` Imran Zaheer <[email protected]>
2026-03-19 02:33 ` Xuneng Zhou <[email protected]>
2026-04-03 06:58 ` Henson Choi <[email protected]>
2026-04-08 08:46 ` Imran Zaheer <[email protected]>
2026-04-08 11:14 ` Imran Zaheer <[email protected]>
2026-04-22 09:43 ` Xuneng Zhou <[email protected]>
2026-04-03 03:18 ` Hayato Kuroda (Fujitsu) <[email protected]>
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox