From: =?UTF-8?q?=C3=81lvaro=20Herrera?= Date: Mon, 1 Jun 2026 18:43:08 +0200 Subject: [PATCH] Have RepackDecodingState carry a magic number ... which is set by the REPACK executing code and checked by the logical decoding plugin; this way, if the plugin is invoked in some way other than REPACK, we can throw an error. Assorted code rearrangement is included. Discussion: https://postgr.es/m/19500-38a02529a69353a5@postgresql.org --- src/backend/commands/repack_worker.c | 70 +++++++++------------ src/backend/replication/pgrepack/pgrepack.c | 29 ++++++++- src/include/commands/repack_internal.h | 3 + 3 files changed, 60 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/repack_worker.c b/src/backend/commands/repack_worker.c index b6b7b604b4f..f3d2720b246 100644 --- a/src/backend/commands/repack_worker.c +++ b/src/backend/commands/repack_worker.c @@ -197,7 +197,7 @@ repack_setup_logical_decoding(Oid relid) Relation rel; Oid toastrelid; LogicalDecodingContext *ctx; - NameData slotname; + char slotname[NAMEDATALEN]; RepackDecodingState *dstate; MemoryContext oldcxt; @@ -218,12 +218,30 @@ repack_setup_logical_decoding(Oid relid) * * RS_TEMPORARY so that the slot gets cleaned up on ERROR. */ - snprintf(NameStr(slotname), NAMEDATALEN, "repack_%d", MyProcPid); - ReplicationSlotCreate(NameStr(slotname), true, RS_TEMPORARY, false, true, + snprintf(slotname, NAMEDATALEN, "repack_%d", MyProcPid); + ReplicationSlotCreate(slotname, true, RS_TEMPORARY, false, true, false, false); EnsureLogicalDecodingEnabled(); + /* + * Setup repacked_rel_locator and repacked_rel_toast_locator, which we use + * to skip decoding of unrelated relations. + */ + rel = table_open(relid, AccessShareLock); + repacked_rel_locator = rel->rd_locator; + toastrelid = rel->rd_rel->reltoastrelid; + if (OidIsValid(toastrelid)) + { + Relation toastrel; + + /* Avoid logical decoding of other TOAST relations. */ + toastrel = table_open(toastrelid, AccessShareLock); + repacked_rel_toast_locator = toastrel->rd_locator; + table_close(toastrel, AccessShareLock); + } + table_close(rel, AccessShareLock); + /* * Neither prepare_write nor do_write callback nor update_progress is * useful for us. @@ -237,27 +255,16 @@ repack_setup_logical_decoding(Oid relid) .segment_open = wal_segment_open, .segment_close = wal_segment_close), NULL, NULL, NULL); + /* The pgrepack output plugin allocated this; now we can complete it */ + dstate = ctx->output_writer_private; + dstate->magic = REPACK_DECODER_MAGIC; +#ifdef USE_ASSERT_CHECKING + dstate->relid = relid; +#endif - /* - * We don't have control on setting fast_forward, so at least check it. - */ + /* We don't have control on setting fast_forward, so at least check it. */ Assert(!ctx->fast_forward); - /* Avoid logical decoding of other relations. */ - rel = table_open(relid, AccessShareLock); - repacked_rel_locator = rel->rd_locator; - toastrelid = rel->rd_rel->reltoastrelid; - if (OidIsValid(toastrelid)) - { - Relation toastrel; - - /* Avoid logical decoding of other TOAST relations. */ - toastrel = table_open(toastrelid, AccessShareLock); - repacked_rel_toast_locator = toastrel->rd_locator; - table_close(toastrel, AccessShareLock); - } - table_close(rel, AccessShareLock); - DecodingContextFindStartpoint(ctx); /* @@ -283,28 +290,9 @@ repack_setup_logical_decoding(Oid relid) * WAL. */ ctx->reader->private_data = palloc0_object(ReadLocalXLogPageNoWaitPrivate); - dstate = palloc0_object(RepackDecodingState); + MemoryContextSwitchTo(oldcxt); -#ifdef USE_ASSERT_CHECKING - dstate->relid = relid; -#endif - - dstate->change_cxt = AllocSetContextCreate(ctx->context, - "REPACK - change", - ALLOCSET_DEFAULT_SIZES); - - /* The file will be set as soon as we have it opened. */ - dstate->file = NULL; - - /* - * Memory context and resource owner for long-lived resources. - */ - dstate->worker_cxt = CurrentMemoryContext; - dstate->worker_resowner = CurrentResourceOwner; - - ctx->output_writer_private = dstate; - return ctx; } diff --git a/src/backend/replication/pgrepack/pgrepack.c b/src/backend/replication/pgrepack/pgrepack.c index a2615ce54c1..c9106882f8e 100644 --- a/src/backend/replication/pgrepack/pgrepack.c +++ b/src/backend/replication/pgrepack/pgrepack.c @@ -47,7 +47,25 @@ static void repack_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { - ctx->output_plugin_private = NULL; + RepackDecodingState *dstate; + + /* + * Set up private state, but only partially: we want + * repack_setup_logical_decoding to bless it to prevent this plugin from + * being used outside of REPACK. + */ + Assert(CurrentMemoryContext == ctx->context); + dstate = palloc0_object(RepackDecodingState); + dstate->change_cxt = AllocSetContextCreate(ctx->context, + "REPACK - change", + ALLOCSET_DEFAULT_SIZES); + /* The file will be set as soon as we have it opened. */ + dstate->file = NULL; + /* Memory context and resource owner for long-lived resources. */ + dstate->worker_cxt = CurrentMemoryContext; + dstate->worker_resowner = CurrentResourceOwner; + + ctx->output_writer_private = dstate; /* Probably unnecessary, as we don't use the SQL interface ... */ opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT; @@ -77,6 +95,15 @@ repack_shutdown(LogicalDecodingContext *ctx) static void repack_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + RepackDecodingState *private = + (RepackDecodingState *) ctx->output_writer_private; + + if (private->magic != REPACK_DECODER_MAGIC) + ereport(ERROR, + errmsg("wrong magic 0x%x in \"%s\" decoder plugin, expected 0x%x", + private->magic, "pgrepack", REPACK_DECODER_MAGIC), + errhint("The \"%s\" decoder plugin may only be called by %s.", + "pgrepack", "REPACK (CONCURRENTLY)")); } /* COMMIT callback */ diff --git a/src/include/commands/repack_internal.h b/src/include/commands/repack_internal.h index 6a85cee8910..4236f0b6fde 100644 --- a/src/include/commands/repack_internal.h +++ b/src/include/commands/repack_internal.h @@ -37,8 +37,11 @@ typedef char ConcurrentChangeKind; * The output plugin uses it to store the data changes that it decodes from * WAL while the table contents is being copied to a new storage. */ +#define REPACK_DECODER_MAGIC 0x75A91DF7 typedef struct RepackDecodingState { + uint32 magic; + #ifdef USE_ASSERT_CHECKING /* The relation whose changes we're decoding. */ Oid relid; -- 2.47.3 --g2p5afb23t36fp4l--