From: =?UTF-8?q?=C3=81lvaro=20Herrera?= Date: Tue, 24 Mar 2026 15:09:03 +0100 Subject: [PATCH v44 07/10] Split cluster.h to create repack_internal.h Most of the details of concurrent repack are only needed by pgoutput_repack; and they have a nasty effect on headers included by cluster.h (used by several high-profile places), so isolate that for cleanliness. Also, change_useless_for_repack() is better declared in decode.h. --- src/backend/commands/cluster.c | 128 ++++++++++-------- src/backend/postmaster/bgworker.c | 2 +- .../pgoutput_repack/pgoutput_repack.c | 13 +- src/include/commands/cluster.h | 57 +------- src/include/commands/repack_internal.h | 59 ++++++++ src/include/replication/decode.h | 4 + 6 files changed, 147 insertions(+), 116 deletions(-) create mode 100644 src/include/commands/repack_internal.h diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 75556cbdafb..2c3058ba10d 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -59,6 +59,7 @@ #include "commands/cluster.h" #include "commands/defrem.h" #include "commands/progress.h" +#include "commands/repack_internal.h" #include "commands/tablecmds.h" #include "commands/vacuum.h" #include "executor/executor.h" @@ -205,17 +206,11 @@ typedef struct DecodingWorkerShared char error_queue[FLEXIBLE_ARRAY_MEMBER]; } DecodingWorkerShared; -/* - * Generate worker's output file name. If relations of the same 'relid' happen - * to be processed at the same time, they must be from different databases and - * therefore different backends must be involved. (PID is already present in - * the fileset name.) - */ -static inline void -DecodingWorkerFileName(char *fname, Oid relid, uint32 seq) -{ - snprintf(fname, MAXPGPATH, "%u-%u", relid, seq); -} +/* Is this process a REPACK worker? */ +static bool is_repack_worker = false; + +static pid_t backend_pid; +static ProcNumber backend_proc_number; /* * Backend-local information to control the decoding worker. @@ -241,6 +236,7 @@ static DecodingWorker *decoding_worker = NULL; */ volatile sig_atomic_t RepackMessagePending = false; +static LOCKMODE RepackLockLevel(bool concurrent); static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap, Oid indexOid, Oid userid, LOCKMODE lmode, int options); @@ -298,12 +294,16 @@ static Relation process_single_relation(RepackStmt *stmt, ClusterParams *params); static Oid determine_clustered_index(Relation rel, bool usingindex, const char *indexname); -static void start_decoding_worker(Oid relid); -static void stop_decoding_worker(void); + +static void start_repack_decoding_worker(Oid relid); +static void stop_repack_decoding_worker(void); static void repack_worker_internal(dsm_segment *seg); static void export_initial_snapshot(Snapshot snapshot, DecodingWorkerShared *shared); static Snapshot get_initial_snapshot(DecodingWorker *worker); +extern bool am_decoding_for_repack(void); +static void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq); + static void ProcessRepackMessage(StringInfo msg); static const char *RepackCommandAsString(RepackCommand cmd); @@ -369,17 +369,8 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) parser_errposition(pstate, opt->location)); } - /* - * Determine the lock mode expected by cluster_rel(). - * - * In the exclusive case, we obtain AccessExclusiveLock right away to - * avoid lock-upgrade hazard in the single-transaction case. In the - * CONCURRENTLY case, the AccessExclusiveLock will only be used at the end - * of processing, supposedly for very short time. Until then, we'll have - * to unlock the relation temporarily, so there's no lock-upgrade hazard. - */ - lockmode = (params.options & CLUOPT_CONCURRENT) == 0 ? - AccessExclusiveLock : ShareUpdateExclusiveLock; + /* Determine the lock mode to use. */ + lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0); /* * If a single relation is specified, process it and we're done ... unless @@ -434,6 +425,12 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) "Repack", ALLOCSET_DEFAULT_SIZES); + /* + * Since we open a new transaction for each relation, we have to check + * that the relation still is what we think it is. + * + * In single-transaction CLUSTER, we don't need the overhead. + */ params.options |= CLUOPT_RECHECK; /* @@ -544,6 +541,22 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel) MemoryContextDelete(repack_context); } +/* + * In the non-concurrent case, we obtain AccessExclusiveLock throughout the + * operation to avoid any lock-upgrade hazards. In the concurrent case, we + * grab ShareUpdateExclusiveLock (jsut like VACUUM) for most of the + * processing and only acquire AccessExclusiveLock at the end, to swap the + * relation -- supposedly for a short time. + */ +static LOCKMODE +RepackLockLevel(bool concurrent) +{ + if (concurrent) + return ShareUpdateExclusiveLock; + else + return AccessExclusiveLock; +} + /* * cluster_rel * @@ -583,25 +596,22 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0); Oid ident_idx = InvalidOid; - /* - * The lock mode is AccessExclusiveLock for normal processing and - * ShareUpdateExclusiveLock for concurrent processing (so that SELECT, - * INSERT, UPDATE and DELETE commands work, but cluster_rel() cannot be - * called concurrently for the same relation). - */ - lmode = !concurrent ? AccessExclusiveLock : ShareUpdateExclusiveLock; + /* Determine the lock mode to use. */ + lmode = RepackLockLevel(concurrent); - /* There are specific requirements on concurrent processing. */ + /* + * Check some preconditions in the concurrent case. This also obtains the + * replica index OID. + */ if (concurrent) { /* - * Make sure we have no XID assigned, otherwise call of - * repack_setup_logical_decoding() can cause a deadlock. + * Make sure we're not in a transaction block. * - * The existence of transaction block actually does not imply that XID - * was already assigned, but it very likely is. We might want to check - * the result of GetCurrentTransactionIdIfAny() instead, but that - * would be less clear from user's perspective. + * The reason is that repack_setup_logical_decoding() could deadlock + * if there's an XID already assigned. It would be possible to run in + * a transaction block if we had no XID, but this restriction is + * simpler for users to understand and we don't lose anything. */ PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)"); @@ -626,15 +636,11 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, RestrictSearchPath(); /* - * Since we may open a new transaction for each relation, we have to check - * that the relation still is what we think it is. + * Recheck that the relation is still what it was when we started. * - * If this is a single-transaction CLUSTER, we can skip these tests. We - * *must* skip the one on indisclustered since it would reject an attempt - * to cluster a not-previously-clustered index. - * - * XXX move [some of] these comments to where the RECHECK flag is - * determined? + * Note that it's critical to skip this in single-relation CLUSTER; + * otherwise, we would reject an attempt to cluster using a + * not-previously-clustered index. */ if (recheck && !cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid, @@ -754,7 +760,7 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid, * However it still seems a good practice to make sure that the * worker never survives the REPACK command. */ - stop_decoding_worker(); + stop_repack_decoding_worker(); } } PG_END_TRY(); @@ -1093,7 +1099,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, * clustering index) and checking again if it's still eligible for * REPACK CONCURRENTLY. */ - start_decoding_worker(tableOid); + start_repack_decoding_worker(tableOid); /* * Wait until the worker has the initial snapshot and retrieve it. @@ -2538,7 +2544,6 @@ RepackCommandAsString(RepackCommand cmd) return "???"; /* keep compiler quiet */ } - /* * Is this backend performing logical decoding on behalf of REPACK * (CONCURRENTLY) ? @@ -3688,7 +3693,7 @@ build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes) * contents to a new table. */ static void -start_decoding_worker(Oid relid) +start_repack_decoding_worker(Oid relid) { Size size; dsm_segment *seg; @@ -3779,7 +3784,7 @@ start_decoding_worker(Oid relid) * we need to stop it explicitly at least on ERROR in the launching backend. */ static void -stop_decoding_worker(void) +stop_repack_decoding_worker(void) { BgwHandleStatus status; @@ -3817,12 +3822,6 @@ stop_decoding_worker(void) decoding_worker = NULL; } -/* Is this process a REPACK worker? */ -static bool is_repack_worker = false; - -static pid_t backend_pid; -static ProcNumber backend_proc_number; - /* * See ParallelWorkerShutdown for details. */ @@ -3869,7 +3868,7 @@ RepackWorkerMain(Datum main_arg) /* * Join locking group - see the comments around the call of - * start_decoding_worker(). + * start_repack_decoding_worker(). */ if (!BecomeLockGroupMember(shared->backend_proc, backend_pid)) /* The leader is not running anymore. */ @@ -4039,6 +4038,18 @@ get_initial_snapshot(DecodingWorker *worker) return snapshot; } +/* + * Generate worker's output file name. If relations of the same 'relid' happen + * to be processed at the same time, they must be from different databases and + * therefore different backends must be involved. (PID is already present in + * the fileset name.) + */ +static void +DecodingWorkerFileName(char *fname, Oid relid, uint32 seq) +{ + snprintf(fname, MAXPGPATH, "%u-%u", relid, seq); +} + bool IsRepackWorker(void) { @@ -4067,7 +4078,6 @@ void ProcessRepackMessages(void) { MemoryContext oldcontext; - static MemoryContext hpm_context = NULL; /* diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index f8a8d1681e9..9e876d55e27 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -13,7 +13,7 @@ #include "postgres.h" #include "access/parallel.h" -#include "commands/cluster.h" +#include "commands/repack_internal.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c index 032fbd0e5b0..cc9ce615b18 100644 --- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c +++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c @@ -13,7 +13,7 @@ #include "postgres.h" #include "access/detoast.h" -#include "commands/cluster.h" +#include "commands/repack_internal.h" #include "replication/snapbuild.h" #include "utils/memutils.h" @@ -176,7 +176,7 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation, ConcurrentChangeKind kind, HeapTuple tuple) { RepackDecodingState *dstate; - MemoryContext oldcxt; + MemoryContext oldcxt; BufFile *file; List *attrs_ext = NIL; int natt_ext; @@ -226,7 +226,10 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation, slot_getsomeattrs(slot, i + 1); - /* This is a non-null varlena datum, but we only care if it's out-of-line */ + /* + * This is a non-null varlena datum, but we only care if it's + * out-of-line + */ varlen = (varlena *) DatumGetPointer(slot->tts_values[i]); if (!VARATT_IS_EXTERNAL(varlen)) continue; @@ -244,8 +247,8 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation, * attributes (those actually should never appear on disk), so * only TOASTed attribute can be seen here. * - * FIXME in what circumstances can an ONDISK attr appear? - * Why aren't these written separately? + * FIXME in what circumstances can an ONDISK attr appear? Why + * aren't these written separately? */ Assert(VARATT_IS_EXTERNAL_ONDISK(varlen)); } diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h index 1c0ac3ab4f5..1528d34fa42 100644 --- a/src/include/commands/cluster.h +++ b/src/include/commands/cluster.h @@ -13,17 +13,12 @@ #ifndef CLUSTER_H #define CLUSTER_H -#include "nodes/execnodes.h" +#include + #include "nodes/parsenodes.h" #include "parser/parse_node.h" -#include "replication/decode.h" -#include "postmaster/bgworker.h" -#include "replication/logical.h" -#include "storage/buffile.h" #include "storage/lockdefs.h" -#include "storage/shm_mq.h" #include "utils/relcache.h" -#include "utils/resowner.h" /* flag bits for ClusterParams->options */ @@ -34,55 +29,14 @@ #define CLUOPT_ANALYZE 0x08 /* do an ANALYZE */ #define CLUOPT_CONCURRENT 0x10 /* allow concurrent data changes */ - /* options for CLUSTER */ typedef struct ClusterParams { bits32 options; /* bitmask of CLUOPT_* */ } ClusterParams; - -/* - * The following definitions are used by REPACK CONCURRENTLY. - */ - -/* - * Stored as a single byte in the output file. - */ -#define CHANGE_INSERT 'i' -#define CHANGE_UPDATE_OLD 'u' -#define CHANGE_UPDATE_NEW 'U' -#define CHANGE_DELETE 'd' -typedef char ConcurrentChangeKind; - -/* - * Logical decoding state. - * - * The output plugin uses it to store the data changes that it decodes from - * WAL while the table contents is being copied to a new storage. - */ -typedef struct RepackDecodingState -{ -#ifdef USE_ASSERT_CHECKING - /* The relation whose changes we're decoding. */ - Oid relid; -#endif - - /* Per-change memory context. */ - MemoryContext change_cxt; - - /* A tuple slot used to pass tuples back and forth */ - TupleTableSlot *slot; - - /* The current output file. */ - BufFile *file; -} RepackDecodingState; - extern PGDLLIMPORT volatile sig_atomic_t RepackMessagePending; -extern bool IsRepackWorker(void); -extern void HandleRepackMessageInterrupt(void); -extern void ProcessRepackMessages(void); extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel); @@ -104,8 +58,9 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap, MultiXactId cutoffMulti, char newrelpersistence); -extern bool am_decoding_for_repack(void); -extern bool change_useless_for_repack(XLogRecordBuffer *buf); +extern bool IsRepackWorker(void); +extern void HandleRepackMessageInterrupt(void); +extern void ProcessRepackMessages(void); + -extern void RepackWorkerMain(Datum main_arg); #endif /* CLUSTER_H */ diff --git a/src/include/commands/repack_internal.h b/src/include/commands/repack_internal.h new file mode 100644 index 00000000000..f90c973f5a0 --- /dev/null +++ b/src/include/commands/repack_internal.h @@ -0,0 +1,59 @@ +/*------------------------------------------------------------------------- + * + * repack_internal.h + * header for REPACK internals + * + * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group + * Portions Copyright (c) 1994-5, Regents of the University of California + * + * src/include/commands/repack_internal.h + * + *------------------------------------------------------------------------- + */ +#ifndef REPACK_INTERNAL_H +#define REPACK_INTERNAL_H + +#include "nodes/execnodes.h" +#include "replication/decode.h" +#include "postmaster/bgworker.h" +#include "replication/logical.h" +#include "storage/buffile.h" +#include "storage/shm_mq.h" +#include "utils/resowner.h" + +/* + * Stored as a single byte in the output file. + */ +#define CHANGE_INSERT 'i' +#define CHANGE_UPDATE_OLD 'u' +#define CHANGE_UPDATE_NEW 'U' +#define CHANGE_DELETE 'd' +typedef char ConcurrentChangeKind; + +/* + * Logical decoding state. + * + * The output plugin uses it to store the data changes that it decodes from + * WAL while the table contents is being copied to a new storage. + */ +typedef struct RepackDecodingState +{ +#ifdef USE_ASSERT_CHECKING + /* The relation whose changes we're decoding. */ + Oid relid; +#endif + + /* Per-change memory context. */ + MemoryContext change_cxt; + + /* A tuple slot used to pass tuples back and forth */ + TupleTableSlot *slot; + + /* The current output file. */ + BufFile *file; +} RepackDecodingState; + + +extern void RepackWorkerMain(Datum main_arg); + +#endif /* REPACK_INTERNAL_H */ diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h index 49f00fc48b8..02b5393474c 100644 --- a/src/include/replication/decode.h +++ b/src/include/replication/decode.h @@ -31,4 +31,8 @@ extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record); +/* in commands/cluster.c */ +extern bool change_useless_for_repack(XLogRecordBuffer *buf); + + #endif -- 2.47.3 --gwom7bl7ogtszo4k Content-Type: text/x-diff; charset=utf-8 Content-Disposition: attachment; filename="v44-0008-Use-BulkInsertState-when-copying-data-to-the-new.patch"