From: =?UTF-8?q?=C3=81lvaro=20Herrera?= Date: Mon, 23 Mar 2026 21:27:57 +0100 Subject: [PATCH v44 05/10] Rename ChangeDest to ChangeContext, and absorb IndexInsertState This makes the data structure simpler, as we only need a single struct which can be passed down to all functions. Also, the inititalization and release sequences are a bit simpler and there's a single location in charge. --- src/backend/commands/cluster.c | 307 ++++++++++++++----------------- src/tools/pgindent/typedefs.list | 2 +- 2 files changed, 135 insertions(+), 174 deletions(-) diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index b4acaeb6b93..b86e600af41 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -113,15 +113,6 @@ typedef struct static RelFileLocator repacked_rel_locator = {.relNumber = InvalidOid}; static RelFileLocator repacked_rel_toast_locator = {.relNumber = InvalidOid}; -/* - * Everything we need to call ExecInsertIndexTuples(). - */ -typedef struct IndexInsertState -{ - ResultRelInfo *rri; - EState *estate; -} IndexInsertState; - /* The WAL segment being decoded. */ static XLogSegNo repack_current_segment = 0; @@ -134,31 +125,27 @@ static XLogSegNo repack_current_segment = 0; /* * Information needed to apply concurrent data changes. */ -typedef struct ChangeDest +typedef struct ChangeContext { /* The relation the changes are applied to. */ - Relation rel; - - /* - * The following is needed to find the existing tuple if the change is - * UPDATE or DELETE. 'ident_key' should have all the fields except for - * 'sk_argument' initialized. - */ - Relation ident_index; - ScanKey ident_key; - int ident_key_nentries; + Relation cc_rel; /* Needed to update indexes of rel_dst. */ - IndexInsertState *iistate; + ResultRelInfo *cc_rri; + EState *cc_estate; /* - * Sequential number of the file containing the changes. - * - * TODO This field makes the structure name less descriptive. Should we - * rename it, e.g. to ChangeApplyInfo? + * Existing tuples to UPDATE and DELETE are located via this index. We + * keep the scankey in partially initialized state to avoid repeated work. + * sk_argument is completed on the fly. */ - int file_seq; -} ChangeDest; + Relation cc_ident_index; + ScanKey cc_ident_key; + int cc_ident_key_nentries; + + /* Sequential number of the file containing the changes. */ + int cc_file_seq; +} ChangeContext; /* * Layout of shared memory used for communication between backend and the @@ -278,29 +265,27 @@ static bool repack_is_permitted_for_relation(RepackCommand cmd, static LogicalDecodingContext *repack_setup_logical_decoding(Oid relid); static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared); -static void apply_concurrent_changes(BufFile *file, ChangeDest *dest); +static void apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt); static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot, - IndexInsertState *iistate); + ChangeContext *chgcxt); static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple, TupleTableSlot *ondisk_tuple, - IndexInsertState *iistate); + ChangeContext *chgcxt); static void apply_concurrent_delete(Relation rel, TupleTableSlot *slot); static void restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot); static void adjust_toast_pointers(Relation relation, TupleTableSlot *dest, - TupleTableSlot *src); -static bool find_target_tuple(Relation rel, ChangeDest *dest, + TupleTableSlot *src); +static bool find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator, TupleTableSlot *received); static void process_concurrent_changes(XLogRecPtr end_of_wal, - ChangeDest *dest, + ChangeContext *chgcxt, bool done); -static IndexInsertState *get_index_insert_state(Relation relation, - Oid ident_index_id, - Relation *ident_index_p); -static ScanKey build_identity_key(Oid ident_idx_oid, Relation rel_src, - int *nentries); -static void free_index_insert_state(IndexInsertState *iistate); +static void initialize_change_context(ChangeContext *chgcxt, + Relation relation, + Oid ident_index_id); +static void release_change_context(ChangeContext *chgcxt); static void cleanup_logical_decoding(LogicalDecodingContext *ctx); static void rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Oid identIdx, @@ -2871,13 +2856,13 @@ decode_concurrent_changes(LogicalDecodingContext *ctx, * Apply changes stored in 'file'. */ static void -apply_concurrent_changes(BufFile *file, ChangeDest *dest) +apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt) { - ConcurrentChangeKind kind = '\0'; - Relation rel = dest->rel; - TupleTableSlot *spilled_tuple; - TupleTableSlot *old_update_tuple; - TupleTableSlot *ondisk_tuple; + ConcurrentChangeKind kind = '\0'; + Relation rel = chgcxt->cc_rel; + TupleTableSlot *spilled_tuple; + TupleTableSlot *old_update_tuple; + TupleTableSlot *ondisk_tuple; MemoryContext apply_cxt; bool have_old_tuple = false; @@ -2924,12 +2909,12 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) /* * Just before an UPDATE or DELETE, we must update the command - * counter, because the change could refer to a tuple that we - * have just inserted; and before an INSERT, we have to do this - * also if the previous command was either update or delete. + * counter, because the change could refer to a tuple that we have + * just inserted; and before an INSERT, we have to do this also if the + * previous command was either update or delete. * - * With this approach we don't spend so many CCIs for long - * strings of only INSERTs, which can't affect one another. + * With this approach we don't spend so many CCIs for long strings of + * only INSERTs, which can't affect one another. */ if (kind == CHANGE_UPDATE_NEW || kind == CHANGE_DELETE || (kind == CHANGE_INSERT && (prevkind == CHANGE_UPDATE_NEW || @@ -2946,22 +2931,22 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) if (kind == CHANGE_INSERT) { - apply_concurrent_insert(rel, spilled_tuple, dest->iistate); + apply_concurrent_insert(rel, spilled_tuple, chgcxt); } else if (kind == CHANGE_DELETE) { - bool found; + bool found; /* Find the tuple to be deleted */ - found = find_target_tuple(rel, dest, spilled_tuple, ondisk_tuple); + found = find_target_tuple(rel, chgcxt, spilled_tuple, ondisk_tuple); if (!found) elog(ERROR, "failed to find target tuple"); apply_concurrent_delete(rel, ondisk_tuple); } else if (kind == CHANGE_UPDATE_NEW) { - TupleTableSlot *key; - bool found; + TupleTableSlot *key; + bool found; if (have_old_tuple) key = old_update_tuple; @@ -2969,20 +2954,20 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) key = spilled_tuple; /* Find the tuple to be updated or deleted. */ - found = find_target_tuple(rel, dest, key, ondisk_tuple); + found = find_target_tuple(rel, chgcxt, key, ondisk_tuple); if (!found) elog(ERROR, "failed to find target tuple"); /* * If 'tup' contains TOAST pointers, they point to the old - * relation's toast. Copy the corresponding TOAST pointers for - * the new relation from the existing tuple. (The fact that we - * received a TOAST pointer here implies that the attribute - * hasn't changed.) + * relation's toast. Copy the corresponding TOAST pointers for the + * new relation from the existing tuple. (The fact that we + * received a TOAST pointer here implies that the attribute hasn't + * changed.) */ adjust_toast_pointers(rel, spilled_tuple, ondisk_tuple); - apply_concurrent_update(rel, spilled_tuple, ondisk_tuple, dest->iistate); + apply_concurrent_update(rel, spilled_tuple, ondisk_tuple, chgcxt); ExecClearTuple(old_update_tuple); have_old_tuple = false; @@ -3006,7 +2991,7 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) */ static void apply_concurrent_insert(Relation rel, TupleTableSlot *slot, - IndexInsertState *iistate) + ChangeContext *chgcxt) { List *recheck; @@ -3019,8 +3004,8 @@ apply_concurrent_insert(Relation rel, TupleTableSlot *slot, * action that already happened, we have no use for the recheck list of * indexes returned, so just free it. XXX or maybe just leave it? */ - recheck = ExecInsertIndexTuples(iistate->rri, - iistate->estate, + recheck = ExecInsertIndexTuples(chgcxt->cc_rri, + chgcxt->cc_estate, 0, slot, NIL, NULL); @@ -3028,7 +3013,7 @@ apply_concurrent_insert(Relation rel, TupleTableSlot *slot, pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1); - ResetPerTupleExprContext(iistate->estate); + ResetPerTupleExprContext(chgcxt->cc_estate); } /* @@ -3038,7 +3023,7 @@ apply_concurrent_insert(Relation rel, TupleTableSlot *slot, static void apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple, TupleTableSlot *ondisk_tuple, - IndexInsertState *iistate) + ChangeContext *chgcxt) { LockTupleMode lockmode; TM_FailureData tmfd; @@ -3065,13 +3050,13 @@ apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple, if (update_indexes == TU_Summarizing) flags |= EIIT_ONLY_SUMMARIZING; - recheck = ExecInsertIndexTuples(iistate->rri, - iistate->estate, + recheck = ExecInsertIndexTuples(chgcxt->cc_rri, + chgcxt->cc_estate, flags, spilled_tuple, NIL, NULL); list_free(recheck); - ResetPerTupleExprContext(iistate->estate); + ResetPerTupleExprContext(chgcxt->cc_estate); } pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1); @@ -3135,19 +3120,19 @@ restore_tuple(BufFile *file, Relation relation, TupleTableSlot *slot) ExecForceStoreHeapTuple(tup, slot, false); /* - * Next, read any attributes we stored separately into the tts_values array - * elements expecting them, if any. This matches store_change. + * Next, read any attributes we stored separately into the tts_values + * array elements expecting them, if any. This matches store_change. */ BufFileReadExact(file, &natt_ext, sizeof(natt_ext)); if (natt_ext > 0) { - TupleDesc desc = slot->tts_tupleDescriptor; + TupleDesc desc = slot->tts_tupleDescriptor; for (int i = 0; i < desc->natts; i++) { CompactAttribute *attr = TupleDescCompactAttr(desc, i); - varlena *varlen; - alignas(uint32) varlena varhdr; + varlena *varlen; + alignas(uint32) varlena varhdr; void *value; Size varlensz; @@ -3219,10 +3204,10 @@ adjust_toast_pointers(Relation relation, TupleTableSlot *dest, TupleTableSlot *s * not found, return false. */ static bool -find_target_tuple(Relation rel, ChangeDest *dest, TupleTableSlot *locator, +find_target_tuple(Relation rel, ChangeContext *chgcxt, TupleTableSlot *locator, TupleTableSlot *retrieved) { - Form_pg_index idx = dest->ident_index->rd_index; + Form_pg_index idx = chgcxt->cc_ident_index->rd_index; IndexScanDesc scan; bool retval; @@ -3233,9 +3218,9 @@ find_target_tuple(Relation rel, ChangeDest *dest, TupleTableSlot *locator, * * Use the incoming tuple to finalize the scan key. */ - for (int i = 0; i < dest->ident_key_nentries; i++) + for (int i = 0; i < chgcxt->cc_ident_key_nentries; i++) { - ScanKey entry = &dest->ident_key[i]; + ScanKey entry = &chgcxt->cc_ident_key[i]; AttrNumber attno = idx->indkey.values[i]; entry->sk_argument = locator->tts_values[attno - 1]; @@ -3243,9 +3228,9 @@ find_target_tuple(Relation rel, ChangeDest *dest, TupleTableSlot *locator, } /* XXX no instrumentation for now */ - scan = index_beginscan(rel, dest->ident_index, GetActiveSnapshot(), - NULL, dest->ident_key_nentries, 0); - index_rescan(scan, dest->ident_key, dest->ident_key_nentries, NULL, 0); + scan = index_beginscan(rel, chgcxt->cc_ident_index, GetActiveSnapshot(), + NULL, chgcxt->cc_ident_key_nentries, 0); + index_rescan(scan, chgcxt->cc_ident_key, chgcxt->cc_ident_key_nentries, NULL, 0); retval = index_getnext_slot(scan, ForwardScanDirection, retrieved); index_endscan(scan); @@ -3260,7 +3245,7 @@ find_target_tuple(Relation rel, ChangeDest *dest, TupleTableSlot *locator, * are far too similar to each other. */ static void -process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done) +process_concurrent_changes(XLogRecPtr end_of_wal, ChangeContext *chgcxt, bool done) { DecodingWorkerShared *shared; char fname[MAXPGPATH]; @@ -3293,7 +3278,7 @@ process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done) /* * Has the worker exported the file we are waiting for? */ - if (last_exported == dest->file_seq) + if (last_exported == chgcxt->cc_file_seq) break; ConditionVariableSleep(&shared->cv, WAIT_EVENT_REPACK_WORKER_EXPORT); @@ -3301,117 +3286,99 @@ process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, bool done) ConditionVariableCancelSleep(); /* Open the file. */ - DecodingWorkerFileName(fname, shared->relid, dest->file_seq); + DecodingWorkerFileName(fname, shared->relid, chgcxt->cc_file_seq); file = BufFileOpenFileSet(&shared->sfs.fs, fname, O_RDONLY, false); - apply_concurrent_changes(file, dest); + apply_concurrent_changes(file, chgcxt); BufFileClose(file); /* Get ready for the next file. */ - dest->file_seq++; + chgcxt->cc_file_seq++; } /* - * Initialize IndexInsertState for index specified by ident_index_id. - * - * While doing that, also return the identity index in *ident_index_p. + * Initialize the ChangeContext struct for the given relation, with + * the given index as identity index. */ -static IndexInsertState * -get_index_insert_state(Relation relation, Oid ident_index_id, - Relation *ident_index_p) +static void +initialize_change_context(ChangeContext *chgcxt, + Relation relation, Oid ident_index_id) { - EState *estate; - IndexInsertState *result; - Relation ident_index = NULL; + chgcxt->cc_rel = relation; - result = (IndexInsertState *) palloc0(sizeof(IndexInsertState)); - estate = CreateExecutorState(); + /* Only initialize fields needed by ExecInsertIndexTuples(). */ + chgcxt->cc_estate = CreateExecutorState(); - result->rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo)); - InitResultRelInfo(result->rri, relation, 0, 0, 0); - ExecOpenIndices(result->rri, false); + chgcxt->cc_rri = (ResultRelInfo *) palloc(sizeof(ResultRelInfo)); + InitResultRelInfo(chgcxt->cc_rri, relation, 0, 0, 0); + ExecOpenIndices(chgcxt->cc_rri, false); /* - * Find the relcache entry of the identity index so that we spend no extra - * effort to open / close it. + * The table's relcache entry already has the relcache entry for the + * identity index; find that. */ - for (int i = 0; i < result->rri->ri_NumIndices; i++) + chgcxt->cc_ident_index = NULL; + for (int i = 0; i < chgcxt->cc_rri->ri_NumIndices; i++) { Relation ind_rel; - ind_rel = result->rri->ri_IndexRelationDescs[i]; + ind_rel = chgcxt->cc_rri->ri_IndexRelationDescs[i]; if (ind_rel->rd_id == ident_index_id) - ident_index = ind_rel; + { + chgcxt->cc_ident_index = ind_rel; + break; + } } - if (ident_index == NULL) + if (chgcxt->cc_ident_index == NULL) elog(ERROR, "failed to find identity index"); - /* Only initialize fields needed by ExecInsertIndexTuples(). */ - result->estate = estate; - - *ident_index_p = ident_index; - return result; -} - -/* - * Build scan key to process logical changes. - */ -static ScanKey -build_identity_key(Oid ident_idx_oid, Relation rel_src, int *nentries) -{ - Relation ident_idx_rel; - Form_pg_index ident_idx; - int n, - i; - ScanKey result; - - Assert(OidIsValid(ident_idx_oid)); - ident_idx_rel = index_open(ident_idx_oid, AccessShareLock); - ident_idx = ident_idx_rel->rd_index; - n = ident_idx->indnkeyatts; - result = (ScanKey) palloc(sizeof(ScanKeyData) * n); - for (i = 0; i < n; i++) + /* Set up for scanning said identity index */ { - ScanKey entry; - Oid opfamily, - opcintype, - opno, - opcode; + Form_pg_index indexForm; - entry = &result[i]; + indexForm = chgcxt->cc_ident_index->rd_index; + chgcxt->cc_ident_key_nentries = indexForm->indnkeyatts; + chgcxt->cc_ident_key = (ScanKey) palloc_array(ScanKeyData, indexForm->indnkeyatts); + for (int i = 0; i < indexForm->indnkeyatts; i++) + { + ScanKey entry; + Oid opfamily, + opcintype, + opno, + opcode; - opfamily = ident_idx_rel->rd_opfamily[i]; - opcintype = ident_idx_rel->rd_opcintype[i]; - opno = get_opfamily_member(opfamily, opcintype, opcintype, - BTEqualStrategyNumber); + entry = &chgcxt->cc_ident_key[i]; - if (!OidIsValid(opno)) - elog(ERROR, "failed to find = operator for type %u", opcintype); + opfamily = chgcxt->cc_ident_index->rd_opfamily[i]; + opcintype = chgcxt->cc_ident_index->rd_opcintype[i]; + opno = get_opfamily_member(opfamily, opcintype, opcintype, + BTEqualStrategyNumber); + if (!OidIsValid(opno)) + elog(ERROR, "failed to find = operator for type %u", opcintype); + opcode = get_opcode(opno); + if (!OidIsValid(opcode)) + elog(ERROR, "failed to find = operator for operator %u", opno); - opcode = get_opcode(opno); - if (!OidIsValid(opcode)) - elog(ERROR, "failed to find = operator for operator %u", opno); - - /* Initialize everything but argument. */ - ScanKeyInit(entry, - i + 1, - BTEqualStrategyNumber, opcode, - (Datum) NULL); - entry->sk_collation = ident_idx_rel->rd_indcollation[i]; + /* Initialize everything but argument. */ + ScanKeyInit(entry, + i + 1, + BTEqualStrategyNumber, opcode, + (Datum) NULL); + entry->sk_collation = chgcxt->cc_ident_index->rd_indcollation[i]; + } } - index_close(ident_idx_rel, AccessShareLock); - *nentries = n; - return result; + chgcxt->cc_file_seq = WORKER_FILE_SNAPSHOT + 1; } static void -free_index_insert_state(IndexInsertState *iistate) +release_change_context(ChangeContext *chgcxt) { - ExecCloseIndices(iistate->rri); - FreeExecutorState(iistate->estate); - pfree(iistate->rri); - pfree(iistate); + ExecCloseIndices(chgcxt->cc_rri); + FreeExecutorState(chgcxt->cc_estate); + /* XXX are these pfrees necessary? */ + pfree(chgcxt->cc_rri); + pfree(chgcxt->cc_ident_key); } static void @@ -3450,7 +3417,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, Relation *ind_refs, *ind_refs_p; int nind; - ChangeDest chgdst; + ChangeContext chgcxt; /* Like in cluster_rel(). */ lockmode_old = ShareUpdateExclusiveLock; @@ -3496,12 +3463,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, errmsg("identity index missing on the new relation")); /* Gather information to apply concurrent changes. */ - chgdst.rel = NewHeap; - chgdst.iistate = get_index_insert_state(NewHeap, ident_idx_new, - &chgdst.ident_index); - chgdst.ident_key = build_identity_key(ident_idx_new, OldHeap, - &chgdst.ident_key_nentries); - chgdst.file_seq = WORKER_FILE_SNAPSHOT + 1; + initialize_change_context(&chgcxt, NewHeap, ident_idx_new); /* * During testing, wait for another backend to perform concurrent data @@ -3523,7 +3485,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, * hold AccessExclusiveLock. (Quite some amount of WAL could have been * written during the data copying and index creation.) */ - process_concurrent_changes(end_of_wal, &chgdst, false); + process_concurrent_changes(end_of_wal, &chgcxt, false); /* * Acquire AccessExclusiveLock on the table, its TOAST relation (if there @@ -3608,7 +3570,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, * Apply the concurrent changes again. Indicate that the decoding worker * won't be needed anymore. */ - process_concurrent_changes(end_of_wal, &chgdst, true); + process_concurrent_changes(end_of_wal, &chgcxt, true); /* Remember info about rel before closing OldHeap */ relpersistence = OldHeap->rd_rel->relpersistence; @@ -3659,8 +3621,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap, table_close(NewHeap, NoLock); /* Cleanup what we don't need anymore. (And close the identity index.) */ - pfree(chgdst.ident_key); - free_index_insert_state(chgdst.iistate); + release_change_context(&chgcxt); /* * Swap the relations and their TOAST relations and TOAST indexes. This @@ -3980,7 +3941,7 @@ repack_worker_internal(dsm_segment *seg) for (;;) { - bool stop = decode_concurrent_changes(decoding_ctx, shared); + bool stop = decode_concurrent_changes(decoding_ctx, shared); if (stop) break; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 72f7c0b992b..12bfd50d25b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -426,7 +426,7 @@ CatCacheHeader CatalogId CatalogIdMapEntry CatalogIndexState -ChangeDest +ChangeContext ChangeVarNodes_callback ChangeVarNodes_context ChannelName -- 2.47.3 --gwom7bl7ogtszo4k Content-Type: text/x-diff; charset=utf-8 Content-Disposition: attachment; filename="v44-0006-rename-routines-on-the-logical-output-plugin-sid.patch"