From 7ee50f8c99355ae022bb0b87e2cc561a3628c899 Mon Sep 17 00:00:00 2001
From: Greg Burd <greg@burd.me>
Date: Sun, 2 Nov 2025 11:36:20 -0500
Subject: [PATCH v29 1/4] Prepare heapam_tuple_update() and
 simple_heap_update() for divergence

This commit lays the foundation for larger changes to come by taking the
first portion of heap_update() through the HeapDeterminColumnsInfo() and
replicating that logic in both heapam_tuple_update() and
simple_heap_upate().  This is done so that these two paths might diverge
in implementation later on.  The simple_heap_update() path deals solely
with updates to catalog tuples which could record their modified
attributes rather than relearn them.  The remaining calls from the
executor into the table AM update API could include the set of updated
attributes.  This is foreshadowing... of course, as that's what the next
commit will start to do.

As part of this reorganization, the handling of replica identity key
attributes has been adjusted. Instead of fetching a second copy of
the bitmap during an update operation, the caller is now required to
provide it. This change applies to both heap_update() and
heap_delete().
---
 src/backend/access/heap/heapam.c         | 568 +++++++++++------------
 src/backend/access/heap/heapam_handler.c | 117 ++++-
 src/include/access/heapam.h              |  24 +-
 3 files changed, 410 insertions(+), 299 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 3004964ab7f..73c1d3d2cb8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -39,18 +39,24 @@
 #include "access/syncscan.h"
 #include "access/valid.h"
 #include "access/visibilitymap.h"
+#include "access/xact.h"
 #include "access/xloginsert.h"
+#include "catalog/catalog.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_database_d.h"
 #include "commands/vacuum.h"
+#include "nodes/bitmapset.h"
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
+#include "storage/bufmgr.h"
+#include "storage/itemptr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/procarray.h"
 #include "utils/datum.h"
 #include "utils/injection_point.h"
 #include "utils/inval.h"
+#include "utils/relcache.h"
 #include "utils/spccache.h"
 #include "utils/syscache.h"
 
@@ -62,16 +68,8 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
 								  HeapTuple newtup, HeapTuple old_key_tuple,
 								  bool all_visible_cleared, bool new_all_visible_cleared);
 #ifdef USE_ASSERT_CHECKING
-static void check_lock_if_inplace_updateable_rel(Relation relation,
-												 const ItemPointerData *otid,
-												 HeapTuple newtup);
 static void check_inplace_rel_lock(HeapTuple oldtup);
 #endif
-static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
-										   Bitmapset *interesting_cols,
-										   Bitmapset *external_cols,
-										   HeapTuple oldtup, HeapTuple newtup,
-										   bool *has_external);
 static bool heap_acquire_tuplock(Relation relation, const ItemPointerData *tid,
 								 LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool *have_tuple_lock);
@@ -106,10 +104,10 @@ static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status
 static void index_delete_sort(TM_IndexDeleteOp *delstate);
 static int	bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
-static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
+static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp,
+										Bitmapset *rid_attrs, bool key_required,
 										bool *copy);
 
-
 /*
  * This table lists the heavyweight lock mode that corresponds to each tuple
  * lock mode, as well as one or two corresponding MultiXactStatus values:
@@ -2852,6 +2850,7 @@ heap_delete(Relation relation, const ItemPointerData *tid,
 	Buffer		buffer;
 	Buffer		vmbuffer = InvalidBuffer;
 	TransactionId new_xmax;
+	Bitmapset  *rid_attrs;
 	uint16		new_infomask,
 				new_infomask2;
 	bool		have_tuple_lock = false;
@@ -2864,6 +2863,8 @@ heap_delete(Relation relation, const ItemPointerData *tid,
 
 	AssertHasSnapshotForToast(relation);
 
+	rid_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
 	/*
 	 * Forbid this during a parallel operation, lest it allocate a combo CID.
 	 * Other workers might need that combo CID for visibility checks, and we
@@ -3067,6 +3068,7 @@ l1:
 			UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive);
 		if (vmbuffer != InvalidBuffer)
 			ReleaseBuffer(vmbuffer);
+		bms_free(rid_attrs);
 		return result;
 	}
 
@@ -3088,7 +3090,10 @@ l1:
 	 * Compute replica identity tuple before entering the critical section so
 	 * we don't PANIC upon a memory allocation failure.
 	 */
-	old_key_tuple = ExtractReplicaIdentity(relation, &tp, true, &old_key_copied);
+	old_key_tuple = ExtractReplicaIdentity(relation, &tp, rid_attrs,
+										   true, &old_key_copied);
+	bms_free(rid_attrs);
+	rid_attrs = NULL;
 
 	/*
 	 * If this is the first possibly-multixact-able operation in the current
@@ -3300,7 +3305,10 @@ simple_heap_delete(Relation relation, const ItemPointerData *tid)
  *	heap_update - replace a tuple
  *
  * See table_tuple_update() for an explanation of the parameters, except that
- * this routine directly takes a tuple rather than a slot.
+ * this routine directly takes a heap tuple rather than a slot.
+ *
+ * It's required that the caller has acquired the pin and lock on the buffer.
+ * That lock and pin will be managed here, not in the caller.
  *
  * In the failure cases, the routine fills *tmfd with the tuple's t_ctid,
  * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax (the last
@@ -3308,30 +3316,21 @@ simple_heap_delete(Relation relation, const ItemPointerData *tid)
  * generated by another transaction).
  */
 TM_Result
-heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
-			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, LockTupleMode *lockmode,
-			TU_UpdateIndexes *update_indexes)
+heap_update(Relation relation, HeapTupleData *oldtup,
+			HeapTuple newtup, CommandId cid, Snapshot crosscheck, bool wait,
+			TM_FailureData *tmfd, LockTupleMode *lockmode, Buffer buffer,
+			Page page, BlockNumber block, ItemId lp, Bitmapset *hot_attrs,
+			Bitmapset *sum_attrs, Bitmapset *pk_attrs, Bitmapset *rid_attrs,
+			Bitmapset *mix_attrs, Buffer *vmbuffer,
+			bool rep_id_key_required, TU_UpdateIndexes *update_indexes)
 {
 	TM_Result	result;
 	TransactionId xid = GetCurrentTransactionId();
-	Bitmapset  *hot_attrs;
-	Bitmapset  *sum_attrs;
-	Bitmapset  *key_attrs;
-	Bitmapset  *id_attrs;
-	Bitmapset  *interesting_attrs;
-	Bitmapset  *modified_attrs;
-	ItemId		lp;
-	HeapTupleData oldtup;
 	HeapTuple	heaptup;
 	HeapTuple	old_key_tuple = NULL;
 	bool		old_key_copied = false;
-	Page		page;
-	BlockNumber block;
 	MultiXactStatus mxact_status;
-	Buffer		buffer,
-				newbuf,
-				vmbuffer = InvalidBuffer,
+	Buffer		newbuf,
 				vmbuffer_new = InvalidBuffer;
 	bool		need_toast;
 	Size		newtupsize,
@@ -3345,7 +3344,6 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 	bool		all_visible_cleared_new = false;
 	bool		checked_lockers;
 	bool		locker_remains;
-	bool		id_has_external = false;
 	TransactionId xmax_new_tuple,
 				xmax_old_tuple;
 	uint16		infomask_old_tuple,
@@ -3353,144 +3351,13 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 				infomask_new_tuple,
 				infomask2_new_tuple;
 
-	Assert(ItemPointerIsValid(otid));
-
-	/* Cheap, simplistic check that the tuple matches the rel's rowtype. */
-	Assert(HeapTupleHeaderGetNatts(newtup->t_data) <=
-		   RelationGetNumberOfAttributes(relation));
-
+	Assert(BufferIsLockedByMe(buffer));
+	Assert(ItemIdIsNormal(lp));
 	AssertHasSnapshotForToast(relation);
 
-	/*
-	 * Forbid this during a parallel operation, lest it allocate a combo CID.
-	 * Other workers might need that combo CID for visibility checks, and we
-	 * have no provision for broadcasting it to them.
-	 */
-	if (IsInParallelMode())
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
-				 errmsg("cannot update tuples during a parallel operation")));
-
-#ifdef USE_ASSERT_CHECKING
-	check_lock_if_inplace_updateable_rel(relation, otid, newtup);
-#endif
-
-	/*
-	 * Fetch the list of attributes to be checked for various operations.
-	 *
-	 * For HOT considerations, this is wasted effort if we fail to update or
-	 * have to put the new tuple on a different page.  But we must compute the
-	 * list before obtaining buffer lock --- in the worst case, if we are
-	 * doing an update on one of the relevant system catalogs, we could
-	 * deadlock if we try to fetch the list later.  In any case, the relcache
-	 * caches the data so this is usually pretty cheap.
-	 *
-	 * We also need columns used by the replica identity and columns that are
-	 * considered the "key" of rows in the table.
-	 *
-	 * Note that we get copies of each bitmap, so we need not worry about
-	 * relcache flush happening midway through.
-	 */
-	hot_attrs = RelationGetIndexAttrBitmap(relation,
-										   INDEX_ATTR_BITMAP_HOT_BLOCKING);
-	sum_attrs = RelationGetIndexAttrBitmap(relation,
-										   INDEX_ATTR_BITMAP_SUMMARIZED);
-	key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);
-	id_attrs = RelationGetIndexAttrBitmap(relation,
-										  INDEX_ATTR_BITMAP_IDENTITY_KEY);
-	interesting_attrs = NULL;
-	interesting_attrs = bms_add_members(interesting_attrs, hot_attrs);
-	interesting_attrs = bms_add_members(interesting_attrs, sum_attrs);
-	interesting_attrs = bms_add_members(interesting_attrs, key_attrs);
-	interesting_attrs = bms_add_members(interesting_attrs, id_attrs);
-
-	block = ItemPointerGetBlockNumber(otid);
-	INJECTION_POINT("heap_update-before-pin", NULL);
-	buffer = ReadBuffer(relation, block);
-	page = BufferGetPage(buffer);
-
-	/*
-	 * Before locking the buffer, pin the visibility map page if it appears to
-	 * be necessary.  Since we haven't got the lock yet, someone else might be
-	 * in the middle of changing this, so we'll need to recheck after we have
-	 * the lock.
-	 */
-	if (PageIsAllVisible(page))
-		visibilitymap_pin(relation, block, &vmbuffer);
-
-	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-
-	lp = PageGetItemId(page, ItemPointerGetOffsetNumber(otid));
-
-	/*
-	 * Usually, a buffer pin and/or snapshot blocks pruning of otid, ensuring
-	 * we see LP_NORMAL here.  When the otid origin is a syscache, we may have
-	 * neither a pin nor a snapshot.  Hence, we may see other LP_ states, each
-	 * of which indicates concurrent pruning.
-	 *
-	 * Failing with TM_Updated would be most accurate.  However, unlike other
-	 * TM_Updated scenarios, we don't know the successor ctid in LP_UNUSED and
-	 * LP_DEAD cases.  While the distinction between TM_Updated and TM_Deleted
-	 * does matter to SQL statements UPDATE and MERGE, those SQL statements
-	 * hold a snapshot that ensures LP_NORMAL.  Hence, the choice between
-	 * TM_Updated and TM_Deleted affects only the wording of error messages.
-	 * Settle on TM_Deleted, for two reasons.  First, it avoids complicating
-	 * the specification of when tmfd->ctid is valid.  Second, it creates
-	 * error log evidence that we took this branch.
-	 *
-	 * Since it's possible to see LP_UNUSED at otid, it's also possible to see
-	 * LP_NORMAL for a tuple that replaced LP_UNUSED.  If it's a tuple for an
-	 * unrelated row, we'll fail with "duplicate key value violates unique".
-	 * XXX if otid is the live, newer version of the newtup row, we'll discard
-	 * changes originating in versions of this catalog row after the version
-	 * the caller got from syscache.  See syscache-update-pruned.spec.
-	 */
-	if (!ItemIdIsNormal(lp))
-	{
-		Assert(RelationSupportsSysCache(RelationGetRelid(relation)));
-
-		UnlockReleaseBuffer(buffer);
-		Assert(!have_tuple_lock);
-		if (vmbuffer != InvalidBuffer)
-			ReleaseBuffer(vmbuffer);
-		tmfd->ctid = *otid;
-		tmfd->xmax = InvalidTransactionId;
-		tmfd->cmax = InvalidCommandId;
-		*update_indexes = TU_None;
-
-		bms_free(hot_attrs);
-		bms_free(sum_attrs);
-		bms_free(key_attrs);
-		bms_free(id_attrs);
-		/* modified_attrs not yet initialized */
-		bms_free(interesting_attrs);
-		return TM_Deleted;
-	}
-
-	/*
-	 * Fill in enough data in oldtup for HeapDetermineColumnsInfo to work
-	 * properly.
-	 */
-	oldtup.t_tableOid = RelationGetRelid(relation);
-	oldtup.t_data = (HeapTupleHeader) PageGetItem(page, lp);
-	oldtup.t_len = ItemIdGetLength(lp);
-	oldtup.t_self = *otid;
-
-	/* the new tuple is ready, except for this: */
+	/* The new tuple is ready, except for this */
 	newtup->t_tableOid = RelationGetRelid(relation);
 
-	/*
-	 * Determine columns modified by the update.  Additionally, identify
-	 * whether any of the unmodified replica identity key attributes in the
-	 * old tuple is externally stored or not.  This is required because for
-	 * such attributes the flattened value won't be WAL logged as part of the
-	 * new tuple so we must include it as part of the old_key_tuple.  See
-	 * ExtractReplicaIdentity.
-	 */
-	modified_attrs = HeapDetermineColumnsInfo(relation, interesting_attrs,
-											  id_attrs, &oldtup,
-											  newtup, &id_has_external);
-
 	/*
 	 * If we're not updating any "key" column, we can grab a weaker lock type.
 	 * This allows for more concurrency when we are running simultaneously
@@ -3502,7 +3369,7 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 	 * is updates that don't manipulate key columns, not those that
 	 * serendipitously arrive at the same key values.
 	 */
-	if (!bms_overlap(modified_attrs, key_attrs))
+	if (!bms_overlap(mix_attrs, pk_attrs))
 	{
 		*lockmode = LockTupleNoKeyExclusive;
 		mxact_status = MultiXactStatusNoKeyUpdate;
@@ -3526,17 +3393,10 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 		key_intact = false;
 	}
 
-	/*
-	 * Note: beyond this point, use oldtup not otid to refer to old tuple.
-	 * otid may very well point at newtup->t_self, which we will overwrite
-	 * with the new tuple's location, so there's great risk of confusion if we
-	 * use otid anymore.
-	 */
-
 l2:
 	checked_lockers = false;
 	locker_remains = false;
-	result = HeapTupleSatisfiesUpdate(&oldtup, cid, buffer);
+	result = HeapTupleSatisfiesUpdate(oldtup, cid, buffer);
 
 	/* see below about the "no wait" case */
 	Assert(result != TM_BeingModified || wait);
@@ -3568,8 +3428,8 @@ l2:
 		 */
 
 		/* must copy state data before unlocking buffer */
-		xwait = HeapTupleHeaderGetRawXmax(oldtup.t_data);
-		infomask = oldtup.t_data->t_infomask;
+		xwait = HeapTupleHeaderGetRawXmax(oldtup->t_data);
+		infomask = oldtup->t_data->t_infomask;
 
 		/*
 		 * Now we have to do something about the existing locker.  If it's a
@@ -3609,13 +3469,12 @@ l2:
 				 * requesting a lock and already have one; avoids deadlock).
 				 */
 				if (!current_is_member)
-					heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
+					heap_acquire_tuplock(relation, &oldtup->t_self, *lockmode,
 										 LockWaitBlock, &have_tuple_lock);
 
 				/* wait for multixact */
 				MultiXactIdWait((MultiXactId) xwait, mxact_status, infomask,
-								relation, &oldtup.t_self, XLTW_Update,
-								&remain);
+								relation, &oldtup->t_self, XLTW_Update, &remain);
 				checked_lockers = true;
 				locker_remains = remain != 0;
 				LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
@@ -3625,9 +3484,9 @@ l2:
 				 * could update this tuple before we get to this point.  Check
 				 * for xmax change, and start over if so.
 				 */
-				if (xmax_infomask_changed(oldtup.t_data->t_infomask,
+				if (xmax_infomask_changed(oldtup->t_data->t_infomask,
 										  infomask) ||
-					!TransactionIdEquals(HeapTupleHeaderGetRawXmax(oldtup.t_data),
+					!TransactionIdEquals(HeapTupleHeaderGetRawXmax(oldtup->t_data),
 										 xwait))
 					goto l2;
 			}
@@ -3652,8 +3511,8 @@ l2:
 			 * before this one, which are important to keep in case this
 			 * subxact aborts.
 			 */
-			if (!HEAP_XMAX_IS_LOCKED_ONLY(oldtup.t_data->t_infomask))
-				update_xact = HeapTupleGetUpdateXid(oldtup.t_data);
+			if (!HEAP_XMAX_IS_LOCKED_ONLY(oldtup->t_data->t_infomask))
+				update_xact = HeapTupleGetUpdateXid(oldtup->t_data);
 			else
 				update_xact = InvalidTransactionId;
 
@@ -3694,9 +3553,9 @@ l2:
 			 * lock.
 			 */
 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-			heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
+			heap_acquire_tuplock(relation, &oldtup->t_self, *lockmode,
 								 LockWaitBlock, &have_tuple_lock);
-			XactLockTableWait(xwait, relation, &oldtup.t_self,
+			XactLockTableWait(xwait, relation, &oldtup->t_self,
 							  XLTW_Update);
 			checked_lockers = true;
 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
@@ -3706,20 +3565,20 @@ l2:
 			 * other xact could update this tuple before we get to this point.
 			 * Check for xmax change, and start over if so.
 			 */
-			if (xmax_infomask_changed(oldtup.t_data->t_infomask, infomask) ||
+			if (xmax_infomask_changed(oldtup->t_data->t_infomask, infomask) ||
 				!TransactionIdEquals(xwait,
-									 HeapTupleHeaderGetRawXmax(oldtup.t_data)))
+									 HeapTupleHeaderGetRawXmax(oldtup->t_data)))
 				goto l2;
 
 			/* Otherwise check if it committed or aborted */
-			UpdateXmaxHintBits(oldtup.t_data, buffer, xwait);
-			if (oldtup.t_data->t_infomask & HEAP_XMAX_INVALID)
+			UpdateXmaxHintBits(oldtup->t_data, buffer, xwait);
+			if (oldtup->t_data->t_infomask & HEAP_XMAX_INVALID)
 				can_continue = true;
 		}
 
 		if (can_continue)
 			result = TM_Ok;
-		else if (!ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid))
+		else if (!ItemPointerEquals(&oldtup->t_self, &oldtup->t_data->t_ctid))
 			result = TM_Updated;
 		else
 			result = TM_Deleted;
@@ -3732,39 +3591,33 @@ l2:
 			   result == TM_Updated ||
 			   result == TM_Deleted ||
 			   result == TM_BeingModified);
-		Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
+		Assert(!(oldtup->t_data->t_infomask & HEAP_XMAX_INVALID));
 		Assert(result != TM_Updated ||
-			   !ItemPointerEquals(&oldtup.t_self, &oldtup.t_data->t_ctid));
+			   !ItemPointerEquals(&oldtup->t_self, &oldtup->t_data->t_ctid));
 	}
 
 	if (crosscheck != InvalidSnapshot && result == TM_Ok)
 	{
 		/* Perform additional check for transaction-snapshot mode RI updates */
-		if (!HeapTupleSatisfiesVisibility(&oldtup, crosscheck, buffer))
+		if (!HeapTupleSatisfiesVisibility(oldtup, crosscheck, buffer))
 			result = TM_Updated;
 	}
 
 	if (result != TM_Ok)
 	{
-		tmfd->ctid = oldtup.t_data->t_ctid;
-		tmfd->xmax = HeapTupleHeaderGetUpdateXid(oldtup.t_data);
+		tmfd->ctid = oldtup->t_data->t_ctid;
+		tmfd->xmax = HeapTupleHeaderGetUpdateXid(oldtup->t_data);
 		if (result == TM_SelfModified)
-			tmfd->cmax = HeapTupleHeaderGetCmax(oldtup.t_data);
+			tmfd->cmax = HeapTupleHeaderGetCmax(oldtup->t_data);
 		else
 			tmfd->cmax = InvalidCommandId;
 		UnlockReleaseBuffer(buffer);
 		if (have_tuple_lock)
-			UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
-		if (vmbuffer != InvalidBuffer)
-			ReleaseBuffer(vmbuffer);
+			UnlockTupleTuplock(relation, &oldtup->t_self, *lockmode);
+		if (*vmbuffer != InvalidBuffer)
+			ReleaseBuffer(*vmbuffer);
 		*update_indexes = TU_None;
 
-		bms_free(hot_attrs);
-		bms_free(sum_attrs);
-		bms_free(key_attrs);
-		bms_free(id_attrs);
-		bms_free(modified_attrs);
-		bms_free(interesting_attrs);
 		return result;
 	}
 
@@ -3777,10 +3630,10 @@ l2:
 	 * tuple has been locked or updated under us, but hopefully it won't
 	 * happen very often.
 	 */
-	if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
+	if (*vmbuffer == InvalidBuffer && PageIsAllVisible(page))
 	{
 		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-		visibilitymap_pin(relation, block, &vmbuffer);
+		visibilitymap_pin(relation, block, vmbuffer);
 		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 		goto l2;
 	}
@@ -3791,9 +3644,9 @@ l2:
 	 * If the tuple we're updating is locked, we need to preserve the locking
 	 * info in the old tuple's Xmax.  Prepare a new Xmax value for this.
 	 */
-	compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup.t_data),
-							  oldtup.t_data->t_infomask,
-							  oldtup.t_data->t_infomask2,
+	compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup->t_data),
+							  oldtup->t_data->t_infomask,
+							  oldtup->t_data->t_infomask2,
 							  xid, *lockmode, true,
 							  &xmax_old_tuple, &infomask_old_tuple,
 							  &infomask2_old_tuple);
@@ -3805,12 +3658,12 @@ l2:
 	 * tuple.  (In rare cases that might also be InvalidTransactionId and yet
 	 * not have the HEAP_XMAX_INVALID bit set; that's fine.)
 	 */
-	if ((oldtup.t_data->t_infomask & HEAP_XMAX_INVALID) ||
-		HEAP_LOCKED_UPGRADED(oldtup.t_data->t_infomask) ||
+	if ((oldtup->t_data->t_infomask & HEAP_XMAX_INVALID) ||
+		HEAP_LOCKED_UPGRADED(oldtup->t_data->t_infomask) ||
 		(checked_lockers && !locker_remains))
 		xmax_new_tuple = InvalidTransactionId;
 	else
-		xmax_new_tuple = HeapTupleHeaderGetRawXmax(oldtup.t_data);
+		xmax_new_tuple = HeapTupleHeaderGetRawXmax(oldtup->t_data);
 
 	if (!TransactionIdIsValid(xmax_new_tuple))
 	{
@@ -3825,7 +3678,7 @@ l2:
 		 * Note that since we're doing an update, the only possibility is that
 		 * the lockers had FOR KEY SHARE lock.
 		 */
-		if (oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI)
+		if (oldtup->t_data->t_infomask & HEAP_XMAX_IS_MULTI)
 		{
 			GetMultiXactIdHintBits(xmax_new_tuple, &infomask_new_tuple,
 								   &infomask2_new_tuple);
@@ -3853,7 +3706,7 @@ l2:
 	 * Replace cid with a combo CID if necessary.  Note that we already put
 	 * the plain cid into the new tuple.
 	 */
-	HeapTupleHeaderAdjustCmax(oldtup.t_data, &cid, &iscombo);
+	HeapTupleHeaderAdjustCmax(oldtup->t_data, &cid, &iscombo);
 
 	/*
 	 * If the toaster needs to be activated, OR if the new tuple will not fit
@@ -3870,12 +3723,12 @@ l2:
 		relation->rd_rel->relkind != RELKIND_MATVIEW)
 	{
 		/* toast table entries should never be recursively toasted */
-		Assert(!HeapTupleHasExternal(&oldtup));
+		Assert(!HeapTupleHasExternal(oldtup));
 		Assert(!HeapTupleHasExternal(newtup));
 		need_toast = false;
 	}
 	else
-		need_toast = (HeapTupleHasExternal(&oldtup) ||
+		need_toast = (HeapTupleHasExternal(oldtup) ||
 					  HeapTupleHasExternal(newtup) ||
 					  newtup->t_len > TOAST_TUPLE_THRESHOLD);
 
@@ -3908,9 +3761,9 @@ l2:
 		 * updating, because the potentially created multixact would otherwise
 		 * be wrong.
 		 */
-		compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup.t_data),
-								  oldtup.t_data->t_infomask,
-								  oldtup.t_data->t_infomask2,
+		compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup->t_data),
+								  oldtup->t_data->t_infomask,
+								  oldtup->t_data->t_infomask2,
 								  xid, *lockmode, false,
 								  &xmax_lock_old_tuple, &infomask_lock_old_tuple,
 								  &infomask2_lock_old_tuple);
@@ -3920,18 +3773,18 @@ l2:
 		START_CRIT_SECTION();
 
 		/* Clear obsolete visibility flags ... */
-		oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-		oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
-		HeapTupleClearHotUpdated(&oldtup);
+		oldtup->t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+		oldtup->t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+		HeapTupleClearHotUpdated(oldtup);
 		/* ... and store info about transaction updating this tuple */
 		Assert(TransactionIdIsValid(xmax_lock_old_tuple));
-		HeapTupleHeaderSetXmax(oldtup.t_data, xmax_lock_old_tuple);
-		oldtup.t_data->t_infomask |= infomask_lock_old_tuple;
-		oldtup.t_data->t_infomask2 |= infomask2_lock_old_tuple;
-		HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
+		HeapTupleHeaderSetXmax(oldtup->t_data, xmax_lock_old_tuple);
+		oldtup->t_data->t_infomask |= infomask_lock_old_tuple;
+		oldtup->t_data->t_infomask2 |= infomask2_lock_old_tuple;
+		HeapTupleHeaderSetCmax(oldtup->t_data, cid, iscombo);
 
 		/* temporarily make it look not-updated, but locked */
-		oldtup.t_data->t_ctid = oldtup.t_self;
+		oldtup->t_data->t_ctid = oldtup->t_self;
 
 		/*
 		 * Clear all-frozen bit on visibility map if needed. We could
@@ -3940,7 +3793,7 @@ l2:
 		 * worthwhile.
 		 */
 		if (PageIsAllVisible(page) &&
-			visibilitymap_clear(relation, block, vmbuffer,
+			visibilitymap_clear(relation, block, *vmbuffer,
 								VISIBILITYMAP_ALL_FROZEN))
 			cleared_all_frozen = true;
 
@@ -3954,10 +3807,10 @@ l2:
 			XLogBeginInsert();
 			XLogRegisterBuffer(0, buffer, REGBUF_STANDARD);
 
-			xlrec.offnum = ItemPointerGetOffsetNumber(&oldtup.t_self);
+			xlrec.offnum = ItemPointerGetOffsetNumber(&oldtup->t_self);
 			xlrec.xmax = xmax_lock_old_tuple;
-			xlrec.infobits_set = compute_infobits(oldtup.t_data->t_infomask,
-												  oldtup.t_data->t_infomask2);
+			xlrec.infobits_set = compute_infobits(oldtup->t_data->t_infomask,
+												  oldtup->t_data->t_infomask2);
 			xlrec.flags =
 				cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0;
 			XLogRegisterData(&xlrec, SizeOfHeapLock);
@@ -3979,7 +3832,7 @@ l2:
 		if (need_toast)
 		{
 			/* Note we always use WAL and FSM during updates */
-			heaptup = heap_toast_insert_or_update(relation, newtup, &oldtup, 0);
+			heaptup = heap_toast_insert_or_update(relation, newtup, oldtup, 0);
 			newtupsize = MAXALIGN(heaptup->t_len);
 		}
 		else
@@ -4015,20 +3868,20 @@ l2:
 				/* It doesn't fit, must use RelationGetBufferForTuple. */
 				newbuf = RelationGetBufferForTuple(relation, heaptup->t_len,
 												   buffer, 0, NULL,
-												   &vmbuffer_new, &vmbuffer,
+												   &vmbuffer_new, vmbuffer,
 												   0);
 				/* We're all done. */
 				break;
 			}
 			/* Acquire VM page pin if needed and we don't have it. */
-			if (vmbuffer == InvalidBuffer && PageIsAllVisible(page))
-				visibilitymap_pin(relation, block, &vmbuffer);
+			if (*vmbuffer == InvalidBuffer && PageIsAllVisible(page))
+				visibilitymap_pin(relation, block, vmbuffer);
 			/* Re-acquire the lock on the old tuple's page. */
 			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 			/* Re-check using the up-to-date free space */
 			pagefree = PageGetHeapFreeSpace(page);
 			if (newtupsize > pagefree ||
-				(vmbuffer == InvalidBuffer && PageIsAllVisible(page)))
+				(*vmbuffer == InvalidBuffer && PageIsAllVisible(page)))
 			{
 				/*
 				 * Rats, it doesn't fit anymore, or somebody just now set the
@@ -4066,7 +3919,7 @@ l2:
 	 * will include checking the relation level, there is no benefit to a
 	 * separate check for the new tuple.
 	 */
-	CheckForSerializableConflictIn(relation, &oldtup.t_self,
+	CheckForSerializableConflictIn(relation, &oldtup->t_self,
 								   BufferGetBlockNumber(buffer));
 
 	/*
@@ -4074,7 +3927,6 @@ l2:
 	 * has enough space for the new tuple.  If they are the same buffer, only
 	 * one pin is held.
 	 */
-
 	if (newbuf == buffer)
 	{
 		/*
@@ -4082,7 +3934,7 @@ l2:
 		 * to do a HOT update.  Check if any of the index columns have been
 		 * changed.
 		 */
-		if (!bms_overlap(modified_attrs, hot_attrs))
+		if (!bms_overlap(mix_attrs, hot_attrs))
 		{
 			use_hot_update = true;
 
@@ -4093,7 +3945,7 @@ l2:
 			 * indexes if the columns were updated, or we may fail to detect
 			 * e.g. value bound changes in BRIN minmax indexes.
 			 */
-			if (bms_overlap(modified_attrs, sum_attrs))
+			if (bms_overlap(mix_attrs, sum_attrs))
 				summarized_update = true;
 		}
 	}
@@ -4110,10 +3962,8 @@ l2:
 	 * logged.  Pass old key required as true only if the replica identity key
 	 * columns are modified or it has external data.
 	 */
-	old_key_tuple = ExtractReplicaIdentity(relation, &oldtup,
-										   bms_overlap(modified_attrs, id_attrs) ||
-										   id_has_external,
-										   &old_key_copied);
+	old_key_tuple = ExtractReplicaIdentity(relation, oldtup, rid_attrs,
+										   rep_id_key_required, &old_key_copied);
 
 	/* NO EREPORT(ERROR) from here till changes are logged */
 	START_CRIT_SECTION();
@@ -4135,7 +3985,7 @@ l2:
 	if (use_hot_update)
 	{
 		/* Mark the old tuple as HOT-updated */
-		HeapTupleSetHotUpdated(&oldtup);
+		HeapTupleSetHotUpdated(oldtup);
 		/* And mark the new tuple as heap-only */
 		HeapTupleSetHeapOnly(heaptup);
 		/* Mark the caller's copy too, in case different from heaptup */
@@ -4144,7 +3994,7 @@ l2:
 	else
 	{
 		/* Make sure tuples are correctly marked as not-HOT */
-		HeapTupleClearHotUpdated(&oldtup);
+		HeapTupleClearHotUpdated(oldtup);
 		HeapTupleClearHeapOnly(heaptup);
 		HeapTupleClearHeapOnly(newtup);
 	}
@@ -4153,17 +4003,17 @@ l2:
 
 
 	/* Clear obsolete visibility flags, possibly set by ourselves above... */
-	oldtup.t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
-	oldtup.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
+	oldtup->t_data->t_infomask &= ~(HEAP_XMAX_BITS | HEAP_MOVED);
+	oldtup->t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
 	/* ... and store info about transaction updating this tuple */
 	Assert(TransactionIdIsValid(xmax_old_tuple));
-	HeapTupleHeaderSetXmax(oldtup.t_data, xmax_old_tuple);
-	oldtup.t_data->t_infomask |= infomask_old_tuple;
-	oldtup.t_data->t_infomask2 |= infomask2_old_tuple;
-	HeapTupleHeaderSetCmax(oldtup.t_data, cid, iscombo);
+	HeapTupleHeaderSetXmax(oldtup->t_data, xmax_old_tuple);
+	oldtup->t_data->t_infomask |= infomask_old_tuple;
+	oldtup->t_data->t_infomask2 |= infomask2_old_tuple;
+	HeapTupleHeaderSetCmax(oldtup->t_data, cid, iscombo);
 
 	/* record address of new tuple in t_ctid of old one */
-	oldtup.t_data->t_ctid = heaptup->t_self;
+	oldtup->t_data->t_ctid = heaptup->t_self;
 
 	/* clear PD_ALL_VISIBLE flags, reset all visibilitymap bits */
 	if (PageIsAllVisible(BufferGetPage(buffer)))
@@ -4171,7 +4021,7 @@ l2:
 		all_visible_cleared = true;
 		PageClearAllVisible(BufferGetPage(buffer));
 		visibilitymap_clear(relation, BufferGetBlockNumber(buffer),
-							vmbuffer, VISIBILITYMAP_VALID_BITS);
+							*vmbuffer, VISIBILITYMAP_VALID_BITS);
 	}
 	if (newbuf != buffer && PageIsAllVisible(BufferGetPage(newbuf)))
 	{
@@ -4196,12 +4046,12 @@ l2:
 		 */
 		if (RelationIsAccessibleInLogicalDecoding(relation))
 		{
-			log_heap_new_cid(relation, &oldtup);
+			log_heap_new_cid(relation, oldtup);
 			log_heap_new_cid(relation, heaptup);
 		}
 
 		recptr = log_heap_update(relation, buffer,
-								 newbuf, &oldtup, heaptup,
+								 newbuf, oldtup, heaptup,
 								 old_key_tuple,
 								 all_visible_cleared,
 								 all_visible_cleared_new);
@@ -4226,7 +4076,7 @@ l2:
 	 * both tuple versions in one call to inval.c so we can avoid redundant
 	 * sinval messages.)
 	 */
-	CacheInvalidateHeapTuple(relation, &oldtup, heaptup);
+	CacheInvalidateHeapTuple(relation, oldtup, heaptup);
 
 	/* Now we can release the buffer(s) */
 	if (newbuf != buffer)
@@ -4234,14 +4084,14 @@ l2:
 	ReleaseBuffer(buffer);
 	if (BufferIsValid(vmbuffer_new))
 		ReleaseBuffer(vmbuffer_new);
-	if (BufferIsValid(vmbuffer))
-		ReleaseBuffer(vmbuffer);
+	if (BufferIsValid(*vmbuffer))
+		ReleaseBuffer(*vmbuffer);
 
 	/*
 	 * Release the lmgr tuple lock, if we had it.
 	 */
 	if (have_tuple_lock)
-		UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
+		UnlockTupleTuplock(relation, &oldtup->t_self, *lockmode);
 
 	pgstat_count_heap_update(relation, use_hot_update, newbuf != buffer);
 
@@ -4274,13 +4124,6 @@ l2:
 	if (old_key_tuple != NULL && old_key_copied)
 		heap_freetuple(old_key_tuple);
 
-	bms_free(hot_attrs);
-	bms_free(sum_attrs);
-	bms_free(key_attrs);
-	bms_free(id_attrs);
-	bms_free(modified_attrs);
-	bms_free(interesting_attrs);
-
 	return TM_Ok;
 }
 
@@ -4289,7 +4132,7 @@ l2:
  * Confirm adequate lock held during heap_update(), per rules from
  * README.tuplock section "Locking to write inplace-updated tables".
  */
-static void
+void
 check_lock_if_inplace_updateable_rel(Relation relation,
 									 const ItemPointerData *otid,
 									 HeapTuple newtup)
@@ -4461,7 +4304,7 @@ heap_attr_equals(TupleDesc tupdesc, int attrnum, Datum value1, Datum value2,
  * listed as interesting) of the old tuple is a member of external_cols and is
  * stored externally.
  */
-static Bitmapset *
+Bitmapset *
 HeapDetermineColumnsInfo(Relation relation,
 						 Bitmapset *interesting_cols,
 						 Bitmapset *external_cols,
@@ -4544,25 +4387,175 @@ HeapDetermineColumnsInfo(Relation relation,
 }
 
 /*
- *	simple_heap_update - replace a tuple
- *
- * This routine may be used to update a tuple when concurrent updates of
- * the target tuple are not expected (for example, because we have a lock
- * on the relation associated with the tuple).  Any failure is reported
- * via ereport().
+ * This routine may be used to update a tuple when concurrent updates of the
+ * target tuple are not expected (for example, because we have a lock on the
+ * relation associated with the tuple).  Any failure is reported via ereport().
  */
 void
-simple_heap_update(Relation relation, const ItemPointerData *otid, HeapTuple tup,
+simple_heap_update(Relation relation, const ItemPointerData *otid, HeapTuple tuple,
 				   TU_UpdateIndexes *update_indexes)
 {
 	TM_Result	result;
 	TM_FailureData tmfd;
 	LockTupleMode lockmode;
+	Buffer		buffer;
+	Buffer		vmbuffer = InvalidBuffer;
+	Page		page;
+	BlockNumber block;
+	Bitmapset  *hot_attrs,
+			   *sum_attrs,
+			   *pk_attrs,
+			   *rid_attrs,
+			   *mix_attrs,
+			   *idx_attrs;
+	ItemId		lp;
+	HeapTupleData oldtup;
+	bool		rep_id_key_required = false;
+
+	Assert(ItemPointerIsValid(otid));
+
+	/* Cheap, simplistic check that the tuple matches the rel's rowtype. */
+	Assert(HeapTupleHeaderGetNatts(tuple->t_data) <=
+		   RelationGetNumberOfAttributes(relation));
+
+	/*
+	 * Forbid this during a parallel operation, lest it allocate a combo CID.
+	 * Other workers might need that combo CID for visibility checks, and we
+	 * have no provision for broadcasting it to them.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot update tuples during a parallel operation")));
+
+#ifdef USE_ASSERT_CHECKING
+	check_lock_if_inplace_updateable_rel(relation, otid, tuple);
+#endif
+
+	/*
+	 * Fetch the list of attributes to be checked for various operations.
+	 *
+	 * For HOT considerations, this is wasted effort if we fail to update or
+	 * have to put the new tuple on a different page.  But we must compute the
+	 * list before obtaining buffer lock --- in the worst case, if we are
+	 * doing an update on one of the relevant system catalogs, we could
+	 * deadlock if we try to fetch the list later.  In any case, the relcache
+	 * caches the data so this is usually pretty cheap.
+	 *
+	 * We also need columns used by the replica identity and columns that are
+	 * considered the "key" of rows in the table.
+	 *
+	 * Note that we get copies of each bitmap, so we need not worry about
+	 * relcache flush happening midway through.
+	 */
+	hot_attrs = RelationGetIndexAttrBitmap(relation,
+										   INDEX_ATTR_BITMAP_HOT_BLOCKING);
+	sum_attrs = RelationGetIndexAttrBitmap(relation,
+										   INDEX_ATTR_BITMAP_SUMMARIZED);
+	pk_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);
+	rid_attrs = RelationGetIndexAttrBitmap(relation,
+										   INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+	idx_attrs = bms_copy(hot_attrs);
+	idx_attrs = bms_add_members(idx_attrs, sum_attrs);
+	idx_attrs = bms_add_members(idx_attrs, pk_attrs);
+	idx_attrs = bms_add_members(idx_attrs, rid_attrs);
+
+	block = ItemPointerGetBlockNumber(otid);
+	INJECTION_POINT("heap_update-before-pin", NULL);
+	buffer = ReadBuffer(relation, block);
+	page = BufferGetPage(buffer);
+
+	/*
+	 * Before locking the buffer, pin the visibility map page if it appears to
+	 * be necessary.  Since we haven't got the lock yet, someone else might be
+	 * in the middle of changing this, so we'll need to recheck after we have
+	 * the lock.
+	 */
+	if (PageIsAllVisible(page))
+		visibilitymap_pin(relation, block, &vmbuffer);
+
+	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+	lp = PageGetItemId(page, ItemPointerGetOffsetNumber(otid));
+
+	/*
+	 * Usually, a buffer pin and/or snapshot blocks pruning of otid, ensuring
+	 * we see LP_NORMAL here.  When the otid origin is a syscache, we may have
+	 * neither a pin nor a snapshot.  Hence, we may see other LP_ states, each
+	 * of which indicates concurrent pruning.
+	 *
+	 * Failing with TM_Updated would be most accurate.  However, unlike other
+	 * TM_Updated scenarios, we don't know the successor ctid in LP_UNUSED and
+	 * LP_DEAD cases.  While the distinction between TM_Updated and TM_Deleted
+	 * does matter to SQL statements UPDATE and MERGE, those SQL statements
+	 * hold a snapshot that ensures LP_NORMAL.  Hence, the choice between
+	 * TM_Updated and TM_Deleted affects only the wording of error messages.
+	 * Settle on TM_Deleted, for two reasons.  First, it avoids complicating
+	 * the specification of when tmfd->ctid is valid.  Second, it creates
+	 * error log evidence that we took this branch.
+	 *
+	 * Since it's possible to see LP_UNUSED at otid, it's also possible to see
+	 * LP_NORMAL for a tuple that replaced LP_UNUSED.  If it's a tuple for an
+	 * unrelated row, we'll fail with "duplicate key value violates unique".
+	 * XXX if otid is the live, newer version of the newtup row, we'll discard
+	 * changes originating in versions of this catalog row after the version
+	 * the caller got from syscache.  See syscache-update-pruned.spec.
+	 */
+	if (!ItemIdIsNormal(lp))
+	{
+		Assert(RelationSupportsSysCache(RelationGetRelid(relation)));
+
+		UnlockReleaseBuffer(buffer);
+		if (vmbuffer != InvalidBuffer)
+			ReleaseBuffer(vmbuffer);
+		*update_indexes = TU_None;
+
+		bms_free(hot_attrs);
+		bms_free(sum_attrs);
+		bms_free(pk_attrs);
+		bms_free(rid_attrs);
+		bms_free(idx_attrs);
+		/* mix_attrs not yet initialized */
+
+		elog(ERROR, "tuple concurrently deleted");
+
+		return;
+	}
+
+	/*
+	 * Partially construct the oldtup for HeapDetermineColumnsInfo to work and
+	 * then pass that on to heap_update.
+	 */
+	oldtup.t_tableOid = RelationGetRelid(relation);
+	oldtup.t_data = (HeapTupleHeader) PageGetItem(page, lp);
+	oldtup.t_len = ItemIdGetLength(lp);
+	oldtup.t_self = *otid;
+
+	mix_attrs = HeapDetermineColumnsInfo(relation, idx_attrs, rid_attrs,
+										 &oldtup, tuple, &rep_id_key_required);
+
+	/*
+	 * We'll need to WAL log the replica identity attributes if either they
+	 * overlap with the modified indexed attributes or, as we've checked for
+	 * just now in HeapDetermineColumnsInfo, they were unmodified external
+	 * indexed attributes.
+	 */
+	rep_id_key_required = rep_id_key_required || bms_overlap(mix_attrs, rid_attrs);
+
+	result = heap_update(relation, &oldtup, tuple, GetCurrentCommandId(true),
+						 InvalidSnapshot, true /* wait for commit */ , &tmfd, &lockmode,
+						 buffer, page, block, lp, hot_attrs, sum_attrs, pk_attrs,
+						 rid_attrs, mix_attrs, &vmbuffer, rep_id_key_required,
+						 update_indexes);
+
+	bms_free(hot_attrs);
+	bms_free(sum_attrs);
+	bms_free(pk_attrs);
+	bms_free(rid_attrs);
+	bms_free(mix_attrs);
+	bms_free(idx_attrs);
 
-	result = heap_update(relation, otid, tup,
-						 GetCurrentCommandId(true), InvalidSnapshot,
-						 true /* wait for commit */ ,
-						 &tmfd, &lockmode, update_indexes);
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -9218,12 +9211,11 @@ log_heap_new_cid(Relation relation, HeapTuple tup)
  * the same tuple that was passed in.
  */
 static HeapTuple
-ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
-					   bool *copy)
+ExtractReplicaIdentity(Relation relation, HeapTuple tp, Bitmapset *rid_attrs,
+					   bool key_required, bool *copy)
 {
 	TupleDesc	desc = RelationGetDescr(relation);
 	char		replident = relation->rd_rel->relreplident;
-	Bitmapset  *idattrs;
 	HeapTuple	key_tuple;
 	bool		nulls[MaxHeapAttributeNumber];
 	Datum		values[MaxHeapAttributeNumber];
@@ -9254,17 +9246,13 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 	if (!key_required)
 		return NULL;
 
-	/* find out the replica identity columns */
-	idattrs = RelationGetIndexAttrBitmap(relation,
-										 INDEX_ATTR_BITMAP_IDENTITY_KEY);
-
 	/*
 	 * If there's no defined replica identity columns, treat as !key_required.
 	 * (This case should not be reachable from heap_update, since that should
 	 * calculate key_required accurately.  But heap_delete just passes
 	 * constant true for key_required, so we can hit this case in deletes.)
 	 */
-	if (bms_is_empty(idattrs))
+	if (bms_is_empty(rid_attrs))
 		return NULL;
 
 	/*
@@ -9277,7 +9265,7 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 	for (int i = 0; i < desc->natts; i++)
 	{
 		if (bms_is_member(i + 1 - FirstLowInvalidHeapAttributeNumber,
-						  idattrs))
+						  rid_attrs))
 			Assert(!nulls[i]);
 		else
 			nulls[i] = true;
@@ -9286,8 +9274,6 @@ ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required,
 	key_tuple = heap_form_tuple(desc, values, nulls);
 	*copy = true;
 
-	bms_free(idattrs);
-
 	/*
 	 * If the tuple, which by here only contains indexed columns, still has
 	 * toasted columns, force them to be inlined. This is somewhat unlikely
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index cbef73e5d4b..54c021ff209 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -44,6 +44,7 @@
 #include "storage/procarray.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/rel.h"
 
 static void reform_and_rewrite_tuple(HeapTuple tuple,
@@ -312,23 +313,133 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 	return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
 }
 
-
 static TM_Result
 heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 					CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 					bool wait, TM_FailureData *tmfd,
 					LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes)
 {
+	bool		rep_id_key_required = false;
 	bool		shouldFree = true;
 	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+	HeapTupleData oldtup;
+	Buffer		buffer;
+	Buffer		vmbuffer = InvalidBuffer;
+	Page		page;
+	BlockNumber block;
+	ItemId		lp;
+	Bitmapset  *hot_attrs,
+			   *sum_attrs,
+			   *pk_attrs,
+			   *rid_attrs,
+			   *mix_attrs,
+			   *idx_attrs;
 	TM_Result	result;
 
+	Assert(ItemPointerIsValid(otid));
+
+	/* Cheap, simplistic check that the tuple matches the rel's rowtype. */
+	Assert(HeapTupleHeaderGetNatts(tuple->t_data) <=
+		   RelationGetNumberOfAttributes(relation));
+
+	/*
+	 * Forbid this during a parallel operation, lest it allocate a combo CID.
+	 * Other workers might need that combo CID for visibility checks, and we
+	 * have no provision for broadcasting it to them.
+	 */
+	if (IsInParallelMode())
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+				 errmsg("cannot update tuples during a parallel operation")));
+
+#ifdef USE_ASSERT_CHECKING
+	check_lock_if_inplace_updateable_rel(relation, otid, tuple);
+#endif
+
+	/*
+	 * Fetch the list of attributes to be checked for various operations.
+	 *
+	 * For HOT considerations, this is wasted effort if we fail to update or
+	 * have to put the new tuple on a different page.  But we must compute the
+	 * list before obtaining buffer lock --- in the worst case, if we are
+	 * doing an update on one of the relevant system catalogs, we could
+	 * deadlock if we try to fetch the list later.  In any case, the relcache
+	 * caches the data so this is usually pretty cheap.
+	 *
+	 * We also need columns used by the replica identity and columns that are
+	 * considered the "key" of rows in the table.
+	 *
+	 * Note that we get copies of each bitmap, so we need not worry about
+	 * relcache flush happening midway through.
+	 */
+	hot_attrs = RelationGetIndexAttrBitmap(relation,
+										   INDEX_ATTR_BITMAP_HOT_BLOCKING);
+	sum_attrs = RelationGetIndexAttrBitmap(relation,
+										   INDEX_ATTR_BITMAP_SUMMARIZED);
+	pk_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);
+	rid_attrs = RelationGetIndexAttrBitmap(relation,
+										   INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+	idx_attrs = bms_copy(hot_attrs);
+	idx_attrs = bms_add_members(idx_attrs, sum_attrs);
+	idx_attrs = bms_add_members(idx_attrs, pk_attrs);
+	idx_attrs = bms_add_members(idx_attrs, rid_attrs);
+
+	block = ItemPointerGetBlockNumber(otid);
+	INJECTION_POINT("heap_update-before-pin", NULL);
+	buffer = ReadBuffer(relation, block);
+	page = BufferGetPage(buffer);
+
+	/*
+	 * Before locking the buffer, pin the visibility map page if it appears to
+	 * be necessary.  Since we haven't got the lock yet, someone else might be
+	 * in the middle of changing this, so we'll need to recheck after we have
+	 * the lock.
+	 */
+	if (PageIsAllVisible(page))
+		visibilitymap_pin(relation, block, &vmbuffer);
+
+	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+	lp = PageGetItemId(page, ItemPointerGetOffsetNumber(otid));
+
+	Assert(ItemIdIsNormal(lp));
+
+	/*
+	 * Partially construct the oldtup for HeapDetermineColumnsInfo to work and
+	 * then pass that on to heap_update.
+	 */
+	oldtup.t_tableOid = RelationGetRelid(relation);
+	oldtup.t_data = (HeapTupleHeader) PageGetItem(page, lp);
+	oldtup.t_len = ItemIdGetLength(lp);
+	oldtup.t_self = *otid;
+
+	mix_attrs = HeapDetermineColumnsInfo(relation, idx_attrs, rid_attrs,
+										 &oldtup, tuple, &rep_id_key_required);
+
+	/*
+	 * We'll need to WAL log the replica identity attributes if either they
+	 * overlap with the modified indexed attributes or, as we've checked for
+	 * just now in HeapDetermineColumnsInfo, they were unmodified external
+	 * indexed attributes.
+	 */
+	rep_id_key_required = rep_id_key_required || bms_overlap(mix_attrs, rid_attrs);
+
 	/* Update the tuple with table oid */
 	slot->tts_tableOid = RelationGetRelid(relation);
 	tuple->t_tableOid = slot->tts_tableOid;
 
-	result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
-						 tmfd, lockmode, update_indexes);
+	result = heap_update(relation, &oldtup, tuple, cid, crosscheck, wait, tmfd, lockmode,
+						 buffer, page, block, lp, hot_attrs, sum_attrs, pk_attrs,
+						 rid_attrs, mix_attrs, &vmbuffer, rep_id_key_required, update_indexes);
+
+	bms_free(hot_attrs);
+	bms_free(sum_attrs);
+	bms_free(pk_attrs);
+	bms_free(rid_attrs);
+	bms_free(mix_attrs);
+	bms_free(idx_attrs);
+
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	/*
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 3c0961ab36b..38f944771db 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -364,11 +364,13 @@ extern TM_Result heap_delete(Relation relation, const ItemPointerData *tid,
 							 TM_FailureData *tmfd, bool changingPart);
 extern void heap_finish_speculative(Relation relation, const ItemPointerData *tid);
 extern void heap_abort_speculative(Relation relation, const ItemPointerData *tid);
-extern TM_Result heap_update(Relation relation, const ItemPointerData *otid,
-							 HeapTuple newtup,
-							 CommandId cid, Snapshot crosscheck, bool wait,
-							 TM_FailureData *tmfd, LockTupleMode *lockmode,
-							 TU_UpdateIndexes *update_indexes);
+extern TM_Result heap_update(Relation relation, HeapTupleData *oldtup,
+							 HeapTuple newtup, CommandId cid, Snapshot crosscheck, bool wait,
+							 TM_FailureData *tmfd, LockTupleMode *lockmode, Buffer buffer,
+							 Page page, BlockNumber block, ItemId lp, Bitmapset *hot_attrs,
+							 Bitmapset *sum_attrs, Bitmapset *pk_attrs, Bitmapset *rid_attrs,
+							 Bitmapset *mix_attrs, Buffer *vmbuffer,
+							 bool rep_id_key_required, TU_UpdateIndexes *update_indexes);
 extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 								 CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool follow_updates,
@@ -430,6 +432,18 @@ extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer,
 									  OffsetNumber *dead, int ndead,
 									  OffsetNumber *unused, int nunused);
 
+/* in heap/heapam.c */
+extern Bitmapset *HeapDetermineColumnsInfo(Relation relation,
+										   Bitmapset *interesting_cols,
+										   Bitmapset *external_cols,
+										   HeapTuple oldtup, HeapTuple newtup,
+										   bool *has_external);
+#ifdef USE_ASSERT_CHECKING
+extern void check_lock_if_inplace_updateable_rel(Relation relation,
+												 const ItemPointerData *otid,
+												 HeapTuple newtup);
+#endif
+
 /* in heap/vacuumlazy.c */
 extern void heap_vacuum_rel(Relation rel,
 							const VacuumParams params, BufferAccessStrategy bstrategy);
-- 
2.51.2

