From d93e1b9be047f6ba484e1f105a94505b7d261c5b Mon Sep 17 00:00:00 2001 From: Shawn Wang Date: Mon, 23 Mar 2026 20:46:08 +0800 Subject: [PATCH] Add logical_decoding_spill_limit GUC to cap spill file disk usage. When logical decoding exceeds logical_decoding_work_mem, changes are spilled to disk under pg_replslot//. Currently there is no limit on the total size of these spill files, which can lead to uncontrolled disk space consumption for large or numerous concurrent transactions. Add a new GUC logical_decoding_spill_limit (integer, unit kB, default 0 meaning unlimited) that limits the total on-disk spill file size per replication slot. The implementation tracks the current on-disk footprint in ReorderBuffer.spillBytesOnDisk (incremented after each successful write in ReorderBufferSerializeChange, decremented when spill files are removed in ReorderBufferRestoreCleanup) and per-transaction in ReorderBufferTXN.serialized_size. Before serializing a transaction to disk, ReorderBufferCheckMemoryLimit checks whether the projected write would exceed the configured limit. If so, it raises an ERROR. The replication slot state (restart_lsn, confirmed_flush) is preserved, so the subscriber can reconnect after the administrator increases the limit, raises logical_decoding_work_mem, or switches to a streaming-capable output plugin. The check is only on the spill-to-disk path (not the streaming path, which involves no disk I/O) and only when the limit is set (> 0), so there is zero overhead on the normal hot path and full backward compatibility with existing configurations. --- doc/src/sgml/config.sgml | 24 ++++++++ .../replication/logical/reorderbuffer.c | 56 +++++++++++++++++++ src/backend/utils/misc/guc_parameters.dat | 10 ++++ src/backend/utils/misc/postgresql.conf.sample | 2 + src/include/replication/reorderbuffer.h | 17 ++++++ 5 files changed, 109 insertions(+) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 8cdd826fbd3..e6fc9653fef 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2120,6 +2120,30 @@ include_dir 'conf.d' + + logical_decoding_spill_limit (integer) + + logical_decoding_spill_limit configuration parameter + + + + + Specifies the maximum total size of on-disk spill files that logical + decoding may use per replication slot. When changes exceed the memory + limit set by logical_decoding_work_mem, they are + spilled to disk. This parameter limits the total amount of disk space + used by those spill files. If the limit would be exceeded, the + walsender process raises an error and disconnects. The replication + slot's state is preserved, so the subscriber can reconnect after the + administrator increases this limit or switches to a streaming-capable + output plugin. + If this value is specified without units, it is taken as kilobytes. + The default value is 0, which means there is no + limit on spill file disk usage. + + + + commit_timestamp_buffers (integer) diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 682d13c9f22..d9f82c2ac48 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -72,6 +72,15 @@ * counter is updated; however transactions with size 0 are not stored in * the heap, because they have no changes to evict. * + * To prevent spill files from consuming unbounded disk space, the + * logical_decoding_spill_limit GUC can be set to limit the total size + * of spill files per replication slot. We track the current on-disk + * footprint in ReorderBuffer.spillBytesOnDisk (incremented on each + * successful write, decremented when spill files are cleaned up) and + * per-transaction in ReorderBufferTXN.serialized_size. Before + * serializing a transaction, we check whether the projected write would + * exceed the configured limit and raise an ERROR if so. + * * We still rely on max_changes_in_memory when loading serialized changes * back into memory. At that point we can't use the memory limit directly * as we load the subxacts independently. One option to deal with this @@ -224,6 +233,7 @@ typedef struct ReorderBufferDiskChange * like. */ int logical_decoding_work_mem; +int logical_decoding_spill_limit; static const Size max_changes_in_memory = 4096; /* XXX for restore only */ /* GUC variable */ @@ -3885,6 +3895,10 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) * If debug_logical_replication_streaming is set to "immediate", stream or * serialize the changes immediately. * + * When spilling to disk, if logical_decoding_spill_limit is set (> 0), + * we check whether the projected write would exceed the configured limit + * and raise an ERROR if so, to prevent unbounded disk usage. + * * XXX At this point we select the transactions until we reach under the memory * limit, but we might also adapt a more elaborate eviction strategy - for example * evicting enough transactions to free certain fraction (e.g. 50%) of the memory @@ -3964,6 +3978,36 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn)) continue; + /* + * Check the spill-to-disk size limit before actually serializing. + * We use the transaction's in-memory size as an estimate of how + * much will be written, which is a reasonable approximation. + * + * We only check when logical_decoding_spill_limit is set (> 0). + * When the limit would be exceeded, raise an ERROR. The + * walsender will exit, but the replication slot's restart_lsn is + * preserved so the subscriber can reconnect after the DBA + * increases the limit or switches to a streaming-capable output + * plugin. + */ + if (logical_decoding_spill_limit > 0 && + rb->spillBytesOnDisk + txn->size > + (Size) logical_decoding_spill_limit * 1024) + { + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("logical decoding spill file size limit exceeded"), + errdetail("Current on-disk spill size is %zu bytes, " + "transaction to spill is %zu bytes, " + "limit is %d kB.", + rb->spillBytesOnDisk, txn->size, + logical_decoding_spill_limit), + errhint("Consider increasing %s, %s, " + "or using a streaming-capable output plugin.", + "logical_decoding_spill_limit", + "logical_decoding_work_mem"))); + } + ReorderBufferSerializeTXN(rb, txn); } @@ -4286,6 +4330,13 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } pgstat_report_wait_end(); + /* + * Update the on-disk spill size accounting for both the transaction and + * the reorder buffer. This is used to enforce logical_decoding_spill_limit. + */ + txn->serialized_size += ondisk->size; + rb->spillBytesOnDisk += ondisk->size; + /* * Keep the transaction's final_lsn up to date with each change we send to * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to @@ -4872,6 +4923,11 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", path))); } + + /* Update the on-disk spill size accounting. */ + Assert(rb->spillBytesOnDisk >= txn->serialized_size); + rb->spillBytesOnDisk -= txn->serialized_size; + txn->serialized_size = 0; } /* diff --git a/src/backend/utils/misc/guc_parameters.dat b/src/backend/utils/misc/guc_parameters.dat index 0c9854ad8fc..3168d23f387 100644 --- a/src/backend/utils/misc/guc_parameters.dat +++ b/src/backend/utils/misc/guc_parameters.dat @@ -1849,6 +1849,16 @@ boot_val => 'false', }, +{ name => 'logical_decoding_spill_limit', type => 'int', context => 'PGC_USERSET', group => 'RESOURCES_MEM', + short_desc => 'Sets the maximum total size of spill files for logical decoding.', + long_desc => 'Limits the total disk space used by spill files per replication slot during logical decoding. 0 means no limit.', + flags => 'GUC_UNIT_KB', + variable => 'logical_decoding_spill_limit', + boot_val => '0', + min => '0', + max => 'MAX_KILOBYTES', +}, + { name => 'logical_decoding_work_mem', type => 'int', context => 'PGC_USERSET', group => 'RESOURCES_MEM', short_desc => 'Sets the maximum memory to be used for logical decoding.', long_desc => 'This much memory can be used by each internal reorder buffer before spilling to disk.', diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index e4abe6c0077..04b4fb73737 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -149,6 +149,8 @@ #maintenance_work_mem = 64MB # min 64kB #autovacuum_work_mem = -1 # min 64kB, or -1 to use maintenance_work_mem #logical_decoding_work_mem = 64MB # min 64kB +#logical_decoding_spill_limit = 0 # limits spill file disk usage per slot; + # 0 means no limit #max_stack_depth = 2MB # min 100kB #shared_memory_type = mmap # the default is the first option # supported by the operating system: diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 2d717a9e152..04249a99d82 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -25,6 +25,7 @@ /* GUC variables */ extern PGDLLIMPORT int logical_decoding_work_mem; +extern PGDLLIMPORT int logical_decoding_spill_limit; extern PGDLLIMPORT int debug_logical_replication_streaming; /* possible values for debug_logical_replication_streaming */ @@ -461,6 +462,14 @@ typedef struct ReorderBufferTXN /* Size of top-transaction including sub-transactions. */ Size total_size; + /* + * Size of this transaction's changes currently serialized to disk (in + * bytes). Tracked per-transaction so that we can accurately update the + * ReorderBuffer-level spillBytesOnDisk counter when spill files are + * cleaned up. + */ + Size serialized_size; + /* * Private data pointer of the output plugin. */ @@ -685,6 +694,14 @@ struct ReorderBuffer int64 spillCount; /* spill-to-disk invocation counter */ int64 spillBytes; /* amount of data spilled to disk */ + /* + * Current total size of spill files on disk for this reorder buffer (in + * bytes). Unlike spillBytes which is a cumulative statistic counter, this + * tracks the actual on-disk footprint right now and is decremented when + * spill files are cleaned up. Used to enforce logical_decoding_spill_limit. + */ + Size spillBytesOnDisk; + /* Statistics about transactions streamed to the decoding output plugin */ int64 streamTxns; /* number of transactions streamed */ int64 streamCount; /* streaming invocation counter */ -- 2.43.7