public inbox for [email protected]
help / color / mirror / Atom feedFrom: Imran Zaheer <[email protected]>
To: pgsql-hackers <[email protected]>
Subject: [WIP] Pipelined Recovery
Date: Fri, 30 Jan 2026 11:28:47 +0500
Message-ID: <CA+UBfa=vDV8wbmAV0pgrx-FuJh+x8YOW23vJ90Jzr=14rV+9jA@mail.gmail.com> (raw)
Hi,
Based on a suggestion by my colleague Ants Aasma, I worked on this
idea of adding parallelism to the WAL recovery process.
The crux of this idea is to decode the WAL using parallel workers. Now
the replay process can get the records from the shared memory queue
directly. This way, we can decrease some CPU load on the recovery process.
Implementing this idea yielded an improvement of around 20% in the
recovery times, but results may differ based on workloads. I have
attached some benchmarks for different workloads.
Following are some recovery tests with the default configs. Here p1
shows pipeline enabled. (db size) is the backup database size on
which the recovery happens. You can see more detail related to the
benchmarks in the attached file `recoveries-benchmark-v01`.
elapsed (p0) elapsed (p1) % perf db
size
inserts.sql 272s 10ms 197s 570ms 27.37% 480 MB
updates.sql 177s 420ms 117s 80ms 34.01% 480 MB
hot-updates.sql 36s 940ms 29s 240ms 20.84% 480 MB
nonhot.sql 36s 570ms 28s 980ms 20.75% 480 MB
simple-update 20s 160ms 11s 580ms 42.56% 4913 MB
tpcb-like 20s 590ms 13s 640ms 33.75% 4913 MB
Similar approach was also suggested by Matthias van de Meent earlier in a
separate thread [1]. Right now I am using one bgw for decoding and filling
up the shared message queue, and the redo apply loop simply receives the
decoded record
from the queue. After the redo is finished, the consumer (startup
process) can request a shutdown from the producer (pipeline bgw)
before exiting recovery.
This idea can be coupled with another idea of pinning the buffers in
parallel before the recovery process needs them. This will try to
parallelize most of the work being done in
`XLogReadBufferForRedoExtended`. The Redo can simply receive
the already pinned buffers from a queue, but for implementing
this, we still need some R&D on that, as IPC and pinning/unpinning of
buffers across two processes can be tricky.
If someone wants to reproduce the benchmark, they can do so using
these scripts [2].
Looking forward to your reviews, comments, etc.
[1]:
https://www.postgresql.org/message-id/CAEze2Wh6C_QfxLii%2B%2BeZue5%3DKvbVXKkHyZW8PLmtLgyjmFzwCQ%40ma...
[2]: https://github.com/imranzaheer612/pg-recovery-testing
--
Regards,
Imran Zaheer
CYBERTEC PostgreSQL International GmbH
Attachments:
[application/octet-stream] v1-0001-Pipelined-Recoveries.patch (46.7K, 3-v1-0001-Pipelined-Recoveries.patch)
download | inline diff:
From d8f11b8382310eac70259e29d996e13435c40902 Mon Sep 17 00:00:00 2001
From: Imran Zaheer <[email protected]>
Date: Thu, 29 Jan 2026 20:07:43 +0500
Subject: [PATCH v1] Pipelined Recoveries
Implement a producer-consumer architecture for WAL replay that separates
WAL decoding from recovery process, enabling parallel processing between
differemt steps of replay.
A background worker reads and decodes WAL records while the startup
process applies them. IPC happens via shared memory message queues (shm_mq),
allowing the decoder to run ahead of the apply process.
This provides some improvement in recovery performance for CPU-bound workloads.
New GUC: wal_pipeline (default: off)
Author: Imran Zaheer <[email protected]>
Idea by: Ants Aasma <[email protected]>
---
src/backend/access/transam/Makefile | 1 +
src/backend/access/transam/wal_pipeline.c | 1004 +++++++++++++++++++
src/backend/access/transam/xlogprefetcher.c | 2 +-
src/backend/access/transam/xlogrecovery.c | 143 ++-
src/backend/postmaster/bgworker.c | 4 +
src/backend/storage/ipc/ipci.c | 5 +
src/backend/utils/misc/guc_parameters.dat | 6 +
src/include/access/wal_pipeline.h | 184 ++++
src/include/access/xlogrecovery.h | 12 +
src/include/utils/guc.h | 2 +
10 files changed, 1347 insertions(+), 16 deletions(-)
create mode 100644 src/backend/access/transam/wal_pipeline.c
create mode 100644 src/include/access/wal_pipeline.h
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index a32f473e0a2..2cd0425f7a1 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -25,6 +25,7 @@ OBJS = \
transam.o \
twophase.o \
twophase_rmgr.o \
+ wal_pipeline.o \
varsup.o \
xact.o \
xlog.o \
diff --git a/src/backend/access/transam/wal_pipeline.c b/src/backend/access/transam/wal_pipeline.c
new file mode 100644
index 00000000000..a27d9f2083f
--- /dev/null
+++ b/src/backend/access/transam/wal_pipeline.c
@@ -0,0 +1,1004 @@
+/*-------------------------------------------------------------------------
+ *
+ * wal_pipeline.c
+ * WAL replay pipeline implementation
+ *
+ * This module implements a producer-consumer pipeline for WAL replay.
+ * The producer (background worker) reads and decodes WAL records in parallel
+ * with the consumer (startup process) that applies them.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/wal_pipeline.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <fcntl.h>
+#include <unistd.h>
+
+#include "access/heapam_xlog.h"
+#include "access/rmgr.h"
+#include "access/wal_pipeline.h"
+#include "access/xlog.h"
+#include "access/xlogprefetcher.h"
+#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
+#include "access/xlogrecovery.h"
+#include "access/xlogutils.h"
+#include "access/xlog_internal.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "postmaster/bgworker.h"
+#include "storage/bufmgr.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/md.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "storage/smgr.h"
+#include "tcop/tcopprot.h"
+#include "utils/elog.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+#include "utils/rel.h"
+
+
+/* Global shared memory control structure */
+WalPipelineShmCtl *WalPipelineShm = NULL;
+
+/* Local state for producer */
+static dsm_segment *producer_dsm_seg = NULL;
+static shm_mq *producer_mq = NULL;
+static shm_mq_handle *producer_mq_handle = NULL;
+
+/* Local state for consumer */
+static dsm_segment *consumer_dsm_seg = NULL;
+static shm_mq *consumer_mq = NULL;
+static shm_mq_handle *consumer_mq_handle = NULL;
+
+/* Forward declarations */
+static void wal_pipeline_cleanup_callback(int code, Datum arg);
+static Size serialize_wal_record(XLogReaderState *record, char **buffer);
+static DecodedXLogRecord *deserialize_wal_record(const char *buffer, Size len);
+static void cleanup_producer_resources(void);
+static void cleanup_consumer_resources(void);
+
+/* copeied from xlogrecovery.c */
+/* Parameters passed down from ReadRecord to the XLogPageRead callback. */
+typedef struct XLogPageReadPrivate
+{
+ int emode;
+ bool fetching_ckpt; /* are we fetching a checkpoint record? */
+ bool randAccess;
+ TimeLineID replayTLI;
+} XLogPageReadPrivate;
+
+/*
+ * WalPipelineShmemSize
+ * Compute space needed for WAL pipeline shared memory
+ */
+Size
+WalPipelineShmemSize(void)
+{
+ Size size = 0;
+
+ size = add_size(size, sizeof(WalPipelineShmCtl));
+
+ return size;
+}
+
+/*
+ * WalPipelineShmemInit
+ * Initialize WAL pipeline shared memory structures
+ */
+void
+WalPipelineShmemInit(void)
+{
+ bool found;
+
+ WalPipelineShm = (WalPipelineShmCtl *)
+ ShmemInitStruct("WAL Pipeline Control",
+ sizeof(WalPipelineShmCtl),
+ &found);
+
+ if (!found)
+ {
+ /* First time through, initialize */
+ SpinLockInit(&WalPipelineShm->mutex);
+ WalPipelineShm->initialized = false;
+ WalPipelineShm->shutdown_requested = false;
+ WalPipelineShm->producer_exited = false;
+ WalPipelineShm->producer_pid = 0;
+ WalPipelineShm->consumer_pid = 0;
+ WalPipelineShm->producer_lsn = InvalidXLogRecPtr;
+ WalPipelineShm->consumer_lsn = InvalidXLogRecPtr;
+ WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID;
+ WalPipelineShm->producer_mq_handle = NULL;
+ WalPipelineShm->consumer_mq_handle = NULL;
+ WalPipelineShm->records_sent = 0;
+ WalPipelineShm->records_received = 0;
+ WalPipelineShm->bytes_sent = 0;
+ WalPipelineShm->bytes_received = 0;
+ WalPipelineShm->error_code = 0;
+ WalPipelineShm->error_message[0] = '\0';
+ }
+}
+
+
+/*
+ * WalPipeline_Start
+ * Initialize and start the WAL pipeline
+ */
+void
+WalPipeline_Start(WalPipelineParams *params)
+{
+ BackgroundWorker worker;
+ BackgroundWorkerHandle *handle;
+ dsm_segment *seg;
+ shm_toc_estimator e;
+ shm_toc *toc;
+ Size segsize;
+ shm_mq *mq;
+ WalPipelineParams *shared_params;
+ pid_t pid;
+ BgwHandleStatus status;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+
+ if (WalPipelineShm->initialized)
+ {
+ SpinLockRelease(&WalPipelineShm->mutex);
+ return; /* Already started */
+ }
+
+ shm_toc_initialize_estimator(&e);
+ shm_toc_estimate_chunk(&e, sizeof(WalPipelineParams));
+ shm_toc_estimate_chunk(&e, WAL_PIPELINE_QUEUE_SIZE);
+ shm_toc_estimate_keys(&e, 2); /* key=1 → params, key=2 → mq */
+ segsize = shm_toc_estimate(&e);
+
+ seg = dsm_create(segsize, 0);
+ dsm_pin_segment(seg);
+
+ toc = shm_toc_create(PG_WAL_PIPELINE_MAGIC,
+ dsm_segment_address(seg), segsize);
+
+ /*
+ * These are some global variables from xlogrecovery.c that our
+ * pipeline should be aware of, so passing it through the shmem.
+ *
+ * WalPipelineParams is shared-memory safe and contains
+ * only plain data (no pointers).
+ */
+ shared_params = shm_toc_allocate(toc, sizeof(WalPipelineParams));
+ shm_toc_insert(toc, 1, shared_params);
+ *shared_params = *params;
+
+ /* create the message queue */
+ mq = shm_mq_create(shm_toc_allocate(toc, WAL_PIPELINE_QUEUE_SIZE),
+ WAL_PIPELINE_QUEUE_SIZE);
+ shm_toc_insert(toc, 2, mq);
+
+ /* update shared state */
+ WalPipelineShm->dsm_seg_handle = dsm_segment_handle(seg);
+ WalPipelineShm->consumer_pid = MyProcPid;
+ WalPipelineShm->initialized = true;
+
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ /* Set up consumer side of the queue */
+ consumer_dsm_seg = seg;
+ consumer_mq = mq;
+ shm_mq_set_receiver(consumer_mq, MyProc);
+
+ /* Register cleanup callback */
+ before_shmem_exit(wal_pipeline_cleanup_callback, (Datum) 0);
+
+ /* Register background worker */
+ memset(&worker, 0, sizeof(worker));
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
+ worker.bgw_start_time = BgWorkerStart_PostmasterStart;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ sprintf(worker.bgw_library_name, "postgres");
+ sprintf(worker.bgw_function_name, "WalPipeline_ProducerMain");
+ snprintf(worker.bgw_name, BGW_MAXLEN, "wal pipeline producer");
+ snprintf(worker.bgw_type, BGW_MAXLEN, "wal pipeline producer");
+ worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg));
+ worker.bgw_notify_pid = MyProcPid;
+
+ if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not register background worker for WAL pipeline")));
+
+ status = WaitForBackgroundWorkerStartup(handle, &pid);
+
+ if (status != BGWH_STARTED)
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
+ errmsg("could not start background process"),
+ errhint("More details may be available in the server log.")));
+ else
+ ereport(LOG, (errmsg("[walpipeline] started.")));
+
+ consumer_mq_handle = shm_mq_attach(consumer_mq, seg, NULL);
+}
+
+/*
+ * WalPipeline_Stop
+ * Stop the WAL pipeline
+ */
+void
+WalPipeline_Stop(void)
+{
+ if (!WalPipelineShm || !WalPipelineShm->initialized)
+ return;
+
+ WalPipeline_RequestShutdown();
+
+ /* Wait for producer to exit (with timeout) */
+ for (int i = 0; i < 100; i++) /* 10 second timeout */
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ if (WalPipelineShm->producer_pid == 0)
+ {
+ SpinLockRelease(&WalPipelineShm->mutex);
+ break;
+ }
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ pg_usleep(100000); /* 100ms */
+ }
+
+ cleanup_consumer_resources();
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->initialized = false;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ elog(LOG, "WAL pipeline stopped");
+}
+
+/*
+ * WalPipeline_RequestShutdown
+ * Request graceful shutdown of the pipeline
+ */
+void
+WalPipeline_RequestShutdown(void)
+{
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->shutdown_requested = true;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ /* Send shutdown message if queue is available */
+ if (consumer_mq_handle)
+ WalPipeline_SendShutdown();
+}
+
+/*
+ * WalPipeline_ProducerMain
+ * Main loop for the producer background worker
+ */
+void
+WalPipeline_ProducerMain(Datum main_arg)
+{
+ dsm_handle handle = DatumGetUInt32(main_arg);
+ dsm_segment *seg;
+ shm_toc *toc;
+ WalPipelineParams *params;
+ XLogReaderState *xlogreader;
+ XLogPrefetcher *xlogprefetcher;
+ XLogPageReadPrivate *private;
+ XLogRecord *record;
+ TimeLineID replayTLI = 0;
+
+ pqsignal(SIGTERM, die);
+ BackgroundWorkerUnblockSignals();
+
+ seg = dsm_attach(handle);
+ if (seg == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("[walpipeline] producer: could not map dynamic shared memory segment")));
+
+ toc = shm_toc_attach(PG_WAL_PIPELINE_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("[walpipeline] producer: bad magic number in dynamic shared memory segment")));
+
+ /* Lookup params and queue */
+ params = shm_toc_lookup(toc, 1, false);
+ producer_mq = shm_toc_lookup(toc, 2, false);
+
+ /* Set up producer side of queue */
+ producer_dsm_seg = seg;
+ shm_mq_set_sender(producer_mq, MyProc);
+ producer_mq_handle = shm_mq_attach(producer_mq, seg, NULL);
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_pid = MyProcPid;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ /* Set up WAL reading processor */
+ private = palloc0(sizeof(XLogPageReadPrivate));
+ xlogreader =
+ XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &XLogPageRead,
+ .segment_open = NULL,
+ .segment_close = wal_segment_close),
+ private);
+
+ if (!xlogreader)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory"),
+ errdetail("Failed while allocating a WAL reading processor.")));
+ xlogreader->system_identifier = GetSystemIdentifier();
+
+ /*
+ * Set the WAL decode buffer size. This limits how far ahead we can read
+ * in the WAL.
+ */
+ XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+ /* Create a WAL prefetcher. */
+ xlogprefetcher = XLogPrefetcherAllocate(xlogreader);
+
+ /* Init some important globals before starting */
+ StandbyModeRequested = params->StandbyModeRequested;
+ ArchiveRecoveryRequested = params->ArchiveRecoveryRequested;
+ InArchiveRecovery = params->InArchiveRecovery;
+ replayTLI = params->ReplayTLI;
+ recoveryTargetTLI = params->recoveryTargetTLI;
+ minRecoveryPointTLI = params->minRecoveryPointTLI;
+ minRecoveryPoint = params->minRecoveryPoint;
+ InRedo = params->InRedo;
+ InRecovery = true;
+
+ elog(DEBUG1, "[walpipeline] producer: started at %X/%X, TLI %u buffer",
+ LSN_FORMAT_ARGS(params->NextRecPtr), replayTLI);
+
+ XLogPrefetcherBeginRead(xlogprefetcher, params->NextRecPtr);
+
+ /* Main decoding loop */
+ while (true)
+ {
+ record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
+
+ if (record == NULL)
+ {
+ /*
+ * No error message means we've caught up to the end of available WAL.
+ * Check if there's a valid EndRecPtr - if not, we're truly at the end.
+ */
+ if (XLogRecPtrIsInvalid(xlogreader->EndRecPtr))
+ {
+ /*
+ * We've reached the end of available WAL.
+ * In crash recovery, this means we're done.
+ * In streaming replication, wait for more WAL.
+ */
+ if (!RecoveryInProgress())
+ {
+ elog(LOG, "[walpipeline] producer: recovery completed");
+ break;
+ }
+
+ /* Wait for more WAL to arrive */
+ pg_usleep(10000); /* 10ms */
+ continue;
+ }
+ else
+ {
+ /*
+ * Cannot read more records, shut it down.
+ */
+ WalPipeline_SendShutdown();
+ break;
+ }
+ }
+
+ /*
+ * Successfully read and decoded a record.
+ * Send it to the consumer.
+ */
+ if (!WalPipeline_SendRecord(xlogreader))
+ {
+ elog(WARNING, "[walpipeline] producer: failed to send record, queue full or detached");
+ break;
+ }
+
+ /* Update our position for monitoring */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_lsn = xlogreader->EndRecPtr;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ CHECK_FOR_INTERRUPTS();
+ }
+
+ /* Wait until consumer finished processing records and sent a shutdown request */
+ while (true)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ if (WalPipelineShm->shutdown_requested)
+ {
+ SpinLockRelease(&WalPipelineShm->mutex);
+ break;
+ }
+ SpinLockRelease(&WalPipelineShm->mutex);
+ pg_usleep(10000); /* sleep 10ms */
+ }
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT " received=" UINT64_FORMAT,
+ WalPipelineShm->records_sent, WalPipelineShm->records_received);
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ /* Cleanup */
+ XLogReaderFree(xlogreader);
+ pfree(private);
+ cleanup_producer_resources();
+}
+
+/*
+ * WalPipeline_SendRecord
+ * Send a decoded WAL record to the consumer
+ */
+bool
+WalPipeline_SendRecord(XLogReaderState *record)
+{
+ char *buffer = NULL;
+ Size msglen;
+ shm_mq_result res;
+
+
+ if (!producer_mq_handle)
+ return false;
+
+ /* Serialize the record */
+ msglen = serialize_wal_record(record, &buffer);
+
+ if (msglen > WAL_PIPELINE_MAX_MSG_SIZE)
+ {
+ elog(WARNING, "[walpipeline] producer: wal record at %X/%X too large (%zu bytes), skipping",
+ LSN_FORMAT_ARGS(record->ReadRecPtr), msglen);
+ pfree(buffer);
+ return true;
+ }
+
+ res = shm_mq_send(producer_mq_handle, msglen, buffer, false, true);
+
+ if (res == SHM_MQ_SUCCESS)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->records_sent++;
+ WalPipelineShm->bytes_sent += msglen;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ pfree(buffer);
+ return true;
+ }
+
+ if (res == SHM_MQ_DETACHED)
+ {
+ elog(WARNING, "[walpipeline] producer: consumer detached");
+ pfree(buffer);
+ return false;
+ }
+
+ /* Some other error */
+ elog(WARNING, "[walpipeline] producer: shm_mq_send failed with result %d", res);
+ pfree(buffer);
+ return false;
+}
+
+/*
+ * WalPipeline_SendShutdown
+ * Send shutdown message to consumer
+ */
+bool
+WalPipeline_SendShutdown(void)
+{
+ WalMsgHeader hdr;
+ shm_mq_result res;
+
+ if (!producer_mq_handle)
+ return false;
+
+ hdr.msg_type = WAL_MSG_SHUTDOWN;
+ hdr.msg_len = sizeof(WalMsgHeader);
+ hdr.lsn = InvalidXLogRecPtr;
+
+ res = shm_mq_send(producer_mq_handle, sizeof(hdr), &hdr, false, true);
+ return (res == SHM_MQ_SUCCESS);
+}
+
+/*
+ * WalPipeline_SendError
+ * Send error message to consumer
+ */
+bool
+WalPipeline_SendError(int errcode, const char *errmsg)
+{
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->error_code = errcode;
+ strlcpy(WalPipelineShm->error_message, errmsg,
+ sizeof(WalPipelineShm->error_message));
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ return true;
+}
+
+/*
+ * WalPipeline_ReceiveRecord
+ * Receive and deserialize a WAL record from the producer
+ */
+DecodedXLogRecord *
+WalPipeline_ReceiveRecord(void)
+{
+ shm_mq_result res;
+ Size nbytes;
+ void *data;
+ WalMsgHeader *hdr;
+ DecodedXLogRecord *record;
+
+ if (!consumer_mq_handle)
+ return NULL;
+
+ /* Receive message from queue */
+ res = shm_mq_receive(consumer_mq_handle, &nbytes, &data, false);
+
+ if (res != SHM_MQ_SUCCESS)
+ {
+ /* Detached or error */
+ return NULL;
+ }
+
+ hdr = (WalMsgHeader *) data;
+
+ /* Handle different message types */
+ switch (hdr->msg_type)
+ {
+ case WAL_MSG_RECORD:
+ record = deserialize_wal_record((char *) data, nbytes);
+
+ /* Update statistics */
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->records_received++;
+ WalPipelineShm->bytes_received += nbytes;
+ WalPipelineShm->consumer_lsn = hdr->lsn;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ return record;
+
+ case WAL_MSG_SHUTDOWN:
+ elog(LOG, "[walpipeline] consumer: received shutdown message from the producer");
+ return NULL;
+
+ case WAL_MSG_ERROR:
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ ereport(ERROR,
+ (errcode(WalPipelineShm->error_code),
+ errmsg("[walpipeline] consumer: received error from the producer: %s",
+ WalPipelineShm->error_message)));
+ SpinLockRelease(&WalPipelineShm->mutex);
+ return NULL;
+
+ default:
+ elog(WARNING, "[walpipeline] consumer: unknown message type: %d",
+ hdr->msg_type);
+ return NULL;
+ }
+}
+
+/*
+ * WalPipeline_CheckProducerAlive
+ * Check if producer is still running
+ */
+bool
+WalPipeline_CheckProducerAlive(void)
+{
+ pid_t pid;
+ bool alive;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ pid = WalPipelineShm->producer_pid;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ if (pid == 0)
+ return false;
+
+ alive = (kill(pid, 0) == 0);
+
+ if (!alive)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_pid = 0;
+ SpinLockRelease(&WalPipelineShm->mutex);
+ }
+
+ return alive;
+}
+
+/*
+ * WalPipeline_IsActive
+ * Check if pipeline is active
+ */
+bool
+WalPipeline_IsActive(void)
+{
+ bool active;
+
+ if (!WalPipelineShm)
+ return false;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ active = WalPipelineShm->initialized && !WalPipelineShm->shutdown_requested;
+ SpinLockRelease(&WalPipelineShm->mutex);
+
+ return active;
+}
+
+/*
+ * WalPipeline_GetStats
+ * Get pipeline statistics
+ */
+void
+WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+ XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn)
+{
+ SpinLockAcquire(&WalPipelineShm->mutex);
+
+ if (records_sent)
+ *records_sent = WalPipelineShm->records_sent;
+ if (records_received)
+ *records_received = WalPipelineShm->records_received;
+ if (producer_lsn)
+ *producer_lsn = WalPipelineShm->producer_lsn;
+ if (consumer_lsn)
+ *consumer_lsn = WalPipelineShm->consumer_lsn;
+
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+/*
+ * serialize_wal_record
+ *
+ * Serialize the currently decoded WAL record owned by the given
+ * XLogReaderState into a contiguous buffer.
+ *
+ * Allocation/layout of the output buffer:
+ *
+ * [WalRecordMsgHeader]
+ * [XLogRecord]
+ * [main_data]
+ * Repeated for each in-use block:
+ * [SerializedBlockMeta]
+ * [backup image bytes] (optional)
+ * [block data bytes] (optional)
+ *
+ * The resulting buffer contains no pointers and is safe to transfer
+ * across processes or store externally.
+ */
+static Size
+serialize_wal_record(XLogReaderState *xlogreader, char **outbuf)
+{
+ DecodedXLogRecord *dec = xlogreader->record;
+ WalRecordMsgHeader hdr;
+ Size total;
+ char *ptr;
+
+ if (dec == NULL)
+ return 0;
+
+ /* ---- size calculation ---- */
+ total =
+ sizeof(WalRecordMsgHeader) +
+ sizeof(XLogRecord) +
+ dec->main_data_len;
+
+ for (int i = 0; i <= dec->max_block_id; i++)
+ {
+ const DecodedBkpBlock *blk = &dec->blocks[i];
+
+ if (!blk->in_use)
+ continue;
+
+ total += sizeof(SerializedBlockMeta);
+
+ if (blk->has_image && blk->bkp_image && blk->bimg_len > 0)
+ total += blk->bimg_len;
+
+ if (blk->has_data && blk->data && blk->data_len > 0)
+ total += blk->data_len;
+ }
+
+ ptr = *outbuf = palloc(total);
+
+ /* ---- message header ---- */
+ memset(&hdr, 0, sizeof(hdr));
+ hdr.msg_type = WAL_MSG_RECORD;
+ hdr.readRecPtr = xlogreader->ReadRecPtr;
+ hdr.endRecPtr = xlogreader->EndRecPtr;
+ hdr.decoded_size = total - sizeof(WalRecordMsgHeader);
+ hdr.max_block_id = dec->max_block_id;
+ hdr.main_data_len = dec->main_data_len;
+
+ memcpy(ptr, &hdr, sizeof(hdr));
+ ptr += sizeof(hdr);
+
+ /* ---- XLogRecord ---- */
+ memcpy(ptr, &dec->header, sizeof(XLogRecord));
+ ptr += sizeof(XLogRecord);
+
+ /* ---- main data ---- */
+ if (dec->main_data_len > 0)
+ {
+ memcpy(ptr, dec->main_data, dec->main_data_len);
+ ptr += dec->main_data_len;
+ }
+
+ /* ---- blocks ---- */
+ for (int i = 0; i <= dec->max_block_id; i++)
+ {
+ const DecodedBkpBlock *blk = &dec->blocks[i];
+ SerializedBlockMeta meta;
+
+ if (!blk->in_use)
+ continue;
+
+ memset(&meta, 0, sizeof(meta));
+ meta.block_id = i;
+ meta.in_use = true;
+ meta.rlocator = blk->rlocator;
+ meta.forknum = blk->forknum;
+ meta.blkno = blk->blkno;
+ meta.flags = blk->flags;
+ meta.has_image = blk->has_image;
+ meta.apply_image = blk->apply_image;
+ meta.has_data = blk->has_data;
+ meta.bimg_len = blk->bimg_len;
+ meta.bimg_info = blk->bimg_info;
+ meta.hole_offset = blk->hole_offset;
+ meta.hole_length = blk->hole_length;
+ meta.data_len = blk->data_len;
+
+ memcpy(ptr, &meta, sizeof(meta));
+ ptr += sizeof(meta);
+
+ if (blk->has_image && blk->bkp_image && blk->bimg_len > 0)
+ {
+ memcpy(ptr, blk->bkp_image, blk->bimg_len);
+ ptr += blk->bimg_len;
+ }
+
+ if (blk->has_data && blk->data && blk->data_len > 0)
+ {
+ memcpy(ptr, blk->data, blk->data_len);
+ ptr += blk->data_len;
+ }
+ }
+
+ Assert(ptr - *outbuf == total);
+ return total;
+}
+
+/*
+ * deserialize_wal_record
+ *
+ * Deserialize a WAL record from a buffer into a DecodedXLogRecord.
+ *
+ * Memory layout:
+ * [DecodedXLogRecord + blocks][main_data][block_images][block_data]
+ */
+DecodedXLogRecord *
+deserialize_wal_record(const char *buf, Size len)
+{
+ const char *ptr = buf;
+ const char *end = buf + len;
+ WalRecordMsgHeader hdr;
+ DecodedXLogRecord *dec = NULL;
+ char *alloc_ptr;
+ int nblocks;
+ Size total;
+
+ if (len < sizeof(WalRecordMsgHeader))
+ return NULL;
+
+ memcpy(&hdr, ptr, sizeof(hdr));
+ ptr += sizeof(hdr);
+
+ if (hdr.decoded_size != len - sizeof(WalRecordMsgHeader))
+ return NULL;
+
+ nblocks = (hdr.max_block_id >= 0) ? hdr.max_block_id + 1 : 0;
+
+ /* ---- space allocation ---- */
+ total = MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock))
+ + MAXALIGN(hdr.decoded_size);
+
+ dec = palloc(total);
+ memset(dec, 0, total);
+
+ alloc_ptr = (char *)dec + MAXALIGN(offsetof(DecodedXLogRecord, blocks) + nblocks * sizeof(DecodedBkpBlock));
+
+ /* ---- record metadata ---- */
+ dec->lsn = hdr.readRecPtr;
+ dec->next_lsn = hdr.endRecPtr;
+ dec->max_block_id = hdr.max_block_id;
+ dec->main_data_len = hdr.main_data_len;
+
+ /* ---- XLogRecord ---- */
+ if (ptr + sizeof(XLogRecord) > end)
+ goto fail;
+
+ memcpy(&dec->header, ptr, sizeof(XLogRecord));
+ ptr += sizeof(XLogRecord);
+
+ /* ---- main data ---- */
+ if (hdr.main_data_len > 0)
+ {
+ if (ptr + hdr.main_data_len > end)
+ goto fail;
+
+ dec->main_data = alloc_ptr;
+ memcpy(dec->main_data, ptr, hdr.main_data_len);
+ ptr += hdr.main_data_len;
+ alloc_ptr += MAXALIGN(hdr.main_data_len);
+ }
+
+ /* ---- blocks ---- */
+ for (int i = 0; i < nblocks && ptr < end; i++)
+ {
+ SerializedBlockMeta meta;
+ DecodedBkpBlock *blk;
+
+ if (ptr + sizeof(meta) > end)
+ break;
+
+ memcpy(&meta, ptr, sizeof(meta));
+ ptr += sizeof(meta);
+
+ if (!meta.in_use)
+ continue;
+
+ if (meta.block_id < 0 || meta.block_id >= nblocks)
+ goto fail;
+
+ blk = &dec->blocks[meta.block_id];
+
+ blk->in_use = true;
+ blk->rlocator = meta.rlocator;
+ blk->forknum = meta.forknum;
+ blk->blkno = meta.blkno;
+ blk->flags = meta.flags;
+ blk->has_image = meta.has_image;
+ blk->apply_image = meta.apply_image;
+ blk->has_data = meta.has_data;
+ blk->bimg_len = meta.bimg_len;
+ blk->bimg_info = meta.bimg_info;
+ blk->hole_offset = meta.hole_offset;
+ blk->hole_length = meta.hole_length;
+ blk->data_len = meta.data_len;
+
+ if (blk->has_image && blk->bimg_len > 0)
+ {
+ if (ptr + blk->bimg_len > end)
+ goto fail;
+
+ blk->bkp_image = alloc_ptr;
+ memcpy(blk->bkp_image, ptr, blk->bimg_len);
+ ptr += blk->bimg_len;
+ alloc_ptr += MAXALIGN(blk->bimg_len);
+ }
+
+ if (blk->has_data && blk->data_len > 0)
+ {
+ if (ptr + blk->data_len > end)
+ goto fail;
+
+ blk->data = alloc_ptr;
+ memcpy(blk->data, ptr, blk->data_len);
+ ptr += blk->data_len;
+ alloc_ptr += MAXALIGN(blk->data_len);
+ }
+ }
+
+ dec->size = alloc_ptr - (char *)dec;
+ dec->oversized = false;
+
+ return dec;
+
+fail:
+ if (dec)
+ pfree(dec);
+
+ elog(LOG, "deserialize_wal_record: failed");
+ return NULL;
+}
+
+/*
+ * cleanup_producer_resources
+ * Clean up producer-side resources
+ */
+static void
+cleanup_producer_resources(void)
+{
+ if (producer_mq_handle)
+ {
+ shm_mq_detach(producer_mq_handle);
+ producer_mq_handle = NULL;
+ }
+
+ if (producer_dsm_seg)
+ {
+ dsm_detach(producer_dsm_seg);
+ producer_dsm_seg = NULL;
+ }
+
+ producer_mq = NULL;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->producer_pid = 0;
+ WalPipelineShm->producer_exited = true;
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+/*
+ * cleanup_consumer_resources
+ * Clean up consumer-side resources
+ */
+static void
+cleanup_consumer_resources(void)
+{
+ if (consumer_mq_handle)
+ {
+ shm_mq_detach(consumer_mq_handle);
+ consumer_mq_handle = NULL;
+ }
+
+ if (consumer_dsm_seg)
+ {
+ dsm_unpin_segment(dsm_segment_handle(consumer_dsm_seg));
+ dsm_detach(consumer_dsm_seg);
+ consumer_dsm_seg = NULL;
+ }
+
+ consumer_mq = NULL;
+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ WalPipelineShm->consumer_pid = 0;
+ WalPipelineShm->dsm_seg_handle = DSM_HANDLE_INVALID;
+ SpinLockRelease(&WalPipelineShm->mutex);
+}
+
+/*
+ * wal_pipeline_cleanup_callback
+ * Cleanup callback for process exit
+ */
+static void
+wal_pipeline_cleanup_callback(int code, Datum arg)
+{
+ pid_t mypid = MyProcPid;
+ bool is_producer = false;
+
+ if (WalPipelineShm)
+ {
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ is_producer = (WalPipelineShm->producer_pid == mypid);
+ SpinLockRelease(&WalPipelineShm->mutex);
+ }
+
+ if (is_producer)
+ cleanup_producer_resources();
+ else
+ cleanup_consumer_resources();
+}
diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c
index 3c3f067aafb..2ffa82db913 100644
--- a/src/backend/access/transam/xlogprefetcher.c
+++ b/src/backend/access/transam/xlogprefetcher.c
@@ -350,7 +350,7 @@ XLogPrefetchReconfigure(void)
static inline void
XLogPrefetchIncrement(pg_atomic_uint64 *counter)
{
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ // Assert(AmStartupProcess() || !IsUnderPostmaster);
pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
}
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index a81dcbb5d79..0da26c32608 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -32,6 +32,7 @@
#include "access/timeline.h"
#include "access/transam.h"
+#include "access/wal_pipeline.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
@@ -98,6 +99,7 @@ int recovery_min_apply_delay = 0;
char *PrimaryConnInfo = NULL;
char *PrimarySlotName = NULL;
bool wal_receiver_create_temp_slot = false;
+bool wal_pipeline_enabled = false;
/*
* recoveryTargetTimeLineGoal: what the user requested, if any
@@ -145,7 +147,7 @@ bool InArchiveRecovery = false;
* in standby mode. These variables are only valid in the startup process.
* They work similarly to ArchiveRecoveryRequested and InArchiveRecovery.
*/
-static bool StandbyModeRequested = false;
+bool StandbyModeRequested = false;
bool StandbyMode = false;
/* was a signal file present at startup? */
@@ -202,7 +204,7 @@ typedef struct XLogPageReadPrivate
} XLogPageReadPrivate;
/* flag to tell XLogPageRead that we have started replaying */
-static bool InRedo = false;
+bool InRedo = false;
/*
* Codes indicating where we got a WAL file from during recovery, or where
@@ -277,8 +279,8 @@ static TimeLineID receiveTLI = 0;
* file. But this copy of minRecoveryPoint variable reflects the value at the
* beginning of recovery, and is *not* updated after consistency is reached.
*/
-static XLogRecPtr minRecoveryPoint;
-static TimeLineID minRecoveryPointTLI;
+XLogRecPtr minRecoveryPoint;
+TimeLineID minRecoveryPointTLI;
static XLogRecPtr backupStartPoint;
static XLogRecPtr backupEndPoint;
@@ -419,12 +421,6 @@ static void recoveryPausesHere(bool endOfRecovery);
static bool recoveryApplyDelay(XLogReaderState *record);
static void ConfirmRecoveryPaused(void);
-static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher,
- int emode, bool fetching_ckpt,
- TimeLineID replayTLI);
-
-static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr,
- int reqLen, XLogRecPtr targetRecPtr, char *readBuf);
static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr,
bool randAccess,
bool fetching_ckpt,
@@ -1663,6 +1659,87 @@ ShutdownWalRecovery(void)
DisownLatch(&XLogRecoveryCtl->recoveryWakeupLatch);
}
+/*
+ * Get the next record for redo.
+ * Use the pipeline if enabled for performance else read it directly.
+ * `reader_state_out` will only be used and updated in case pipeline is
+ * enabled.
+ */
+static XLogRecord *
+ReceiveRecord(XLogPrefetcher *xlogprefetcher, int emode,
+ bool fetching_ckpt, TimeLineID replayTLI,
+ XLogReaderState **localreader, uint64 loop_count)
+{
+
+ XLogRecord *record = NULL;
+ XLogReaderState *reader = *localreader;
+
+ /*
+ * If pipeline not enabled read the record directly
+ */
+ if (!wal_pipeline_enabled)
+ {
+ record = ReadRecord(xlogprefetcher, emode, fetching_ckpt, replayTLI);
+ return record;
+ }
+
+ /*
+ * Get record from the pipeline
+ */
+ if (WalPipeline_IsActive())
+ {
+ DecodedXLogRecord *decoded_record = NULL;
+
+ decoded_record = WalPipeline_ReceiveRecord();
+
+ if (decoded_record)
+ {
+ record = &decoded_record->header;
+
+ /*
+ * The previous decoded record has been deserialized from
+ * from the pipeline and hence need to free the memory after
+ * use.
+ *
+ * But for the first iteration memory space for `reader->record`
+ * was allocated from the `decode_buffer`, and freeing this
+ * memory can be fatal. This memory will be freed automatically
+ * at the end of the recovery in `finishwalrecovery()`. So we
+ * will skip pfree for the fist iteration.
+ */
+ if (reader->record && loop_count != 0)
+ pfree(reader->record);
+
+ /*
+ * Update to local reader state. We don't have to update the
+ * internal reader state, only updating the primary parameters.
+ */
+ reader->record = decoded_record;
+ reader->ReadRecPtr = decoded_record->lsn;
+ reader->DecodeRecPtr = decoded_record->lsn;
+ reader->EndRecPtr = decoded_record->next_lsn;
+ reader->NextRecPtr = decoded_record->next_lsn;
+ reader->decode_queue_head = decoded_record;
+ reader->decode_queue_tail = decoded_record;
+
+ return record;
+ }
+ else
+ {
+ /*
+ * We will end up here only when pipeline couldn't read more
+ * records and have sent a shutdown msg. We will acknowldge this
+ * and will send back the shutdown request to properly stop the workers.
+ */
+ WalPipeline_Stop();
+ return NULL;
+ }
+ }
+
+ elog(FATAL, "[walpipeline] consumer: either pipeline not active, or no record available from pipeline.");
+ return record;
+}
+
/*
* Perform WAL recovery.
*
@@ -1749,11 +1826,42 @@ PerformWalRecovery(void)
{
TimestampTz xtime;
PGRUsage ru0;
+ uint64 loop_count = 0;
pg_rusage_init(&ru0);
InRedo = true;
+ if(wal_pipeline_enabled)
+ {
+ /*
+ * Start walpipline to decode the wal parallely. Also pass some
+ * important global parameters. These parameters are already set
+ * for the startup process but not for our pipeline worker. So in order
+ * to start decoding through the pipeline, these variables should be
+ * set.
+ *
+ * ArchiveRecoveryRequested T
+ * InArchiveRecovery. |____ used by ReadRecord when called by the pipeline process
+ * minRecoveryPoint; |
+ * minRecoveryPointTLI; ⊥
+ * RedoStartLSN T____ used by XlogPageRead when called by the pipeline process
+ * RedoStartTLI ⊥
+ */
+ WalPipelineParams *params = palloc0(sizeof(WalPipelineParams));
+ params->ReplayTLI = replayTLI;
+ params->NextRecPtr = xlogreader->NextRecPtr;
+ params->recoveryTargetTLI = recoveryTargetTLI;
+ params->StandbyModeRequested = StandbyModeRequested;
+ params->ArchiveRecoveryRequested = ArchiveRecoveryRequested;
+ params->InArchiveRecovery = InArchiveRecovery;
+ params->minRecoveryPointTLI = minRecoveryPointTLI;
+ params->minRecoveryPoint = minRecoveryPoint;
+ params->InRedo = InRedo;
+
+ WalPipeline_Start(params);
+ }
+
RmgrStartup();
ereport(LOG,
@@ -1859,7 +1967,7 @@ PerformWalRecovery(void)
}
/* Else, try to fetch the next WAL record */
- record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
+ record = ReceiveRecord(xlogprefetcher, LOG, false, replayTLI, &xlogreader, loop_count++);
} while (record != NULL);
/*
@@ -1868,6 +1976,9 @@ PerformWalRecovery(void)
if (reachedRecoveryTarget)
{
+ if (wal_pipeline_enabled)
+ WalPipeline_Stop();
+
if (!reachedConsistency)
ereport(FATAL,
(errmsg("requested recovery stop point is before consistent recovery point")));
@@ -1901,6 +2012,8 @@ PerformWalRecovery(void)
RmgrCleanup();
+ // XXX: testing purpose only
+ ereport(DEBUG1, (errmsg("replay loop fiinished apply loop count: " UINT64_FORMAT, loop_count)));
ereport(LOG,
errmsg("redo done at %X/%08X system usage: %s",
LSN_FORMAT_ARGS(xlogreader->ReadRecPtr),
@@ -3160,7 +3273,7 @@ ConfirmRecoveryPaused(void)
* (emode must be either PANIC, LOG). In standby mode, retries until a valid
* record is available.
*/
-static XLogRecord *
+XLogRecord *
ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
bool fetching_ckpt, TimeLineID replayTLI)
{
@@ -3168,7 +3281,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher);
XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data;
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ // Assert(AmStartupProcess() || !IsUnderPostmaster);
/* Pass through parameters to XLogPageRead */
private->fetching_ckpt = fetching_ckpt;
@@ -3329,7 +3442,7 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
*/
-static int
+int
XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *readBuf)
{
@@ -3341,7 +3454,7 @@ XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
int r;
instr_time io_start;
- Assert(AmStartupProcess() || !IsUnderPostmaster);
+ // Assert(AmStartupProcess() || !IsUnderPostmaster);
XLByteToSeg(targetPagePtr, targetSegNo, wal_segment_size);
targetPageOff = XLogSegmentOffset(targetPagePtr, wal_segment_size);
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index 65deabe91a7..a86111e4a1f 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,6 +13,7 @@
#include "postgres.h"
#include "access/parallel.h"
+#include "access/wal_pipeline.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -136,6 +137,9 @@ static const struct
},
{
"SequenceSyncWorkerMain", SequenceSyncWorkerMain
+ },
+ {
+ "WalPipeline_ProducerMain", WalPipeline_ProducerMain
}
};
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 2a3dfedf7e9..0fe2dd8de57 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/syncscan.h"
#include "access/transam.h"
#include "access/twophase.h"
+#include "access/wal_pipeline.h"
#include "access/xlogprefetcher.h"
#include "access/xlogrecovery.h"
#include "access/xlogwait.h"
@@ -141,6 +142,7 @@ CalculateShmemSize(void)
size = add_size(size, AioShmemSize());
size = add_size(size, WaitLSNShmemSize());
size = add_size(size, LogicalDecodingCtlShmemSize());
+ size = add_size(size, WalPipelineShmemSize());
/* include additional requested shmem from preload libraries */
size = add_size(size, total_addin_request);
@@ -225,6 +227,9 @@ CreateSharedMemoryAndSemaphores(void)
/* Initialize dynamic shared memory facilities. */
dsm_postmaster_startup(shim);
+ /* Initialize WAL pipeline module */
+ WalPipelineShmemInit();
+
/*
* Now give loadable modules a chance to set up their shmem allocations
*/
diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat
index f0260e6e412..d538fc7be0c 100644
--- a/src/backend/utils/misc/guc_parameters.dat
+++ b/src/backend/utils/misc/guc_parameters.dat
@@ -3379,6 +3379,12 @@
boot_val => 'false',
},
+{ name => 'wal_pipeline', type => 'bool', context => 'PGC_SIGHUP', group => 'WAL_RECOVERY',
+ short_desc => 'Use parallel workers to speedup recovery.',
+ variable => 'wal_pipeline_enabled',
+ boot_val => 'false',
+},
+
{ name => 'wal_receiver_create_temp_slot', type => 'bool', context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
short_desc => 'Sets whether a WAL receiver should create a temporary replication slot if no permanent slot is configured.',
variable => 'wal_receiver_create_temp_slot',
diff --git a/src/include/access/wal_pipeline.h b/src/include/access/wal_pipeline.h
new file mode 100644
index 00000000000..0463b8e2c76
--- /dev/null
+++ b/src/include/access/wal_pipeline.h
@@ -0,0 +1,184 @@
+/*-------------------------------------------------------------------------
+ *
+ * wal_pipeline.h
+ * WAL replay pipeline for parallel recovery
+ *
+ * This module implements a producer-consumer pipeline for WAL replay:
+ * - Producer: background worker that reads and decodes WAL records
+ * - Consumer: startup process: core redo loop
+ *
+ * The pipeline uses shared memory queues (shm_mq) to pass decoded WAL
+ * records from producer to consumer, enabling parallelism while
+ * maintaining sequential replay semantics.
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ *
+ * src/include/access/wal_pipeline.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAL_PIPELINE_H
+#define WAL_PIPELINE_H
+
+#include "access/xlogreader.h"
+#include "storage/dsm.h"
+#include "storage/shm_mq.h"
+#include "storage/spin.h"
+
+/*
+ * Magic number for shared memory TOC
+ */
+#define PG_WAL_PIPELINE_MAGIC 0x57414C50 /* "WALP" */
+
+/*
+ * Message types sent through the pipeline
+ */
+typedef enum WalMsgType
+{
+ WAL_MSG_INVALID = 0,
+ WAL_MSG_RECORD, /* Decoded WAL record */
+ WAL_MSG_SHUTDOWN, /* Graceful shutdown request */
+ WAL_MSG_ERROR /* Error occurred in producer */
+} WalMsgType;
+
+/*
+ * Message header for all pipeline messages
+ * Each message starts with this header, followed by type-specific data
+ */
+typedef struct WalMsgHeader
+{
+ WalMsgType msg_type;
+ uint32 msg_len; /* Total message length including header */
+ XLogRecPtr lsn; /* LSN of the record (for WAL_MSG_RECORD) */
+} WalMsgHeader;
+
+/* Wire header for a serialized WAL message */
+typedef struct WalRecordMsgHeader
+{
+ uint32 msg_type; /* WAL_MSG_RECORD etc */
+ XLogRecPtr readRecPtr; /* XLogReaderState->ReadRecPtr */
+ XLogRecPtr endRecPtr; /* EndRecPtr */
+ uint32 decoded_size; /* total size of decoded record payload (not including this header) */
+ int32 max_block_id; /* highest block id (could be -1) */
+ uint32 main_data_len; /* length of main_data */
+ /* followed by decoded payload bytes */
+} WalRecordMsgHeader;
+
+/* Per-block metadata on the wire (no pointers) */
+typedef struct SerializedBlockMeta
+{
+ int32 block_id;
+ bool in_use;
+ RelFileLocator rlocator; /* same as decoded block */
+ ForkNumber forknum;
+ BlockNumber blkno;
+ uint8 flags;
+ bool has_image;
+ bool apply_image;
+ bool has_data;
+ uint16 bimg_len;
+ uint8 bimg_info;
+ uint16 hole_offset;
+ uint16 hole_length;
+ uint16 data_len;
+ /* followed by bimg bytes then data bytes */
+} SerializedBlockMeta;
+
+/*
+ * Parameters passed from StartupXLOG (consumer side)
+ * to the WAL pipeline producer background worker.
+ */
+typedef struct WalPipelineParams
+{
+ TimeLineID RedoStartTLI;
+ TimeLineID CheckPointTLI;
+ TimeLineID ReplayTLI;
+ TimeLineID recoveryTargetTLI;
+ TimeLineID minRecoveryPointTLI;
+ XLogRecPtr RedoStartLSN;
+ XLogRecPtr CheckPointLoc;
+ XLogRecPtr NextRecPtr;
+ XLogRecPtr minRecoveryPoint;
+ bool haveBackupLabel;
+ bool StandbyModeRequested;
+ bool ArchiveRecoveryRequested;
+ bool InArchiveRecovery;
+ bool InRedo;
+} WalPipelineParams;
+
+/*
+ * Shared memory control structure for the WAL pipeline
+ */
+typedef struct WalPipelineShmCtl
+{
+ /* Lifecycle management */
+ slock_t mutex;
+ bool initialized;
+ bool shutdown_requested;
+ bool producer_exited;
+
+ /* Producer state */
+ pid_t producer_pid;
+ ProcNumber producer_procnum;
+ XLogRecPtr producer_lsn; /* Last LSN read by producer */
+
+ /* Consumer state */
+ pid_t consumer_pid;
+ XLogRecPtr consumer_lsn; /* Last LSN applied by consumer */
+
+ /* Queue handles */
+ dsm_handle dsm_seg_handle;
+ shm_mq_handle *producer_mq_handle;
+ shm_mq_handle *consumer_mq_handle;
+
+ /* Statistics */
+ uint64 records_sent;
+ uint64 records_received;
+ uint64 bytes_sent;
+ uint64 bytes_received;
+
+ /* Error state */
+ int error_code;
+ char error_message[256];
+} WalPipelineShmCtl;
+
+/* Size of the shared memory queue (can be made configurable) */
+#define WAL_PIPELINE_QUEUE_SIZE (128 * 1024 * 1024) /* 8 MB */
+
+/* Maximum size of a single message */
+#define WAL_PIPELINE_MAX_MSG_SIZE (2 * 1024 * 1024) /* 1 MB */
+
+/*
+ * Public API functions
+ */
+
+/* Initialize the WAL pipeline shared memory structures */
+extern Size WalPipelineShmemSize(void);
+extern void WalPipelineShmemInit(void);
+
+/* Start/stop the pipeline */
+extern void WalPipeline_Start(WalPipelineParams *params);
+extern void WalPipeline_Stop(void);
+extern void WalPipeline_RequestShutdown(void);
+
+/* Producer functions (called by background worker) */
+extern void WalPipeline_ProducerMain(Datum main_arg);
+extern bool WalPipeline_SendRecord(XLogReaderState *record);
+extern bool WalPipeline_SendShutdown(void);
+extern bool WalPipeline_SendError(int errcode, const char *errmsg);
+
+/* Consumer functions (called by startup/walreceiver process) */
+extern DecodedXLogRecord *WalPipeline_ReceiveRecord(void);
+extern bool WalPipeline_CheckProducerAlive(void);
+extern void WalPipeline_FreeRecord(XLogReaderState *record);
+extern void WalPipeline_UpdateReaderState(XLogReaderState *dst, XLogReaderState *src);
+
+/* Status and monitoring */
+extern bool WalPipeline_IsActive(void);
+extern void WalPipeline_GetStats(uint64 *records_sent, uint64 *records_received,
+ XLogRecPtr *producer_lsn, XLogRecPtr *consumer_lsn);
+
+/* Global shared memory pointer */
+extern WalPipelineShmCtl *WalPipelineShm;
+
+#endif /* WAL_PIPELINE_H */
\ No newline at end of file
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index f926d89cb2f..76094982a49 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -11,6 +11,7 @@
#ifndef XLOGRECOVERY_H
#define XLOGRECOVERY_H
+#include "access/xlogprefetcher.h"
#include "access/xlogreader.h"
#include "catalog/pg_control.h"
#include "lib/stringinfo.h"
@@ -85,6 +86,13 @@ extern PGDLLIMPORT bool reachedConsistency;
/* Are we currently in standby mode? */
extern PGDLLIMPORT bool StandbyMode;
+extern PGDLLIMPORT bool StandbyModeRequested;
+
+/* flag to tell XLogPageRead that we have started replaying */
+extern PGDLLIMPORT bool InRedo;
+
+extern PGDLLIMPORT XLogRecPtr minRecoveryPoint;
+extern PGDLLIMPORT TimeLineID minRecoveryPointTLI;
extern Size XLogRecoveryShmemSize(void);
extern void XLogRecoveryShmemInit(void);
@@ -153,6 +161,10 @@ extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
extern TimestampTz GetLatestXTime(void);
extern TimestampTz GetCurrentChunkReplayStartTime(void);
extern XLogRecPtr GetCurrentReplayRecPtr(TimeLineID *replayEndTLI);
+extern XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
+ bool fetching_ckpt, TimeLineID replayTLI);
+extern int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen,
+ XLogRecPtr targetRecPtr, char *readBuf);
extern bool PromoteIsTriggered(void);
extern bool CheckPromoteSignal(void);
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index bf39878c43e..7dc1101c95b 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -324,6 +324,8 @@ extern PGDLLIMPORT int tcp_user_timeout;
extern PGDLLIMPORT char *role_string;
extern PGDLLIMPORT bool in_hot_standby_guc;
extern PGDLLIMPORT bool trace_sort;
+extern PGDLLIMPORT bool wal_pipeline_enabled;
+
#ifdef DEBUG_BOUNDED_SORT
extern PGDLLIMPORT bool optimize_bounded_sort;
--
2.49.0
[application/pdf] recoveries-becnhmark-v01.pdf (48.6K, 4-recoveries-becnhmark-v01.pdf)
download
[application/zip] recoveries-benchmarks-v01.zip (2.7M, 5-recoveries-benchmarks-v01.zip)
download
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [WIP] Pipelined Recovery
In-Reply-To: <CA+UBfa=vDV8wbmAV0pgrx-FuJh+x8YOW23vJ90Jzr=14rV+9jA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox