public inbox for [email protected]
help / color / mirror / Atom feedFrom: Rui Zhao <[email protected]>
To: [email protected]
Subject: Lazy snapshot distribution in logical decoding
Date: Tue, 12 May 2026 16:55:23 +0800
Message-ID: <[email protected]> (raw)
Hi hackers,
When a long-running write transaction coexists with many concurrent
catalog-modifying commits (e.g., autovacuum updating pg_class stats for
many tables, or batch DDL), the reorder buffer's spill files for logical
decoding can grow quadratically, reaching ~80GB in our production with
N=100K extrapolation. This patch eliminates the problem by deferring
snapshot distribution from commit-time to data-change-time.
== Problem ==
The trigger requires two conditions:
(i) Something prevents xmin from advancing. This can be the long write
transaction itself (its own XID holds xmin back), or an unrelated
xmin-holder like pg_dump's REPEATABLE READ session or a slow
logical-replication consumer.
(ii) A transaction that has at least one data change is present in the
reorder buffer. Pure read-only sessions (pg_dump, long SELECT) do
not appear in the reorder buffer and are not victims themselves;
they only enable condition (i).
When both are met, the chain reaction runs across four stages:
1) VACUUM updates pg_class statistics via heap_inplace_update_and_unlock(),
which triggers CacheInvalidateHeapTuple(). At command end,
LogLogicalInvalidations() emits an XLOG_XACT_INVALIDATIONS WAL record.
2) During logical decoding, xact_decode() processes XLOG_XACT_INVALIDATIONS
by calling ReorderBufferXidSetCatalogChanges(), marking the VACUUM
transaction as having catalog changes.
3) When the VACUUM transaction commits, SnapBuildCommitTxn() sees it has
catalog changes, builds a new snapshot via SnapBuildBuildSnapshot(), and
calls SnapBuildDistributeSnapshotAndInval() to distribute the snapshot
to ALL in-progress transactions.
4) The snapshot includes the entire builder->committed.xip array. Since a
long-running transaction prevents SnapBuildPurgeOlderTxn() from cleaning
old entries (builder->xmin cannot advance), the committed array grows
monotonically. Each successive snapshot is larger than the last:
Snapshot N contains N xids, size = 192 + 4*N bytes
Total disk usage = sum(i=1..N)(192 + 4*i) = 2*N^2 bytes
In our measurements (see Performance impact below), master spills
247 MB at N=5000 and 813 MB at N=10000, scaling cleanly as O(N^2).
Extrapolation to N=100K yields ~80 GB.
Beyond disk usage, these accumulated snapshots also degrade decoding
performance:
a) Memory pressure: the snapshots consume logical_decoding_work_mem
quickly, forcing the reorder buffer to spill to disk much earlier
than necessary.
b) Spill/restore I/O: each INTERNAL_SNAPSHOT change is serialized
during spill and deserialized during commit-time processing,
adding significant disk I/O for what is essentially dead data.
c) Commit-time iteration overhead: ReorderBufferProcessTXN() iterates
all changes via a binary heap. 100K extra INTERNAL_SNAPSHOT entries
mean 100K additional heap pop operations and snapshot_now switches
(refcount inc/dec, potential old snapshot free).
d) Replication lag: the combined CPU and I/O overhead directly
increases the time between a transaction's commit and its decoded
output reaching subscribers.
== Why not filter VACUUM? ==
One might consider filtering out VACUUM-generated XLOG_XACT_INVALIDATIONS
so they don't trigger snapshot rebuilds. However, the O(N^2) problem is
not specific to VACUUM. Any workload that generates many catalog-modifying
commits triggers it: batch partition creation, online migration tools
running thousands of DDLs, extension installations creating many functions
and types, etc. A general solution at the distribution layer is needed.
== Prior work ==
The distribution function modified by this patch was last touched by the
"long-standing data loss bug in initial sync of logical replication"
thread started by Tomas Vondra in 2023 [1], committed by Amit Kapila in
2025. That fix renamed
SnapBuildDistributeNewCatalogSnapshot
to SnapBuildDistributeSnapshotAndInval and added invalidation
distribution alongside the existing snapshot distribution, to ensure
that in-progress transactions invalidate their relcache when concurrent
publication DDL changes the set of published tables. The commit
message acknowledged "some performance regression ... primarily during
frequent execution of publication DDL statements". The present patch
addresses the snapshot-distribution side of that overhead while keeping
the invalidation-distribution semantics unchanged, since invalidations
cannot be deduplicated the way snapshots can (see "Correctness" below).
Related but orthogonal work in the same area:
- Sawada's "Using per-transaction memory contexts for storing decoded
tuples" [2] addresses a different root cause of decoding memory
bloat under long-running transactions: GenerationContext
fragmentation in the reorder buffer's tuple context. His patch is
complementary; both contribute to keeping decoding memory usage
within logical_decoding_work_mem.
- Tachoires's "Compress ReorderBuffer spill files using LZ4" [3]
reduces the disk cost of spilled changes via compression. The two
approaches are complementary: this patch reduces what needs to be
spilled in the first place (eliminating O(N^2) INTERNAL_SNAPSHOT
entries for write transactions with few data changes), while [3]
reduces the cost of whatever does spill.
[1] https://www.postgresql.org/message-id/flat/de52b282-1166-1180-45a2-8d8917ca74c6%40enterprisedb.com
[2] https://www.postgresql.org/message-id/flat/CAD21AoBTY1LATZUmvSXEssvq07qDZufV4AF-OHh9VD2pC0VY2A%40mai...
[3] https://www.postgresql.org/message-id/flat/CAA4eK1Kd6vP1g5pJXoiPgfLQsn54hecsc06hxpNWV-9_xMa%2B5Q%40m...
== Solution: Lazy Snapshot Distribution ==
Instead of distributing a snapshot to every in-progress transaction on each
catalog-modifying commit, we defer distribution until a transaction actually
needs to decode a data change (in SnapBuildProcessChange()).
A generation counter (SnapBuild.snapshot_generation) is incremented each
time a new catalog snapshot is built. Each transaction tracks the last
generation it received (ReorderBufferTXN.last_snapshot_generation). When
SnapBuildProcessChange() is called for a data change, it checks whether the
transaction's generation is behind the builder's and distributes the current
snapshot only if needed.
This means:
- A long-running write transaction with K data changes accumulates at
most min(K, G) snapshots in its change list (where G is the number of
catalog-modifying commits during its lifetime), instead of G snapshots
each of growing size. The generation counter deduplicates further:
consecutive data changes without intervening DDLs share one snapshot.
- Total disk usage attributable to snapshot distribution drops from
O(N^2) to O(min(K, G) * N), where N is the committed xid array size.
For the common production case where K << G (e.g. a long batch INSERT
during a vacuum storm), the savings are substantial.
Invalidation messages are still distributed eagerly as before, since they
are small (100K vacuum commits = 14MB) and already have overflow protection
via MAX_DISTR_INVAL_MSG_PER_TXN.
== Correctness ==
The lazy approach is equivalent to eager distribution because:
1) WAL is processed strictly in LSN order. When SnapBuildProcessChange()
is called at LSN X, builder->snapshot reflects exactly all catalog-
modifying commits with LSN < X. It is neither "too new" (future commits
haven't been processed) nor "too old" (all past commits are included).
2) The committed array is append-only (only grows, never shrinks while a
long transaction holds xmin back). Snapshot N is a superset of snapshots
1..N-1. Skipping intermediate snapshots does not affect visibility.
3) In the original code, when a transaction's changes are decoded at commit
time, each data change uses the most recent snapshot preceding it in the
change list. Intermediate snapshots that have no data changes between
them are effectively unused. Lazy distribution simply avoids creating
these unused entries.
4) This works correctly with streaming mode (changes decoded before commit)
because SnapBuildProcessChange() is always called before each data change
regardless of whether the change is later streamed or buffered.
5) The lazy snapshot is distributed to the toplevel transaction (using
txn->xid), consistent with the original eager distribution. When a
data change belongs to a subtransaction, the snapshot and the data
change reside in different change lists (toplevel vs subtransaction).
This is safe because eager invalidation distribution always places an
invalidation change entry in the toplevel's list at the DDL commit
LSN (which is strictly less than the data change LSN). During binary
heap iteration, this invalidation entry ensures the toplevel is at
the heap root when the equal-LSN comparison between the snapshot and
the data change occurs, so the snapshot is always processed first.
== Why not eliminate ReorderBufferAddSnapshot entirely? ==
One might ask: if we can defer snapshot distribution, why not skip storing
snapshots in the reorder buffer altogether and just use builder->snapshot
directly at decode time?
This doesn't work because DecodeInsert/DecodeUpdate only package the raw
WAL tuple into a ReorderBufferChange and enqueue it. The actual catalog
lookup (RelationIdGetRelation, tuple interpretation) happens later during
commit-time processing in ReorderBufferProcessTXN. At that point,
builder->snapshot has advanced past all changes and no longer reflects the
catalog state at each individual change's LSN.
The INTERNAL_SNAPSHOT entries in the change list serve as markers that tell
commit-time processing "switch to this catalog snapshot from here on."
Without them, all changes would use the base_snapshot, and an INSERT that
happened before an ALTER TABLE ADD COLUMN would be decoded with the new
schema, producing incorrect output (extra columns with NULL values).
Since each INTERNAL_SNAPSHOT entry serves all subsequent data changes until
the next generation bump, the total number of entries is min(K, G), which
is already minimal.
== Notes ==
- RBTXN_HAS_CATALOG_CHANGES is still needed: it is used by
SnapBuildCommitTxn() to decide whether to rebuild the snapshot,
ReorderBufferBuildTupleCidHash() for combo CID handling, and
SnapBuildSerialize() for serialization. This patch does not change
its semantics.
- SNAPBUILD_VERSION is bumped from 6 to 7 because snapshot_generation
is added to the SnapBuild struct which is serialized to disk. On
upgrade, existing serialized snapshots will be invalidated and rebuilt
automatically, which is the standard behavior for version mismatches.
== Changes ==
src/include/replication/snapbuild_internal.h:
- Add uint64 snapshot_generation to struct SnapBuild
src/include/replication/reorderbuffer.h:
- Add uint64 last_snapshot_generation to ReorderBufferTXN
- Declare ReorderBufferTXNByXid() as extern
src/backend/replication/logical/snapbuild.c:
- SnapBuildCommitTxn(): increment snapshot_generation after building
a new snapshot
- Rename SnapBuildDistributeSnapshotAndInval() to SnapBuildDistributeInval()
and remove snapshot distribution code; keep invalidation distribution
- SnapBuildProcessChange(): do a single ReorderBufferTXNByXid() lookup,
reuse the txn pointer for both base snapshot check and lazy catalog
snapshot distribution. The lazy snapshot is distributed to the
toplevel transaction, consistent with the original eager approach.
This also eliminates redundant hash lookups that existed in the
original code path.
- Bump SNAPBUILD_VERSION from 6 to 7
src/backend/replication/logical/reorderbuffer.c:
- Remove 'static' from ReorderBufferTXNByXid()
== Performance impact ==
Theoretical:
Before After
Snapshot distributions G (per commit) 0 or 1 (per data change)
Disk usage growth O(N^2) O(min(K, G) * N)
where N = committed xid array size at the time of the K-th data change,
G = number of catalog-modifying commits during the long txn,
K = number of data changes in the long txn.
The hot path (SnapBuildProcessChange for each data change) now does a single
hash lookup instead of 2-4 separate lookups in the original code, so there
is no performance regression for the common case.
Measured (single-threaded benchmark, scripts attached):
Scenario: one long-running transaction with K=1 INSERT, coexisting with
N concurrent CREATE/DROP TABLE pairs (each its own catalog-modifying
commit), then drained via pg_logical_slot_get_changes() with the
test_decoding output plugin. 3-run median per cell.
logical_decoding_work_mem = 64MB (production-like default):
N decode_ms spill_bytes total_bytes
---- ------------- ------------- -------------
1000 master 45 master 0 master 22 MB
patch 39 patch 0 patch 13 MB
5000 master 629 master 207 MB master 429 MB
patch 261 patch 0 patch 227 MB
10000 master 1865 master 813 MB master 1659 MB
patch 814 patch 0 patch 855 MB
At ldwm=64MB the patched code does not spill at all up to N=10000;
master spills 207 MB at N=5000 and 813 MB at N=10000, scaling
quadratically in N. Decoding time drops 2.3-2.4x at N>=5000,
purely from avoided spill I/O. Memory residency (total_bytes)
is halved as well, because INTERNAL_SNAPSHOT entries no longer
accumulate in the long transaction's change list.
logical_decoding_work_mem = 64kB (stress, forces spill even at small N):
N decode_ms spill_bytes total_bytes
---- ------------- ------------- -------------
500 master 28 master 2.6 MB master 6.9 MB
patch 24 patch 0.4 MB patch 4.7 MB
1000 master 57 master 9.3 MB master 22 MB
patch 42 patch 0.9 MB patch 13 MB
2000 master 143 master 35 MB master 76 MB
patch 82 patch 1.8 MB patch 43 MB
5000 master 607 master 247 MB master 429 MB
patch 289 patch 25 MB patch 227 MB
Under stress, the patch reduces spill bytes by 6-19x and decoding
time by 1.2-2.1x. Master's spill grows as ~10*N^2 bytes (clean
O(N^2) signal: 5000^2 / 1000^2 = 25, spill 247 MB / 9.3 MB = 26.6).
The patch's residual spill is dominated by invalidation-message
entries (still distributed eagerly by design); these are bounded
by MAX_DISTR_INVAL_MSG_PER_TXN.
The decoded change count is identical between master and patch in
every cell (3 changes: BEGIN + INSERT + COMMIT of the long
transaction; DDL transactions emit no test_decoding output), so
the patch preserves decoding output exactly.
Extrapolating to N=100K using the observed quadratic coefficient on
master and linear on patch:
N=100K master ~80 GB spill (vs the 2*N^2 lower-bound
estimate of 20 GB; the observed
coefficient is ~4x higher due to
per-entry ReorderBufferChange
overhead and accumulated
invalidation entries)
patch 0 spill at 64MB ldwm
<500 MB at 64kB ldwm
(residual is invalidation entries, bounded by
MAX_DISTR_INVAL_MSG_PER_TXN)
Scripts to reproduce are in the second patch (v1-0002):
bench/setup_cluster.sh, bench/lazy_snapshot_bench.sh,
bench/run_matrix.sh, bench/aggregate.sh.
== Limitations ==
When the long-running transaction itself has continuous data changes
(e.g. COPY to many tables), K approaches G and lazy distribution
degenerates toward eager behavior:
K small (typical) K~G (bulk COPY)
Eager (before) O(G^2) O(G^2)
Lazy (this patch) O(K*N) O(G^2)
For the bulk COPY scenario, the root cause is that VACUUM's pg_class
statistics updates (relpages, reltuples, relallvisible) are treated as
catalog changes even though they do not affect tuple decoding. A
complementary optimization could filter these non-schema-affecting
catalog modifications from triggering snapshot rebuilds. That is a
separate change with its own considerations and is left as future work.
== Tests ==
1) Isolation test (contrib/test_decoding/specs/lazy_snapshot_distribution.spec):
Three scenarios verifying decoding correctness with lazy distribution:
- Long transaction + multiple ALTER TABLE ADD COLUMN + INSERT with
new schema
- Long transaction + many CREATE TABLE (catalog changes) + INSERT
- Subtransaction + DDL + INSERT with new schema
2) TAP test (contrib/test_decoding/t/002_lazy_snapshot_spill.pl):
Verifies that a long transaction with 200 catalog-modifying DDLs in
between its two INSERTs results in spill_bytes=0, confirming that lazy
distribution eliminates snapshot-caused spilling.
3) All existing tests pass without modification:
- test_decoding: 20 regression + 15 isolation + 1 TAP
- src/test/subscription: 39 TAP tests (581 test cases), covering
streaming, subtransactions, DDL, two-phase commit, etc.
- src/test/recovery: 51 TAP tests (635 test cases), including
006_logical_decoding and 038_save_logical_slots_shutdown
- contrib/pg_logicalinspect: 2 tests
Two patches attached:
v1-0001-Lazy-snapshot-distribution-in-logical-decoding.patch
The lazy distribution change itself plus isolation and TAP tests.
v1-0002-Benchmark-scripts-for-lazy-snapshot-distribution.patch
Reproducer scripts for the measurements above (bench/). Kept as a
separate patch since it lives outside the contrib/ tree and is not
intended for backport or for production builds.
Thanks,
Rui Zhao
Attachments:
[application/octet-stream] v1-0001-Lazy-snapshot-distribution-in-logical-decoding.patch (31.8K, 2-v1-0001-Lazy-snapshot-distribution-in-logical-decoding.patch)
download | inline diff:
From 6c6df5edda0c93d6b58f409dbab9d45289abac4d Mon Sep 17 00:00:00 2001
From: Rui Zhao <[email protected]>
Date: Mon, 11 May 2026 23:52:51 +0800
Subject: [PATCH v1 1/2] Lazy snapshot distribution in logical decoding
When a long-running write transaction in logical decoding coexists with
many concurrent catalog-modifying commits (e.g., autovacuum updating
pg_class stats for many tables, or batch DDL), the reorder buffer's
spill files can grow quadratically. In measurements, master spills
247 MB at N=5000 and 813 MB at N=10000 catalog-modifying commits
during a single long write transaction with K=1 INSERT; linear
extrapolation to N=100K yields ~80 GB.
The trigger requires two conditions:
(i) Something prevents xmin from advancing. This can be the long
write transaction itself, or an unrelated xmin-holder such as
pg_dump's REPEATABLE READ session or a slow logical-replication
consumer.
(ii) A transaction with at least one data change is present in the
reorder buffer. Read-only sessions do not appear in the reorder
buffer and are not victims themselves; they only enable (i).
The root cause is that on every catalog-modifying commit, the snapshot
builder calls SnapBuildDistributeSnapshotAndInval() which appends a new
catalog snapshot to every in-progress transaction's change list. Since
xmin cannot advance, SnapBuildPurgeOlderTxn() cannot purge entries from
builder->committed.xip, and each successive snapshot is larger than the
last. Total disk usage attributed to snapshot distribution is O(N^2)
where N is the number of catalog-modifying commits.
Beyond disk usage, these accumulated snapshots also degrade decoding
performance: memory pressure forces earlier spill, every INTERNAL_SNAPSHOT
change incurs serialize/deserialize I/O, ReorderBufferProcessTXN()
performs an extra binary-heap pop and snapshot_now switch for each one,
and replication lag grows accordingly.
Defer snapshot distribution from commit time to data-change time. A
generation counter (SnapBuild.snapshot_generation) is incremented each
time a new catalog snapshot is built. Each ReorderBufferTXN tracks the
last generation it received. When SnapBuildProcessChange() is called for
a data change, it distributes the current snapshot only if the txn's
generation is behind the builder's.
Invalidation messages are still distributed eagerly because each one
carries unique information that cannot be deduplicated; missing an
invalidation can cause pgoutput to silently drop changes for newly
published tables. Snapshots, by contrast, represent absolute state, so
the latest one supersedes all earlier ones and intermediate ones can be
skipped.
A long-running write transaction with K data changes now accumulates at
most min(K, G) snapshots, where G is the number of catalog-modifying
commits during its lifetime; the generation counter deduplicates further
so consecutive data changes without intervening DDLs share one snapshot.
Disk usage drops from O(N^2) to O(min(K, G) * N).
The hot path (SnapBuildProcessChange for each data change) now performs a
single ReorderBufferTXNByXid() lookup that is reused for both the base
snapshot check and lazy distribution, eliminating redundant hash lookups
that existed in the original code.
The lazy snapshot is distributed to the toplevel transaction (using
txn->xid), consistent with the original eager distribution. When a data
change belongs to a subtransaction, eager invalidation distribution still
places an invalidation change entry in the toplevel's list at the DDL
commit LSN (strictly less than the data change LSN), which ensures the
toplevel is at the binary heap root when equal-LSN comparisons occur, so
the snapshot is processed before the data change.
SNAPBUILD_VERSION is bumped from 6 to 7 because snapshot_generation is
added to the serialized SnapBuild struct.
Tests:
- contrib/test_decoding/specs/lazy_snapshot_distribution.spec: three
isolation test scenarios covering long-tx + DDL + INSERT,
long-tx + many catalog changes + INSERT, and subtransaction + DDL +
INSERT.
- contrib/test_decoding/t/002_lazy_snapshot_spill.pl: verifies that a
long transaction with 200 catalog-modifying DDLs between its two
INSERTs results in spill_bytes = 0.
- All existing test_decoding, subscription, recovery, and
pg_logicalinspect tests pass without modification.
---
contrib/test_decoding/Makefile | 3 +-
.../expected/lazy_snapshot_distribution.out | 70 ++++++++++
contrib/test_decoding/meson.build | 2 +
.../specs/lazy_snapshot_distribution.spec | 81 +++++++++++
.../t/002_lazy_snapshot_spill.pl | 120 ++++++++++++++++
.../replication/logical/reorderbuffer.c | 4 +-
src/backend/replication/logical/snapbuild.c | 130 ++++++++++--------
src/include/replication/reorderbuffer.h | 12 ++
src/include/replication/snapbuild_internal.h | 11 ++
9 files changed, 372 insertions(+), 61 deletions(-)
create mode 100644 contrib/test_decoding/expected/lazy_snapshot_distribution.out
create mode 100644 contrib/test_decoding/specs/lazy_snapshot_distribution.spec
create mode 100644 contrib/test_decoding/t/002_lazy_snapshot_spill.pl
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 0111124399..17e342cba4 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,8 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error catalog_change_snapshot \
- skip_snapshot_restore invalidation_distribution parallel_session_origin
+ skip_snapshot_restore invalidation_distribution parallel_session_origin \
+ lazy_snapshot_distribution
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/lazy_snapshot_distribution.out b/contrib/test_decoding/expected/lazy_snapshot_distribution.out
new file mode 100644
index 0000000000..49b85bfa3a
--- /dev/null
+++ b/contrib/test_decoding/expected/lazy_snapshot_distribution.out
@@ -0,0 +1,70 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s1_begin s1_insert_tbl1 s2_add_col_c1 s2_add_col_c2 s1_insert_tbl2_3col s1_commit s3_get_changes
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 VALUES (1);
+step s2_add_col_c1: ALTER TABLE tbl2 ADD COLUMN c1 integer;
+step s2_add_col_c2: ALTER TABLE tbl2 ADD COLUMN c2 integer;
+step s1_insert_tbl2_3col: INSERT INTO tbl2 VALUES (1, 10, 100);
+step s1_commit: COMMIT;
+step s3_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data
+-------------------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1
+table public.tbl2: INSERT: val1[integer]:1 c1[integer]:10 c2[integer]:100
+COMMIT
+(4 rows)
+
+?column?
+--------
+stop
+(1 row)
+
+
+starting permutation: s1_begin s1_insert_tbl1 s2_create_dummy1 s2_create_dummy2 s2_create_dummy3 s1_insert_tbl1 s1_commit s3_get_changes
+step s1_begin: BEGIN;
+step s1_insert_tbl1: INSERT INTO tbl1 VALUES (1);
+step s2_create_dummy1: CREATE TABLE dummy1 (id int);
+step s2_create_dummy2: CREATE TABLE dummy2 (id int);
+step s2_create_dummy3: CREATE TABLE dummy3 (id int);
+step s1_insert_tbl1: INSERT INTO tbl1 VALUES (1);
+step s1_commit: COMMIT;
+step s3_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data
+------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1
+table public.tbl1: INSERT: val1[integer]:1
+COMMIT
+(4 rows)
+
+?column?
+--------
+stop
+(1 row)
+
+
+starting permutation: s1_begin s1_savepoint s1_insert_tbl1 s2_add_col_c1 s2_add_col_c2 s1_insert_tbl2_3col s1_release s1_commit s3_get_changes
+step s1_begin: BEGIN;
+step s1_savepoint: SAVEPOINT sp1;
+step s1_insert_tbl1: INSERT INTO tbl1 VALUES (1);
+step s2_add_col_c1: ALTER TABLE tbl2 ADD COLUMN c1 integer;
+step s2_add_col_c2: ALTER TABLE tbl2 ADD COLUMN c2 integer;
+step s1_insert_tbl2_3col: INSERT INTO tbl2 VALUES (1, 10, 100);
+step s1_release: RELEASE SAVEPOINT sp1;
+step s1_commit: COMMIT;
+step s3_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data
+-------------------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1
+table public.tbl2: INSERT: val1[integer]:1 c1[integer]:10 c2[integer]:100
+COMMIT
+(4 rows)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index ac655853d2..564d82bd4e 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -66,6 +66,7 @@ tests += {
'skip_snapshot_restore',
'invalidation_distribution',
'parallel_session_origin',
+ 'lazy_snapshot_distribution',
],
'regress_args': [
'--temp-config', files('logical.conf'),
@@ -76,6 +77,7 @@ tests += {
'tap': {
'tests': [
't/001_repl_stats.pl',
+ 't/002_lazy_snapshot_spill.pl',
],
},
}
diff --git a/contrib/test_decoding/specs/lazy_snapshot_distribution.spec b/contrib/test_decoding/specs/lazy_snapshot_distribution.spec
new file mode 100644
index 0000000000..217836a019
--- /dev/null
+++ b/contrib/test_decoding/specs/lazy_snapshot_distribution.spec
@@ -0,0 +1,81 @@
+# Test lazy snapshot distribution: snapshots are no longer eagerly distributed
+# to all in-progress transactions on each catalog-modifying commit. Instead,
+# they are lazily distributed when a transaction actually decodes a data change.
+# This avoids O(N^2) snapshot disk usage when a long-running transaction
+# coexists with many catalog-modifying commits (e.g. autovacuum).
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+ DROP TABLE IF EXISTS tbl1;
+ DROP TABLE IF EXISTS tbl2;
+ DROP TABLE IF EXISTS dummy1;
+ DROP TABLE IF EXISTS dummy2;
+ DROP TABLE IF EXISTS dummy3;
+ CREATE TABLE tbl1 (val1 integer);
+ CREATE TABLE tbl2 (val1 integer);
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS tbl1;
+ DROP TABLE IF EXISTS tbl2;
+ DROP TABLE IF EXISTS dummy1;
+ DROP TABLE IF EXISTS dummy2;
+ DROP TABLE IF EXISTS dummy3;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+# Session s1: long-running transaction
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_begin" { BEGIN; }
+step "s1_insert_tbl1" { INSERT INTO tbl1 VALUES (1); }
+step "s1_insert_tbl2_3col" { INSERT INTO tbl2 VALUES (1, 10, 100); }
+step "s1_savepoint" { SAVEPOINT sp1; }
+step "s1_release" { RELEASE SAVEPOINT sp1; }
+step "s1_commit" { COMMIT; }
+
+# Session s2: performs catalog-modifying operations
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_add_col_c1" { ALTER TABLE tbl2 ADD COLUMN c1 integer; }
+step "s2_add_col_c2" { ALTER TABLE tbl2 ADD COLUMN c2 integer; }
+step "s2_create_dummy1" { CREATE TABLE dummy1 (id int); }
+step "s2_create_dummy2" { CREATE TABLE dummy2 (id int); }
+step "s2_create_dummy3" { CREATE TABLE dummy3 (id int); }
+
+# Session s3: consumes changes
+session "s3"
+setup { SET synchronous_commit=on; }
+step "s3_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+# Scenario 1: Long transaction + multiple DDLs + correct decoding
+#
+# s1 starts a long transaction and inserts into tbl1 (gets base snapshot).
+# s2 performs two ALTER TABLE ADD COLUMN on tbl2.
+# s1 then inserts into tbl2 using the new schema (3 columns).
+# The lazily distributed snapshot at s1's second insert must reflect both
+# ALTER TABLEs so the new columns are visible during decoding.
+permutation "s1_begin" "s1_insert_tbl1" "s2_add_col_c1" "s2_add_col_c2" "s1_insert_tbl2_3col" "s1_commit" "s3_get_changes"
+
+# Scenario 2: Long transaction + many catalog changes + correct decoding
+#
+# s1 starts a long transaction and inserts into tbl1.
+# s2 performs multiple catalog-modifying DDLs (CREATE TABLE), simulating
+# the pattern of autovacuum generating many catalog change commits.
+# s1 inserts again into tbl1 after all the DDLs.
+# Both inserts must be correctly decoded despite many catalog-modifying
+# commits in between. With eager distribution, each CREATE TABLE would
+# have distributed a snapshot to s1; with lazy distribution, only the
+# last snapshot is distributed when s1 does its second insert.
+permutation "s1_begin" "s1_insert_tbl1" "s2_create_dummy1" "s2_create_dummy2" "s2_create_dummy3" "s1_insert_tbl1" "s1_commit" "s3_get_changes"
+
+# Scenario 3: Subtransaction + catalog change + correct decoding
+#
+# s1 starts a transaction with a savepoint, inserts into tbl1 inside the
+# subtransaction. s2 performs ALTER TABLE ADD COLUMN on tbl2. s1 then
+# inserts into tbl2 with the new column, still inside the subtransaction.
+# Both inserts must decode correctly, verifying lazy distribution works
+# with subtransactions.
+permutation "s1_begin" "s1_savepoint" "s1_insert_tbl1" "s2_add_col_c1" "s2_add_col_c2" "s1_insert_tbl2_3col" "s1_release" "s1_commit" "s3_get_changes"
diff --git a/contrib/test_decoding/t/002_lazy_snapshot_spill.pl b/contrib/test_decoding/t/002_lazy_snapshot_spill.pl
new file mode 100644
index 0000000000..1f4551b0a6
--- /dev/null
+++ b/contrib/test_decoding/t/002_lazy_snapshot_spill.pl
@@ -0,0 +1,120 @@
+
+# Copyright (c) 2024-2026, PostgreSQL Global Development Group
+
+# Test that lazy snapshot distribution reduces spill file usage.
+#
+# With the old eager distribution, each catalog-modifying commit (e.g. vacuum,
+# DDL) would distribute a snapshot to every in-progress transaction in the
+# reorder buffer. When a long-running transaction coexists with many such
+# commits, the snapshots accumulate with O(N^2) total size and spill to disk.
+#
+# With lazy distribution, snapshots are only distributed when a transaction
+# actually decodes a data change, so a long-running transaction with few or
+# no data changes receives at most one snapshot regardless of how many
+# catalog-modifying commits happen.
+#
+# Note: invalidation messages are still eagerly distributed and they produce
+# REORDER_BUFFER_CHANGE_INVALIDATION entries that count toward memory usage.
+# We set logical_decoding_work_mem high enough to accommodate invalidation
+# messages while verifying that snapshot distribution does not cause spilling.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', qq(
+synchronous_commit = on
+logical_decoding_work_mem = 1MB
+autovacuum = off
+));
+$node->start;
+
+# Setup
+$node->safe_psql('postgres', qq[
+ CREATE TABLE test_data (id int);
+ SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding');
+]);
+
+# Consume any setup-related changes
+$node->safe_psql('postgres',
+ "SELECT count(*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL)");
+
+# Reset stats
+$node->safe_psql('postgres',
+ "SELECT pg_stat_reset_replication_slot('test_slot')");
+$node->safe_psql('postgres', "SELECT pg_stat_force_next_flush()");
+
+# Start a long-running transaction in a background session.
+# This transaction will be in-progress while many catalog changes happen.
+my $long_txn = $node->background_psql('postgres', on_error_stop => 1);
+$long_txn->query_safe("BEGIN");
+$long_txn->query_safe("INSERT INTO test_data VALUES (1)");
+
+# In the main session, perform many catalog-modifying DDLs.
+# Each DDL commit increments snapshot_generation and would have eagerly
+# distributed a growing snapshot under the old code.
+#
+# With 200 DDLs (each CREATE TABLE is a catalog-modifying commit), the old
+# eager approach would accumulate ~200 snapshots with O(N^2) total size in
+# the long transaction's reorder buffer changes.
+# With lazy distribution, only 1 snapshot is distributed when the long
+# transaction does its next data change.
+my $num_ddls = 200;
+for my $i (1 .. $num_ddls) {
+ $node->safe_psql('postgres', "CREATE TABLE dummy_$i (id int)");
+}
+
+# Insert one more row in the long transaction and commit.
+$long_txn->query_safe("INSERT INTO test_data VALUES (2)");
+$long_txn->query_safe("COMMIT");
+$long_txn->quit;
+
+# Consume the changes to trigger decoding
+my $result = $node->safe_psql('postgres',
+ "SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, " .
+ "'include-xids', '0', 'skip-empty-xacts', '1')");
+
+# Verify the decoded data is correct
+like($result, qr/INSERT: id\[integer\]:1/, 'first INSERT decoded correctly');
+like($result, qr/INSERT: id\[integer\]:2/, 'second INSERT decoded correctly');
+
+# Check spill statistics.
+# With lazy snapshot distribution and logical_decoding_work_mem=1MB, the long
+# transaction should NOT spill. It only contains 2 small INSERTs plus 1
+# lazily-distributed snapshot, well under 1MB.
+#
+# Without the optimization, the long transaction would have accumulated ~200
+# internal snapshots with a total size of ~O(N^2) bytes (~80KB for N=200),
+# plus ~200 invalidation change entries. The combined size could push the
+# transaction over 1MB with larger N.
+$node->safe_psql('postgres', "SELECT pg_stat_force_next_flush()");
+$node->poll_query_until('postgres', qq[
+ SELECT spill_bytes IS NOT NULL
+ FROM pg_stat_replication_slots
+ WHERE slot_name = 'test_slot'
+]) or die "Timed out waiting for stats";
+
+my $spill_bytes = $node->safe_psql('postgres', qq[
+ SELECT spill_bytes
+ FROM pg_stat_replication_slots
+ WHERE slot_name = 'test_slot'
+]);
+
+is($spill_bytes, '0',
+ "lazy snapshot distribution prevents spilling (spill_bytes=$spill_bytes)");
+
+# Cleanup
+for my $i (1 .. $num_ddls) {
+ $node->safe_psql('postgres', "DROP TABLE IF EXISTS dummy_$i");
+}
+$node->safe_psql('postgres', qq[
+ SELECT pg_drop_replication_slot('test_slot');
+ DROP TABLE test_data;
+]);
+
+$node->stop;
+done_testing();
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 682d13c9f2..c63d4e17d3 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -235,7 +235,7 @@ int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED
*/
static ReorderBufferTXN *ReorderBufferAllocTXN(ReorderBuffer *rb);
static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
-static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
+ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
TransactionId xid, bool create, bool *is_new,
XLogRecPtr lsn, bool create_as_top);
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
@@ -650,7 +650,7 @@ ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
* (with the given LSN, and as top transaction if that's specified);
* when this happens, is_new is set to true.
*/
-static ReorderBufferTXN *
+ReorderBufferTXN *
ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
bool *is_new, XLogRecPtr lsn, bool create_as_top)
{
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index c8309b96ed..1c6a94917a 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -172,7 +172,7 @@ static void SnapBuildFreeSnapshot(Snapshot snap);
static void SnapBuildSnapIncRefcount(Snapshot snap);
-static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
+static void SnapBuildDistributeInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
uint32 xinfo);
@@ -655,6 +655,8 @@ SnapBuildResetExportedSnapshotState(void)
bool
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
{
+ ReorderBufferTXN *txn;
+
/*
* We can't handle data in transactions if we haven't built a snapshot
* yet, so don't store them.
@@ -672,10 +674,23 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
return false;
/*
- * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
- * be needed to decode the change we're currently processing.
+ * Look up the transaction once and reuse it for both the base snapshot
+ * check and the lazy catalog snapshot distribution below. This avoids
+ * repeated hash lookups that were previously done separately by
+ * ReorderBufferXidHasBaseSnapshot(), ReorderBufferSetBaseSnapshot(),
+ * and the lazy distribution logic.
*/
- if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+ txn = ReorderBufferTXNByXid(builder->reorder, xid, true,
+ NULL, lsn, true);
+ if (rbtxn_is_known_subxact(txn))
+ txn = ReorderBufferTXNByXid(builder->reorder, txn->toplevel_xid,
+ false, NULL, InvalidXLogRecPtr, false);
+
+ /*
+ * If the reorderbuffer doesn't yet have a base snapshot, add one now,
+ * it will be needed to decode the change we're currently processing.
+ */
+ if (txn->base_snapshot == NULL)
{
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
@@ -694,6 +709,28 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
builder->snapshot);
}
+ /*
+ * Lazily distribute the catalog snapshot to this transaction if it hasn't
+ * received the latest one yet. This replaces the previous eager
+ * distribution in SnapBuildDistributeSnapshotAndInval(), avoiding O(N^2)
+ * disk usage when many catalog-modifying transactions (e.g. autovacuum)
+ * commit while a long-running transaction is in progress.
+ *
+ * We only add a snapshot when the transaction actually has a data change
+ * to decode, so idle long-running transactions won't accumulate any
+ * snapshots at all.
+ */
+ if (builder->snapshot_generation > 0 &&
+ txn->last_snapshot_generation < builder->snapshot_generation)
+ {
+ Assert(builder->snapshot != NULL);
+
+ SnapBuildSnapIncRefcount(builder->snapshot);
+ ReorderBufferAddSnapshot(builder->reorder, txn->xid,
+ lsn, builder->snapshot);
+ txn->last_snapshot_generation = builder->snapshot_generation;
+ }
+
return true;
}
@@ -737,15 +774,20 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
}
/*
- * Add a new Snapshot and invalidation messages to all transactions we're
- * decoding that currently are in-progress so they can see new catalog contents
- * made by the transaction that just committed. This is necessary because those
- * in-progress transactions will use the new catalog's contents from here on
- * (at the very least everything they do needs to be compatible with newer
- * catalog contents).
+ * Distribute invalidation messages to all in-progress transactions so they
+ * can see new catalog contents made by the transaction that just committed.
+ *
+ * Note: we no longer eagerly distribute snapshots here. Instead, snapshots
+ * are lazily distributed in SnapBuildProcessChange() when a transaction
+ * actually needs to decode a data change. This avoids O(N^2) snapshot disk
+ * usage when a long-running transaction coexists with many catalog-modifying
+ * commits (e.g. autovacuum of many tables). The snapshot_generation counter
+ * in SnapBuild tracks when a new snapshot is available, and each transaction's
+ * last_snapshot_generation in ReorderBufferTXN tracks whether it has received
+ * the latest snapshot.
*/
static void
-SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
+SnapBuildDistributeInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
{
dlist_iter txn_i;
ReorderBufferTXN *txn;
@@ -753,8 +795,7 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
/*
* Iterate through all toplevel transactions. This can include
* subtransactions which we just don't yet know to be that, but that's
- * fine, they will just get an unnecessary snapshot and invalidations
- * queued.
+ * fine, they will just get unnecessary invalidations queued.
*/
dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
{
@@ -764,60 +805,25 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
/*
* If we don't have a base snapshot yet, there are no changes in this
- * transaction which in turn implies we don't yet need a snapshot at
- * all. We'll add a snapshot when the first change gets queued.
- *
- * Similarly, we don't need to add invalidations to a transaction
- * whose base snapshot is not yet set. Once a base snapshot is built,
- * it will include the xids of committed transactions that have
- * modified the catalog, thus reflecting the new catalog contents. The
- * existing catalog cache will have already been invalidated after
- * processing the invalidations in the transaction that modified
- * catalogs, ensuring that a fresh cache is constructed during
- * decoding.
- *
- * NB: This works correctly even for subtransactions because
- * ReorderBufferAssignChild() takes care to transfer the base snapshot
- * to the top-level transaction, and while iterating the changequeue
- * we'll get the change from the subtxn.
+ * transaction which in turn implies we don't yet need invalidations.
+ * Once a base snapshot is built, it will include the xids of committed
+ * transactions that have modified the catalog, thus reflecting the new
+ * catalog contents.
*/
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
continue;
/*
- * We don't need to add snapshot or invalidations to prepared
- * transactions as they should not see the new catalog contents.
+ * We don't need to add invalidations to prepared transactions as they
+ * should not see the new catalog contents.
*/
if (rbtxn_is_prepared(txn))
continue;
- elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
- txn->xid, LSN_FORMAT_ARGS(lsn));
-
- /*
- * increase the snapshot's refcount for the transaction we are handing
- * it out to
- */
- SnapBuildSnapIncRefcount(builder->snapshot);
- ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
- builder->snapshot);
-
/*
* Add invalidation messages to the reorder buffer of in-progress
* transactions except the current committed transaction, for which we
* will execute invalidations at the end.
- *
- * It is required, otherwise, we will end up using the stale catcache
- * contents built by the current transaction even after its decoding,
- * which should have been invalidated due to concurrent catalog
- * changing transaction.
- *
- * Distribute only the invalidation messages generated by the current
- * committed transaction. Invalidation messages received from other
- * transactions would have already been propagated to the relevant
- * in-progress transactions. This transaction would have processed
- * those invalidations, ensuring that subsequent transactions observe
- * a consistent cache state.
*/
if (txn->xid != xid)
{
@@ -831,6 +837,9 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
{
Assert(msgs != NULL);
+ elog(DEBUG2, "adding invalidations to %u at %X/%08X",
+ txn->xid, LSN_FORMAT_ARGS(lsn));
+
ReorderBufferAddDistributedInvalidations(builder->reorder,
txn->xid, lsn,
ninvalidations, msgs);
@@ -1097,6 +1106,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
builder->snapshot = SnapBuildBuildSnapshot(builder);
+ /* Track that the catalog snapshot changed */
+ builder->snapshot_generation++;
+
/* we might need to execute invalidations, add snapshot */
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
{
@@ -1109,10 +1121,12 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
SnapBuildSnapIncRefcount(builder->snapshot);
/*
- * Add a new catalog snapshot and invalidations messages to all
- * currently running transactions.
+ * Distribute invalidation messages to all currently running
+ * transactions. Snapshots are distributed lazily in
+ * SnapBuildProcessChange() when a transaction decodes a data
+ * change, avoiding O(N^2) disk usage from snapshot accumulation.
*/
- SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
+ SnapBuildDistributeInval(builder, lsn, xid);
}
}
@@ -1532,7 +1546,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 6
+#define SNAPBUILD_VERSION 7
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index ff825e4b7b..c0ee7ca8ae 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -461,6 +461,14 @@ typedef struct ReorderBufferTXN
/* Size of top-transaction including sub-transactions. */
Size total_size;
+ /*
+ * Tracks the snapshot generation at which this transaction last received
+ * a catalog snapshot via lazy distribution. Compared against
+ * SnapBuild.snapshot_generation to decide if a new snapshot is needed
+ * before decoding a data change.
+ */
+ uint64 last_snapshot_generation;
+
/*
* Private data pointer of the output plugin.
*/
@@ -742,6 +750,10 @@ extern void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunning
extern void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
extern void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
+extern ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
+ TransactionId xid, bool create,
+ bool *is_new, XLogRecPtr lsn,
+ bool create_as_top);
extern void ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn, Snapshot snap);
extern void ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
diff --git a/src/include/replication/snapbuild_internal.h b/src/include/replication/snapbuild_internal.h
index 363f7f5977..f8d20dd1f2 100644
--- a/src/include/replication/snapbuild_internal.h
+++ b/src/include/replication/snapbuild_internal.h
@@ -129,6 +129,17 @@ struct SnapBuild
TransactionId *xip;
} committed;
+ /*
+ * Generation counter, incremented each time a new catalog snapshot is
+ * built due to a catalog-modifying transaction commit. Used for lazy
+ * snapshot distribution: instead of distributing a snapshot to every
+ * in-progress transaction on each catalog-modifying commit, we only
+ * distribute when the transaction actually needs to decode a data change.
+ * This avoids O(N^2) snapshot disk usage when a long-running transaction
+ * coexists with many vacuum commits.
+ */
+ uint64 snapshot_generation;
+
/*
* Array of transactions and subtransactions that had modified catalogs
* and were running when the snapshot was serialized.
--
2.43.7
[application/octet-stream] v1-0002-Benchmark-scripts-for-lazy-snapshot-distribution.patch (15.2K, 3-v1-0002-Benchmark-scripts-for-lazy-snapshot-distribution.patch)
download | inline diff:
From 14e88f283c6513fff07a58b301c350a3cdcc1388 Mon Sep 17 00:00:00 2001
From: Rui Zhao <[email protected]>
Date: Tue, 12 May 2026 16:35:39 +0800
Subject: [PATCH v1 2/2] Benchmark scripts for lazy snapshot distribution
Provides single-run and matrix benchmark scripts to measure the impact
of the lazy snapshot distribution patch on logical decoding spill bytes
and decoding time.
Scenario: one long-running write transaction with K=1 INSERT, coexisting
with N concurrent CREATE/DROP TABLE pairs (each its own catalog-modifying
commit), then drained via pg_logical_slot_get_changes() with the
test_decoding output plugin. Captures spill_bytes, spill_count,
total_bytes, and end-to-end decoding wall time.
Files:
bench/setup_cluster.sh
Spins up a throwaway PG cluster from a specified install directory,
configured for logical decoding (wal_level=logical, max_wal_senders,
max_replication_slots, configurable logical_decoding_work_mem).
bench/lazy_snapshot_bench.sh
Runs a single (N, K) scenario against a running cluster. The long
transaction runs in a background psql session that does K INSERTs
and then pg_sleep()s long enough for the concurrent DDL loop to
complete. DDLs are batched in a single psql session for ~100x
throughput over per-statement connections. Outputs one CSV row.
bench/run_matrix.sh
Driver that iterates over a list of N values with configurable
repeat count (default 3). Emits a CSV with all replicates.
bench/aggregate.sh
Aggregates CSVs (from one or more run_matrix.sh invocations) into
median-per-cell summary tables, plus a side-by-side master vs
patch comparison with computed speedup and bytes-saved ratios.
Usage:
# Build master and patch versions of PostgreSQL
git checkout master
./configure --prefix=/tmp/pg_master_install ... && make install
git checkout lazy-snapshot-distribution
make clean && ./configure --prefix=/tmp/pg_patch_install ... && make install
# Bring up master cluster, run matrix
eval "$(./bench/setup_cluster.sh /tmp/pg_master_install /tmp/pg_master_data 55432)"
./bench/run_matrix.sh master 500 1000 2000 5000 > master.csv
pg_ctl -D /tmp/pg_master_data stop
# Bring up patch cluster on a different port, run matrix in parallel
eval "$(./bench/setup_cluster.sh /tmp/pg_patch_install /tmp/pg_patch_data 55433)"
./bench/run_matrix.sh patch 500 1000 2000 5000 > patch.csv
# Aggregate
cat master.csv patch.csv | ./bench/aggregate.sh -
These scripts produced the empirical data cited in the
"Performance impact" section of the cover letter for v1-0001.
---
bench/aggregate.sh | 99 +++++++++++++++++++++
bench/lazy_snapshot_bench.sh | 161 +++++++++++++++++++++++++++++++++++
bench/run_matrix.sh | 57 +++++++++++++
bench/setup_cluster.sh | 75 ++++++++++++++++
4 files changed, 392 insertions(+)
create mode 100755 bench/aggregate.sh
create mode 100755 bench/lazy_snapshot_bench.sh
create mode 100755 bench/run_matrix.sh
create mode 100755 bench/setup_cluster.sh
diff --git a/bench/aggregate.sh b/bench/aggregate.sh
new file mode 100755
index 0000000000..481e00c549
--- /dev/null
+++ b/bench/aggregate.sh
@@ -0,0 +1,99 @@
+#!/bin/bash
+#
+# Aggregate the CSV from run_matrix.sh into a comparison table suitable for
+# pasting into a pgsql-hackers email.
+#
+# Usage:
+# $0 <combined.csv>
+# or
+# cat master.csv patch.csv | $0 -
+#
+# Outputs, per (N, label) combo: median of decoding_ms / spill_bytes /
+# total_bytes across iterations. Then a side-by-side comparison of master
+# vs patch.
+
+set -euo pipefail
+
+if [[ $# -lt 1 ]]; then
+ echo "Usage: $0 <csv-file> (or '-' for stdin)" >&2
+ exit 1
+fi
+
+INPUT="$1"
+[[ "$INPUT" == "-" ]] && INPUT=/dev/stdin
+
+awk -F, '
+$1 == "label" { next } # skip any header row (works for concatenated CSVs)
+{
+ key = $1 "," $2 # label,N
+ decoding[key, ++cnt_dec[key]] = $5
+ spill[key, ++cnt_sp[key]] = $9
+ total[key, ++cnt_tot[key]] = $10
+ labels[$1] = 1
+ ns[$2 + 0] = 1
+ # remember K (assume constant) and max repeat seen
+ k = $3
+ ldwm = $4
+ if (cnt_dec[key] > rep_max) rep_max = cnt_dec[key]
+}
+END {
+ # median helper baked into END via re-sort per-key (small N, OK)
+ for (key in cnt_dec) {
+ n = cnt_dec[key]
+ # collect into arr
+ delete arr
+ for (i = 1; i <= n; i++) arr[i] = decoding[key, i]
+ asort(arr)
+ if (n % 2) m_dec[key] = arr[(n+1)/2]
+ else m_dec[key] = (arr[n/2] + arr[n/2 + 1]) / 2
+
+ delete arr
+ for (i = 1; i <= n; i++) arr[i] = spill[key, i]
+ asort(arr)
+ if (n % 2) m_sp[key] = arr[(n+1)/2]
+ else m_sp[key] = (arr[n/2] + arr[n/2 + 1]) / 2
+
+ delete arr
+ for (i = 1; i <= n; i++) arr[i] = total[key, i]
+ asort(arr)
+ if (n % 2) m_tot[key] = arr[(n+1)/2]
+ else m_tot[key] = (arr[n/2] + arr[n/2 + 1]) / 2
+ }
+
+ # Sorted N values
+ n_count = 0
+ for (n in ns) sorted_ns[++n_count] = n
+ for (i = 1; i <= n_count; i++)
+ for (j = i + 1; j <= n_count; j++)
+ if (sorted_ns[i] + 0 > sorted_ns[j] + 0) {
+ t = sorted_ns[i]; sorted_ns[i] = sorted_ns[j]; sorted_ns[j] = t
+ }
+
+ printf "Config: K=%s, logical_decoding_work_mem=%s, REPEAT=%d (median)\n\n", k, ldwm, rep_max
+ printf "%-8s %-7s %-14s %-14s %-14s\n", "label", "N", "decode_ms", "spill_bytes", "total_bytes"
+ for (i = 1; i <= n_count; i++) {
+ N = sorted_ns[i]
+ for (lbl in labels) {
+ key = lbl "," N
+ if (key in m_dec)
+ printf "%-8s %-7d %-14d %-14d %-14d\n", lbl, N, m_dec[key], m_sp[key], m_tot[key]
+ }
+ }
+
+ # Side-by-side, if exactly master + patch are present
+ if ("master" in labels && "patch" in labels) {
+ printf "\n%-7s %-12s %-12s %-12s %-12s %-12s %-12s\n", \
+ "N", "master_dec", "patch_dec", "speedup", "master_spill", "patch_spill", "saved_x"
+ for (i = 1; i <= n_count; i++) {
+ N = sorted_ns[i]
+ mk = "master," N; pk = "patch," N
+ if ((mk in m_dec) && (pk in m_dec)) {
+ speedup = (m_dec[pk] > 0) ? m_dec[mk] / m_dec[pk] : 0
+ saved = (m_sp[pk] > 0) ? m_sp[mk] / m_sp[pk] : (m_sp[mk] > 0 ? 999 : 1)
+ printf "%-7d %-12d %-12d %-12.2f %-12d %-12d %-12.2f\n", \
+ N, m_dec[mk], m_dec[pk], speedup, m_sp[mk], m_sp[pk], saved
+ }
+ }
+ }
+}
+' "$INPUT"
diff --git a/bench/lazy_snapshot_bench.sh b/bench/lazy_snapshot_bench.sh
new file mode 100755
index 0000000000..f16a1a0c1f
--- /dev/null
+++ b/bench/lazy_snapshot_bench.sh
@@ -0,0 +1,161 @@
+#!/bin/bash
+#
+# Single benchmark run for "lazy snapshot distribution" patch.
+#
+# Simulates: one long-running transaction (with K inserts) coexists with N
+# concurrent catalog-modifying commits, then drains via pg_logical_slot_get_changes.
+#
+# Output (CSV row to stdout):
+# label,N,K,ldwm,decoding_ms,decoded_changes,spill_txns,spill_count,spill_bytes,total_bytes,ddl_loop_sec
+#
+# Assumes a running PG cluster with wal_level=logical. To compare master vs
+# patch, point this script at two different clusters built from each binary.
+
+set -euo pipefail
+
+# -------- args --------
+N=""
+K="1"
+LABEL=""
+LDWM="64kB"
+SLOT="lazy_bench"
+VERBOSE=0
+
+usage() {
+ cat <<EOF
+Usage: $0 -N <num_ddl_commits> -l <label> [-K <inserts>] [-m <ldwm>] [-s <slot>] [-v]
+
+Required:
+ -N <N> number of concurrent CREATE/DROP TABLE pairs during long txn
+ -l <label> CSV tag (e.g. "master" or "patch")
+
+Options:
+ -K <K> inserts in long txn (default 1)
+ -m <mem> logical_decoding_work_mem (default 64kB; small to amplify spill)
+ -s <slot> slot name (default lazy_bench)
+ -v verbose
+
+PG connection: standard libpq env vars (PGHOST/PGPORT/PGDATABASE/PGUSER).
+EOF
+ exit 1
+}
+
+while getopts "N:K:l:m:s:vh" opt; do
+ case $opt in
+ N) N="$OPTARG" ;;
+ K) K="$OPTARG" ;;
+ l) LABEL="$OPTARG" ;;
+ m) LDWM="$OPTARG" ;;
+ s) SLOT="$OPTARG" ;;
+ v) VERBOSE=1 ;;
+ h|*) usage ;;
+ esac
+done
+
+[[ -z "$N" || -z "$LABEL" ]] && usage
+
+log() { [[ $VERBOSE -eq 1 ]] && echo "[bench] $*" >&2 || true; }
+
+PSQL="psql -X -At -q"
+
+# Quote string for SQL.
+sqlstr() { printf "'%s'" "${1//\'/\'\'}"; }
+
+# -------- preflight --------
+
+# Verify cluster reachable and wal_level=logical.
+wal_level=$($PSQL -c "SHOW wal_level;" || { echo "cannot connect to PG" >&2; exit 1; })
+if [[ "$wal_level" != "logical" ]]; then
+ echo "wal_level must be 'logical' (got '$wal_level')" >&2
+ echo "set: ALTER SYSTEM SET wal_level = logical; then restart" >&2
+ exit 1
+fi
+
+# Set logical_decoding_work_mem for this session via slot creation parameter
+# is not possible; it's a backend GUC. We rely on session-level SET inside the
+# decoding query below.
+
+# -------- setup --------
+log "setup: drop old slot/tables, create fresh slot"
+$PSQL <<SQL >/dev/null
+SELECT pg_drop_replication_slot($(sqlstr "$SLOT"))
+ WHERE EXISTS (SELECT 1 FROM pg_replication_slots
+ WHERE slot_name = $(sqlstr "$SLOT"));
+DROP TABLE IF EXISTS bench_data;
+CREATE TABLE bench_data (i int);
+SELECT pg_create_logical_replication_slot($(sqlstr "$SLOT"), 'test_decoding');
+SELECT pg_stat_reset_replication_slot($(sqlstr "$SLOT"));
+SQL
+
+# -------- long-running txn (background) --------
+# It does K inserts then sleeps long enough for the DDL loop to finish.
+# Generous sleep: 0.05s per DDL + 10s buffer. We don't poll; if DDLs finish
+# early, the long txn just sits idle (harmless for the measurement).
+# Sleep budget covers DDL loop time + safety margin. Tuned for ~1000 DDL/s
+# in single-session mode; conservative 10x factor for slow CI machines.
+SLEEP_SEC=$(awk "BEGIN { printf \"%.2f\", $N * 0.01 + 5 }")
+log "long txn sleep budget: ${SLEEP_SEC}s"
+
+(
+ $PSQL <<SQL
+BEGIN;
+INSERT INTO bench_data SELECT generate_series(1, $K);
+SELECT pg_sleep($SLEEP_SEC);
+COMMIT;
+SQL
+) &
+LONG_PID=$!
+
+# Wait for the long txn to actually start its INSERT before kicking off DDLs.
+sleep 1
+
+# -------- concurrent DDL loop --------
+# Single psql session, one statement per line. Each unwrapped CREATE/DROP
+# auto-commits as its own transaction, so this generates N catalog-modifying
+# commits in WAL, identical to N separate psql -c calls but ~100x faster.
+log "running $N CREATE/DROP TABLE pairs in a single psql session"
+DDL_START=$(date +%s.%N)
+{
+ echo '\set ON_ERROR_STOP on'
+ for i in $(seq 1 "$N"); do
+ printf 'CREATE TABLE bench_t%d (a int); DROP TABLE bench_t%d;\n' "$i" "$i"
+ done
+} | $PSQL -f - >/dev/null
+DDL_END=$(date +%s.%N)
+DDL_LOOP_SEC=$(awk "BEGIN { printf \"%.2f\", $DDL_END - $DDL_START }")
+log "DDL loop done in ${DDL_LOOP_SEC}s"
+
+# Wait for long txn to commit.
+wait "$LONG_PID"
+log "long txn committed"
+
+# -------- decode + measure --------
+log "draining slot via pg_logical_slot_get_changes"
+DECODE_START=$(date +%s.%N)
+DECODED=$($PSQL <<SQL
+SET logical_decoding_work_mem = '$LDWM';
+SELECT count(*) FROM pg_logical_slot_get_changes(
+ $(sqlstr "$SLOT"), NULL, NULL,
+ 'include-xids', '0', 'skip-empty-xacts', '1'
+);
+SQL
+)
+DECODE_END=$(date +%s.%N)
+DECODE_MS=$(awk "BEGIN { printf \"%.0f\", ($DECODE_END - $DECODE_START) * 1000 }")
+log "decoded $DECODED changes in ${DECODE_MS}ms"
+
+# Capture stats (the slot's lifetime stats; we reset at setup, so this is just
+# the current decoding run).
+STATS=$($PSQL -c "
+SELECT spill_txns || ',' || spill_count || ',' || spill_bytes || ',' || total_bytes
+FROM pg_stat_replication_slots
+WHERE slot_name = $(sqlstr "$SLOT");")
+
+# -------- cleanup --------
+$PSQL <<SQL >/dev/null
+SELECT pg_drop_replication_slot($(sqlstr "$SLOT"));
+DROP TABLE bench_data;
+SQL
+
+# -------- output --------
+echo "$LABEL,$N,$K,$LDWM,$DECODE_MS,$DECODED,$STATS,$DDL_LOOP_SEC"
diff --git a/bench/run_matrix.sh b/bench/run_matrix.sh
new file mode 100755
index 0000000000..626941c174
--- /dev/null
+++ b/bench/run_matrix.sh
@@ -0,0 +1,57 @@
+#!/bin/bash
+#
+# Run lazy_snapshot_bench.sh across a matrix of N values and emit one CSV.
+#
+# This script runs against a SINGLE PG cluster (it does not switch binaries).
+# To compare master vs patch, run this twice with the appropriate cluster
+# running each time, then concat the two CSVs.
+#
+# Example:
+# # Cluster is built from master code
+# $0 master 500 1000 2000 5000 > master.csv
+#
+# # ... rebuild with patch applied, restart cluster ...
+# $0 patch 500 1000 2000 5000 > patch.csv
+#
+# # combine
+# cat master.csv patch.csv > both.csv
+#
+# Each row is run REPEAT times (default 3) and all replicates are output.
+# Aggregate (min/median/max) externally with awk/Python/pandas.
+
+set -euo pipefail
+
+REPEAT=${REPEAT:-3}
+K=${K:-1}
+LDWM=${LDWM:-64kB}
+
+HERE="$(cd "$(dirname "$0")" && pwd)"
+BENCH="$HERE/lazy_snapshot_bench.sh"
+
+if [[ $# -lt 2 ]]; then
+ cat <<EOF >&2
+Usage: $0 <label> <N> [<N> ...]
+ label: "master" | "patch" | any tag
+ N: one or more values to test
+Environment:
+ REPEAT (default 3) — runs per (label, N) combo
+ K (default 1) — inserts in long txn
+ LDWM (default 64kB) — logical_decoding_work_mem (small to amplify spill)
+EOF
+ exit 1
+fi
+
+LABEL="$1"
+shift
+N_VALUES=("$@")
+
+# header
+echo "label,N,K,ldwm,decoding_ms,decoded_changes,spill_txns,spill_count,spill_bytes,total_bytes,ddl_loop_sec,iter"
+
+for N in "${N_VALUES[@]}"; do
+ for ((i = 1; i <= REPEAT; i++)); do
+ echo "==> $LABEL N=$N iter=$i/$REPEAT" >&2
+ row=$("$BENCH" -N "$N" -K "$K" -m "$LDWM" -l "$LABEL")
+ echo "$row,$i"
+ done
+done
diff --git a/bench/setup_cluster.sh b/bench/setup_cluster.sh
new file mode 100755
index 0000000000..fdc2fcf7ed
--- /dev/null
+++ b/bench/setup_cluster.sh
@@ -0,0 +1,75 @@
+#!/bin/bash
+#
+# Spin up a throwaway PG cluster from a specified install dir, configured for
+# logical decoding, and print env vars to source.
+#
+# Usage:
+# eval "$(./setup_cluster.sh /tmp/pg_master_install /tmp/pg_master_data 55432)"
+# # now PGPORT, PGHOST etc. point at the new cluster
+#
+# Stop with:
+# $PGBINDIR/pg_ctl -D $PGDATA stop
+
+set -euo pipefail
+
+if [[ $# -lt 1 ]]; then
+ cat <<EOF >&2
+Usage: $0 <pg_install_dir> [datadir] [port]
+ pg_install_dir: e.g. /tmp/pg_master_install (contains bin/initdb etc.)
+ datadir: default \$pg_install_dir/data
+ port: default 55432
+EOF
+ exit 1
+fi
+
+PGBINDIR="$1/bin"
+PGDATA="${2:-$1/data}"
+PGPORT="${3:-55432}"
+
+[[ ! -x "$PGBINDIR/initdb" ]] && { echo "no initdb at $PGBINDIR" >&2; exit 1; }
+
+# Stop if a previous cluster is running here.
+if [[ -f "$PGDATA/postmaster.pid" ]]; then
+ echo "stopping existing cluster at $PGDATA" >&2
+ "$PGBINDIR/pg_ctl" -D "$PGDATA" stop -m fast || true
+ sleep 1
+fi
+
+if [[ ! -d "$PGDATA/base" ]]; then
+ echo "initdb at $PGDATA" >&2
+ "$PGBINDIR/initdb" -D "$PGDATA" --auth=trust >/dev/null
+fi
+
+# Apply logical-decoding-friendly settings.
+cat >> "$PGDATA/postgresql.auto.conf" <<EOF
+# bench/setup_cluster.sh
+wal_level = logical
+max_wal_senders = 10
+max_replication_slots = 10
+shared_buffers = 256MB
+logical_decoding_work_mem = 64kB
+port = $PGPORT
+unix_socket_directories = '$PGDATA'
+EOF
+
+echo "starting cluster" >&2
+"$PGBINDIR/pg_ctl" -D "$PGDATA" -l "$PGDATA/server.log" start
+
+# Wait until ready.
+for i in {1..20}; do
+ if "$PGBINDIR/pg_isready" -h "$PGDATA" -p "$PGPORT" >/dev/null 2>&1; then
+ break
+ fi
+ sleep 0.5
+done
+
+# Emit env-vars to eval.
+cat <<EOF
+export PGBINDIR=$PGBINDIR
+export PGDATA=$PGDATA
+export PGHOST=$PGDATA
+export PGPORT=$PGPORT
+export PGDATABASE=postgres
+export PGUSER=$USER
+export PATH=$PGBINDIR:\$PATH
+EOF
--
2.43.7
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected]
Subject: Re: Lazy snapshot distribution in logical decoding
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox