public inbox for [email protected]
help / color / mirror / Atom feedFrom: Álvaro Herrera <[email protected]>
Subject: [PATCH v44 07/10] Split cluster.h to create repack_internal.h
Date: Tue, 24 Mar 2026 15:09:03 +0100
Most of the details of concurrent repack are only needed by
pgoutput_repack; and they have a nasty effect on headers included by
cluster.h (used by several high-profile places), so isolate that for
cleanliness.
Also, change_useless_for_repack() is better declared in decode.h.
---
src/backend/commands/cluster.c | 128 ++++++++++--------
src/backend/postmaster/bgworker.c | 2 +-
.../pgoutput_repack/pgoutput_repack.c | 13 +-
src/include/commands/cluster.h | 57 +-------
src/include/commands/repack_internal.h | 59 ++++++++
src/include/replication/decode.h | 4 +
6 files changed, 147 insertions(+), 116 deletions(-)
create mode 100644 src/include/commands/repack_internal.h
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index 75556cbdafb..2c3058ba10d 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -59,6 +59,7 @@
#include "commands/cluster.h"
#include "commands/defrem.h"
#include "commands/progress.h"
+#include "commands/repack_internal.h"
#include "commands/tablecmds.h"
#include "commands/vacuum.h"
#include "executor/executor.h"
@@ -205,17 +206,11 @@ typedef struct DecodingWorkerShared
char error_queue[FLEXIBLE_ARRAY_MEMBER];
} DecodingWorkerShared;
-/*
- * Generate worker's output file name. If relations of the same 'relid' happen
- * to be processed at the same time, they must be from different databases and
- * therefore different backends must be involved. (PID is already present in
- * the fileset name.)
- */
-static inline void
-DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
-{
- snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
-}
+/* Is this process a REPACK worker? */
+static bool is_repack_worker = false;
+
+static pid_t backend_pid;
+static ProcNumber backend_proc_number;
/*
* Backend-local information to control the decoding worker.
@@ -241,6 +236,7 @@ static DecodingWorker *decoding_worker = NULL;
*/
volatile sig_atomic_t RepackMessagePending = false;
+static LOCKMODE RepackLockLevel(bool concurrent);
static bool cluster_rel_recheck(RepackCommand cmd, Relation OldHeap,
Oid indexOid, Oid userid, LOCKMODE lmode,
int options);
@@ -298,12 +294,16 @@ static Relation process_single_relation(RepackStmt *stmt,
ClusterParams *params);
static Oid determine_clustered_index(Relation rel, bool usingindex,
const char *indexname);
-static void start_decoding_worker(Oid relid);
-static void stop_decoding_worker(void);
+
+static void start_repack_decoding_worker(Oid relid);
+static void stop_repack_decoding_worker(void);
static void repack_worker_internal(dsm_segment *seg);
static void export_initial_snapshot(Snapshot snapshot,
DecodingWorkerShared *shared);
static Snapshot get_initial_snapshot(DecodingWorker *worker);
+extern bool am_decoding_for_repack(void);
+static void DecodingWorkerFileName(char *fname, Oid relid, uint32 seq);
+
static void ProcessRepackMessage(StringInfo msg);
static const char *RepackCommandAsString(RepackCommand cmd);
@@ -369,17 +369,8 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
parser_errposition(pstate, opt->location));
}
- /*
- * Determine the lock mode expected by cluster_rel().
- *
- * In the exclusive case, we obtain AccessExclusiveLock right away to
- * avoid lock-upgrade hazard in the single-transaction case. In the
- * CONCURRENTLY case, the AccessExclusiveLock will only be used at the end
- * of processing, supposedly for very short time. Until then, we'll have
- * to unlock the relation temporarily, so there's no lock-upgrade hazard.
- */
- lockmode = (params.options & CLUOPT_CONCURRENT) == 0 ?
- AccessExclusiveLock : ShareUpdateExclusiveLock;
+ /* Determine the lock mode to use. */
+ lockmode = RepackLockLevel((params.options & CLUOPT_CONCURRENT) != 0);
/*
* If a single relation is specified, process it and we're done ... unless
@@ -434,6 +425,12 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
"Repack",
ALLOCSET_DEFAULT_SIZES);
+ /*
+ * Since we open a new transaction for each relation, we have to check
+ * that the relation still is what we think it is.
+ *
+ * In single-transaction CLUSTER, we don't need the overhead.
+ */
params.options |= CLUOPT_RECHECK;
/*
@@ -544,6 +541,22 @@ ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel)
MemoryContextDelete(repack_context);
}
+/*
+ * In the non-concurrent case, we obtain AccessExclusiveLock throughout the
+ * operation to avoid any lock-upgrade hazards. In the concurrent case, we
+ * grab ShareUpdateExclusiveLock (jsut like VACUUM) for most of the
+ * processing and only acquire AccessExclusiveLock at the end, to swap the
+ * relation -- supposedly for a short time.
+ */
+static LOCKMODE
+RepackLockLevel(bool concurrent)
+{
+ if (concurrent)
+ return ShareUpdateExclusiveLock;
+ else
+ return AccessExclusiveLock;
+}
+
/*
* cluster_rel
*
@@ -583,25 +596,22 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
bool concurrent = ((params->options & CLUOPT_CONCURRENT) != 0);
Oid ident_idx = InvalidOid;
- /*
- * The lock mode is AccessExclusiveLock for normal processing and
- * ShareUpdateExclusiveLock for concurrent processing (so that SELECT,
- * INSERT, UPDATE and DELETE commands work, but cluster_rel() cannot be
- * called concurrently for the same relation).
- */
- lmode = !concurrent ? AccessExclusiveLock : ShareUpdateExclusiveLock;
+ /* Determine the lock mode to use. */
+ lmode = RepackLockLevel(concurrent);
- /* There are specific requirements on concurrent processing. */
+ /*
+ * Check some preconditions in the concurrent case. This also obtains the
+ * replica index OID.
+ */
if (concurrent)
{
/*
- * Make sure we have no XID assigned, otherwise call of
- * repack_setup_logical_decoding() can cause a deadlock.
+ * Make sure we're not in a transaction block.
*
- * The existence of transaction block actually does not imply that XID
- * was already assigned, but it very likely is. We might want to check
- * the result of GetCurrentTransactionIdIfAny() instead, but that
- * would be less clear from user's perspective.
+ * The reason is that repack_setup_logical_decoding() could deadlock
+ * if there's an XID already assigned. It would be possible to run in
+ * a transaction block if we had no XID, but this restriction is
+ * simpler for users to understand and we don't lose anything.
*/
PreventInTransactionBlock(isTopLevel, "REPACK (CONCURRENTLY)");
@@ -626,15 +636,11 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
RestrictSearchPath();
/*
- * Since we may open a new transaction for each relation, we have to check
- * that the relation still is what we think it is.
+ * Recheck that the relation is still what it was when we started.
*
- * If this is a single-transaction CLUSTER, we can skip these tests. We
- * *must* skip the one on indisclustered since it would reject an attempt
- * to cluster a not-previously-clustered index.
- *
- * XXX move [some of] these comments to where the RECHECK flag is
- * determined?
+ * Note that it's critical to skip this in single-relation CLUSTER;
+ * otherwise, we would reject an attempt to cluster using a
+ * not-previously-clustered index.
*/
if (recheck &&
!cluster_rel_recheck(cmd, OldHeap, indexOid, save_userid,
@@ -754,7 +760,7 @@ cluster_rel(RepackCommand cmd, Relation OldHeap, Oid indexOid,
* However it still seems a good practice to make sure that the
* worker never survives the REPACK command.
*/
- stop_decoding_worker();
+ stop_repack_decoding_worker();
}
}
PG_END_TRY();
@@ -1093,7 +1099,7 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose,
* clustering index) and checking again if it's still eligible for
* REPACK CONCURRENTLY.
*/
- start_decoding_worker(tableOid);
+ start_repack_decoding_worker(tableOid);
/*
* Wait until the worker has the initial snapshot and retrieve it.
@@ -2538,7 +2544,6 @@ RepackCommandAsString(RepackCommand cmd)
return "???"; /* keep compiler quiet */
}
-
/*
* Is this backend performing logical decoding on behalf of REPACK
* (CONCURRENTLY) ?
@@ -3688,7 +3693,7 @@ build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
* contents to a new table.
*/
static void
-start_decoding_worker(Oid relid)
+start_repack_decoding_worker(Oid relid)
{
Size size;
dsm_segment *seg;
@@ -3779,7 +3784,7 @@ start_decoding_worker(Oid relid)
* we need to stop it explicitly at least on ERROR in the launching backend.
*/
static void
-stop_decoding_worker(void)
+stop_repack_decoding_worker(void)
{
BgwHandleStatus status;
@@ -3817,12 +3822,6 @@ stop_decoding_worker(void)
decoding_worker = NULL;
}
-/* Is this process a REPACK worker? */
-static bool is_repack_worker = false;
-
-static pid_t backend_pid;
-static ProcNumber backend_proc_number;
-
/*
* See ParallelWorkerShutdown for details.
*/
@@ -3869,7 +3868,7 @@ RepackWorkerMain(Datum main_arg)
/*
* Join locking group - see the comments around the call of
- * start_decoding_worker().
+ * start_repack_decoding_worker().
*/
if (!BecomeLockGroupMember(shared->backend_proc, backend_pid))
/* The leader is not running anymore. */
@@ -4039,6 +4038,18 @@ get_initial_snapshot(DecodingWorker *worker)
return snapshot;
}
+/*
+ * Generate worker's output file name. If relations of the same 'relid' happen
+ * to be processed at the same time, they must be from different databases and
+ * therefore different backends must be involved. (PID is already present in
+ * the fileset name.)
+ */
+static void
+DecodingWorkerFileName(char *fname, Oid relid, uint32 seq)
+{
+ snprintf(fname, MAXPGPATH, "%u-%u", relid, seq);
+}
+
bool
IsRepackWorker(void)
{
@@ -4067,7 +4078,6 @@ void
ProcessRepackMessages(void)
{
MemoryContext oldcontext;
-
static MemoryContext hpm_context = NULL;
/*
diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c
index f8a8d1681e9..9e876d55e27 100644
--- a/src/backend/postmaster/bgworker.c
+++ b/src/backend/postmaster/bgworker.c
@@ -13,7 +13,7 @@
#include "postgres.h"
#include "access/parallel.h"
-#include "commands/cluster.h"
+#include "commands/repack_internal.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
index 032fbd0e5b0..cc9ce615b18 100644
--- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c
+++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c
@@ -13,7 +13,7 @@
#include "postgres.h"
#include "access/detoast.h"
-#include "commands/cluster.h"
+#include "commands/repack_internal.h"
#include "replication/snapbuild.h"
#include "utils/memutils.h"
@@ -176,7 +176,7 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation,
ConcurrentChangeKind kind, HeapTuple tuple)
{
RepackDecodingState *dstate;
- MemoryContext oldcxt;
+ MemoryContext oldcxt;
BufFile *file;
List *attrs_ext = NIL;
int natt_ext;
@@ -226,7 +226,10 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation,
slot_getsomeattrs(slot, i + 1);
- /* This is a non-null varlena datum, but we only care if it's out-of-line */
+ /*
+ * This is a non-null varlena datum, but we only care if it's
+ * out-of-line
+ */
varlen = (varlena *) DatumGetPointer(slot->tts_values[i]);
if (!VARATT_IS_EXTERNAL(varlen))
continue;
@@ -244,8 +247,8 @@ repack_store_change(LogicalDecodingContext *ctx, Relation relation,
* attributes (those actually should never appear on disk), so
* only TOASTed attribute can be seen here.
*
- * FIXME in what circumstances can an ONDISK attr appear?
- * Why aren't these written separately?
+ * FIXME in what circumstances can an ONDISK attr appear? Why
+ * aren't these written separately?
*/
Assert(VARATT_IS_EXTERNAL_ONDISK(varlen));
}
diff --git a/src/include/commands/cluster.h b/src/include/commands/cluster.h
index 1c0ac3ab4f5..1528d34fa42 100644
--- a/src/include/commands/cluster.h
+++ b/src/include/commands/cluster.h
@@ -13,17 +13,12 @@
#ifndef CLUSTER_H
#define CLUSTER_H
-#include "nodes/execnodes.h"
+#include <signal.h>
+
#include "nodes/parsenodes.h"
#include "parser/parse_node.h"
-#include "replication/decode.h"
-#include "postmaster/bgworker.h"
-#include "replication/logical.h"
-#include "storage/buffile.h"
#include "storage/lockdefs.h"
-#include "storage/shm_mq.h"
#include "utils/relcache.h"
-#include "utils/resowner.h"
/* flag bits for ClusterParams->options */
@@ -34,55 +29,14 @@
#define CLUOPT_ANALYZE 0x08 /* do an ANALYZE */
#define CLUOPT_CONCURRENT 0x10 /* allow concurrent data changes */
-
/* options for CLUSTER */
typedef struct ClusterParams
{
bits32 options; /* bitmask of CLUOPT_* */
} ClusterParams;
-
-/*
- * The following definitions are used by REPACK CONCURRENTLY.
- */
-
-/*
- * Stored as a single byte in the output file.
- */
-#define CHANGE_INSERT 'i'
-#define CHANGE_UPDATE_OLD 'u'
-#define CHANGE_UPDATE_NEW 'U'
-#define CHANGE_DELETE 'd'
-typedef char ConcurrentChangeKind;
-
-/*
- * Logical decoding state.
- *
- * The output plugin uses it to store the data changes that it decodes from
- * WAL while the table contents is being copied to a new storage.
- */
-typedef struct RepackDecodingState
-{
-#ifdef USE_ASSERT_CHECKING
- /* The relation whose changes we're decoding. */
- Oid relid;
-#endif
-
- /* Per-change memory context. */
- MemoryContext change_cxt;
-
- /* A tuple slot used to pass tuples back and forth */
- TupleTableSlot *slot;
-
- /* The current output file. */
- BufFile *file;
-} RepackDecodingState;
-
extern PGDLLIMPORT volatile sig_atomic_t RepackMessagePending;
-extern bool IsRepackWorker(void);
-extern void HandleRepackMessageInterrupt(void);
-extern void ProcessRepackMessages(void);
extern void ExecRepack(ParseState *pstate, RepackStmt *stmt, bool isTopLevel);
@@ -104,8 +58,9 @@ extern void finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
MultiXactId cutoffMulti,
char newrelpersistence);
-extern bool am_decoding_for_repack(void);
-extern bool change_useless_for_repack(XLogRecordBuffer *buf);
+extern bool IsRepackWorker(void);
+extern void HandleRepackMessageInterrupt(void);
+extern void ProcessRepackMessages(void);
+
-extern void RepackWorkerMain(Datum main_arg);
#endif /* CLUSTER_H */
diff --git a/src/include/commands/repack_internal.h b/src/include/commands/repack_internal.h
new file mode 100644
index 00000000000..f90c973f5a0
--- /dev/null
+++ b/src/include/commands/repack_internal.h
@@ -0,0 +1,59 @@
+/*-------------------------------------------------------------------------
+ *
+ * repack_internal.h
+ * header for REPACK internals
+ *
+ * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994-5, Regents of the University of California
+ *
+ * src/include/commands/repack_internal.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef REPACK_INTERNAL_H
+#define REPACK_INTERNAL_H
+
+#include "nodes/execnodes.h"
+#include "replication/decode.h"
+#include "postmaster/bgworker.h"
+#include "replication/logical.h"
+#include "storage/buffile.h"
+#include "storage/shm_mq.h"
+#include "utils/resowner.h"
+
+/*
+ * Stored as a single byte in the output file.
+ */
+#define CHANGE_INSERT 'i'
+#define CHANGE_UPDATE_OLD 'u'
+#define CHANGE_UPDATE_NEW 'U'
+#define CHANGE_DELETE 'd'
+typedef char ConcurrentChangeKind;
+
+/*
+ * Logical decoding state.
+ *
+ * The output plugin uses it to store the data changes that it decodes from
+ * WAL while the table contents is being copied to a new storage.
+ */
+typedef struct RepackDecodingState
+{
+#ifdef USE_ASSERT_CHECKING
+ /* The relation whose changes we're decoding. */
+ Oid relid;
+#endif
+
+ /* Per-change memory context. */
+ MemoryContext change_cxt;
+
+ /* A tuple slot used to pass tuples back and forth */
+ TupleTableSlot *slot;
+
+ /* The current output file. */
+ BufFile *file;
+} RepackDecodingState;
+
+
+extern void RepackWorkerMain(Datum main_arg);
+
+#endif /* REPACK_INTERNAL_H */
diff --git a/src/include/replication/decode.h b/src/include/replication/decode.h
index 49f00fc48b8..02b5393474c 100644
--- a/src/include/replication/decode.h
+++ b/src/include/replication/decode.h
@@ -31,4 +31,8 @@ extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf
extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *record);
+/* in commands/cluster.c */
+extern bool change_useless_for_repack(XLogRecordBuffer *buf);
+
+
#endif
--
2.47.3
--gwom7bl7ogtszo4k
Content-Type: text/x-diff; charset=utf-8
Content-Disposition: attachment;
filename="v44-0008-Use-BulkInsertState-when-copying-data-to-the-new.patch"
view thread (721+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [PATCH v44 07/10] Split cluster.h to create repack_internal.h
In-Reply-To: <no-message-id-238679@localhost>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox