public inbox for [email protected]
help / color / mirror / Atom feedFrom: Álvaro Herrera <[email protected]>
Subject: [PATCH v47 4/9] repack code cleanups
Date: Tue, 31 Mar 2026 16:48:25 +0200
---
src/backend/commands/cluster.c | 163 +++++++++++----------------------
1 file changed, 51 insertions(+), 112 deletions(-)
diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c
index fd0bfacaae6..c0220cb3380 100644
--- a/src/backend/commands/cluster.c
+++ b/src/backend/commands/cluster.c
@@ -2459,18 +2459,8 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
TupleTableSlot *spilled_tuple;
TupleTableSlot *old_update_tuple;
TupleTableSlot *ondisk_tuple;
- MemoryContext apply_cxt;
bool have_old_tuple = false;
-
- /*
- * Use a separate memory context for the tuples and any memory needed to
- * process them.
- *
- * XXX would this be better with GenerationContextCreate?
- */
- apply_cxt = AllocSetContextCreate(TopTransactionContext,
- "REPACK - apply",
- ALLOCSET_DEFAULT_SIZES);
+ MemoryContext oldcxt;
spilled_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
&TTSOpsVirtual);
@@ -2479,6 +2469,8 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
old_update_tuple = MakeSingleTupleTableSlot(RelationGetDescr(rel),
&TTSOpsVirtual);
+ oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(chgcxt->cc_estate));
+
while (true)
{
size_t nread;
@@ -2570,14 +2562,15 @@ apply_concurrent_changes(BufFile *file, ChangeContext *chgcxt)
else
elog(ERROR, "unrecognized kind of change: %d", kind);
- MemoryContextReset(apply_cxt);
+ ResetPerTupleExprContext(chgcxt->cc_estate);
}
/* Cleanup. */
ExecDropSingleTupleTableSlot(spilled_tuple);
ExecDropSingleTupleTableSlot(ondisk_tuple);
ExecDropSingleTupleTableSlot(old_update_tuple);
- MemoryContextDelete(apply_cxt);
+
+ MemoryContextSwitchTo(oldcxt);
}
/*
@@ -2588,27 +2581,17 @@ static void
apply_concurrent_insert(Relation rel, TupleTableSlot *slot,
ChangeContext *chgcxt)
{
- List *recheck;
-
/* Put the tuple in the table, but make sure it won't be decoded */
table_tuple_insert(rel, slot, GetCurrentCommandId(true),
HEAP_INSERT_NO_LOGICAL, NULL);
- /*
- * Update indexes with this new tuple. Because we're merely replaying an
- * action that already happened, we have no use for the recheck list of
- * indexes returned, so just free it. XXX or maybe just leave it?
- */
- recheck = ExecInsertIndexTuples(chgcxt->cc_rri,
- chgcxt->cc_estate,
- 0,
- slot,
- NIL, NULL);
- list_free(recheck);
-
+ /* Update indexes with this new tuple. */
+ ExecInsertIndexTuples(chgcxt->cc_rri,
+ chgcxt->cc_estate,
+ 0,
+ slot,
+ NIL, NULL);
pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_INSERTED, 1);
-
- ResetPerTupleExprContext(chgcxt->cc_estate);
}
/*
@@ -2624,10 +2607,9 @@ apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
TM_FailureData tmfd;
TU_UpdateIndexes update_indexes;
TM_Result res;
- List *recheck;
/*
- * Carry out the update, avoiding logical decoding info.
+ * Carry out the update, skipping logical decoding for it.
*/
res = table_tuple_update(rel, &(ondisk_tuple->tts_tid), spilled_tuple,
GetCurrentCommandId(true),
@@ -2646,13 +2628,11 @@ apply_concurrent_update(Relation rel, TupleTableSlot *spilled_tuple,
if (update_indexes == TU_Summarizing)
flags |= EIIT_ONLY_SUMMARIZING;
- recheck = ExecInsertIndexTuples(chgcxt->cc_rri,
- chgcxt->cc_estate,
- flags,
- spilled_tuple,
- NIL, NULL);
- list_free(recheck);
- ResetPerTupleExprContext(chgcxt->cc_estate);
+ ExecInsertIndexTuples(chgcxt->cc_rri,
+ chgcxt->cc_estate,
+ flags,
+ spilled_tuple,
+ NIL, NULL);
}
pgstat_progress_incr_param(PROGRESS_REPACK_HEAP_TUPLES_UPDATED, 1);
@@ -2665,10 +2645,7 @@ apply_concurrent_delete(Relation rel, TupleTableSlot *slot)
TM_FailureData tmfd;
/*
- * Delete tuple from the new heap.
- *
- * Do it like in simple_heap_delete(), except for 'wal_logical' (and
- * except for 'wait').
+ * Delete tuple from the new heap, skipping logical decoding for it.
*/
res = table_tuple_delete(rel, &(slot->tts_tid),
GetCurrentCommandId(true),
@@ -3009,12 +2986,8 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
char relpersistence;
bool is_system_catalog;
Oid ident_idx_new;
- XLogRecPtr wal_insert_ptr,
- end_of_wal;
- char dummy_rec_data = '\0';
- Relation *ind_refs,
- *ind_refs_p;
- int nind;
+ XLogRecPtr end_of_wal;
+ List *indexrels;
ChangeContext chgcxt;
/* Like in cluster_rel(). */
@@ -3041,24 +3014,23 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
*/
ind_oids_new = build_new_indexes(NewHeap, OldHeap, ind_oids_old);
- /* Find "identity index" on the new relation. */
+ /*
+ * The identity index in the new relation appears in the same relative
+ * position as the corresponding index in the old relation. Find it.
+ */
ident_idx_new = InvalidOid;
- forboth(lc, ind_oids_old, lc2, ind_oids_new)
+ foreach_oid(ind_old, ind_oids_old)
{
- Oid ind_old = lfirst_oid(lc);
- Oid ind_new = lfirst_oid(lc2);
-
if (identIdx == ind_old)
{
- ident_idx_new = ind_new;
+ ident_idx_new = list_nth_oid(ind_oids_new,
+ foreach_current_index(ind_old));
break;
}
}
-
- /* Should not happen, given our lock on the old relation. */
if (!OidIsValid(ident_idx_new))
- ereport(ERROR,
- errmsg("identity index missing on the new relation"));
+ elog(ERROR, "could not find index matching \"%s\" at the new relation",
+ get_rel_name(identIdx));
/* Gather information to apply concurrent changes. */
initialize_change_context(&chgcxt, NewHeap, ident_idx_new);
@@ -3074,8 +3046,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
* incomplete page; see GetInsertRecPtr), to minimize the amount of data
* we need to flush while holding exclusive lock on the source table.
*/
- wal_insert_ptr = GetInsertRecPtr();
- XLogFlush(wal_insert_ptr);
+ XLogFlush(GetXLogInsertRecPtr());
end_of_wal = GetFlushRecPtr(NULL);
/*
@@ -3094,10 +3065,9 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
/*
* Lock all indexes now, not only the clustering one: all indexes need to
* have their files swapped. While doing that, store their relation
- * references in an array, to handle predicate locks below.
+ * references in a zero-terminated array, to handle predicate locks below.
*/
- ind_refs_p = ind_refs = palloc_array(Relation, list_length(ind_oids_old));
- nind = 0;
+ indexrels = NIL;
foreach_oid(ind_oid, ind_oids_old)
{
Relation index;
@@ -3105,16 +3075,11 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
index = index_open(ind_oid, AccessExclusiveLock);
/*
- * TODO 1) Do we need to check if ALTER INDEX was executed since the
- * new index was created in build_new_indexes()? 2) Specifically for
- * the clustering index, should check_index_is_clusterable() be called
- * here? (Not sure about the latter: ShareUpdateExclusiveLock on the
- * table probably blocks all commands that affect the result of
- * check_index_is_clusterable().)
+ * Some things about the index may have changed before we locked the
+ * index, such as ALTER INDEX RENAME. We don't need to do anything
+ * here to absorb those changes in the new index.
*/
- *ind_refs_p = index;
- ind_refs_p++;
- nind++;
+ indexrels = lappend(indexrels, index);
}
/*
@@ -3128,40 +3093,18 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
* Tuples and pages of the old heap will be gone, but the heap will stay.
*/
TransferPredicateLocksToHeapRelation(OldHeap);
- /* The same for indexes. */
- for (int i = 0; i < nind; i++)
+ foreach_ptr(RelationData, index, indexrels)
{
- Relation index = ind_refs[i];
-
TransferPredicateLocksToHeapRelation(index);
-
- /*
- * References to indexes on the old relation are not needed anymore,
- * however locks stay till the end of the transaction.
- */
index_close(index, NoLock);
}
- pfree(ind_refs);
+ list_free(indexrels);
/*
- * Flush anything we see in WAL, to make sure that all changes committed
- * while we were waiting for the exclusive lock are available for
- * decoding. This should not be necessary if all backends had
- * synchronous_commit set, but we can't rely on this setting.
- *
- * Unfortunately, GetInsertRecPtr() may lag behind the actual insert
- * position, and GetLastImportantRecPtr() points at the start of the last
- * record rather than at the end. Thus the simplest way to determine the
- * insert position is to insert a dummy record and use its LSN.
- *
- * XXX Consider using GetLastImportantRecPtr() and adding the size of the
- * last record (plus the total size of all the page headers the record
- * spans)?
+ * Flush WAL again, to make sure that all changes committed while we were
+ * waiting for the exclusive lock are available for decoding.
*/
- XLogBeginInsert();
- XLogRegisterData(&dummy_rec_data, 1);
- wal_insert_ptr = XLogInsert(RM_XLOG_ID, XLOG_NOOP);
- XLogFlush(wal_insert_ptr);
+ XLogFlush(GetXLogInsertRecPtr());
end_of_wal = GetFlushRecPtr(NULL);
/*
@@ -3186,10 +3129,7 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
{
Oid ind_old = lfirst_oid(lc);
Oid ind_new = lfirst_oid(lc2);
- Oid mapped_tables[4];
-
- /* Zero out possible results from swapped_relation_files */
- memset(mapped_tables, 0, sizeof(mapped_tables));
+ Oid mapped_tables[4] = {0};
swap_relation_files(ind_old, ind_new,
(old_table_oid == RelationRelationId),
@@ -3200,13 +3140,12 @@ rebuild_relation_finish_concurrent(Relation NewHeap, Relation OldHeap,
mapped_tables);
#ifdef USE_ASSERT_CHECKING
-
/*
* Concurrent processing is not supported for system relations, so
* there should be no mapped tables.
*/
for (int i = 0; i < 4; i++)
- Assert(mapped_tables[i] == 0);
+ Assert(!OidIsValid(mapped_tables[i]));
#endif
}
@@ -3256,22 +3195,22 @@ build_new_indexes(Relation NewHeap, Relation OldHeap, List *OldIndexes)
pgstat_progress_update_param(PROGRESS_REPACK_PHASE,
PROGRESS_REPACK_PHASE_REBUILD_INDEX);
- foreach_oid(ind_oid, OldIndexes)
+ foreach_oid(oldindex, OldIndexes)
{
- Oid ind_oid_new;
+ Oid newindex;
char *newName;
Relation ind;
- ind = index_open(ind_oid, ShareUpdateExclusiveLock);
+ ind = index_open(oldindex, ShareUpdateExclusiveLock);
- newName = ChooseRelationName(get_rel_name(ind_oid),
+ newName = ChooseRelationName(get_rel_name(oldindex),
NULL,
"repacknew",
get_rel_namespace(ind->rd_index->indrelid),
false);
- ind_oid_new = index_create_copy(NewHeap, false, ind_oid,
- ind->rd_rel->reltablespace, newName);
- result = lappend_oid(result, ind_oid_new);
+ newindex = index_create_copy(NewHeap, false, oldindex,
+ ind->rd_rel->reltablespace, newName);
+ result = lappend_oid(result, newindex);
index_close(ind, NoLock);
}
--
2.47.3
--2fxmamo6mu2qbxgv
Content-Type: text/x-diff; charset=utf-8
Content-Disposition: attachment;
filename="v47-0005-Use-BulkInsertState-when-copying-data-to-the-new.patch"
view thread (698+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [PATCH v47 4/9] repack code cleanups
In-Reply-To: <no-message-id-1205030@localhost>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox