Received: from malur.postgresql.org ([217.196.149.56]) by arkaria.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.96) (envelope-from ) id 1wNZDy-000uQy-22 for pgsql-hackers@arkaria.postgresql.org; Thu, 14 May 2026 16:47:51 +0000 Received: from localhost ([127.0.0.1] helo=malur.postgresql.org) by malur.postgresql.org with esmtp (Exim 4.96) (envelope-from ) id 1wNX5n-00D8tr-0B for pgsql-hackers@arkaria.postgresql.org; Thu, 14 May 2026 14:31:15 +0000 Received: from makus.postgresql.org ([2001:4800:3e1:1::229]) by malur.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 (Exim 4.96) (envelope-from ) id 1wMivL-0041x0-3C for pgsql-hackers@lists.postgresql.org; Tue, 12 May 2026 08:57:08 +0000 Received: from mail-pj1-x102c.google.com ([2607:f8b0:4864:20::102c]) by makus.postgresql.org with esmtps (TLS1.3) tls TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 (Exim 4.98.2) (envelope-from ) id 1wMivF-00000000B1F-2sYG for pgsql-hackers@lists.postgresql.org; Tue, 12 May 2026 08:57:07 +0000 Received: by mail-pj1-x102c.google.com with SMTP id 98e67ed59e1d1-36523acb0c1so3690251a91.0 for ; Tue, 12 May 2026 01:57:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20251104; t=1778576221; x=1779181021; darn=lists.postgresql.org; h=to:date:message-id:subject:mime-version:from:from:to:cc:subject :date:message-id:reply-to; bh=4wDnUvJATlfbektWaItEDFfLm6fM462y4Am2ClaR8XI=; b=UKR2OXX0oYXisFxkiOxxFSX5MuYGloxCnmAAODbKfxFUcMNd4bPVcw2+iwIO0gFRm+ gSSoKquuB8Nsak1at8brn0fWiJj+YldjRD46+kh5iKzKoqHREvQSvQZUbHV6gCAxyzBx 3zKWdoxQ7OhmMq+AJM5oCFRBUPYxAC+/7eoYooEWDucSW7qOe10xih+4e9fsvVBkp5Gl 7l9BplOMO0AyavLmyYu06vSqYyVBi8OSheHTBS9bqpp/cC1UbKElAaVGNh3ywAr3UdJs ZOuRiyN4Fa5cpzrk3uROIq0UEiHdsex5vZf+LzhlzO7mNVMTLby0Q2olal6HPJB8X92J Ux5g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20251104; t=1778576221; x=1779181021; h=to:date:message-id:subject:mime-version:from:x-gm-gg :x-gm-message-state:from:to:cc:subject:date:message-id:reply-to; bh=4wDnUvJATlfbektWaItEDFfLm6fM462y4Am2ClaR8XI=; b=RAF/s4Cs+mkcgGCbOg7lKIhpOdh+8cpZdEziGHaF8nYYl6klPybqWUBStsdKngQhjL fHpgaWqbbz1fzddbfyE2oxVvpE1hFSl+fGAbHlLkyXce8Bf0B0Z+dJhfQycYzzNLEBYz 8UZ2/Smg6I9SdKWP29NfbBG4HWraI41ULnz0uwKNlWPlQESODl2RypP9pK2uCxm/Btb+ LvJnuIN8XcihjJ11dfNjGMduLN816ClkIjGH8f+jL5xV1JLtoShmGVgfR0P9M+7iBFEJ VMEtBf0AH9dOQNaEuQO+rdV62jS5qeScZmn1CYr7GuvGgqgNKDcHPUGn3ERKrplyyN3e peBA== X-Gm-Message-State: AOJu0YytFVO2wgrxhRkO46ZuuQ1MQfO8Fu9jAWq2KMvGTS9gbz0hHVbD 0wgl2GiSYxHKmVg564Jmeol1c/L/998QeCPa/PaaKKV/G5aRc1T5EBX6c6ig6u6USPM= X-Gm-Gg: Acq92OFwLcCUObTPJIXHX6NOpxaeFY/M1JaraKk0l/odxAH0vM3WXiUdw2hUG2Jc1DJ cGvdrUk97a9X3858tHTe/poXPFCFh6srVZSu/pF8nfkPFuDCILXcTa15P6WvZInzZAohuFG2zh5 ciTxbARzoy5TKuhoIw5CTPNa6qxUiM0UN+D7O4yBDdwLA5+PdhiLbbH1Ro8LxvHYNcRSgQl7sNJ q0221CVWD1hIo3X5GJRa1uk9G9POFBhGKbTT84RHe1gPqhzqToRmi4KPocxTEuQxZgASQzDE6+3 2kXkHRYCo/DzJViVVv8QP7/EVVYxyyVLJowwdLMNgaVUFnyijoRTC8NAZPWm0pXlxnm7Rv9NwQq lspQLHMgePDt6z/u7kNkciuCwLQAlpkBPMVH5nTKgcH8ieeVtgZg+SQJQCOwFrq3FRv4EdKjm6+ OKCfDKiIAQhFk2atm7d97kDL+O1us= X-Received: by 2002:a17:90b:5824:b0:366:159a:c17e with SMTP id 98e67ed59e1d1-368b24b5e90mr2459583a91.2.1778576220364; Tue, 12 May 2026 01:57:00 -0700 (PDT) Received: from smtpclient.apple ([8.219.78.154]) by smtp.gmail.com with ESMTPSA id 98e67ed59e1d1-368ac4cfc83sm1085100a91.0.2026.05.12.01.56.57 for (version=TLS1_2 cipher=ECDHE-ECDSA-AES128-GCM-SHA256 bits=128/128); Tue, 12 May 2026 01:56:59 -0700 (PDT) From: Rui Zhao Content-Type: multipart/mixed; boundary="Apple-Mail=_A216317C-5AD6-4376-83E2-C97A11AF6F22" Mime-Version: 1.0 (Mac OS X Mail 16.0 \(3774.500.171.1.1\)) Subject: Lazy snapshot distribution in logical decoding Message-Id: <578D6ABA-FC6B-4E08-955F-3493BA6FACA1@gmail.com> Date: Tue, 12 May 2026 16:55:23 +0800 To: pgsql-hackers@lists.postgresql.org X-Mailer: Apple Mail (2.3774.500.171.1.1) List-Id: List-Help: List-Subscribe: List-Post: List-Owner: List-Archive: Archived-At: Precedence: bulk --Apple-Mail=_A216317C-5AD6-4376-83E2-C97A11AF6F22 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=us-ascii Hi hackers, When a long-running write transaction coexists with many concurrent catalog-modifying commits (e.g., autovacuum updating pg_class stats for many tables, or batch DDL), the reorder buffer's spill files for logical decoding can grow quadratically, reaching ~80GB in our production with N=3D100K extrapolation. This patch eliminates the problem by deferring snapshot distribution from commit-time to data-change-time. =3D=3D Problem =3D=3D The trigger requires two conditions: (i) Something prevents xmin from advancing. This can be the long = write transaction itself (its own XID holds xmin back), or an unrelated xmin-holder like pg_dump's REPEATABLE READ session or a slow logical-replication consumer. (ii) A transaction that has at least one data change is present in the reorder buffer. Pure read-only sessions (pg_dump, long SELECT) = do not appear in the reorder buffer and are not victims themselves; they only enable condition (i). When both are met, the chain reaction runs across four stages: 1) VACUUM updates pg_class statistics via = heap_inplace_update_and_unlock(), which triggers CacheInvalidateHeapTuple(). At command end, LogLogicalInvalidations() emits an XLOG_XACT_INVALIDATIONS WAL = record. 2) During logical decoding, xact_decode() processes = XLOG_XACT_INVALIDATIONS by calling ReorderBufferXidSetCatalogChanges(), marking the VACUUM transaction as having catalog changes. 3) When the VACUUM transaction commits, SnapBuildCommitTxn() sees it has catalog changes, builds a new snapshot via SnapBuildBuildSnapshot(), = and calls SnapBuildDistributeSnapshotAndInval() to distribute the = snapshot to ALL in-progress transactions. 4) The snapshot includes the entire builder->committed.xip array. Since = a long-running transaction prevents SnapBuildPurgeOlderTxn() from = cleaning old entries (builder->xmin cannot advance), the committed array grows monotonically. Each successive snapshot is larger than the last: Snapshot N contains N xids, size =3D 192 + 4*N bytes Total disk usage =3D sum(i=3D1..N)(192 + 4*i) =3D 2*N^2 bytes In our measurements (see Performance impact below), master spills 247 MB at N=3D5000 and 813 MB at N=3D10000, scaling cleanly as = O(N^2). Extrapolation to N=3D100K yields ~80 GB. Beyond disk usage, these accumulated snapshots also degrade decoding performance: a) Memory pressure: the snapshots consume logical_decoding_work_mem quickly, forcing the reorder buffer to spill to disk much earlier than necessary. b) Spill/restore I/O: each INTERNAL_SNAPSHOT change is serialized during spill and deserialized during commit-time processing, adding significant disk I/O for what is essentially dead data. c) Commit-time iteration overhead: ReorderBufferProcessTXN() iterates all changes via a binary heap. 100K extra INTERNAL_SNAPSHOT entries mean 100K additional heap pop operations and snapshot_now switches (refcount inc/dec, potential old snapshot free). d) Replication lag: the combined CPU and I/O overhead directly increases the time between a transaction's commit and its decoded output reaching subscribers. =3D=3D Why not filter VACUUM? =3D=3D One might consider filtering out VACUUM-generated = XLOG_XACT_INVALIDATIONS so they don't trigger snapshot rebuilds. However, the O(N^2) problem is not specific to VACUUM. Any workload that generates many = catalog-modifying commits triggers it: batch partition creation, online migration tools running thousands of DDLs, extension installations creating many = functions and types, etc. A general solution at the distribution layer is needed. =3D=3D Prior work =3D=3D The distribution function modified by this patch was last touched by the "long-standing data loss bug in initial sync of logical replication" thread started by Tomas Vondra in 2023 [1], committed by Amit Kapila in 2025. That fix renamed SnapBuildDistributeNewCatalogSnapshot to SnapBuildDistributeSnapshotAndInval and added invalidation distribution alongside the existing snapshot distribution, to ensure that in-progress transactions invalidate their relcache when concurrent publication DDL changes the set of published tables. The commit message acknowledged "some performance regression ... primarily during frequent execution of publication DDL statements". The present patch addresses the snapshot-distribution side of that overhead while keeping the invalidation-distribution semantics unchanged, since invalidations cannot be deduplicated the way snapshots can (see "Correctness" below). Related but orthogonal work in the same area: - Sawada's "Using per-transaction memory contexts for storing decoded tuples" [2] addresses a different root cause of decoding memory bloat under long-running transactions: GenerationContext fragmentation in the reorder buffer's tuple context. His patch is complementary; both contribute to keeping decoding memory usage within logical_decoding_work_mem. - Tachoires's "Compress ReorderBuffer spill files using LZ4" [3] reduces the disk cost of spilled changes via compression. The two approaches are complementary: this patch reduces what needs to be spilled in the first place (eliminating O(N^2) INTERNAL_SNAPSHOT entries for write transactions with few data changes), while [3] reduces the cost of whatever does spill. [1] = https://www.postgresql.org/message-id/flat/de52b282-1166-1180-45a2-8d8917c= a74c6%40enterprisedb.com [2] = https://www.postgresql.org/message-id/flat/CAD21AoBTY1LATZUmvSXEssvq07qDZu= fV4AF-OHh9VD2pC0VY2A%40mail.gmail.com [3] = https://www.postgresql.org/message-id/flat/CAA4eK1Kd6vP1g5pJXoiPgfLQsn54he= csc06hxpNWV-9_xMa%2B5Q%40mail.gmail.com =3D=3D Solution: Lazy Snapshot Distribution =3D=3D Instead of distributing a snapshot to every in-progress transaction on = each catalog-modifying commit, we defer distribution until a transaction = actually needs to decode a data change (in SnapBuildProcessChange()). A generation counter (SnapBuild.snapshot_generation) is incremented each time a new catalog snapshot is built. Each transaction tracks the last generation it received (ReorderBufferTXN.last_snapshot_generation). = When SnapBuildProcessChange() is called for a data change, it checks whether = the transaction's generation is behind the builder's and distributes the = current snapshot only if needed. This means: - A long-running write transaction with K data changes accumulates at most min(K, G) snapshots in its change list (where G is the number of catalog-modifying commits during its lifetime), instead of G snapshots each of growing size. The generation counter deduplicates further: consecutive data changes without intervening DDLs share one snapshot. - Total disk usage attributable to snapshot distribution drops from O(N^2) to O(min(K, G) * N), where N is the committed xid array size. For the common production case where K << G (e.g. a long batch INSERT during a vacuum storm), the savings are substantial. Invalidation messages are still distributed eagerly as before, since = they are small (100K vacuum commits =3D 14MB) and already have overflow = protection via MAX_DISTR_INVAL_MSG_PER_TXN. =3D=3D Correctness =3D=3D The lazy approach is equivalent to eager distribution because: 1) WAL is processed strictly in LSN order. When = SnapBuildProcessChange() is called at LSN X, builder->snapshot reflects exactly all catalog- modifying commits with LSN < X. It is neither "too new" (future = commits haven't been processed) nor "too old" (all past commits are = included). 2) The committed array is append-only (only grows, never shrinks while a long transaction holds xmin back). Snapshot N is a superset of = snapshots 1..N-1. Skipping intermediate snapshots does not affect visibility. 3) In the original code, when a transaction's changes are decoded at = commit time, each data change uses the most recent snapshot preceding it in = the change list. Intermediate snapshots that have no data changes = between them are effectively unused. Lazy distribution simply avoids = creating these unused entries. 4) This works correctly with streaming mode (changes decoded before = commit) because SnapBuildProcessChange() is always called before each data = change regardless of whether the change is later streamed or buffered. 5) The lazy snapshot is distributed to the toplevel transaction (using txn->xid), consistent with the original eager distribution. When a data change belongs to a subtransaction, the snapshot and the data change reside in different change lists (toplevel vs subtransaction). This is safe because eager invalidation distribution always places an invalidation change entry in the toplevel's list at the DDL commit LSN (which is strictly less than the data change LSN). During binary heap iteration, this invalidation entry ensures the toplevel is at the heap root when the equal-LSN comparison between the snapshot and the data change occurs, so the snapshot is always processed first. =3D=3D Why not eliminate ReorderBufferAddSnapshot entirely? =3D=3D One might ask: if we can defer snapshot distribution, why not skip = storing snapshots in the reorder buffer altogether and just use = builder->snapshot directly at decode time? This doesn't work because DecodeInsert/DecodeUpdate only package the raw WAL tuple into a ReorderBufferChange and enqueue it. The actual catalog lookup (RelationIdGetRelation, tuple interpretation) happens later = during commit-time processing in ReorderBufferProcessTXN. At that point, builder->snapshot has advanced past all changes and no longer reflects = the catalog state at each individual change's LSN. The INTERNAL_SNAPSHOT entries in the change list serve as markers that = tell commit-time processing "switch to this catalog snapshot from here on." Without them, all changes would use the base_snapshot, and an INSERT = that happened before an ALTER TABLE ADD COLUMN would be decoded with the new schema, producing incorrect output (extra columns with NULL values). Since each INTERNAL_SNAPSHOT entry serves all subsequent data changes = until the next generation bump, the total number of entries is min(K, G), = which is already minimal. =3D=3D Notes =3D=3D - RBTXN_HAS_CATALOG_CHANGES is still needed: it is used by SnapBuildCommitTxn() to decide whether to rebuild the snapshot, ReorderBufferBuildTupleCidHash() for combo CID handling, and SnapBuildSerialize() for serialization. This patch does not change its semantics. - SNAPBUILD_VERSION is bumped from 6 to 7 because snapshot_generation is added to the SnapBuild struct which is serialized to disk. On upgrade, existing serialized snapshots will be invalidated and rebuilt automatically, which is the standard behavior for version mismatches. =3D=3D Changes =3D=3D src/include/replication/snapbuild_internal.h: - Add uint64 snapshot_generation to struct SnapBuild src/include/replication/reorderbuffer.h: - Add uint64 last_snapshot_generation to ReorderBufferTXN - Declare ReorderBufferTXNByXid() as extern src/backend/replication/logical/snapbuild.c: - SnapBuildCommitTxn(): increment snapshot_generation after building a new snapshot - Rename SnapBuildDistributeSnapshotAndInval() to = SnapBuildDistributeInval() and remove snapshot distribution code; keep invalidation = distribution - SnapBuildProcessChange(): do a single ReorderBufferTXNByXid() = lookup, reuse the txn pointer for both base snapshot check and lazy catalog snapshot distribution. The lazy snapshot is distributed to the toplevel transaction, consistent with the original eager approach. This also eliminates redundant hash lookups that existed in the original code path. - Bump SNAPBUILD_VERSION from 6 to 7 src/backend/replication/logical/reorderbuffer.c: - Remove 'static' from ReorderBufferTXNByXid() =3D=3D Performance impact =3D=3D Theoretical: Before After Snapshot distributions G (per commit) 0 or 1 (per data change) Disk usage growth O(N^2) O(min(K, G) * N) where N =3D committed xid array size at the time of the K-th data = change, G =3D number of catalog-modifying commits during the long txn, K =3D number of data changes in the long txn. The hot path (SnapBuildProcessChange for each data change) now does a = single hash lookup instead of 2-4 separate lookups in the original code, so = there is no performance regression for the common case. Measured (single-threaded benchmark, scripts attached): Scenario: one long-running transaction with K=3D1 INSERT, coexisting = with N concurrent CREATE/DROP TABLE pairs (each its own catalog-modifying commit), then drained via pg_logical_slot_get_changes() with the test_decoding output plugin. 3-run median per cell. logical_decoding_work_mem =3D 64MB (production-like default): N decode_ms spill_bytes total_bytes ---- ------------- ------------- ------------- 1000 master 45 master 0 master 22 MB patch 39 patch 0 patch 13 MB 5000 master 629 master 207 MB master 429 MB patch 261 patch 0 patch 227 MB 10000 master 1865 master 813 MB master 1659 MB patch 814 patch 0 patch 855 MB At ldwm=3D64MB the patched code does not spill at all up to N=3D10000;= master spills 207 MB at N=3D5000 and 813 MB at N=3D10000, scaling quadratically in N. Decoding time drops 2.3-2.4x at N>=3D5000, purely from avoided spill I/O. Memory residency (total_bytes) is halved as well, because INTERNAL_SNAPSHOT entries no longer accumulate in the long transaction's change list. logical_decoding_work_mem =3D 64kB (stress, forces spill even at small = N): N decode_ms spill_bytes total_bytes ---- ------------- ------------- ------------- 500 master 28 master 2.6 MB master 6.9 MB patch 24 patch 0.4 MB patch 4.7 MB 1000 master 57 master 9.3 MB master 22 MB patch 42 patch 0.9 MB patch 13 MB 2000 master 143 master 35 MB master 76 MB patch 82 patch 1.8 MB patch 43 MB 5000 master 607 master 247 MB master 429 MB patch 289 patch 25 MB patch 227 MB Under stress, the patch reduces spill bytes by 6-19x and decoding time by 1.2-2.1x. Master's spill grows as ~10*N^2 bytes (clean O(N^2) signal: 5000^2 / 1000^2 =3D 25, spill 247 MB / 9.3 MB =3D = 26.6). The patch's residual spill is dominated by invalidation-message entries (still distributed eagerly by design); these are bounded by MAX_DISTR_INVAL_MSG_PER_TXN. The decoded change count is identical between master and patch in every cell (3 changes: BEGIN + INSERT + COMMIT of the long transaction; DDL transactions emit no test_decoding output), so the patch preserves decoding output exactly. Extrapolating to N=3D100K using the observed quadratic coefficient on master and linear on patch: N=3D100K master ~80 GB spill (vs the 2*N^2 lower-bound estimate of 20 GB; the observed coefficient is ~4x higher due = to per-entry ReorderBufferChange overhead and accumulated invalidation entries) patch 0 spill at 64MB ldwm <500 MB at 64kB ldwm (residual is invalidation entries, bounded by MAX_DISTR_INVAL_MSG_PER_TXN) Scripts to reproduce are in the second patch (v1-0002): bench/setup_cluster.sh, bench/lazy_snapshot_bench.sh, bench/run_matrix.sh, bench/aggregate.sh. =3D=3D Limitations =3D=3D When the long-running transaction itself has continuous data changes (e.g. COPY to many tables), K approaches G and lazy distribution degenerates toward eager behavior: K small (typical) K~G (bulk COPY) Eager (before) O(G^2) O(G^2) Lazy (this patch) O(K*N) O(G^2) For the bulk COPY scenario, the root cause is that VACUUM's pg_class statistics updates (relpages, reltuples, relallvisible) are treated as catalog changes even though they do not affect tuple decoding. A complementary optimization could filter these non-schema-affecting catalog modifications from triggering snapshot rebuilds. That is a separate change with its own considerations and is left as future work. =3D=3D Tests =3D=3D 1) Isolation test = (contrib/test_decoding/specs/lazy_snapshot_distribution.spec): Three scenarios verifying decoding correctness with lazy = distribution: - Long transaction + multiple ALTER TABLE ADD COLUMN + INSERT with new schema - Long transaction + many CREATE TABLE (catalog changes) + INSERT - Subtransaction + DDL + INSERT with new schema 2) TAP test (contrib/test_decoding/t/002_lazy_snapshot_spill.pl): Verifies that a long transaction with 200 catalog-modifying DDLs in between its two INSERTs results in spill_bytes=3D0, confirming that = lazy distribution eliminates snapshot-caused spilling. 3) All existing tests pass without modification: - test_decoding: 20 regression + 15 isolation + 1 TAP - src/test/subscription: 39 TAP tests (581 test cases), covering streaming, subtransactions, DDL, two-phase commit, etc. - src/test/recovery: 51 TAP tests (635 test cases), including 006_logical_decoding and 038_save_logical_slots_shutdown - contrib/pg_logicalinspect: 2 tests Two patches attached: v1-0001-Lazy-snapshot-distribution-in-logical-decoding.patch The lazy distribution change itself plus isolation and TAP tests. v1-0002-Benchmark-scripts-for-lazy-snapshot-distribution.patch Reproducer scripts for the measurements above (bench/). Kept as a separate patch since it lives outside the contrib/ tree and is not intended for backport or for production builds. Thanks, Rui Zhao --Apple-Mail=_A216317C-5AD6-4376-83E2-C97A11AF6F22 Content-Disposition: attachment; filename=v1-0001-Lazy-snapshot-distribution-in-logical-decoding.patch Content-Type: application/octet-stream; x-unix-mode=0644; name="v1-0001-Lazy-snapshot-distribution-in-logical-decoding.patch" Content-Transfer-Encoding: quoted-printable =46rom=206c6df5edda0c93d6b58f409dbab9d45289abac4d=20Mon=20Sep=2017=20= 00:00:00=202001=0AFrom:=20Rui=20Zhao=20=0ADate:=20= Mon,=2011=20May=202026=2023:52:51=20+0800=0ASubject:=20[PATCH=20v1=20= 1/2]=20Lazy=20snapshot=20distribution=20in=20logical=20decoding=0A=0A= When=20a=20long-running=20write=20transaction=20in=20logical=20decoding=20= coexists=20with=0Amany=20concurrent=20catalog-modifying=20commits=20= (e.g.,=20autovacuum=20updating=0Apg_class=20stats=20for=20many=20tables,=20= or=20batch=20DDL),=20the=20reorder=20buffer's=0Aspill=20files=20can=20= grow=20quadratically.=20=20In=20measurements,=20master=20spills=0A247=20= MB=20at=20N=3D5000=20and=20813=20MB=20at=20N=3D10000=20catalog-modifying=20= commits=0Aduring=20a=20single=20long=20write=20transaction=20with=20K=3D1=20= INSERT;=20linear=0Aextrapolation=20to=20N=3D100K=20yields=20~80=20GB.=0A=0A= The=20trigger=20requires=20two=20conditions:=0A=0A=20=20(i)=20=20= Something=20prevents=20xmin=20from=20advancing.=20=20This=20can=20be=20= the=20long=0A=20=20=20=20=20=20=20write=20transaction=20itself,=20or=20= an=20unrelated=20xmin-holder=20such=20as=0A=20=20=20=20=20=20=20= pg_dump's=20REPEATABLE=20READ=20session=20or=20a=20slow=20= logical-replication=0A=20=20=20=20=20=20=20consumer.=0A=20=20(ii)=20A=20= transaction=20with=20at=20least=20one=20data=20change=20is=20present=20= in=20the=0A=20=20=20=20=20=20=20reorder=20buffer.=20=20Read-only=20= sessions=20do=20not=20appear=20in=20the=20reorder=0A=20=20=20=20=20=20=20= buffer=20and=20are=20not=20victims=20themselves;=20they=20only=20enable=20= (i).=0A=0AThe=20root=20cause=20is=20that=20on=20every=20= catalog-modifying=20commit,=20the=20snapshot=0Abuilder=20calls=20= SnapBuildDistributeSnapshotAndInval()=20which=20appends=20a=20new=0A= catalog=20snapshot=20to=20every=20in-progress=20transaction's=20change=20= list.=20=20Since=0Axmin=20cannot=20advance,=20SnapBuildPurgeOlderTxn()=20= cannot=20purge=20entries=20from=0Abuilder->committed.xip,=20and=20each=20= successive=20snapshot=20is=20larger=20than=20the=0Alast.=20=20Total=20= disk=20usage=20attributed=20to=20snapshot=20distribution=20is=20O(N^2)=0A= where=20N=20is=20the=20number=20of=20catalog-modifying=20commits.=0A=0A= Beyond=20disk=20usage,=20these=20accumulated=20snapshots=20also=20= degrade=20decoding=0Aperformance:=20memory=20pressure=20forces=20earlier=20= spill,=20every=20INTERNAL_SNAPSHOT=0Achange=20incurs=20= serialize/deserialize=20I/O,=20ReorderBufferProcessTXN()=0Aperforms=20an=20= extra=20binary-heap=20pop=20and=20snapshot_now=20switch=20for=20each=20= one,=0Aand=20replication=20lag=20grows=20accordingly.=0A=0ADefer=20= snapshot=20distribution=20from=20commit=20time=20to=20data-change=20= time.=20=20A=0Ageneration=20counter=20(SnapBuild.snapshot_generation)=20= is=20incremented=20each=0Atime=20a=20new=20catalog=20snapshot=20is=20= built.=20=20Each=20ReorderBufferTXN=20tracks=20the=0Alast=20generation=20= it=20received.=20=20When=20SnapBuildProcessChange()=20is=20called=20for=0A= a=20data=20change,=20it=20distributes=20the=20current=20snapshot=20only=20= if=20the=20txn's=0Ageneration=20is=20behind=20the=20builder's.=0A=0A= Invalidation=20messages=20are=20still=20distributed=20eagerly=20because=20= each=20one=0Acarries=20unique=20information=20that=20cannot=20be=20= deduplicated;=20missing=20an=0Ainvalidation=20can=20cause=20pgoutput=20= to=20silently=20drop=20changes=20for=20newly=0Apublished=20tables.=20=20= Snapshots,=20by=20contrast,=20represent=20absolute=20state,=20so=0Athe=20= latest=20one=20supersedes=20all=20earlier=20ones=20and=20intermediate=20= ones=20can=20be=0Askipped.=0A=0AA=20long-running=20write=20transaction=20= with=20K=20data=20changes=20now=20accumulates=20at=0Amost=20min(K,=20G)=20= snapshots,=20where=20G=20is=20the=20number=20of=20catalog-modifying=0A= commits=20during=20its=20lifetime;=20the=20generation=20counter=20= deduplicates=20further=0Aso=20consecutive=20data=20changes=20without=20= intervening=20DDLs=20share=20one=20snapshot.=0ADisk=20usage=20drops=20= from=20O(N^2)=20to=20O(min(K,=20G)=20*=20N).=0A=0AThe=20hot=20path=20= (SnapBuildProcessChange=20for=20each=20data=20change)=20now=20performs=20= a=0Asingle=20ReorderBufferTXNByXid()=20lookup=20that=20is=20reused=20for=20= both=20the=20base=0Asnapshot=20check=20and=20lazy=20distribution,=20= eliminating=20redundant=20hash=20lookups=0Athat=20existed=20in=20the=20= original=20code.=0A=0AThe=20lazy=20snapshot=20is=20distributed=20to=20= the=20toplevel=20transaction=20(using=0Atxn->xid),=20consistent=20with=20= the=20original=20eager=20distribution.=20=20When=20a=20data=0Achange=20= belongs=20to=20a=20subtransaction,=20eager=20invalidation=20distribution=20= still=0Aplaces=20an=20invalidation=20change=20entry=20in=20the=20= toplevel's=20list=20at=20the=20DDL=0Acommit=20LSN=20(strictly=20less=20= than=20the=20data=20change=20LSN),=20which=20ensures=20the=0Atoplevel=20= is=20at=20the=20binary=20heap=20root=20when=20equal-LSN=20comparisons=20= occur,=20so=0Athe=20snapshot=20is=20processed=20before=20the=20data=20= change.=0A=0ASNAPBUILD_VERSION=20is=20bumped=20from=206=20to=207=20= because=20snapshot_generation=20is=0Aadded=20to=20the=20serialized=20= SnapBuild=20struct.=0A=0ATests:=0A=20=20-=20= contrib/test_decoding/specs/lazy_snapshot_distribution.spec:=20three=0A=20= =20=20=20isolation=20test=20scenarios=20covering=20long-tx=20+=20DDL=20+=20= INSERT,=0A=20=20=20=20long-tx=20+=20many=20catalog=20changes=20+=20= INSERT,=20and=20subtransaction=20+=20DDL=20+=0A=20=20=20=20INSERT.=0A=20=20= -=20contrib/test_decoding/t/002_lazy_snapshot_spill.pl:=20verifies=20= that=20a=0A=20=20=20=20long=20transaction=20with=20200=20= catalog-modifying=20DDLs=20between=20its=20two=0A=20=20=20=20INSERTs=20= results=20in=20spill_bytes=20=3D=200.=0A=20=20-=20All=20existing=20= test_decoding,=20subscription,=20recovery,=20and=0A=20=20=20=20= pg_logicalinspect=20tests=20pass=20without=20modification.=0A---=0A=20= contrib/test_decoding/Makefile=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20|=20=20=203=20+-=0A=20.../expected/lazy_snapshot_distribution.out=20=20= =20|=20=2070=20++++++++++=0A=20contrib/test_decoding/meson.build=20=20=20= =20=20=20=20=20=20=20=20=20=20|=20=20=202=20+=0A=20= .../specs/lazy_snapshot_distribution.spec=20=20=20=20=20|=20=2081=20= +++++++++++=0A=20.../t/002_lazy_snapshot_spill.pl=20=20=20=20=20=20=20=20= =20=20=20=20=20=20|=20120=20++++++++++++++++=0A=20= .../replication/logical/reorderbuffer.c=20=20=20=20=20=20=20|=20=20=204=20= +-=0A=20src/backend/replication/logical/snapbuild.c=20=20=20|=20130=20= ++++++++++--------=0A=20src/include/replication/reorderbuffer.h=20=20=20=20= =20=20=20|=20=2012=20++=0A=20= src/include/replication/snapbuild_internal.h=20=20|=20=2011=20++=0A=209=20= files=20changed,=20372=20insertions(+),=2061=20deletions(-)=0A=20create=20= mode=20100644=20= contrib/test_decoding/expected/lazy_snapshot_distribution.out=0A=20= create=20mode=20100644=20= contrib/test_decoding/specs/lazy_snapshot_distribution.spec=0A=20create=20= mode=20100644=20contrib/test_decoding/t/002_lazy_snapshot_spill.pl=0A=0A= diff=20--git=20a/contrib/test_decoding/Makefile=20= b/contrib/test_decoding/Makefile=0Aindex=200111124399..17e342cba4=20= 100644=0A---=20a/contrib/test_decoding/Makefile=0A+++=20= b/contrib/test_decoding/Makefile=0A@@=20-9,7=20+9,8=20@@=20REGRESS=20=3D=20= ddl=20xact=20rewrite=20toast=20permissions=20decoding_in_xact=20\=0A=20= ISOLATION=20=3D=20mxact=20delayed_startup=20ondisk_startup=20= concurrent_ddl_dml=20\=0A=20=09oldest_xmin=20snapshot_transfer=20= subxact_without_top=20concurrent_stream=20\=0A=20=09twophase_snapshot=20= slot_creation_error=20catalog_change_snapshot=20\=0A-=09= skip_snapshot_restore=20invalidation_distribution=20= parallel_session_origin=0A+=09skip_snapshot_restore=20= invalidation_distribution=20parallel_session_origin=20\=0A+=09= lazy_snapshot_distribution=0A=20=0A=20REGRESS_OPTS=20=3D=20--temp-config=20= $(top_srcdir)/contrib/test_decoding/logical.conf=0A=20ISOLATION_OPTS=20=3D= =20--temp-config=20$(top_srcdir)/contrib/test_decoding/logical.conf=0A= diff=20--git=20= a/contrib/test_decoding/expected/lazy_snapshot_distribution.out=20= b/contrib/test_decoding/expected/lazy_snapshot_distribution.out=0Anew=20= file=20mode=20100644=0Aindex=200000000000..49b85bfa3a=0A---=20/dev/null=0A= +++=20b/contrib/test_decoding/expected/lazy_snapshot_distribution.out=0A= @@=20-0,0=20+1,70=20@@=0A+Parsed=20test=20spec=20with=203=20sessions=0A+=0A= +starting=20permutation:=20s1_begin=20s1_insert_tbl1=20s2_add_col_c1=20= s2_add_col_c2=20s1_insert_tbl2_3col=20s1_commit=20s3_get_changes=0A+step=20= s1_begin:=20BEGIN;=0A+step=20s1_insert_tbl1:=20INSERT=20INTO=20tbl1=20= VALUES=20(1);=0A+step=20s2_add_col_c1:=20ALTER=20TABLE=20tbl2=20ADD=20= COLUMN=20c1=20integer;=0A+step=20s2_add_col_c2:=20ALTER=20TABLE=20tbl2=20= ADD=20COLUMN=20c2=20integer;=0A+step=20s1_insert_tbl2_3col:=20INSERT=20= INTO=20tbl2=20VALUES=20(1,=2010,=20100);=0A+step=20s1_commit:=20COMMIT;=0A= +step=20s3_get_changes:=20SELECT=20data=20FROM=20= pg_logical_slot_get_changes('isolation_slot',=20NULL,=20NULL,=20= 'include-xids',=20'0',=20'skip-empty-xacts',=20'1');=0A+data=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=0A= +-------------------------------------------------------------------------= =0A+BEGIN=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=0A+table=20= public.tbl1:=20INSERT:=20val1[integer]:1=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=0A+table=20= public.tbl2:=20INSERT:=20val1[integer]:1=20c1[integer]:10=20= c2[integer]:100=0A+COMMIT=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=0A+(4=20rows)=0A+=0A+?column?=0A+--------=0A+stop=20=20=20=20=0A+(1=20= row)=0A+=0A+=0A+starting=20permutation:=20s1_begin=20s1_insert_tbl1=20= s2_create_dummy1=20s2_create_dummy2=20s2_create_dummy3=20s1_insert_tbl1=20= s1_commit=20s3_get_changes=0A+step=20s1_begin:=20BEGIN;=0A+step=20= s1_insert_tbl1:=20INSERT=20INTO=20tbl1=20VALUES=20(1);=0A+step=20= s2_create_dummy1:=20CREATE=20TABLE=20dummy1=20(id=20int);=0A+step=20= s2_create_dummy2:=20CREATE=20TABLE=20dummy2=20(id=20int);=0A+step=20= s2_create_dummy3:=20CREATE=20TABLE=20dummy3=20(id=20int);=0A+step=20= s1_insert_tbl1:=20INSERT=20INTO=20tbl1=20VALUES=20(1);=0A+step=20= s1_commit:=20COMMIT;=0A+step=20s3_get_changes:=20SELECT=20data=20FROM=20= pg_logical_slot_get_changes('isolation_slot',=20NULL,=20NULL,=20= 'include-xids',=20'0',=20'skip-empty-xacts',=20'1');=0A+data=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=0A+------------------------------------------=0A= +BEGIN=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=0A+table=20public.tbl1:=20= INSERT:=20val1[integer]:1=0A+table=20public.tbl1:=20INSERT:=20= val1[integer]:1=0A+COMMIT=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=0A+(4=20= rows)=0A+=0A+?column?=0A+--------=0A+stop=20=20=20=20=0A+(1=20row)=0A+=0A= +=0A+starting=20permutation:=20s1_begin=20s1_savepoint=20s1_insert_tbl1=20= s2_add_col_c1=20s2_add_col_c2=20s1_insert_tbl2_3col=20s1_release=20= s1_commit=20s3_get_changes=0A+step=20s1_begin:=20BEGIN;=0A+step=20= s1_savepoint:=20SAVEPOINT=20sp1;=0A+step=20s1_insert_tbl1:=20INSERT=20= INTO=20tbl1=20VALUES=20(1);=0A+step=20s2_add_col_c1:=20ALTER=20TABLE=20= tbl2=20ADD=20COLUMN=20c1=20integer;=0A+step=20s2_add_col_c2:=20ALTER=20= TABLE=20tbl2=20ADD=20COLUMN=20c2=20integer;=0A+step=20= s1_insert_tbl2_3col:=20INSERT=20INTO=20tbl2=20VALUES=20(1,=2010,=20100);=0A= +step=20s1_release:=20RELEASE=20SAVEPOINT=20sp1;=0A+step=20s1_commit:=20= COMMIT;=0A+step=20s3_get_changes:=20SELECT=20data=20FROM=20= pg_logical_slot_get_changes('isolation_slot',=20NULL,=20NULL,=20= 'include-xids',=20'0',=20'skip-empty-xacts',=20'1');=0A+data=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=0A= +-------------------------------------------------------------------------= =0A+BEGIN=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=0A+table=20= public.tbl1:=20INSERT:=20val1[integer]:1=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=0A+table=20= public.tbl2:=20INSERT:=20val1[integer]:1=20c1[integer]:10=20= c2[integer]:100=0A+COMMIT=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20=20= =20=0A+(4=20rows)=0A+=0A+?column?=0A+--------=0A+stop=20=20=20=20=0A+(1=20= row)=0A+=0Adiff=20--git=20a/contrib/test_decoding/meson.build=20= b/contrib/test_decoding/meson.build=0Aindex=20ac655853d2..564d82bd4e=20= 100644=0A---=20a/contrib/test_decoding/meson.build=0A+++=20= b/contrib/test_decoding/meson.build=0A@@=20-66,6=20+66,7=20@@=20tests=20= +=3D=20{=0A=20=20=20=20=20=20=20'skip_snapshot_restore',=0A=20=20=20=20=20= =20=20'invalidation_distribution',=0A=20=20=20=20=20=20=20= 'parallel_session_origin',=0A+=20=20=20=20=20=20= 'lazy_snapshot_distribution',=0A=20=20=20=20=20],=0A=20=20=20=20=20= 'regress_args':=20[=0A=20=20=20=20=20=20=20'--temp-config',=20= files('logical.conf'),=0A@@=20-76,6=20+77,7=20@@=20tests=20+=3D=20{=0A=20= =20=20'tap':=20{=0A=20=20=20=20=20'tests':=20[=0A=20=20=20=20=20=20=20= 't/001_repl_stats.pl',=0A+=20=20=20=20=20=20= 't/002_lazy_snapshot_spill.pl',=0A=20=20=20=20=20],=0A=20=20=20},=0A=20}=0A= diff=20--git=20= a/contrib/test_decoding/specs/lazy_snapshot_distribution.spec=20= b/contrib/test_decoding/specs/lazy_snapshot_distribution.spec=0Anew=20= file=20mode=20100644=0Aindex=200000000000..217836a019=0A---=20/dev/null=0A= +++=20b/contrib/test_decoding/specs/lazy_snapshot_distribution.spec=0A@@=20= -0,0=20+1,81=20@@=0A+#=20Test=20lazy=20snapshot=20distribution:=20= snapshots=20are=20no=20longer=20eagerly=20distributed=0A+#=20to=20all=20= in-progress=20transactions=20on=20each=20catalog-modifying=20commit.=20= Instead,=0A+#=20they=20are=20lazily=20distributed=20when=20a=20= transaction=20actually=20decodes=20a=20data=20change.=0A+#=20This=20= avoids=20O(N^2)=20snapshot=20disk=20usage=20when=20a=20long-running=20= transaction=0A+#=20coexists=20with=20many=20catalog-modifying=20commits=20= (e.g.=20autovacuum).=0A+=0A+setup=0A+{=0A+=20=20=20=20SELECT=20'init'=20= FROM=20pg_create_logical_replication_slot('isolation_slot',=20= 'test_decoding');=0A+=20=20=20=20DROP=20TABLE=20IF=20EXISTS=20tbl1;=0A+=20= =20=20=20DROP=20TABLE=20IF=20EXISTS=20tbl2;=0A+=20=20=20=20DROP=20TABLE=20= IF=20EXISTS=20dummy1;=0A+=20=20=20=20DROP=20TABLE=20IF=20EXISTS=20= dummy2;=0A+=20=20=20=20DROP=20TABLE=20IF=20EXISTS=20dummy3;=0A+=20=20=20=20= CREATE=20TABLE=20tbl1=20(val1=20integer);=0A+=20=20=20=20CREATE=20TABLE=20= tbl2=20(val1=20integer);=0A+}=0A+=0A+teardown=0A+{=0A+=20=20=20=20DROP=20= TABLE=20IF=20EXISTS=20tbl1;=0A+=20=20=20=20DROP=20TABLE=20IF=20EXISTS=20= tbl2;=0A+=20=20=20=20DROP=20TABLE=20IF=20EXISTS=20dummy1;=0A+=20=20=20=20= DROP=20TABLE=20IF=20EXISTS=20dummy2;=0A+=20=20=20=20DROP=20TABLE=20IF=20= EXISTS=20dummy3;=0A+=20=20=20=20SELECT=20'stop'=20FROM=20= pg_drop_replication_slot('isolation_slot');=0A+}=0A+=0A+#=20Session=20= s1:=20long-running=20transaction=0A+session=20"s1"=0A+setup=20{=20SET=20= synchronous_commit=3Don;=20}=0A+step=20"s1_begin"=20{=20BEGIN;=20}=0A= +step=20"s1_insert_tbl1"=20{=20INSERT=20INTO=20tbl1=20VALUES=20(1);=20}=0A= +step=20"s1_insert_tbl2_3col"=20{=20INSERT=20INTO=20tbl2=20VALUES=20(1,=20= 10,=20100);=20}=0A+step=20"s1_savepoint"=20{=20SAVEPOINT=20sp1;=20}=0A= +step=20"s1_release"=20{=20RELEASE=20SAVEPOINT=20sp1;=20}=0A+step=20= "s1_commit"=20{=20COMMIT;=20}=0A+=0A+#=20Session=20s2:=20performs=20= catalog-modifying=20operations=0A+session=20"s2"=0A+setup=20{=20SET=20= synchronous_commit=3Don;=20}=0A+step=20"s2_add_col_c1"=20{=20ALTER=20= TABLE=20tbl2=20ADD=20COLUMN=20c1=20integer;=20}=0A+step=20= "s2_add_col_c2"=20{=20ALTER=20TABLE=20tbl2=20ADD=20COLUMN=20c2=20= integer;=20}=0A+step=20"s2_create_dummy1"=20{=20CREATE=20TABLE=20dummy1=20= (id=20int);=20}=0A+step=20"s2_create_dummy2"=20{=20CREATE=20TABLE=20= dummy2=20(id=20int);=20}=0A+step=20"s2_create_dummy3"=20{=20CREATE=20= TABLE=20dummy3=20(id=20int);=20}=0A+=0A+#=20Session=20s3:=20consumes=20= changes=0A+session=20"s3"=0A+setup=20{=20SET=20synchronous_commit=3Don;=20= }=0A+step=20"s3_get_changes"=20{=20SELECT=20data=20FROM=20= pg_logical_slot_get_changes('isolation_slot',=20NULL,=20NULL,=20= 'include-xids',=20'0',=20'skip-empty-xacts',=20'1');=20}=0A+=0A+#=20= Scenario=201:=20Long=20transaction=20+=20multiple=20DDLs=20+=20correct=20= decoding=0A+#=0A+#=20s1=20starts=20a=20long=20transaction=20and=20= inserts=20into=20tbl1=20(gets=20base=20snapshot).=0A+#=20s2=20performs=20= two=20ALTER=20TABLE=20ADD=20COLUMN=20on=20tbl2.=0A+#=20s1=20then=20= inserts=20into=20tbl2=20using=20the=20new=20schema=20(3=20columns).=0A+#=20= The=20lazily=20distributed=20snapshot=20at=20s1's=20second=20insert=20= must=20reflect=20both=0A+#=20ALTER=20TABLEs=20so=20the=20new=20columns=20= are=20visible=20during=20decoding.=0A+permutation=20"s1_begin"=20= "s1_insert_tbl1"=20"s2_add_col_c1"=20"s2_add_col_c2"=20= "s1_insert_tbl2_3col"=20"s1_commit"=20"s3_get_changes"=0A+=0A+#=20= Scenario=202:=20Long=20transaction=20+=20many=20catalog=20changes=20+=20= correct=20decoding=0A+#=0A+#=20s1=20starts=20a=20long=20transaction=20= and=20inserts=20into=20tbl1.=0A+#=20s2=20performs=20multiple=20= catalog-modifying=20DDLs=20(CREATE=20TABLE),=20simulating=0A+#=20the=20= pattern=20of=20autovacuum=20generating=20many=20catalog=20change=20= commits.=0A+#=20s1=20inserts=20again=20into=20tbl1=20after=20all=20the=20= DDLs.=0A+#=20Both=20inserts=20must=20be=20correctly=20decoded=20despite=20= many=20catalog-modifying=0A+#=20commits=20in=20between.=20=20With=20= eager=20distribution,=20each=20CREATE=20TABLE=20would=0A+#=20have=20= distributed=20a=20snapshot=20to=20s1;=20with=20lazy=20distribution,=20= only=20the=0A+#=20last=20snapshot=20is=20distributed=20when=20s1=20does=20= its=20second=20insert.=0A+permutation=20"s1_begin"=20"s1_insert_tbl1"=20= "s2_create_dummy1"=20"s2_create_dummy2"=20"s2_create_dummy3"=20= "s1_insert_tbl1"=20"s1_commit"=20"s3_get_changes"=0A+=0A+#=20Scenario=20= 3:=20Subtransaction=20+=20catalog=20change=20+=20correct=20decoding=0A+#=0A= +#=20s1=20starts=20a=20transaction=20with=20a=20savepoint,=20inserts=20= into=20tbl1=20inside=20the=0A+#=20subtransaction.=20=20s2=20performs=20= ALTER=20TABLE=20ADD=20COLUMN=20on=20tbl2.=20=20s1=20then=0A+#=20inserts=20= into=20tbl2=20with=20the=20new=20column,=20still=20inside=20the=20= subtransaction.=0A+#=20Both=20inserts=20must=20decode=20correctly,=20= verifying=20lazy=20distribution=20works=0A+#=20with=20subtransactions.=0A= +permutation=20"s1_begin"=20"s1_savepoint"=20"s1_insert_tbl1"=20= "s2_add_col_c1"=20"s2_add_col_c2"=20"s1_insert_tbl2_3col"=20"s1_release"=20= "s1_commit"=20"s3_get_changes"=0Adiff=20--git=20= a/contrib/test_decoding/t/002_lazy_snapshot_spill.pl=20= b/contrib/test_decoding/t/002_lazy_snapshot_spill.pl=0Anew=20file=20mode=20= 100644=0Aindex=200000000000..1f4551b0a6=0A---=20/dev/null=0A+++=20= b/contrib/test_decoding/t/002_lazy_snapshot_spill.pl=0A@@=20-0,0=20= +1,120=20@@=0A+=0A+#=20Copyright=20(c)=202024-2026,=20PostgreSQL=20= Global=20Development=20Group=0A+=0A+#=20Test=20that=20lazy=20snapshot=20= distribution=20reduces=20spill=20file=20usage.=0A+#=0A+#=20With=20the=20= old=20eager=20distribution,=20each=20catalog-modifying=20commit=20(e.g.=20= vacuum,=0A+#=20DDL)=20would=20distribute=20a=20snapshot=20to=20every=20= in-progress=20transaction=20in=20the=0A+#=20reorder=20buffer.=20=20When=20= a=20long-running=20transaction=20coexists=20with=20many=20such=0A+#=20= commits,=20the=20snapshots=20accumulate=20with=20O(N^2)=20total=20size=20= and=20spill=20to=20disk.=0A+#=0A+#=20With=20lazy=20distribution,=20= snapshots=20are=20only=20distributed=20when=20a=20transaction=0A+#=20= actually=20decodes=20a=20data=20change,=20so=20a=20long-running=20= transaction=20with=20few=20or=0A+#=20no=20data=20changes=20receives=20at=20= most=20one=20snapshot=20regardless=20of=20how=20many=0A+#=20= catalog-modifying=20commits=20happen.=0A+#=0A+#=20Note:=20invalidation=20= messages=20are=20still=20eagerly=20distributed=20and=20they=20produce=0A= +#=20REORDER_BUFFER_CHANGE_INVALIDATION=20entries=20that=20count=20= toward=20memory=20usage.=0A+#=20We=20set=20logical_decoding_work_mem=20= high=20enough=20to=20accommodate=20invalidation=0A+#=20messages=20while=20= verifying=20that=20snapshot=20distribution=20does=20not=20cause=20= spilling.=0A+=0A+use=20strict;=0A+use=20warnings=20FATAL=20=3D>=20'all';=0A= +use=20PostgreSQL::Test::Cluster;=0A+use=20PostgreSQL::Test::Utils;=0A= +use=20Test::More;=0A+=0A+my=20$node=20=3D=20= PostgreSQL::Test::Cluster->new('test');=0A+$node->init(allows_streaming=20= =3D>=20'logical');=0A+$node->append_conf('postgresql.conf',=20qq(=0A= +synchronous_commit=20=3D=20on=0A+logical_decoding_work_mem=20=3D=201MB=0A= +autovacuum=20=3D=20off=0A+));=0A+$node->start;=0A+=0A+#=20Setup=0A= +$node->safe_psql('postgres',=20qq[=0A+=20=20=20=20CREATE=20TABLE=20= test_data=20(id=20int);=0A+=20=20=20=20SELECT=20= pg_create_logical_replication_slot('test_slot',=20'test_decoding');=0A= +]);=0A+=0A+#=20Consume=20any=20setup-related=20changes=0A= +$node->safe_psql('postgres',=0A+=20=20=20=20"SELECT=20count(*)=20FROM=20= pg_logical_slot_get_changes('test_slot',=20NULL,=20NULL)");=0A+=0A+#=20= Reset=20stats=0A+$node->safe_psql('postgres',=0A+=20=20=20=20"SELECT=20= pg_stat_reset_replication_slot('test_slot')");=0A= +$node->safe_psql('postgres',=20"SELECT=20pg_stat_force_next_flush()");=0A= +=0A+#=20Start=20a=20long-running=20transaction=20in=20a=20background=20= session.=0A+#=20This=20transaction=20will=20be=20in-progress=20while=20= many=20catalog=20changes=20happen.=0A+my=20$long_txn=20=3D=20= $node->background_psql('postgres',=20on_error_stop=20=3D>=201);=0A= +$long_txn->query_safe("BEGIN");=0A+$long_txn->query_safe("INSERT=20INTO=20= test_data=20VALUES=20(1)");=0A+=0A+#=20In=20the=20main=20session,=20= perform=20many=20catalog-modifying=20DDLs.=0A+#=20Each=20DDL=20commit=20= increments=20snapshot_generation=20and=20would=20have=20eagerly=0A+#=20= distributed=20a=20growing=20snapshot=20under=20the=20old=20code.=0A+#=0A= +#=20With=20200=20DDLs=20(each=20CREATE=20TABLE=20is=20a=20= catalog-modifying=20commit),=20the=20old=0A+#=20eager=20approach=20would=20= accumulate=20~200=20snapshots=20with=20O(N^2)=20total=20size=20in=0A+#=20= the=20long=20transaction's=20reorder=20buffer=20changes.=0A+#=20With=20= lazy=20distribution,=20only=201=20snapshot=20is=20distributed=20when=20= the=20long=0A+#=20transaction=20does=20its=20next=20data=20change.=0A+my=20= $num_ddls=20=3D=20200;=0A+for=20my=20$i=20(1=20..=20$num_ddls)=20{=0A+=20= =20=20=20$node->safe_psql('postgres',=20"CREATE=20TABLE=20dummy_$i=20(id=20= int)");=0A+}=0A+=0A+#=20Insert=20one=20more=20row=20in=20the=20long=20= transaction=20and=20commit.=0A+$long_txn->query_safe("INSERT=20INTO=20= test_data=20VALUES=20(2)");=0A+$long_txn->query_safe("COMMIT");=0A= +$long_txn->quit;=0A+=0A+#=20Consume=20the=20changes=20to=20trigger=20= decoding=0A+my=20$result=20=3D=20$node->safe_psql('postgres',=0A+=20=20=20= =20"SELECT=20data=20FROM=20pg_logical_slot_get_changes('test_slot',=20= NULL,=20NULL,=20"=20.=0A+=20=20=20=20"'include-xids',=20'0',=20= 'skip-empty-xacts',=20'1')");=0A+=0A+#=20Verify=20the=20decoded=20data=20= is=20correct=0A+like($result,=20qr/INSERT:=20id\[integer\]:1/,=20'first=20= INSERT=20decoded=20correctly');=0A+like($result,=20qr/INSERT:=20= id\[integer\]:2/,=20'second=20INSERT=20decoded=20correctly');=0A+=0A+#=20= Check=20spill=20statistics.=0A+#=20With=20lazy=20snapshot=20distribution=20= and=20logical_decoding_work_mem=3D1MB,=20the=20long=0A+#=20transaction=20= should=20NOT=20spill.=20=20It=20only=20contains=202=20small=20INSERTs=20= plus=201=0A+#=20lazily-distributed=20snapshot,=20well=20under=201MB.=0A= +#=0A+#=20Without=20the=20optimization,=20the=20long=20transaction=20= would=20have=20accumulated=20~200=0A+#=20internal=20snapshots=20with=20a=20= total=20size=20of=20~O(N^2)=20bytes=20(~80KB=20for=20N=3D200),=0A+#=20= plus=20~200=20invalidation=20change=20entries.=20=20The=20combined=20= size=20could=20push=20the=0A+#=20transaction=20over=201MB=20with=20= larger=20N.=0A+$node->safe_psql('postgres',=20"SELECT=20= pg_stat_force_next_flush()");=0A+$node->poll_query_until('postgres',=20= qq[=0A+=20=20=20=20SELECT=20spill_bytes=20IS=20NOT=20NULL=0A+=20=20=20=20= FROM=20pg_stat_replication_slots=0A+=20=20=20=20WHERE=20slot_name=20=3D=20= 'test_slot'=0A+])=20or=20die=20"Timed=20out=20waiting=20for=20stats";=0A= +=0A+my=20$spill_bytes=20=3D=20$node->safe_psql('postgres',=20qq[=0A+=20=20= =20=20SELECT=20spill_bytes=0A+=20=20=20=20FROM=20= pg_stat_replication_slots=0A+=20=20=20=20WHERE=20slot_name=20=3D=20= 'test_slot'=0A+]);=0A+=0A+is($spill_bytes,=20'0',=0A+=20=20=20=20"lazy=20= snapshot=20distribution=20prevents=20spilling=20= (spill_bytes=3D$spill_bytes)");=0A+=0A+#=20Cleanup=0A+for=20my=20$i=20(1=20= ..=20$num_ddls)=20{=0A+=20=20=20=20$node->safe_psql('postgres',=20"DROP=20= TABLE=20IF=20EXISTS=20dummy_$i");=0A+}=0A+$node->safe_psql('postgres',=20= qq[=0A+=20=20=20=20SELECT=20pg_drop_replication_slot('test_slot');=0A+=20= =20=20=20DROP=20TABLE=20test_data;=0A+]);=0A+=0A+$node->stop;=0A= +done_testing();=0Adiff=20--git=20= a/src/backend/replication/logical/reorderbuffer.c=20= b/src/backend/replication/logical/reorderbuffer.c=0Aindex=20= 682d13c9f2..c63d4e17d3=20100644=0A---=20= a/src/backend/replication/logical/reorderbuffer.c=0A+++=20= b/src/backend/replication/logical/reorderbuffer.c=0A@@=20-235,7=20+235,7=20= @@=20int=09=09=09debug_logical_replication_streaming=20=3D=20= DEBUG_LOGICAL_REP_STREAMING_BUFFERED=0A=20=20*/=0A=20static=20= ReorderBufferTXN=20*ReorderBufferAllocTXN(ReorderBuffer=20*rb);=0A=20= static=20void=20ReorderBufferFreeTXN(ReorderBuffer=20*rb,=20= ReorderBufferTXN=20*txn);=0A-static=20ReorderBufferTXN=20= *ReorderBufferTXNByXid(ReorderBuffer=20*rb,=0A+ReorderBufferTXN=20= *ReorderBufferTXNByXid(ReorderBuffer=20*rb,=0A=20=09=09=09=09=09=09=09=09= =09=09=09=20=20=20TransactionId=20xid,=20bool=20create,=20bool=20= *is_new,=0A=20=09=09=09=09=09=09=09=09=09=09=09=20=20=20XLogRecPtr=20= lsn,=20bool=20create_as_top);=0A=20static=20void=20= ReorderBufferTransferSnapToParent(ReorderBufferTXN=20*txn,=0A@@=20-650,7=20= +650,7=20@@=20ReorderBufferFreeRelids(ReorderBuffer=20*rb,=20Oid=20= *relids)=0A=20=20*=20(with=20the=20given=20LSN,=20and=20as=20top=20= transaction=20if=20that's=20specified);=0A=20=20*=20when=20this=20= happens,=20is_new=20is=20set=20to=20true.=0A=20=20*/=0A-static=20= ReorderBufferTXN=20*=0A+ReorderBufferTXN=20*=0A=20= ReorderBufferTXNByXid(ReorderBuffer=20*rb,=20TransactionId=20xid,=20bool=20= create,=0A=20=09=09=09=09=09=20=20bool=20*is_new,=20XLogRecPtr=20lsn,=20= bool=20create_as_top)=0A=20{=0Adiff=20--git=20= a/src/backend/replication/logical/snapbuild.c=20= b/src/backend/replication/logical/snapbuild.c=0Aindex=20= c8309b96ed..1c6a94917a=20100644=0A---=20= a/src/backend/replication/logical/snapbuild.c=0A+++=20= b/src/backend/replication/logical/snapbuild.c=0A@@=20-172,7=20+172,7=20= @@=20static=20void=20SnapBuildFreeSnapshot(Snapshot=20snap);=0A=20=0A=20= static=20void=20SnapBuildSnapIncRefcount(Snapshot=20snap);=0A=20=0A= -static=20void=20SnapBuildDistributeSnapshotAndInval(SnapBuild=20= *builder,=20XLogRecPtr=20lsn,=20TransactionId=20xid);=0A+static=20void=20= SnapBuildDistributeInval(SnapBuild=20*builder,=20XLogRecPtr=20lsn,=20= TransactionId=20xid);=0A=20=0A=20static=20inline=20bool=20= SnapBuildXidHasCatalogChanges(SnapBuild=20*builder,=20TransactionId=20= xid,=0A=20=09=09=09=09=09=09=09=09=09=09=09=09=20uint32=20xinfo);=0A@@=20= -655,6=20+655,8=20@@=20SnapBuildResetExportedSnapshotState(void)=0A=20= bool=0A=20SnapBuildProcessChange(SnapBuild=20*builder,=20TransactionId=20= xid,=20XLogRecPtr=20lsn)=0A=20{=0A+=09ReorderBufferTXN=20*txn;=0A+=0A=20=09= /*=0A=20=09=20*=20We=20can't=20handle=20data=20in=20transactions=20if=20= we=20haven't=20built=20a=20snapshot=0A=20=09=20*=20yet,=20so=20don't=20= store=20them.=0A@@=20-672,10=20+674,23=20@@=20= SnapBuildProcessChange(SnapBuild=20*builder,=20TransactionId=20xid,=20= XLogRecPtr=20lsn)=0A=20=09=09return=20false;=0A=20=0A=20=09/*=0A-=09=20*=20= If=20the=20reorderbuffer=20doesn't=20yet=20have=20a=20snapshot,=20add=20= one=20now,=20it=20will=0A-=09=20*=20be=20needed=20to=20decode=20the=20= change=20we're=20currently=20processing.=0A+=09=20*=20Look=20up=20the=20= transaction=20once=20and=20reuse=20it=20for=20both=20the=20base=20= snapshot=0A+=09=20*=20check=20and=20the=20lazy=20catalog=20snapshot=20= distribution=20below.=20=20This=20avoids=0A+=09=20*=20repeated=20hash=20= lookups=20that=20were=20previously=20done=20separately=20by=0A+=09=20*=20= ReorderBufferXidHasBaseSnapshot(),=20ReorderBufferSetBaseSnapshot(),=0A+=09= =20*=20and=20the=20lazy=20distribution=20logic.=0A=20=09=20*/=0A-=09if=20= (!ReorderBufferXidHasBaseSnapshot(builder->reorder,=20xid))=0A+=09txn=20= =3D=20ReorderBufferTXNByXid(builder->reorder,=20xid,=20true,=0A+=09=09=09= =09=09=09=09=09NULL,=20lsn,=20true);=0A+=09if=20= (rbtxn_is_known_subxact(txn))=0A+=09=09txn=20=3D=20= ReorderBufferTXNByXid(builder->reorder,=20txn->toplevel_xid,=0A+=09=09=09= =09=09=09=09=09=09false,=20NULL,=20InvalidXLogRecPtr,=20false);=0A+=0A+=09= /*=0A+=09=20*=20If=20the=20reorderbuffer=20doesn't=20yet=20have=20a=20= base=20snapshot,=20add=20one=20now,=0A+=09=20*=20it=20will=20be=20needed=20= to=20decode=20the=20change=20we're=20currently=20processing.=0A+=09=20*/=0A= +=09if=20(txn->base_snapshot=20=3D=3D=20NULL)=0A=20=09{=0A=20=09=09/*=20= only=20build=20a=20new=20snapshot=20if=20we=20don't=20have=20a=20= prebuilt=20one=20*/=0A=20=09=09if=20(builder->snapshot=20=3D=3D=20NULL)=0A= @@=20-694,6=20+709,28=20@@=20SnapBuildProcessChange(SnapBuild=20= *builder,=20TransactionId=20xid,=20XLogRecPtr=20lsn)=0A=20=09=09=09=09=09= =09=09=09=09=20builder->snapshot);=0A=20=09}=0A=20=0A+=09/*=0A+=09=20*=20= Lazily=20distribute=20the=20catalog=20snapshot=20to=20this=20transaction=20= if=20it=20hasn't=0A+=09=20*=20received=20the=20latest=20one=20yet.=20=20= This=20replaces=20the=20previous=20eager=0A+=09=20*=20distribution=20in=20= SnapBuildDistributeSnapshotAndInval(),=20avoiding=20O(N^2)=0A+=09=20*=20= disk=20usage=20when=20many=20catalog-modifying=20transactions=20(e.g.=20= autovacuum)=0A+=09=20*=20commit=20while=20a=20long-running=20transaction=20= is=20in=20progress.=0A+=09=20*=0A+=09=20*=20We=20only=20add=20a=20= snapshot=20when=20the=20transaction=20actually=20has=20a=20data=20change=0A= +=09=20*=20to=20decode,=20so=20idle=20long-running=20transactions=20= won't=20accumulate=20any=0A+=09=20*=20snapshots=20at=20all.=0A+=09=20*/=0A= +=09if=20(builder->snapshot_generation=20>=200=20&&=0A+=09=09= txn->last_snapshot_generation=20<=20builder->snapshot_generation)=0A+=09= {=0A+=09=09Assert(builder->snapshot=20!=3D=20NULL);=0A+=0A+=09=09= SnapBuildSnapIncRefcount(builder->snapshot);=0A+=09=09= ReorderBufferAddSnapshot(builder->reorder,=20txn->xid,=0A+=09=09=09=09=09= =09=09=09=20lsn,=20builder->snapshot);=0A+=09=09= txn->last_snapshot_generation=20=3D=20builder->snapshot_generation;=0A+=09= }=0A+=0A=20=09return=20true;=0A=20}=0A=20=0A@@=20-737,15=20+774,20=20@@=20= SnapBuildProcessNewCid(SnapBuild=20*builder,=20TransactionId=20xid,=0A=20= }=0A=20=0A=20/*=0A-=20*=20Add=20a=20new=20Snapshot=20and=20invalidation=20= messages=20to=20all=20transactions=20we're=0A-=20*=20decoding=20that=20= currently=20are=20in-progress=20so=20they=20can=20see=20new=20catalog=20= contents=0A-=20*=20made=20by=20the=20transaction=20that=20just=20= committed.=20This=20is=20necessary=20because=20those=0A-=20*=20= in-progress=20transactions=20will=20use=20the=20new=20catalog's=20= contents=20from=20here=20on=0A-=20*=20(at=20the=20very=20least=20= everything=20they=20do=20needs=20to=20be=20compatible=20with=20newer=0A-=20= *=20catalog=20contents).=0A+=20*=20Distribute=20invalidation=20messages=20= to=20all=20in-progress=20transactions=20so=20they=0A+=20*=20can=20see=20= new=20catalog=20contents=20made=20by=20the=20transaction=20that=20just=20= committed.=0A+=20*=0A+=20*=20Note:=20we=20no=20longer=20eagerly=20= distribute=20snapshots=20here.=20=20Instead,=20snapshots=0A+=20*=20are=20= lazily=20distributed=20in=20SnapBuildProcessChange()=20when=20a=20= transaction=0A+=20*=20actually=20needs=20to=20decode=20a=20data=20= change.=20=20This=20avoids=20O(N^2)=20snapshot=20disk=0A+=20*=20usage=20= when=20a=20long-running=20transaction=20coexists=20with=20many=20= catalog-modifying=0A+=20*=20commits=20(e.g.=20autovacuum=20of=20many=20= tables).=20=20The=20snapshot_generation=20counter=0A+=20*=20in=20= SnapBuild=20tracks=20when=20a=20new=20snapshot=20is=20available,=20and=20= each=20transaction's=0A+=20*=20last_snapshot_generation=20in=20= ReorderBufferTXN=20tracks=20whether=20it=20has=20received=0A+=20*=20the=20= latest=20snapshot.=0A=20=20*/=0A=20static=20void=0A= -SnapBuildDistributeSnapshotAndInval(SnapBuild=20*builder,=20XLogRecPtr=20= lsn,=20TransactionId=20xid)=0A+SnapBuildDistributeInval(SnapBuild=20= *builder,=20XLogRecPtr=20lsn,=20TransactionId=20xid)=0A=20{=0A=20=09= dlist_iter=09txn_i;=0A=20=09ReorderBufferTXN=20*txn;=0A@@=20-753,8=20= +795,7=20@@=20SnapBuildDistributeSnapshotAndInval(SnapBuild=20*builder,=20= XLogRecPtr=20lsn,=20Transact=0A=20=09/*=0A=20=09=20*=20Iterate=20through=20= all=20toplevel=20transactions.=20This=20can=20include=0A=20=09=20*=20= subtransactions=20which=20we=20just=20don't=20yet=20know=20to=20be=20= that,=20but=20that's=0A-=09=20*=20fine,=20they=20will=20just=20get=20an=20= unnecessary=20snapshot=20and=20invalidations=0A-=09=20*=20queued.=0A+=09=20= *=20fine,=20they=20will=20just=20get=20unnecessary=20invalidations=20= queued.=0A=20=09=20*/=0A=20=09dlist_foreach(txn_i,=20= &builder->reorder->toplevel_by_lsn)=0A=20=09{=0A@@=20-764,60=20+805,25=20= @@=20SnapBuildDistributeSnapshotAndInval(SnapBuild=20*builder,=20= XLogRecPtr=20lsn,=20Transact=0A=20=0A=20=09=09/*=0A=20=09=09=20*=20If=20= we=20don't=20have=20a=20base=20snapshot=20yet,=20there=20are=20no=20= changes=20in=20this=0A-=09=09=20*=20transaction=20which=20in=20turn=20= implies=20we=20don't=20yet=20need=20a=20snapshot=20at=0A-=09=09=20*=20= all.=20We'll=20add=20a=20snapshot=20when=20the=20first=20change=20gets=20= queued.=0A-=09=09=20*=0A-=09=09=20*=20Similarly,=20we=20don't=20need=20= to=20add=20invalidations=20to=20a=20transaction=0A-=09=09=20*=20whose=20= base=20snapshot=20is=20not=20yet=20set.=20Once=20a=20base=20snapshot=20= is=20built,=0A-=09=09=20*=20it=20will=20include=20the=20xids=20of=20= committed=20transactions=20that=20have=0A-=09=09=20*=20modified=20the=20= catalog,=20thus=20reflecting=20the=20new=20catalog=20contents.=20The=0A-=09= =09=20*=20existing=20catalog=20cache=20will=20have=20already=20been=20= invalidated=20after=0A-=09=09=20*=20processing=20the=20invalidations=20= in=20the=20transaction=20that=20modified=0A-=09=09=20*=20catalogs,=20= ensuring=20that=20a=20fresh=20cache=20is=20constructed=20during=0A-=09=09= =20*=20decoding.=0A-=09=09=20*=0A-=09=09=20*=20NB:=20This=20works=20= correctly=20even=20for=20subtransactions=20because=0A-=09=09=20*=20= ReorderBufferAssignChild()=20takes=20care=20to=20transfer=20the=20base=20= snapshot=0A-=09=09=20*=20to=20the=20top-level=20transaction,=20and=20= while=20iterating=20the=20changequeue=0A-=09=09=20*=20we'll=20get=20the=20= change=20from=20the=20subtxn.=0A+=09=09=20*=20transaction=20which=20in=20= turn=20implies=20we=20don't=20yet=20need=20invalidations.=0A+=09=09=20*=20= Once=20a=20base=20snapshot=20is=20built,=20it=20will=20include=20the=20= xids=20of=20committed=0A+=09=09=20*=20transactions=20that=20have=20= modified=20the=20catalog,=20thus=20reflecting=20the=20new=0A+=09=09=20*=20= catalog=20contents.=0A=20=09=09=20*/=0A=20=09=09if=20= (!ReorderBufferXidHasBaseSnapshot(builder->reorder,=20txn->xid))=0A=20=09= =09=09continue;=0A=20=0A=20=09=09/*=0A-=09=09=20*=20We=20don't=20need=20= to=20add=20snapshot=20or=20invalidations=20to=20prepared=0A-=09=09=20*=20= transactions=20as=20they=20should=20not=20see=20the=20new=20catalog=20= contents.=0A+=09=09=20*=20We=20don't=20need=20to=20add=20invalidations=20= to=20prepared=20transactions=20as=20they=0A+=09=09=20*=20should=20not=20= see=20the=20new=20catalog=20contents.=0A=20=09=09=20*/=0A=20=09=09if=20= (rbtxn_is_prepared(txn))=0A=20=09=09=09continue;=0A=20=0A-=09=09= elog(DEBUG2,=20"adding=20a=20new=20snapshot=20and=20invalidations=20to=20= %u=20at=20%X/%08X",=0A-=09=09=09=20txn->xid,=20LSN_FORMAT_ARGS(lsn));=0A= -=0A-=09=09/*=0A-=09=09=20*=20increase=20the=20snapshot's=20refcount=20= for=20the=20transaction=20we=20are=20handing=0A-=09=09=20*=20it=20out=20= to=0A-=09=09=20*/=0A-=09=09SnapBuildSnapIncRefcount(builder->snapshot);=0A= -=09=09ReorderBufferAddSnapshot(builder->reorder,=20txn->xid,=20lsn,=0A-=09= =09=09=09=09=09=09=09=20builder->snapshot);=0A-=0A=20=09=09/*=0A=20=09=09= =20*=20Add=20invalidation=20messages=20to=20the=20reorder=20buffer=20of=20= in-progress=0A=20=09=09=20*=20transactions=20except=20the=20current=20= committed=20transaction,=20for=20which=20we=0A=20=09=09=20*=20will=20= execute=20invalidations=20at=20the=20end.=0A-=09=09=20*=0A-=09=09=20*=20= It=20is=20required,=20otherwise,=20we=20will=20end=20up=20using=20the=20= stale=20catcache=0A-=09=09=20*=20contents=20built=20by=20the=20current=20= transaction=20even=20after=20its=20decoding,=0A-=09=09=20*=20which=20= should=20have=20been=20invalidated=20due=20to=20concurrent=20catalog=0A-=09= =09=20*=20changing=20transaction.=0A-=09=09=20*=0A-=09=09=20*=20= Distribute=20only=20the=20invalidation=20messages=20generated=20by=20the=20= current=0A-=09=09=20*=20committed=20transaction.=20Invalidation=20= messages=20received=20from=20other=0A-=09=09=20*=20transactions=20would=20= have=20already=20been=20propagated=20to=20the=20relevant=0A-=09=09=20*=20= in-progress=20transactions.=20This=20transaction=20would=20have=20= processed=0A-=09=09=20*=20those=20invalidations,=20ensuring=20that=20= subsequent=20transactions=20observe=0A-=09=09=20*=20a=20consistent=20= cache=20state.=0A=20=09=09=20*/=0A=20=09=09if=20(txn->xid=20!=3D=20xid)=0A= =20=09=09{=0A@@=20-831,6=20+837,9=20@@=20= SnapBuildDistributeSnapshotAndInval(SnapBuild=20*builder,=20XLogRecPtr=20= lsn,=20Transact=0A=20=09=09=09{=0A=20=09=09=09=09Assert(msgs=20!=3D=20= NULL);=0A=20=0A+=09=09=09=09elog(DEBUG2,=20"adding=20invalidations=20to=20= %u=20at=20%X/%08X",=0A+=09=09=09=09=09=20txn->xid,=20= LSN_FORMAT_ARGS(lsn));=0A+=0A=20=09=09=09=09= ReorderBufferAddDistributedInvalidations(builder->reorder,=0A=20=09=09=09= =09=09=09=09=09=09=09=09=09=09=09=20txn->xid,=20lsn,=0A=20=09=09=09=09=09= =09=09=09=09=09=09=09=09=09=20ninvalidations,=20msgs);=0A@@=20-1097,6=20= +1106,9=20@@=20SnapBuildCommitTxn(SnapBuild=20*builder,=20XLogRecPtr=20= lsn,=20TransactionId=20xid,=0A=20=0A=20=09=09builder->snapshot=20=3D=20= SnapBuildBuildSnapshot(builder);=0A=20=0A+=09=09/*=20Track=20that=20the=20= catalog=20snapshot=20changed=20*/=0A+=09=09= builder->snapshot_generation++;=0A+=0A=20=09=09/*=20we=20might=20need=20= to=20execute=20invalidations,=20add=20snapshot=20*/=0A=20=09=09if=20= (!ReorderBufferXidHasBaseSnapshot(builder->reorder,=20xid))=0A=20=09=09{=0A= @@=20-1109,10=20+1121,12=20@@=20SnapBuildCommitTxn(SnapBuild=20*builder,=20= XLogRecPtr=20lsn,=20TransactionId=20xid,=0A=20=09=09= SnapBuildSnapIncRefcount(builder->snapshot);=0A=20=0A=20=09=09/*=0A-=09=09= =20*=20Add=20a=20new=20catalog=20snapshot=20and=20invalidations=20= messages=20to=20all=0A-=09=09=20*=20currently=20running=20transactions.=0A= +=09=09=20*=20Distribute=20invalidation=20messages=20to=20all=20= currently=20running=0A+=09=09=20*=20transactions.=20=20Snapshots=20are=20= distributed=20lazily=20in=0A+=09=09=20*=20SnapBuildProcessChange()=20= when=20a=20transaction=20decodes=20a=20data=0A+=09=09=20*=20change,=20= avoiding=20O(N^2)=20disk=20usage=20from=20snapshot=20accumulation.=0A=20=09= =09=20*/=0A-=09=09SnapBuildDistributeSnapshotAndInval(builder,=20lsn,=20= xid);=0A+=09=09SnapBuildDistributeInval(builder,=20lsn,=20xid);=0A=20=09= }=0A=20}=0A=20=0A@@=20-1532,7=20+1546,7=20@@=20= SnapBuildWaitSnapshot(xl_running_xacts=20*running,=20TransactionId=20= cutoff)=0A=20=09offsetof(SnapBuildOnDisk,=20version)=0A=20=0A=20#define=20= SNAPBUILD_MAGIC=200x51A1E001=0A-#define=20SNAPBUILD_VERSION=206=0A= +#define=20SNAPBUILD_VERSION=207=0A=20=0A=20/*=0A=20=20*=20Store/Load=20= a=20snapshot=20from=20disk,=20depending=20on=20the=20snapshot=20= builder's=20state.=0Adiff=20--git=20= a/src/include/replication/reorderbuffer.h=20= b/src/include/replication/reorderbuffer.h=0Aindex=20= ff825e4b7b..c0ee7ca8ae=20100644=0A---=20= a/src/include/replication/reorderbuffer.h=0A+++=20= b/src/include/replication/reorderbuffer.h=0A@@=20-461,6=20+461,14=20@@=20= typedef=20struct=20ReorderBufferTXN=0A=20=09/*=20Size=20of=20= top-transaction=20including=20sub-transactions.=20*/=0A=20=09Size=09=09= total_size;=0A=20=0A+=09/*=0A+=09=20*=20Tracks=20the=20snapshot=20= generation=20at=20which=20this=20transaction=20last=20received=0A+=09=20= *=20a=20catalog=20snapshot=20via=20lazy=20distribution.=20=20Compared=20= against=0A+=09=20*=20SnapBuild.snapshot_generation=20to=20decide=20if=20= a=20new=20snapshot=20is=20needed=0A+=09=20*=20before=20decoding=20a=20= data=20change.=0A+=09=20*/=0A+=09uint64=09=09last_snapshot_generation;=0A= +=0A=20=09/*=0A=20=09=20*=20Private=20data=20pointer=20of=20the=20output=20= plugin.=0A=20=09=20*/=0A@@=20-742,6=20+750,10=20@@=20extern=20void=20= ReorderBufferAbortOld(ReorderBuffer=20*rb,=20TransactionId=20= oldestRunning=0A=20extern=20void=20ReorderBufferForget(ReorderBuffer=20= *rb,=20TransactionId=20xid,=20XLogRecPtr=20lsn);=0A=20extern=20void=20= ReorderBufferInvalidate(ReorderBuffer=20*rb,=20TransactionId=20xid,=20= XLogRecPtr=20lsn);=0A=20=0A+extern=20ReorderBufferTXN=20= *ReorderBufferTXNByXid(ReorderBuffer=20*rb,=0A+=09=09=09=09=09=09=09=09=09= =09=09=20=20=20TransactionId=20xid,=20bool=20create,=0A+=09=09=09=09=09=09= =09=09=09=09=09=20=20=20bool=20*is_new,=20XLogRecPtr=20lsn,=0A+=09=09=09=09= =09=09=09=09=09=09=09=20=20=20bool=20create_as_top);=0A=20extern=20void=20= ReorderBufferSetBaseSnapshot(ReorderBuffer=20*rb,=20TransactionId=20xid,=0A= =20=09=09=09=09=09=09=09=09=09=09=20XLogRecPtr=20lsn,=20Snapshot=20= snap);=0A=20extern=20void=20ReorderBufferAddSnapshot(ReorderBuffer=20= *rb,=20TransactionId=20xid,=0Adiff=20--git=20= a/src/include/replication/snapbuild_internal.h=20= b/src/include/replication/snapbuild_internal.h=0Aindex=20= 363f7f5977..f8d20dd1f2=20100644=0A---=20= a/src/include/replication/snapbuild_internal.h=0A+++=20= b/src/include/replication/snapbuild_internal.h=0A@@=20-129,6=20+129,17=20= @@=20struct=20SnapBuild=0A=20=09=09TransactionId=20*xip;=0A=20=09}=09=09=09= committed;=0A=20=0A+=09/*=0A+=09=20*=20Generation=20counter,=20= incremented=20each=20time=20a=20new=20catalog=20snapshot=20is=0A+=09=20*=20= built=20due=20to=20a=20catalog-modifying=20transaction=20commit.=20=20= Used=20for=20lazy=0A+=09=20*=20snapshot=20distribution:=20instead=20of=20= distributing=20a=20snapshot=20to=20every=0A+=09=20*=20in-progress=20= transaction=20on=20each=20catalog-modifying=20commit,=20we=20only=0A+=09=20= *=20distribute=20when=20the=20transaction=20actually=20needs=20to=20= decode=20a=20data=20change.=0A+=09=20*=20This=20avoids=20O(N^2)=20= snapshot=20disk=20usage=20when=20a=20long-running=20transaction=0A+=09=20= *=20coexists=20with=20many=20vacuum=20commits.=0A+=09=20*/=0A+=09uint64=09= =09snapshot_generation;=0A+=0A=20=09/*=0A=20=09=20*=20Array=20of=20= transactions=20and=20subtransactions=20that=20had=20modified=20catalogs=0A= =20=09=20*=20and=20were=20running=20when=20the=20snapshot=20was=20= serialized.=0A--=20=0A2.43.7=0A=0A= --Apple-Mail=_A216317C-5AD6-4376-83E2-C97A11AF6F22 Content-Disposition: attachment; filename=v1-0002-Benchmark-scripts-for-lazy-snapshot-distribution.patch Content-Type: application/octet-stream; x-unix-mode=0644; name="v1-0002-Benchmark-scripts-for-lazy-snapshot-distribution.patch" Content-Transfer-Encoding: quoted-printable =46rom=2014e88f283c6513fff07a58b301c350a3cdcc1388=20Mon=20Sep=2017=20= 00:00:00=202001=0AFrom:=20Rui=20Zhao=20=0ADate:=20= Tue,=2012=20May=202026=2016:35:39=20+0800=0ASubject:=20[PATCH=20v1=20= 2/2]=20Benchmark=20scripts=20for=20lazy=20snapshot=20distribution=0A=0A= Provides=20single-run=20and=20matrix=20benchmark=20scripts=20to=20= measure=20the=20impact=0Aof=20the=20lazy=20snapshot=20distribution=20= patch=20on=20logical=20decoding=20spill=20bytes=0Aand=20decoding=20time.=0A= =0AScenario:=20one=20long-running=20write=20transaction=20with=20K=3D1=20= INSERT,=20coexisting=0Awith=20N=20concurrent=20CREATE/DROP=20TABLE=20= pairs=20(each=20its=20own=20catalog-modifying=0Acommit),=20then=20= drained=20via=20pg_logical_slot_get_changes()=20with=20the=0A= test_decoding=20output=20plugin.=20=20Captures=20spill_bytes,=20= spill_count,=0Atotal_bytes,=20and=20end-to-end=20decoding=20wall=20time.=0A= =0AFiles:=0A=0A=20=20bench/setup_cluster.sh=0A=20=20=20=20Spins=20up=20a=20= throwaway=20PG=20cluster=20from=20a=20specified=20install=20directory,=0A= =20=20=20=20configured=20for=20logical=20decoding=20(wal_level=3Dlogical,=20= max_wal_senders,=0A=20=20=20=20max_replication_slots,=20configurable=20= logical_decoding_work_mem).=0A=0A=20=20bench/lazy_snapshot_bench.sh=0A=20= =20=20=20Runs=20a=20single=20(N,=20K)=20scenario=20against=20a=20running=20= cluster.=20=20The=20long=0A=20=20=20=20transaction=20runs=20in=20a=20= background=20psql=20session=20that=20does=20K=20INSERTs=0A=20=20=20=20= and=20then=20pg_sleep()s=20long=20enough=20for=20the=20concurrent=20DDL=20= loop=20to=0A=20=20=20=20complete.=20=20DDLs=20are=20batched=20in=20a=20= single=20psql=20session=20for=20~100x=0A=20=20=20=20throughput=20over=20= per-statement=20connections.=20=20Outputs=20one=20CSV=20row.=0A=0A=20=20= bench/run_matrix.sh=0A=20=20=20=20Driver=20that=20iterates=20over=20a=20= list=20of=20N=20values=20with=20configurable=0A=20=20=20=20repeat=20= count=20(default=203).=20=20Emits=20a=20CSV=20with=20all=20replicates.=0A= =0A=20=20bench/aggregate.sh=0A=20=20=20=20Aggregates=20CSVs=20(from=20= one=20or=20more=20run_matrix.sh=20invocations)=20into=0A=20=20=20=20= median-per-cell=20summary=20tables,=20plus=20a=20side-by-side=20master=20= vs=0A=20=20=20=20patch=20comparison=20with=20computed=20speedup=20and=20= bytes-saved=20ratios.=0A=0AUsage:=0A=0A=20=20#=20Build=20master=20and=20= patch=20versions=20of=20PostgreSQL=0A=20=20git=20checkout=20master=0A=20=20= ./configure=20--prefix=3D/tmp/pg_master_install=20...=20&&=20make=20= install=0A=20=20git=20checkout=20lazy-snapshot-distribution=0A=20=20make=20= clean=20&&=20./configure=20--prefix=3D/tmp/pg_patch_install=20...=20&&=20= make=20install=0A=0A=20=20#=20Bring=20up=20master=20cluster,=20run=20= matrix=0A=20=20eval=20"$(./bench/setup_cluster.sh=20= /tmp/pg_master_install=20/tmp/pg_master_data=2055432)"=0A=20=20= ./bench/run_matrix.sh=20master=20500=201000=202000=205000=20>=20= master.csv=0A=20=20pg_ctl=20-D=20/tmp/pg_master_data=20stop=0A=0A=20=20#=20= Bring=20up=20patch=20cluster=20on=20a=20different=20port,=20run=20matrix=20= in=20parallel=0A=20=20eval=20"$(./bench/setup_cluster.sh=20= /tmp/pg_patch_install=20/tmp/pg_patch_data=2055433)"=0A=20=20= ./bench/run_matrix.sh=20patch=20500=201000=202000=205000=20>=20patch.csv=0A= =0A=20=20#=20Aggregate=0A=20=20cat=20master.csv=20patch.csv=20|=20= ./bench/aggregate.sh=20-=0A=0AThese=20scripts=20produced=20the=20= empirical=20data=20cited=20in=20the=0A"Performance=20impact"=20section=20= of=20the=20cover=20letter=20for=20v1-0001.=0A---=0A=20bench/aggregate.sh=20= =20=20=20=20=20=20=20=20=20=20|=20=2099=20+++++++++++++++++++++=0A=20= bench/lazy_snapshot_bench.sh=20|=20161=20= +++++++++++++++++++++++++++++++++++=0A=20bench/run_matrix.sh=20=20=20=20=20= =20=20=20=20=20|=20=2057=20+++++++++++++=0A=20bench/setup_cluster.sh=20=20= =20=20=20=20=20|=20=2075=20++++++++++++++++=0A=204=20files=20changed,=20= 392=20insertions(+)=0A=20create=20mode=20100755=20bench/aggregate.sh=0A=20= create=20mode=20100755=20bench/lazy_snapshot_bench.sh=0A=20create=20mode=20= 100755=20bench/run_matrix.sh=0A=20create=20mode=20100755=20= bench/setup_cluster.sh=0A=0Adiff=20--git=20a/bench/aggregate.sh=20= b/bench/aggregate.sh=0Anew=20file=20mode=20100755=0Aindex=20= 0000000000..481e00c549=0A---=20/dev/null=0A+++=20b/bench/aggregate.sh=0A= @@=20-0,0=20+1,99=20@@=0A+#!/bin/bash=0A+#=0A+#=20Aggregate=20the=20CSV=20= from=20run_matrix.sh=20into=20a=20comparison=20table=20suitable=20for=0A= +#=20pasting=20into=20a=20pgsql-hackers=20email.=0A+#=0A+#=20Usage:=0A+#=20= =20=20$0=20=0A+#=20or=0A+#=20=20=20cat=20master.csv=20= patch.csv=20|=20$0=20-=0A+#=0A+#=20Outputs,=20per=20(N,=20label)=20= combo:=20median=20of=20decoding_ms=20/=20spill_bytes=20/=0A+#=20= total_bytes=20across=20iterations.=20Then=20a=20side-by-side=20= comparison=20of=20master=0A+#=20vs=20patch.=0A+=0A+set=20-euo=20pipefail=0A= +=0A+if=20[[=20$#=20-lt=201=20]];=20then=0A+=20=20echo=20"Usage:=20$0=20= =20=20(or=20'-'=20for=20stdin)"=20>&2=0A+=20=20exit=201=0A+fi=0A= +=0A+INPUT=3D"$1"=0A+[[=20"$INPUT"=20=3D=3D=20"-"=20]]=20&&=20= INPUT=3D/dev/stdin=0A+=0A+awk=20-F,=20'=0A+$1=20=3D=3D=20"label"=20{=20= next=20}=20=20#=20skip=20any=20header=20row=20(works=20for=20= concatenated=20CSVs)=0A+{=0A+=20=20key=20=3D=20$1=20","=20$2=20=20#=20= label,N=0A+=20=20decoding[key,=20++cnt_dec[key]]=20=3D=20$5=0A+=20=20= spill[key,=20++cnt_sp[key]]=20=20=20=20=20=3D=20$9=0A+=20=20total[key,=20= ++cnt_tot[key]]=20=20=20=20=3D=20$10=0A+=20=20labels[$1]=20=3D=201=0A+=20= =20ns[$2=20+=200]=20=3D=201=0A+=20=20#=20remember=20K=20(assume=20= constant)=20and=20max=20repeat=20seen=0A+=20=20k=20=3D=20$3=0A+=20=20= ldwm=20=3D=20$4=0A+=20=20if=20(cnt_dec[key]=20>=20rep_max)=20rep_max=20=3D= =20cnt_dec[key]=0A+}=0A+END=20{=0A+=20=20#=20median=20helper=20baked=20= into=20END=20via=20re-sort=20per-key=20(small=20N,=20OK)=0A+=20=20for=20= (key=20in=20cnt_dec)=20{=0A+=20=20=20=20n=20=3D=20cnt_dec[key]=0A+=20=20=20= =20#=20collect=20into=20arr=0A+=20=20=20=20delete=20arr=0A+=20=20=20=20= for=20(i=20=3D=201;=20i=20<=3D=20n;=20i++)=20arr[i]=20=3D=20= decoding[key,=20i]=0A+=20=20=20=20asort(arr)=0A+=20=20=20=20if=20(n=20%=20= 2)=20m_dec[key]=20=3D=20arr[(n+1)/2]=0A+=20=20=20=20else=20=20=20=20=20=20= =20m_dec[key]=20=3D=20(arr[n/2]=20+=20arr[n/2=20+=201])=20/=202=0A+=0A+=20= =20=20=20delete=20arr=0A+=20=20=20=20for=20(i=20=3D=201;=20i=20<=3D=20n;=20= i++)=20arr[i]=20=3D=20spill[key,=20i]=0A+=20=20=20=20asort(arr)=0A+=20=20= =20=20if=20(n=20%=202)=20m_sp[key]=20=3D=20arr[(n+1)/2]=0A+=20=20=20=20= else=20=20=20=20=20=20=20m_sp[key]=20=3D=20(arr[n/2]=20+=20arr[n/2=20+=20= 1])=20/=202=0A+=0A+=20=20=20=20delete=20arr=0A+=20=20=20=20for=20(i=20=3D=20= 1;=20i=20<=3D=20n;=20i++)=20arr[i]=20=3D=20total[key,=20i]=0A+=20=20=20=20= asort(arr)=0A+=20=20=20=20if=20(n=20%=202)=20m_tot[key]=20=3D=20= arr[(n+1)/2]=0A+=20=20=20=20else=20=20=20=20=20=20=20m_tot[key]=20=3D=20= (arr[n/2]=20+=20arr[n/2=20+=201])=20/=202=0A+=20=20}=0A+=0A+=20=20#=20= Sorted=20N=20values=0A+=20=20n_count=20=3D=200=0A+=20=20for=20(n=20in=20= ns)=20sorted_ns[++n_count]=20=3D=20n=0A+=20=20for=20(i=20=3D=201;=20i=20= <=3D=20n_count;=20i++)=0A+=20=20=20=20for=20(j=20=3D=20i=20+=201;=20j=20= <=3D=20n_count;=20j++)=0A+=20=20=20=20=20=20if=20(sorted_ns[i]=20+=200=20= >=20sorted_ns[j]=20+=200)=20{=0A+=20=20=20=20=20=20=20=20t=20=3D=20= sorted_ns[i];=20sorted_ns[i]=20=3D=20sorted_ns[j];=20sorted_ns[j]=20=3D=20= t=0A+=20=20=20=20=20=20}=0A+=0A+=20=20printf=20"Config:=20K=3D%s,=20= logical_decoding_work_mem=3D%s,=20REPEAT=3D%d=20(median)\n\n",=20k,=20= ldwm,=20rep_max=0A+=20=20printf=20"%-8s=20%-7s=20%-14s=20%-14s=20= %-14s\n",=20"label",=20"N",=20"decode_ms",=20"spill_bytes",=20= "total_bytes"=0A+=20=20for=20(i=20=3D=201;=20i=20<=3D=20n_count;=20i++)=20= {=0A+=20=20=20=20N=20=3D=20sorted_ns[i]=0A+=20=20=20=20for=20(lbl=20in=20= labels)=20{=0A+=20=20=20=20=20=20key=20=3D=20lbl=20","=20N=0A+=20=20=20=20= =20=20if=20(key=20in=20m_dec)=0A+=20=20=20=20=20=20=20=20printf=20"%-8s=20= %-7d=20%-14d=20%-14d=20%-14d\n",=20lbl,=20N,=20m_dec[key],=20m_sp[key],=20= m_tot[key]=0A+=20=20=20=20}=0A+=20=20}=0A+=0A+=20=20#=20Side-by-side,=20= if=20exactly=20master=20+=20patch=20are=20present=0A+=20=20if=20= ("master"=20in=20labels=20&&=20"patch"=20in=20labels)=20{=0A+=20=20=20=20= printf=20"\n%-7s=20%-12s=20%-12s=20%-12s=20%-12s=20%-12s=20%-12s\n",=20\=0A= +=20=20=20=20=20=20"N",=20"master_dec",=20"patch_dec",=20"speedup",=20= "master_spill",=20"patch_spill",=20"saved_x"=0A+=20=20=20=20for=20(i=20=3D= =201;=20i=20<=3D=20n_count;=20i++)=20{=0A+=20=20=20=20=20=20N=20=3D=20= sorted_ns[i]=0A+=20=20=20=20=20=20mk=20=3D=20"master,"=20N;=20pk=20=3D=20= "patch,"=20N=0A+=20=20=20=20=20=20if=20((mk=20in=20m_dec)=20&&=20(pk=20= in=20m_dec))=20{=0A+=20=20=20=20=20=20=20=20speedup=20=3D=20(m_dec[pk]=20= >=200)=20?=20m_dec[mk]=20/=20m_dec[pk]=20:=200=0A+=20=20=20=20=20=20=20=20= saved=20=20=20=3D=20(m_sp[pk]=20>=200)=20?=20m_sp[mk]=20/=20m_sp[pk]=20:=20= (m_sp[mk]=20>=200=20?=20999=20:=201)=0A+=20=20=20=20=20=20=20=20printf=20= "%-7d=20%-12d=20%-12d=20%-12.2f=20%-12d=20%-12d=20%-12.2f\n",=20\=0A+=20=20= =20=20=20=20=20=20=20=20N,=20m_dec[mk],=20m_dec[pk],=20speedup,=20= m_sp[mk],=20m_sp[pk],=20saved=0A+=20=20=20=20=20=20}=0A+=20=20=20=20}=0A= +=20=20}=0A+}=0A+'=20"$INPUT"=0Adiff=20--git=20= a/bench/lazy_snapshot_bench.sh=20b/bench/lazy_snapshot_bench.sh=0Anew=20= file=20mode=20100755=0Aindex=200000000000..f16a1a0c1f=0A---=20/dev/null=0A= +++=20b/bench/lazy_snapshot_bench.sh=0A@@=20-0,0=20+1,161=20@@=0A= +#!/bin/bash=0A+#=0A+#=20Single=20benchmark=20run=20for=20"lazy=20= snapshot=20distribution"=20patch.=0A+#=0A+#=20Simulates:=20one=20= long-running=20transaction=20(with=20K=20inserts)=20coexists=20with=20N=0A= +#=20concurrent=20catalog-modifying=20commits,=20then=20drains=20via=20= pg_logical_slot_get_changes.=0A+#=0A+#=20Output=20(CSV=20row=20to=20= stdout):=0A+#=20=20=20= label,N,K,ldwm,decoding_ms,decoded_changes,spill_txns,spill_count,spill_by= tes,total_bytes,ddl_loop_sec=0A+#=0A+#=20Assumes=20a=20running=20PG=20= cluster=20with=20wal_level=3Dlogical.=20=20To=20compare=20master=20vs=0A= +#=20patch,=20point=20this=20script=20at=20two=20different=20clusters=20= built=20from=20each=20binary.=0A+=0A+set=20-euo=20pipefail=0A+=0A+#=20= --------=20args=20--------=0A+N=3D""=0A+K=3D"1"=0A+LABEL=3D""=0A= +LDWM=3D"64kB"=0A+SLOT=3D"lazy_bench"=0A+VERBOSE=3D0=0A+=0A+usage()=20{=0A= +=20=20cat=20<=20-l=20=