public inbox for [email protected]  
help / color / mirror / Atom feed
From: Greg Burd <[email protected]>
To: Jeff Davis <[email protected]>
Cc: pgsql-hackers <[email protected]>
Subject: Re: Expanding HOT updates for expression and partial indexes
Date: Mon, 02 Mar 2026 14:08:50 -0500
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<akciabcu3b2hchj7adxhu4kovfaozp2pcn2z7sdljfthxcyg4o@7e6sfyzipvyy>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>

Hello Jeff, hackers,

In v33 I've updated a test in triggers.sql to address differences across platforms identified by the cf-bot and rebased the work.

I thought it might be prudent to add tests that validate all the corner cases of HOT that I could come up with, maybe too many (you tell me).  In addition, because code that impacts HOT is also involved in what is WAL logged for the purposes of logical replication, I've added tests that try to explore the corners of that too.  The goal of these first few patches is to NOT change the behavior of these things, but to only move the logic into the executor and out of heap then it makes sense to validate that explicitly.

At some point when I get back to $subjet I'll want to document how things changed.  The best way to do that is by changing tests along with code.  So, that is "0001" in this v33 patch set.

I've also run longer performance tests which show minimal performance differences between master and patched.

Workload                60s     300s    600s
jsonb_write_batch       +14.8%  -7.0%   +0.1%
jsonb_write_single      +0.3%   +0.2%   -0.0%
license_write_single    +0.4%   +0.2%   -0.1%
gin_write_single        -0.3%   -0.2%   -0.4%
pgbench_simple-update   +3.2%   +6.2%   +0.9%
pgbench_tpcb-like       -1.1%   +0.8%   -2.0%

Changing tests isn't something I take lightly, I dug into this quite a bit.  I ran an analysis of ALL regression tests comparing master vs patched after instrumenting the code (see below) so I could record HOT and replica identity decisions and record where the tuple landed on the page.

Patched code produced:
  simple_heap_update: 17,028 calls (72.5% - catalog updates, direct heap ops)
  heapam_tuple_update: 6,462 calls (27.5% - executor path via table AM)
  Total entry points: 23,490

This matched master's log line output for the same tests.

Replica identity decisions were identical, 342 unique patterns with 0 differences.

HOT eligibility was also identical, 398 unique patterns matched, again 0 differences.
  
The physical placement of tuples on pages was 99.991% identical, only 2 of 23,473 updates had different buffer placement.

Across test runs there were a few differences noted for pg_sequence, target, and wslot.  Both master and patched agreed on hot_allowed=1 (logic identical), but in some cases use_hot_update differed (buffer placement, newbuf =?= buffer).  To me this reads as non-deterministic behavior, not a bug introduced in this patch.

At this point I'd say that v33 patch is functionally correct and performance neutral.  This set of changes isn't exactly exciting on the surface, but I feel that it opens the door to other changes that will be more interesting/valuable down the line.

Thank you for your time and interest.

best.

-greg


COMPARISON TESTING NOTES:
---------------------------------------------------------------------------------------
src/backend/access/heap/heapam.c
3514:    elog(LOG, "PATCHED heap_update (replica check): rel=%s otid=(%u,%u) rep_id_key_required=%d",
3515-         RelationGetRelationName(relation),
3516-         ItemPointerGetBlockNumber(otid),
3517-         ItemPointerGetOffsetNumber(otid),
3518-         rep_id_key_required);
3519-
--
4106:    elog(LOG, "PATCHED heap_update (final HOT): rel=%s otid=(%u,%u) hot_allowed=%d newbuf==buffer=%d use_hot_update=%d",
4107-         RelationGetRelationName(relation),
4108-         ItemPointerGetBlockNumber(otid),
4109-         ItemPointerGetOffsetNumber(otid),
4110-         hot_allowed, (newbuf == buffer), use_hot_update);
4111-
--
4693:    elog(LOG, "PATCHED simple_heap_update: rel=%s otid=(%u,%u) hot_allowed=%d summarized_only=%d lockmode=%d",
4694-         RelationGetRelationName(relation),
4695-         ItemPointerGetBlockNumber(otid),
4696-         ItemPointerGetOffsetNumber(otid),
4697-         hot_allowed, summarized_only, lockmode);
4698-

                                             
src/backend/access/heap/heapam_handler.c
333:    elog(LOG, "PATCHED heapam_tuple_update: rel=%s otid=(%u,%u) hot_allowed=%d summarized_only=%d lockmode=%d",
334-         RelationGetRelationName(relation),
335-         ItemPointerGetBlockNumber(otid),
336-         ItemPointerGetOffsetNumber(otid),
337-         hot_allowed, summarized_only, *lockmode);

Attachments:

  [text/x-patch] v33-0001-Add-comprehensive-tests-for-HOT-updates-and-repl.patch (102.7K, 2-v33-0001-Add-comprehensive-tests-for-HOT-updates-and-repl.patch)
  download | inline diff:
From 95cda56b5e9d53232c9b5f95abe423e878b1fe78 Mon Sep 17 00:00:00 2001
From: Greg Burd <[email protected]>
Date: Fri, 27 Feb 2026 12:19:06 -0500
Subject: [PATCH v33 1/2] Add comprehensive tests for HOT updates and replica
 identity

Adds regression and isolation tests covering:
- HOT update decisions across various index types (B-tree, BRIN,
  partial, expression, multi-column, unique constraints)
- Replica identity key extraction for logical replication
  (DEFAULT, FULL, USING INDEX, NOTHING modes)
- Concurrent HOT update scenarios (locking, blocking, index scans,
  HOT chains, FOR UPDATE/KEY SHARE interactions)

Regression tests:
- hot_updates.sql: 10 scenarios testing HOT eligibility
- replica_identity_logging.sql: 11 scenarios verifying replica
  identity keys logged to WAL via test_decoding

Isolation tests:
- hot_updates_concurrent.spec: concurrent updates on same/different rows
- hot_updates_index_scan.spec: interactions with index scans and row locks
- hot_updates_chain.spec: HOT chain building and snapshot isolation
---
 .../isolation/expected/hot_updates_chain.out  | 144 ++++
 .../expected/hot_updates_concurrent.out       | 143 ++++
 .../expected/hot_updates_index_scan.out       | 126 +++
 src/test/isolation/isolation_schedule         |   3 +
 .../isolation/specs/hot_updates_chain.spec    | 110 +++
 .../specs/hot_updates_concurrent.spec         | 107 +++
 .../specs/hot_updates_index_scan.spec         |  91 +++
 src/test/regress/expected/hot_updates.out     | 725 ++++++++++++++++++
 .../expected/replica_identity_logging.out     | 396 ++++++++++
 src/test/regress/parallel_schedule            |   7 +
 src/test/regress/sql/hot_updates.sql          | 553 +++++++++++++
 .../regress/sql/replica_identity_logging.sql  | 349 +++++++++
 12 files changed, 2754 insertions(+)
 create mode 100644 src/test/isolation/expected/hot_updates_chain.out
 create mode 100644 src/test/isolation/expected/hot_updates_concurrent.out
 create mode 100644 src/test/isolation/expected/hot_updates_index_scan.out
 create mode 100644 src/test/isolation/specs/hot_updates_chain.spec
 create mode 100644 src/test/isolation/specs/hot_updates_concurrent.spec
 create mode 100644 src/test/isolation/specs/hot_updates_index_scan.spec
 create mode 100644 src/test/regress/expected/hot_updates.out
 create mode 100644 src/test/regress/expected/replica_identity_logging.out
 create mode 100644 src/test/regress/sql/hot_updates.sql
 create mode 100644 src/test/regress/sql/replica_identity_logging.sql

diff --git a/src/test/isolation/expected/hot_updates_chain.out b/src/test/isolation/expected/hot_updates_chain.out
new file mode 100644
index 00000000000..503252009ea
--- /dev/null
+++ b/src/test/isolation/expected/hot_updates_chain.out
@@ -0,0 +1,144 @@
+Parsed test spec with 5 sessions
+
+starting permutation: s1_begin s1_hot_update1 s1_hot_update2 s1_hot_update3 s1_commit s1_select s1_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_hot_update2: UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1;
+step s1_hot_update3: UPDATE hot_test SET non_indexed_col = 'update3' WHERE id = 1;
+step s1_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|update3        
+(1 row)
+
+step s1_verify_hot: 
+    -- Check for HOT chain: LP_REDIRECT or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s2_begin s2_select_before s1_begin s1_hot_update1 s1_hot_update2 s1_commit s2_select_after s2_commit
+step s2_begin: BEGIN ISOLATION LEVEL REPEATABLE READ;
+step s2_select_before: SELECT non_indexed_col FROM hot_test WHERE id = 1;
+non_indexed_col
+---------------
+initial        
+(1 row)
+
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_hot_update2: UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1;
+step s1_commit: COMMIT;
+step s2_select_after: SELECT non_indexed_col FROM hot_test WHERE id = 1;
+non_indexed_col
+---------------
+initial        
+(1 row)
+
+step s2_commit: COMMIT;
+
+starting permutation: s1_begin s1_hot_update1 s1_hot_update2 s1_commit s3_begin s3_non_hot_update s3_commit s1_select
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_hot_update2: UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1;
+step s1_commit: COMMIT;
+step s3_begin: BEGIN;
+step s3_non_hot_update: UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+step s3_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        150|update2        
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update1 s1_commit s3_begin s3_non_hot_update s3_commit s4_begin s4_hot_after_non_hot s4_commit s4_select s4_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_commit: COMMIT;
+step s3_begin: BEGIN;
+step s3_non_hot_update: UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+step s3_commit: COMMIT;
+step s4_begin: BEGIN;
+step s4_hot_after_non_hot: UPDATE hot_test SET non_indexed_col = 'after_non_hot' WHERE id = 1;
+step s4_commit: COMMIT;
+step s4_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        150|after_non_hot  
+(1 row)
+
+step s4_verify_hot: 
+    -- Check for new HOT chain after non-HOT update broke the previous chain
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update1 s1_hot_update2 s5_begin s5_hot_update_row2_1 s5_hot_update_row2_2 s1_commit s5_commit s1_select s5_select s1_verify_hot s5_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update1: UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1;
+step s1_hot_update2: UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1;
+step s5_begin: BEGIN;
+step s5_hot_update_row2_1: UPDATE hot_test SET non_indexed_col = 'row2_update1' WHERE id = 2;
+step s5_hot_update_row2_2: UPDATE hot_test SET non_indexed_col = 'row2_update2' WHERE id = 2;
+step s1_commit: COMMIT;
+step s5_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|update2        
+(1 row)
+
+step s5_select: SELECT * FROM hot_test WHERE id = 2;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 2|        200|row2_update2   
+(1 row)
+
+step s1_verify_hot: 
+    -- Check for HOT chain: LP_REDIRECT or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+step s5_verify_hot: 
+    -- Check for HOT chain on page 0
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
diff --git a/src/test/isolation/expected/hot_updates_concurrent.out b/src/test/isolation/expected/hot_updates_concurrent.out
new file mode 100644
index 00000000000..b1a8b0cb7b2
--- /dev/null
+++ b/src/test/isolation/expected/hot_updates_concurrent.out
@@ -0,0 +1,143 @@
+Parsed test spec with 4 sessions
+
+starting permutation: s1_begin s1_hot_update s2_begin s2_hot_update s1_commit s2_commit s1_select s2_select s2_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1;
+step s2_begin: BEGIN;
+step s2_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s2' WHERE id = 1; <waiting ...>
+step s1_commit: COMMIT;
+step s2_hot_update: <... completed>
+step s2_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|updated_s2     
+(1 row)
+
+step s2_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|updated_s2     
+(1 row)
+
+step s2_verify_hot: 
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update s3_begin s3_non_hot_update s1_commit s3_commit s3_select s3_verify_index
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1;
+step s3_begin: BEGIN;
+step s3_non_hot_update: UPDATE hot_test SET indexed_col = 150 WHERE id = 1; <waiting ...>
+step s1_commit: COMMIT;
+step s3_non_hot_update: <... completed>
+step s3_commit: COMMIT;
+step s3_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        150|updated_s1     
+(1 row)
+
+step s3_verify_index: 
+    -- Verify index was updated (proves non-HOT)
+    SELECT COUNT(*) = 1 AS index_updated FROM hot_test WHERE indexed_col = 150;
+    SELECT COUNT(*) = 0 AS old_value_gone FROM hot_test WHERE indexed_col = 100;
+
+index_updated
+-------------
+t            
+(1 row)
+
+old_value_gone
+--------------
+t             
+(1 row)
+
+
+starting permutation: s3_begin s3_non_hot_update s1_begin s1_hot_update s3_commit s1_commit s1_select s1_verify_hot
+step s3_begin: BEGIN;
+step s3_non_hot_update: UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1; <waiting ...>
+step s3_commit: COMMIT;
+step s1_hot_update: <... completed>
+step s1_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        150|updated_s1     
+(1 row)
+
+step s1_verify_hot: 
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update s4_begin s4_hot_update_row2 s1_commit s4_commit s1_select s4_select s1_verify_hot s4_verify_hot
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1;
+step s4_begin: BEGIN;
+step s4_hot_update_row2: UPDATE hot_test SET non_indexed_col = 'updated_s4' WHERE id = 2;
+step s1_commit: COMMIT;
+step s4_commit: COMMIT;
+step s1_select: SELECT * FROM hot_test WHERE id = 1;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 1|        100|updated_s1     
+(1 row)
+
+step s4_select: SELECT * FROM hot_test WHERE id = 2;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+ 2|        200|updated_s4     
+(1 row)
+
+step s1_verify_hot: 
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+step s4_verify_hot: 
+    -- Check for HOT chain on page 0
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
diff --git a/src/test/isolation/expected/hot_updates_index_scan.out b/src/test/isolation/expected/hot_updates_index_scan.out
new file mode 100644
index 00000000000..d72322b2146
--- /dev/null
+++ b/src/test/isolation/expected/hot_updates_index_scan.out
@@ -0,0 +1,126 @@
+Parsed test spec with 4 sessions
+
+starting permutation: s1_begin s1_hot_update s2_begin s2_index_scan s1_commit s2_commit
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50;
+step s2_begin: BEGIN;
+step s2_index_scan: SELECT * FROM hot_test WHERE indexed_col = 500;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|initial50      
+(1 row)
+
+step s1_commit: COMMIT;
+step s2_commit: COMMIT;
+
+starting permutation: s1_begin s1_non_hot_update s1_commit s2_begin s2_index_scan_new s2_commit s2_verify_index
+step s1_begin: BEGIN;
+step s1_non_hot_update: UPDATE hot_test SET indexed_col = 555 WHERE id = 50;
+step s1_commit: COMMIT;
+step s2_begin: BEGIN;
+step s2_index_scan_new: SELECT * FROM hot_test WHERE indexed_col = 555;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        555|initial50      
+(1 row)
+
+step s2_commit: COMMIT;
+step s2_verify_index: 
+    -- After non-HOT update, verify index reflects the change
+    SELECT COUNT(*) = 1 AS found_new_value FROM hot_test WHERE indexed_col = 555;
+    SELECT COUNT(*) = 0 AS old_value_gone FROM hot_test WHERE indexed_col = 500;
+
+found_new_value
+---------------
+t              
+(1 row)
+
+old_value_gone
+--------------
+t             
+(1 row)
+
+
+starting permutation: s3_begin s3_select_for_update s1_begin s1_hot_update s3_commit s1_commit s1_verify_hot
+step s3_begin: BEGIN;
+step s3_select_for_update: SELECT * FROM hot_test WHERE id = 50 FOR UPDATE;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|initial50      
+(1 row)
+
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50; <waiting ...>
+step s3_commit: COMMIT;
+step s1_hot_update: <... completed>
+step s1_commit: COMMIT;
+step s1_verify_hot: 
+    -- Verify HOT chain exists for row with id=50
+    SELECT EXISTS (
+        SELECT 1 FROM heap_page_items(get_raw_page('hot_test', 0))
+        WHERE lp_flags = 2
+           OR (t_ctid IS NOT NULL
+               AND (t_ctid::text::point)[0]::int = 0
+               AND t_ctid != ('(0,' || lp || ')')::tid)
+    ) AS has_hot_chain;
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s1_begin s1_hot_update s3_begin s3_select_for_update s1_commit s3_commit
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50;
+step s3_begin: BEGIN;
+step s3_select_for_update: SELECT * FROM hot_test WHERE id = 50 FOR UPDATE; <waiting ...>
+step s1_commit: COMMIT;
+step s3_select_for_update: <... completed>
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|hot_updated    
+(1 row)
+
+step s3_commit: COMMIT;
+
+starting permutation: s4_begin s4_select_for_key_share s1_begin s1_hot_update s4_commit s1_commit s1_verify_hot
+step s4_begin: BEGIN;
+step s4_select_for_key_share: SELECT * FROM hot_test WHERE id = 50 FOR KEY SHARE;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|initial50      
+(1 row)
+
+step s1_begin: BEGIN;
+step s1_hot_update: UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50;
+step s4_commit: COMMIT;
+step s1_commit: COMMIT;
+step s1_verify_hot: 
+    -- Verify HOT chain exists for row with id=50
+    SELECT EXISTS (
+        SELECT 1 FROM heap_page_items(get_raw_page('hot_test', 0))
+        WHERE lp_flags = 2
+           OR (t_ctid IS NOT NULL
+               AND (t_ctid::text::point)[0]::int = 0
+               AND t_ctid != ('(0,' || lp || ')')::tid)
+    ) AS has_hot_chain;
+
+has_hot_chain
+-------------
+t            
+(1 row)
+
+
+starting permutation: s4_begin s4_select_for_key_share s1_begin s1_non_hot_update s4_commit s1_commit
+step s4_begin: BEGIN;
+step s4_select_for_key_share: SELECT * FROM hot_test WHERE id = 50 FOR KEY SHARE;
+id|indexed_col|non_indexed_col
+--+-----------+---------------
+50|        500|initial50      
+(1 row)
+
+step s1_begin: BEGIN;
+step s1_non_hot_update: UPDATE hot_test SET indexed_col = 555 WHERE id = 50;
+step s4_commit: COMMIT;
+step s1_commit: COMMIT;
diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule
index 4e466580cd4..46525b0a62a 100644
--- a/src/test/isolation/isolation_schedule
+++ b/src/test/isolation/isolation_schedule
@@ -19,6 +19,9 @@ test: multiple-row-versions
 test: index-only-scan
 test: index-only-bitmapscan
 test: predicate-lock-hot-tuple
+test: hot_updates_concurrent
+test: hot_updates_index_scan
+test: hot_updates_chain
 test: update-conflict-out
 test: deadlock-simple
 test: deadlock-hard
diff --git a/src/test/isolation/specs/hot_updates_chain.spec b/src/test/isolation/specs/hot_updates_chain.spec
new file mode 100644
index 00000000000..85cd2176133
--- /dev/null
+++ b/src/test/isolation/specs/hot_updates_chain.spec
@@ -0,0 +1,110 @@
+# Test HOT update chains and their interaction with VACUUM and page pruning
+#
+# This test verifies that HOT update chains are correctly maintained when
+# multiple HOT updates occur on the same row, and that VACUUM correctly
+# handles HOT chains.
+
+setup
+{
+    CREATE EXTENSION IF NOT EXISTS pageinspect;
+
+    CREATE TABLE hot_test (
+        id int PRIMARY KEY,
+        indexed_col int,
+        non_indexed_col text
+    );
+
+    CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+
+    INSERT INTO hot_test VALUES (1, 100, 'initial');
+    INSERT INTO hot_test VALUES (2, 200, 'initial');
+}
+
+teardown
+{
+    DROP TABLE hot_test;
+    DROP EXTENSION pageinspect;
+}
+
+# Session 1: Create HOT chain with multiple updates
+session s1
+step s1_begin { BEGIN; }
+step s1_hot_update1 { UPDATE hot_test SET non_indexed_col = 'update1' WHERE id = 1; }
+step s1_hot_update2 { UPDATE hot_test SET non_indexed_col = 'update2' WHERE id = 1; }
+step s1_hot_update3 { UPDATE hot_test SET non_indexed_col = 'update3' WHERE id = 1; }
+step s1_commit { COMMIT; }
+step s1_select { SELECT * FROM hot_test WHERE id = 1; }
+step s1_verify_hot {
+    -- Check for HOT chain: LP_REDIRECT or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+}
+
+# Session 2: Read while HOT chain is being built
+session s2
+step s2_begin { BEGIN ISOLATION LEVEL REPEATABLE READ; }
+step s2_select_before { SELECT non_indexed_col FROM hot_test WHERE id = 1; }
+step s2_select_after { SELECT non_indexed_col FROM hot_test WHERE id = 1; }
+step s2_commit { COMMIT; }
+
+# Session 3: Break HOT chain with non-HOT update
+session s3
+step s3_begin { BEGIN; }
+step s3_non_hot_update { UPDATE hot_test SET indexed_col = 150 WHERE id = 1; }
+step s3_commit { COMMIT; }
+
+# Session 4: Try to build HOT chain after non-HOT update
+session s4
+step s4_begin { BEGIN; }
+step s4_hot_after_non_hot { UPDATE hot_test SET non_indexed_col = 'after_non_hot' WHERE id = 1; }
+step s4_commit { COMMIT; }
+step s4_select { SELECT * FROM hot_test WHERE id = 1; }
+step s4_verify_hot {
+    -- Check for new HOT chain after non-HOT update broke the previous chain
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+}
+
+# Session 5: Multiple sessions building separate HOT chains on different rows
+session s5
+step s5_begin { BEGIN; }
+step s5_hot_update_row2_1 { UPDATE hot_test SET non_indexed_col = 'row2_update1' WHERE id = 2; }
+step s5_hot_update_row2_2 { UPDATE hot_test SET non_indexed_col = 'row2_update2' WHERE id = 2; }
+step s5_commit { COMMIT; }
+step s5_select { SELECT * FROM hot_test WHERE id = 2; }
+step s5_verify_hot {
+    -- Check for HOT chain on page 0
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+}
+
+# Build HOT chain within single transaction
+# All updates should form a HOT chain
+permutation s1_begin s1_hot_update1 s1_hot_update2 s1_hot_update3 s1_commit s1_select s1_verify_hot
+
+# REPEATABLE READ should see consistent snapshot across HOT chain updates
+# Session 2 starts before updates, should see 'initial' throughout
+permutation s2_begin s2_select_before s1_begin s1_hot_update1 s1_hot_update2 s1_commit s2_select_after s2_commit
+
+# HOT chain followed by non-HOT update
+# Non-HOT update breaks the HOT chain
+permutation s1_begin s1_hot_update1 s1_hot_update2 s1_commit s3_begin s3_non_hot_update s3_commit s1_select
+
+# HOT update after non-HOT update can start new HOT chain
+# After breaking chain with indexed column update, new HOT updates can start fresh chain
+permutation s1_begin s1_hot_update1 s1_commit s3_begin s3_non_hot_update s3_commit s4_begin s4_hot_after_non_hot s4_commit s4_select s4_verify_hot
+
+# Multiple sessions building separate HOT chains on different rows
+permutation s1_begin s1_hot_update1 s1_hot_update2 s5_begin s5_hot_update_row2_1 s5_hot_update_row2_2 s1_commit s5_commit s1_select s5_select s1_verify_hot s5_verify_hot
diff --git a/src/test/isolation/specs/hot_updates_concurrent.spec b/src/test/isolation/specs/hot_updates_concurrent.spec
new file mode 100644
index 00000000000..eac78d62ac5
--- /dev/null
+++ b/src/test/isolation/specs/hot_updates_concurrent.spec
@@ -0,0 +1,107 @@
+# Test concurrent HOT updates and validate HOT chains
+#
+# This test verifies that HOT updates work correctly when multiple sessions
+# are updating the same table concurrently, and validates that HOT chains
+# are actually created using heap_page_items().
+
+setup
+{
+    CREATE EXTENSION IF NOT EXISTS pageinspect;
+
+    CREATE TABLE hot_test (
+        id int PRIMARY KEY,
+        indexed_col int,
+        non_indexed_col text
+    );
+
+    CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+
+    INSERT INTO hot_test VALUES (1, 100, 'initial1');
+    INSERT INTO hot_test VALUES (2, 200, 'initial2');
+    INSERT INTO hot_test VALUES (3, 300, 'initial3');
+}
+
+teardown
+{
+    DROP TABLE hot_test;
+    DROP EXTENSION pageinspect;
+}
+
+# Session 1: HOT update (modify non-indexed column)
+session s1
+step s1_begin { BEGIN; }
+step s1_hot_update { UPDATE hot_test SET non_indexed_col = 'updated_s1' WHERE id = 1; }
+step s1_commit { COMMIT; }
+step s1_select { SELECT * FROM hot_test WHERE id = 1; }
+step s1_verify_hot {
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+}
+
+# Session 2: HOT update (modify non-indexed column on same row)
+session s2
+step s2_begin { BEGIN; }
+step s2_hot_update { UPDATE hot_test SET non_indexed_col = 'updated_s2' WHERE id = 1; }
+step s2_commit { COMMIT; }
+step s2_select { SELECT * FROM hot_test WHERE id = 1; }
+step s2_verify_hot {
+    -- Check for HOT chain: look for LP_REDIRECT (lp_flags=2) or tuple with t_ctid pointing to same page
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2  -- LP_REDIRECT indicates HOT chain
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0  -- same page
+           AND t_ctid != ('(0,' || lp || ')')::tid);    -- different offset
+}
+
+# Session 3: Non-HOT update (modify indexed column)
+session s3
+step s3_begin { BEGIN; }
+step s3_non_hot_update { UPDATE hot_test SET indexed_col = 150 WHERE id = 1; }
+step s3_commit { COMMIT; }
+step s3_select { SELECT * FROM hot_test WHERE id = 1; }
+step s3_verify_index {
+    -- Verify index was updated (proves non-HOT)
+    SELECT COUNT(*) = 1 AS index_updated FROM hot_test WHERE indexed_col = 150;
+    SELECT COUNT(*) = 0 AS old_value_gone FROM hot_test WHERE indexed_col = 100;
+}
+
+# Session 4: Concurrent HOT updates on different rows
+session s4
+step s4_begin { BEGIN; }
+step s4_hot_update_row2 { UPDATE hot_test SET non_indexed_col = 'updated_s4' WHERE id = 2; }
+step s4_commit { COMMIT; }
+step s4_select { SELECT * FROM hot_test WHERE id = 2; }
+step s4_verify_hot {
+    -- Check for HOT chain on page 0
+    SELECT COUNT(*) > 0 AS has_hot_chain
+    FROM heap_page_items(get_raw_page('hot_test', 0))
+    WHERE lp_flags = 2
+       OR (t_ctid IS NOT NULL
+           AND (t_ctid::text::point)[0]::int = 0
+           AND t_ctid != ('(0,' || lp || ')')::tid);
+}
+
+# Two sessions both doing HOT updates on same row
+# Second session should block until first commits
+# Both should create HOT chains
+permutation s1_begin s1_hot_update s2_begin s2_hot_update s1_commit s2_commit s1_select s2_select s2_verify_hot
+
+# HOT update followed by non-HOT update
+# Non-HOT update should wait for HOT update to commit
+# First update is HOT, second is non-HOT (index updated)
+permutation s1_begin s1_hot_update s3_begin s3_non_hot_update s1_commit s3_commit s3_select s3_verify_index
+
+# Non-HOT update followed by HOT update
+# HOT update should wait for non-HOT update to commit
+# First update is non-HOT (index), second is HOT
+permutation s3_begin s3_non_hot_update s1_begin s1_hot_update s3_commit s1_commit s1_select s1_verify_hot
+
+# Concurrent HOT updates on different rows (should not block)
+# Both sessions should be able to create HOT chains independently
+permutation s1_begin s1_hot_update s4_begin s4_hot_update_row2 s1_commit s4_commit s1_select s4_select s1_verify_hot s4_verify_hot
diff --git a/src/test/isolation/specs/hot_updates_index_scan.spec b/src/test/isolation/specs/hot_updates_index_scan.spec
new file mode 100644
index 00000000000..39db07cc80f
--- /dev/null
+++ b/src/test/isolation/specs/hot_updates_index_scan.spec
@@ -0,0 +1,91 @@
+# Test HOT updates interaction with index scans and SELECT FOR UPDATE
+#
+# This test verifies that HOT updates are correctly handled when concurrent
+# sessions are performing index scans, using SELECT FOR UPDATE, and validates
+# HOT chains using heap_page_items().
+
+setup
+{
+    CREATE EXTENSION IF NOT EXISTS pageinspect;
+
+    CREATE TABLE hot_test (
+        id int PRIMARY KEY,
+        indexed_col int,
+        non_indexed_col text
+    );
+
+    CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+
+    INSERT INTO hot_test SELECT i, i * 10, 'initial' || i FROM generate_series(1, 100) i;
+}
+
+teardown
+{
+    DROP TABLE hot_test;
+    DROP EXTENSION pageinspect;
+}
+
+# Session 1: Perform HOT update
+session s1
+step s1_begin { BEGIN; }
+step s1_hot_update { UPDATE hot_test SET non_indexed_col = 'hot_updated' WHERE id = 50; }
+step s1_non_hot_update { UPDATE hot_test SET indexed_col = 555 WHERE id = 50; }
+step s1_commit { COMMIT; }
+step s1_verify_hot {
+    -- Verify HOT chain exists for row with id=50
+    SELECT EXISTS (
+        SELECT 1 FROM heap_page_items(get_raw_page('hot_test', 0))
+        WHERE lp_flags = 2
+           OR (t_ctid IS NOT NULL
+               AND (t_ctid::text::point)[0]::int = 0
+               AND t_ctid != ('(0,' || lp || ')')::tid)
+    ) AS has_hot_chain;
+}
+
+# Session 2: Index scan while HOT update in progress
+session s2
+step s2_begin { BEGIN; }
+step s2_index_scan { SELECT * FROM hot_test WHERE indexed_col = 500; }
+step s2_index_scan_new { SELECT * FROM hot_test WHERE indexed_col = 555; }
+step s2_commit { COMMIT; }
+step s2_verify_index {
+    -- After non-HOT update, verify index reflects the change
+    SELECT COUNT(*) = 1 AS found_new_value FROM hot_test WHERE indexed_col = 555;
+    SELECT COUNT(*) = 0 AS old_value_gone FROM hot_test WHERE indexed_col = 500;
+}
+
+# Session 3: SELECT FOR UPDATE
+session s3
+step s3_begin { BEGIN; }
+step s3_select_for_update { SELECT * FROM hot_test WHERE id = 50 FOR UPDATE; }
+step s3_commit { COMMIT; }
+
+# Session 4: SELECT FOR KEY SHARE (should not block HOT update of non-key column)
+session s4
+step s4_begin { BEGIN; }
+step s4_select_for_key_share { SELECT * FROM hot_test WHERE id = 50 FOR KEY SHARE; }
+step s4_commit { COMMIT; }
+
+# Index scan should see consistent snapshot during HOT update
+# Index scan starts before HOT update commits
+permutation s1_begin s1_hot_update s2_begin s2_index_scan s1_commit s2_commit
+
+# Index scan after non-HOT update should see new index entry
+# Index scan starts after non-HOT update commits
+permutation s1_begin s1_non_hot_update s1_commit s2_begin s2_index_scan_new s2_commit s2_verify_index
+
+# SELECT FOR UPDATE blocks HOT update
+# FOR UPDATE should block the UPDATE until SELECT commits
+permutation s3_begin s3_select_for_update s1_begin s1_hot_update s3_commit s1_commit s1_verify_hot
+
+# HOT update blocks SELECT FOR UPDATE
+# SELECT FOR UPDATE should wait for HOT update to commit
+permutation s1_begin s1_hot_update s3_begin s3_select_for_update s1_commit s3_commit
+
+# SELECT FOR KEY SHARE should not block HOT update (non-key column)
+# HOT update of non-indexed column should not conflict with FOR KEY SHARE
+permutation s4_begin s4_select_for_key_share s1_begin s1_hot_update s4_commit s1_commit s1_verify_hot
+
+# Non-HOT update (key column) should block after FOR KEY SHARE
+# Non-HOT update of indexed column should wait for FOR KEY SHARE
+permutation s4_begin s4_select_for_key_share s1_begin s1_non_hot_update s4_commit s1_commit
diff --git a/src/test/regress/expected/hot_updates.out b/src/test/regress/expected/hot_updates.out
new file mode 100644
index 00000000000..04fb86755db
--- /dev/null
+++ b/src/test/regress/expected/hot_updates.out
@@ -0,0 +1,725 @@
+--
+-- HOT_UPDATES
+-- Test Heap-Only Tuple (HOT) update decisions
+--
+-- This test systematically verifies that HOT updates are used when appropriate
+-- and avoided when necessary (e.g., when indexed columns are modified).
+--
+-- We use multiple validation methods:
+-- 1. Index verification (index still works = proves no index update for HOT)
+-- 2. Statistics functions (pg_stat_get_tuples_hot_updated)
+-- 3. pageinspect extension for HOT chain examination
+--
+-- Load required extensions
+CREATE EXTENSION IF NOT EXISTS pageinspect;
+-- Clean up from prior runs
+DROP TABLE IF EXISTS hot_test CASCADE;
+NOTICE:  table "hot_test" does not exist, skipping
+-- Function to get HOT update count
+CREATE OR REPLACE FUNCTION get_hot_count(rel_name text)
+RETURNS TABLE (
+    updates BIGINT,
+    hot BIGINT
+) AS $$
+DECLARE
+  rel_oid oid;
+BEGIN
+  rel_oid := rel_name::regclass::oid;
+  PERFORM pg_stat_force_next_flush();
+
+  updates := COALESCE(pg_stat_get_tuples_updated(rel_oid), 0) +
+             COALESCE(pg_stat_get_xact_tuples_updated(rel_oid), 0);
+  hot := COALESCE(pg_stat_get_tuples_hot_updated(rel_oid), 0) +
+         COALESCE(pg_stat_get_xact_tuples_hot_updated(rel_oid), 0);
+
+  RETURN NEXT;
+END;
+$$ LANGUAGE plpgsql;
+-- Check if a tuple is part of a HOT chain (has a predecessor on same page)
+CREATE OR REPLACE FUNCTION has_hot_chain(rel_name text, target_ctid tid)
+RETURNS boolean AS $$
+DECLARE
+  block_num int;
+  page_item record;
+BEGIN
+  block_num := (target_ctid::text::point)[0]::int;
+
+  -- Look for a different tuple on the same page that points to our target tuple
+  FOR page_item IN
+    SELECT lp, lp_flags, t_ctid
+    FROM heap_page_items(get_raw_page(rel_name, block_num))
+    WHERE lp_flags = 1
+      AND t_ctid IS NOT NULL
+      AND t_ctid = target_ctid
+      AND ('(' || block_num::text || ',' || lp::text || ')')::tid != target_ctid
+  LOOP
+    RETURN true;
+  END LOOP;
+
+  RETURN false;
+END;
+$$ LANGUAGE plpgsql;
+-- Print the HOT chain starting from a given tuple
+CREATE OR REPLACE FUNCTION print_hot_chain(rel_name text, start_ctid tid)
+RETURNS TABLE(chain_position int, ctid tid, lp_flags text, t_ctid tid, chain_end boolean) AS
+$$
+#variable_conflict use_column
+DECLARE
+  block_num int;
+  line_ptr int;
+  current_ctid tid := start_ctid;
+  next_ctid tid;
+  position int := 0;
+  max_iterations int := 100;
+  page_item record;
+  found_predecessor boolean := false;
+  flags_name text;
+BEGIN
+  block_num := (start_ctid::text::point)[0]::int;
+
+  -- Find the predecessor (old tuple pointing to our start_ctid)
+  FOR page_item IN
+    SELECT lp, lp_flags, t_ctid
+    FROM heap_page_items(get_raw_page(rel_name, block_num))
+    WHERE lp_flags = 1
+      AND t_ctid = start_ctid
+  LOOP
+    current_ctid := ('(' || block_num::text || ',' || page_item.lp::text || ')')::tid;
+    found_predecessor := true;
+    EXIT;
+  END LOOP;
+
+  -- If no predecessor found, start with the given ctid
+  IF NOT found_predecessor THEN
+    current_ctid := start_ctid;
+  END IF;
+
+  -- Follow the chain forward
+  WHILE position < max_iterations LOOP
+    line_ptr := (current_ctid::text::point)[1]::int;
+
+    FOR page_item IN
+      SELECT lp, lp_flags, t_ctid
+      FROM heap_page_items(get_raw_page(rel_name, block_num))
+      WHERE lp = line_ptr
+    LOOP
+      -- Map lp_flags to names
+      flags_name := CASE page_item.lp_flags
+        WHEN 0 THEN 'unused (0)'
+        WHEN 1 THEN 'normal (1)'
+        WHEN 2 THEN 'redirect (2)'
+        WHEN 3 THEN 'dead (3)'
+        ELSE 'unknown (' || page_item.lp_flags::text || ')'
+      END;
+
+      RETURN QUERY SELECT
+        position,
+        current_ctid,
+        flags_name,
+        page_item.t_ctid,
+        (page_item.t_ctid IS NULL OR page_item.t_ctid = current_ctid)::boolean
+      ;
+
+      IF page_item.t_ctid IS NULL OR page_item.t_ctid = current_ctid THEN
+        RETURN;
+      END IF;
+
+      next_ctid := page_item.t_ctid;
+
+      IF (next_ctid::text::point)[0]::int != block_num THEN
+        RETURN;
+      END IF;
+
+      current_ctid := next_ctid;
+      position := position + 1;
+    END LOOP;
+
+    IF position = 0 THEN
+      RETURN;
+    END IF;
+  END LOOP;
+END;
+$$ LANGUAGE plpgsql;
+-- Trigger page pruning via table scan
+CREATE OR REPLACE FUNCTION heap_prune_page(rel_name text, target_ctid tid)
+RETURNS void AS $$
+DECLARE
+  block_num int;
+BEGIN
+  -- Extract block number from ctid
+  block_num := (target_ctid::text::point)[0]::int;
+
+  -- Scan only the specific page to trigger pruning on that page
+  EXECUTE 'SELECT COUNT(*) FROM ' || quote_ident(rel_name) ||
+           ' WHERE ctid >= (' || block_num || ',0) AND ctid < (' || (block_num + 1) || ',0)';
+END;
+$$ LANGUAGE plpgsql;
+-- Basic HOT update (update non-indexed column)
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    non_indexed_col text
+) USING heap;
+CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+INSERT INTO hot_test VALUES (1, 100, 'initial');
+INSERT INTO hot_test VALUES (2, 200, 'initial');
+INSERT INTO hot_test VALUES (3, 300, 'initial');
+-- Get baseline and initial ctid
+WITH initial_state AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'Initial State' AS phase,
+  initial_state.ctid,
+  (get_hot_count('hot_test')).updates,
+  (get_hot_count('hot_test')).hot
+FROM initial_state;
+     phase     | ctid  | updates | hot 
+---------------+-------+---------+-----
+ Initial State | (0,1) |       0 |   0
+(1 row)
+
+-- Should be HOT updates (only non-indexed column modified)
+UPDATE hot_test SET non_indexed_col = 'updated1' WHERE id = 1;
+UPDATE hot_test SET non_indexed_col = 'updated2' WHERE id = 2;
+UPDATE hot_test SET non_indexed_col = 'updated3' WHERE id = 3;
+-- Verify HOT updates occurred
+SELECT
+  'After Updates' AS phase,
+  (get_hot_count('hot_test')).updates,
+  (get_hot_count('hot_test')).hot;
+     phase     | updates | hot 
+---------------+---------+-----
+ After Updates |       3 |   3
+(1 row)
+
+-- Dump the HOT chain before pruning
+WITH current_tuple AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'Before VACUUM' AS phase,
+  has_hot_chain('hot_test', current_tuple.ctid) AS has_chain,
+  chain_position,
+  print_hot_chain.ctid,
+  lp_flags,
+  t_ctid
+FROM current_tuple,
+LATERAL print_hot_chain('hot_test', current_tuple.ctid);
+     phase     | has_chain | chain_position | ctid  |  lp_flags  | t_ctid 
+---------------+-----------+----------------+-------+------------+--------
+ Before VACUUM | t         |              0 | (0,1) | normal (1) | (0,4)
+ Before VACUUM | t         |              1 | (0,4) | normal (1) | (0,4)
+(2 rows)
+
+SET SESSION enable_seqscan = OFF;
+SET SESSION enable_bitmapscan = OFF;
+-- Verify indexes still work
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 100;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Index Scan using hot_test_indexed_idx on hot_test
+   Index Cond: (indexed_col = 100)
+(2 rows)
+
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 100;
+ id | indexed_col 
+----+-------------
+  1 |         100
+(1 row)
+
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 200;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Index Scan using hot_test_indexed_idx on hot_test
+   Index Cond: (indexed_col = 200)
+(2 rows)
+
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 200;
+ id | indexed_col 
+----+-------------
+  2 |         200
+(1 row)
+
+-- Vacuum the relation, expect the HOT chain to collapse
+VACUUM hot_test;
+-- Show that there is no chain after vacuum
+WITH current_tuple AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'After VACUUM' AS phase,
+  has_hot_chain('hot_test', current_tuple.ctid) AS has_chain,
+  chain_position,
+  print_hot_chain.ctid,
+  lp_flags,
+  t_ctid
+FROM current_tuple,
+LATERAL print_hot_chain('hot_test', current_tuple.ctid);
+    phase     | has_chain | chain_position | ctid  |  lp_flags  | t_ctid 
+--------------+-----------+----------------+-------+------------+--------
+ After VACUUM | f         |              0 | (0,4) | normal (1) | (0,4)
+(1 row)
+
+-- Non-HOT update (update indexed column)
+UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (4,3)
+(1 row)
+
+-- Verify index was updated (new value findable)
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 150;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Index Scan using hot_test_indexed_idx on hot_test
+   Index Cond: (indexed_col = 150)
+(2 rows)
+
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 150;
+ id | indexed_col 
+----+-------------
+  1 |         150
+(1 row)
+
+-- Verify old value no longer in index
+EXPLAIN (COSTS OFF) SELECT id FROM hot_test WHERE indexed_col = 100;
+                    QUERY PLAN                     
+---------------------------------------------------
+ Index Scan using hot_test_indexed_idx on hot_test
+   Index Cond: (indexed_col = 100)
+(2 rows)
+
+SELECT id FROM hot_test WHERE indexed_col = 100;
+ id 
+----
+(0 rows)
+
+SET SESSION enable_seqscan = ON;
+SET SESSION enable_bitmapscan = ON;
+-- All-or-none property: updating one indexed column requires ALL index updates
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    col_a int,
+    col_b int,
+    col_c int,
+    non_indexed text
+) USING heap;
+CREATE INDEX hot_test_a_idx ON hot_test(col_a);
+CREATE INDEX hot_test_b_idx ON hot_test(col_b);
+CREATE INDEX hot_test_c_idx ON hot_test(col_c);
+INSERT INTO hot_test VALUES (1, 10, 20, 30, 'initial');
+-- Update only col_a - should NOT be HOT because an indexed column changed
+-- This means ALL indexes must be updated (all-or-none property)
+UPDATE hot_test SET col_a = 15 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,0)
+(1 row)
+
+-- Verify all three indexes still work correctly
+SELECT id, col_a FROM hot_test WHERE col_a = 15;  -- updated index
+ id | col_a 
+----+-------
+  1 |    15
+(1 row)
+
+SELECT id, col_b FROM hot_test WHERE col_b = 20;  -- unchanged index
+ id | col_b 
+----+-------
+  1 |    20
+(1 row)
+
+SELECT id, col_c FROM hot_test WHERE col_c = 30;  -- unchanged index
+ id | col_c 
+----+-------
+  1 |    30
+(1 row)
+
+-- Now update only non-indexed column - should be HOT
+UPDATE hot_test SET non_indexed = 'updated';
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,1)
+(1 row)
+
+-- Verify all indexes still work
+SELECT id FROM hot_test WHERE col_a = 15 AND col_b = 20 AND col_c = 30;
+ id 
+----
+  1
+(1 row)
+
+-- Partial index: both old and new outside predicate (conservative = non-HOT)
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    status text,
+    data text
+) USING heap;
+-- Partial index only covers status = 'active'
+CREATE INDEX hot_test_active_idx ON hot_test(status) WHERE status = 'active';
+INSERT INTO hot_test VALUES (1, 'active', 'data1');
+INSERT INTO hot_test VALUES (2, 'inactive', 'data2');
+INSERT INTO hot_test VALUES (3, 'deleted', 'data3');
+-- Update non-indexed column on 'active' row (in predicate, status unchanged)
+-- Should be HOT
+UPDATE hot_test SET data = 'updated1' WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Update non-indexed column on 'inactive' row (outside predicate)
+-- Should be HOT
+UPDATE hot_test SET data = 'updated2' WHERE id = 2;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,2)
+(1 row)
+
+-- Update status from 'inactive' to 'deleted' (both outside predicate)
+-- PostgreSQL is conservative: heap insert happens before predicate check
+-- So this is NON-HOT even though both values are outside predicate
+UPDATE hot_test SET status = 'deleted' WHERE id = 2;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (3,2)
+(1 row)
+
+-- Verify index still works for 'active' rows
+SELECT id, status FROM hot_test WHERE status = 'active';
+ id | status 
+----+--------
+  1 | active
+(1 row)
+
+-- Expression index with JSONB
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    metadata jsonb
+) USING heap;
+-- Index on JSONB expression
+CREATE INDEX hot_test_user_id_idx ON hot_test((metadata->>'user_id'));
+CREATE INDEX hot_test_status_idx ON hot_test((metadata->>'status'));
+INSERT INTO hot_test VALUES (1, '{"user_id": "123", "status": "active"}'::jsonb);
+-- Update JSONB field used in expression index to the same value,
+-- this will be HOT because the entire JSONB field is observed to
+-- be unchanged.
+UPDATE hot_test SET metadata = jsonb_set(metadata, '{user_id}', '"123"')
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Update JSONB field that is no used in any index to some new value, this
+-- will prevent a HOT update despite not changing what is used when forming
+-- the index key, this is counter intuitive and causes index bloat as well
+-- as slows down updates on JSONB data as any change will trigger all
+-- indexes to be updated.
+UPDATE hot_test SET metadata = jsonb_set(metadata, '{food}', '"apple"')
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,1)
+(1 row)
+
+-- Use a few different methods for mutating JSONB data, but don't modify
+-- indexed portions of the document.  None of these will be HOT.
+UPDATE hot_test SET metadata = jsonb_set(
+  jsonb_set(metadata, '{food}', '"pear"'),
+  '{timestamp}',
+  to_jsonb(now())
+)
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (3,1)
+(1 row)
+
+UPDATE hot_test
+SET metadata = metadata || '{"user_id": "123", "timestamp": "2024-01-01"}'::jsonb
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (4,1)
+(1 row)
+
+UPDATE hot_test SET metadata =
+  jsonb_set(
+    jsonb_set(metadata, '{user_id}', '"123"'),
+    '{fruit}',
+    '"plumb"'
+  );
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (5,1)
+(1 row)
+
+UPDATE hot_test SET metadata = metadata || jsonb_build_object(
+  'user_id', '123',
+  'timestamp', now(),
+  'fruit', 'honeydew'
+);
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (6,1)
+(1 row)
+
+-- Only BRIN (summarizing) indexes on non-PK columns
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    ts timestamp,
+    value int,
+    brin_col int
+) USING heap;
+CREATE INDEX hot_test_ts_brin ON hot_test USING brin(ts);
+CREATE INDEX hot_test_brin_col_brin ON hot_test USING brin(brin_col);
+INSERT INTO hot_test VALUES (1, '2024-01-01', 100, 1000);
+-- Update both BRIN columns - should still be HOT (only summarizing indexes)
+UPDATE hot_test SET ts = '2024-01-02', brin_col = 2000 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Verify BRIN indexes work
+SELECT id FROM hot_test WHERE ts >= '2024-01-02';
+ id 
+----
+  1
+(1 row)
+
+SELECT id FROM hot_test WHERE brin_col >= 2000;
+ id 
+----
+  1
+(1 row)
+
+-- Update non-indexed column - should also be HOT
+UPDATE hot_test SET value = 200 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,2)
+(1 row)
+
+-- TOAST and HOT: TOASTed columns can participate in HOT
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    large_text text,
+    small_text text
+) USING heap;
+CREATE INDEX hot_test_idx ON hot_test(indexed_col);
+-- Insert row with TOASTed column (> 2KB)
+INSERT INTO hot_test VALUES (1, 100, repeat('x', 3000), 'small');
+-- Update non-indexed, non-TOASTed column - should be HOT
+UPDATE hot_test SET small_text = 'updated';
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Update TOASTed column - should be HOT if indexed column unchanged
+UPDATE hot_test SET large_text = repeat('y', 3000);
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,2)
+(1 row)
+
+-- Verify index still works
+SELECT id FROM hot_test WHERE indexed_col = 100;
+ id 
+----
+  1
+(1 row)
+
+-- Update indexed column - should NOT be HOT
+UPDATE hot_test SET indexed_col = 200;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (3,2)
+(1 row)
+
+-- Verify index was updated
+SELECT id FROM hot_test WHERE indexed_col = 200;
+ id 
+----
+  1
+(1 row)
+
+-- Unique constraint (unique index) behaves like regular index
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    unique_col int UNIQUE,
+    data text
+) USING heap;
+INSERT INTO hot_test VALUES (1, 100, 'data1');
+INSERT INTO hot_test VALUES (2, 200, 'data2');
+-- Update data (non-indexed) - should be HOT
+UPDATE hot_test SET data = 'updated';
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (2,2)
+(1 row)
+
+-- Verify unique constraint still enforced
+SELECT id, unique_col, data FROM hot_test ORDER BY id;
+ id | unique_col |  data   
+----+------------+---------
+  1 |        100 | updated
+  2 |        200 | updated
+(2 rows)
+
+-- This should fail (unique violation)
+UPDATE hot_test SET unique_col = 100 WHERE id = 2;
+ERROR:  duplicate key value violates unique constraint "hot_test_unique_col_key"
+DETAIL:  Key (unique_col)=(100) already exists.
+-- Multi-column index: any column change = non-HOT
+DROP TABLE hot_test;
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    col_a int,
+    col_b int,
+    col_c int,
+    data text
+) USING heap;
+CREATE INDEX hot_test_ab_idx ON hot_test(col_a, col_b);
+INSERT INTO hot_test VALUES (1, 10, 20, 30, 'data');
+-- Update col_a (part of multi-column index) - should NOT be HOT
+UPDATE hot_test SET col_a = 15;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (1,0)
+(1 row)
+
+-- Reset
+UPDATE hot_test SET col_a = 10;
+-- Update col_b (part of multi-column index) - should NOT be HOT
+UPDATE hot_test SET col_b = 25;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (3,0)
+(1 row)
+
+-- Reset
+UPDATE hot_test SET col_b = 20;
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (4,0)
+(1 row)
+
+-- Update col_c (not indexed) - should be HOT
+UPDATE hot_test SET col_c = 35;
+-- Update data (not indexed) - should be HOT
+UPDATE hot_test SET data = 'updated';
+SELECT get_hot_count('hot_test');
+ get_hot_count 
+---------------
+ (6,2)
+(1 row)
+
+-- Verify multi-column index works
+SELECT id FROM hot_test WHERE col_a = 10 AND col_b = 20;
+ id 
+----
+  1
+(1 row)
+
+-- Partitioned tables: HOT works within partitions
+DROP TABLE IF EXISTS hot_test_partitioned CASCADE;
+NOTICE:  table "hot_test_partitioned" does not exist, skipping
+CREATE TABLE hot_test_partitioned (
+    id int,
+    partition_key int,
+    indexed_col int,
+    data text,
+    PRIMARY KEY (id, partition_key)
+) PARTITION BY RANGE (partition_key) USING heap;
+CREATE TABLE hot_test_part1 PARTITION OF hot_test_partitioned
+    FOR VALUES FROM (1) TO (100) USING heap;
+CREATE TABLE hot_test_part2 PARTITION OF hot_test_partitioned
+    FOR VALUES FROM (100) TO (200) USING heap;
+CREATE INDEX hot_test_part_idx ON hot_test_partitioned(indexed_col);
+INSERT INTO hot_test_partitioned VALUES (1, 50, 100, 'initial1');
+INSERT INTO hot_test_partitioned VALUES (2, 150, 200, 'initial2');
+-- Update in partition 1 (non-indexed column) - should be HOT
+UPDATE hot_test_partitioned SET data = 'updated1' WHERE id = 1;
+-- Update in partition 2 (non-indexed column) - should be HOT
+UPDATE hot_test_partitioned SET data = 'updated2' WHERE id = 2;
+SELECT get_hot_count('hot_test_part1');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+SELECT get_hot_count('hot_test_part2');
+ get_hot_count 
+---------------
+ (1,1)
+(1 row)
+
+-- Verify indexes work on partitions
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 100;
+ id 
+----
+  1
+(1 row)
+
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 200;
+ id 
+----
+  2
+(1 row)
+
+-- Update indexed column in partition - should NOT be HOT
+UPDATE hot_test_partitioned SET indexed_col = 150 WHERE id = 1;
+SELECT get_hot_count('hot_test_part1');
+ get_hot_count 
+---------------
+ (2,1)
+(1 row)
+
+-- Verify index was updated
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 150;
+ id 
+----
+  1
+(1 row)
+
+-- Cleanup
+DROP TABLE IF EXISTS hot_test;
+DROP TABLE IF EXISTS hot_test_partitioned CASCADE;
+DROP FUNCTION IF EXISTS has_hot_chain(text, tid);
+DROP FUNCTION IF EXISTS print_hot_chain(text, tid);
+DROP FUNCTION IF EXISTS heap_prune(text);
+NOTICE:  function heap_prune(text) does not exist, skipping
+DROP FUNCTION IF EXISTS get_hot_count(text);
+DROP EXTENSION pageinspect;
diff --git a/src/test/regress/expected/replica_identity_logging.out b/src/test/regress/expected/replica_identity_logging.out
new file mode 100644
index 00000000000..2096510b924
--- /dev/null
+++ b/src/test/regress/expected/replica_identity_logging.out
@@ -0,0 +1,396 @@
+--
+-- REPLICA_IDENTITY_LOGGING
+-- Test that replica identity keys are correctly extracted and logged for logical replication
+--
+-- This test verifies that the correct old key columns are included in WAL records
+-- for logical replication, based on the table's replica identity setting.
+--
+-- Clean up from prior runs
+DROP TABLE IF EXISTS repid_test CASCADE;
+NOTICE:  table "repid_test" does not exist, skipping
+-- Drop replication slot if it exists from prior run
+SELECT pg_drop_replication_slot('repid_test_slot') FROM pg_replication_slots WHERE slot_name = 'repid_test_slot';
+ pg_drop_replication_slot 
+--------------------------
+(0 rows)
+
+-- Enable logical decoding to verify what gets logged
+SELECT 'init' FROM pg_create_logical_replication_slot('repid_test_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- REPLICA IDENTITY DEFAULT (primary key columns only)
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    data text
+);
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+INSERT INTO repid_test VALUES (2, 200, 'initial');
+-- Advance slot to skip inserts (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                             data                                             
+----------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 indexed_col[integer]:100 data[text]:'initial'
+ table public.repid_test: INSERT: id[integer]:2 indexed_col[integer]:200 data[text]:'initial'
+(2 rows)
+
+-- Update non-key column - should log only id in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Update indexed non-key column - should still log only id in old key
+UPDATE repid_test SET indexed_col = 150 WHERE id = 2;
+-- Check logical decoding output - should see old key with only id
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                             data                                             
+----------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: id[integer]:1 indexed_col[integer]:100 data[text]:'updated'
+ table public.repid_test: UPDATE: id[integer]:2 indexed_col[integer]:150 data[text]:'initial'
+(2 rows)
+
+-- REPLICA IDENTITY FULL (all columns in old key)
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                             data                                             
+----------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 indexed_col[integer]:100 data[text]:'initial'
+(1 row)
+
+-- Update any column - should log ALL columns in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output - should see old key with all columns
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                                     data                                                                                     
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 indexed_col[integer]:100 data[text]:'initial' new-tuple: id[integer]:1 indexed_col[integer]:100 data[text]:'updated'
+(1 row)
+
+-- REPLICA IDENTITY USING INDEX (index columns only)
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int,
+    unique_col int UNIQUE NOT NULL,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_unique_col_key;
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+INSERT INTO repid_test VALUES (2, 200, 'initial');
+-- Advance slot past inserts (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                            data                                             
+---------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 unique_col[integer]:100 data[text]:'initial'
+ table public.repid_test: INSERT: id[integer]:2 unique_col[integer]:200 data[text]:'initial'
+(2 rows)
+
+-- Update non-indexed column - should log only unique_col in old key
+UPDATE repid_test SET data = 'updated' WHERE unique_col = 100;
+-- Update id (not in replica identity index) - should still log only unique_col
+UPDATE repid_test SET id = 10 WHERE unique_col = 200;
+-- Check logical decoding output - should see old key with only unique_col
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                             data                                             
+----------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: id[integer]:1 unique_col[integer]:100 data[text]:'updated'
+ table public.repid_test: UPDATE: id[integer]:10 unique_col[integer]:200 data[text]:'initial'
+(2 rows)
+
+-- REPLICA IDENTITY NOTHING (no old key logged)
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY NOTHING;
+INSERT INTO repid_test VALUES (1, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                data                                 
+---------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 data[text]:'initial'
+(1 row)
+
+-- Update - should log no old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output - should see update with no old key
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                data                                 
+---------------------------------------------------------------------
+ table public.repid_test: UPDATE: id[integer]:1 data[text]:'updated'
+(1 row)
+
+-- Multi-column index replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int,
+    col_a int NOT NULL,
+    col_b int NOT NULL,
+    col_c int,
+    data text,
+    UNIQUE (col_a, col_b)
+);
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_col_a_col_b_key;
+INSERT INTO repid_test VALUES (1, 10, 20, 30, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                           data                                                            
+---------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 col_a[integer]:10 col_b[integer]:20 col_c[integer]:30 data[text]:'initial'
+(1 row)
+
+-- Update non-indexed columns - should log col_a and col_b in old key
+UPDATE repid_test SET data = 'updated', col_c = 35 WHERE col_a = 10 AND col_b = 20;
+-- Check logical decoding output - should see old key with col_a and col_b
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                           data                                                            
+---------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: id[integer]:1 col_a[integer]:10 col_b[integer]:20 col_c[integer]:35 data[text]:'updated'
+(1 row)
+
+-- TOAST/external columns in replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    large_text text,
+    data text
+);
+-- REPLICA IDENTITY FULL includes toasted columns
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+-- Insert a large value (large enough to show the concept without excessive output)
+INSERT INTO repid_test VALUES (1, repeat('x', 100), 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                                                            data                                                                                             
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 large_text[text]:'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx' data[text]:'initial'
+(1 row)
+
+-- Update small column - should still log large_text column in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output - verify both old and new values are logged
+-- Just check that UPDATE happened and includes both large_text and data columns
+SELECT COUNT(*) as update_count FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%' AND data LIKE '%large_text%' AND data LIKE '%old-key%';
+ update_count 
+--------------
+            1
+(1 row)
+
+-- Test TOAST columns with REPLICA IDENTITY USING INDEX
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_large_text text NOT NULL,
+    data text
+);
+-- Create unique index on the large text column
+CREATE UNIQUE INDEX repid_test_large_idx ON repid_test(indexed_large_text);
+-- Set replica identity to use the index (not FULL)
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_large_idx;
+-- Insert a large value (large enough to be TOASTed)
+INSERT INTO repid_test VALUES (1, repeat('x', 100000), 'initial');
+-- Advance slot past inserts
+SELECT COUNT(*) FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+ count 
+-------
+     1
+(1 row)
+
+-- Update non-indexed column - should still log indexed_large_text in old key
+-- despite being unmodified because it is TOASTed and in the replica key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Verify TOASTed indexed column part of the relica identity is logged in old key
+SELECT COUNT(*) AS toasted_index_logged FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%' AND data LIKE '%indexed_large_text%';
+ toasted_index_logged 
+----------------------
+                    1
+(1 row)
+
+-- Dropped columns and replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    dropped_col int,
+    kept_col int,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+INSERT INTO repid_test VALUES (1, 999, 100, 'initial');
+-- Drop a column
+ALTER TABLE repid_test DROP COLUMN dropped_col;
+-- Advance slot past insert and DDL (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                        data                                                        
+--------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 dropped_col[integer]:999 kept_col[integer]:100 data[text]:'initial'
+(1 row)
+
+-- Update - old key should handle dropped column
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                                  data                                                                                  
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 kept_col[integer]:100 data[text]:'initial' new-tuple: id[integer]:1 kept_col[integer]:100 data[text]:'updated'
+(1 row)
+
+-- DEFAULT replica identity with composite primary key
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id_a int,
+    id_b int,
+    indexed_col int,
+    data text,
+    PRIMARY KEY (id_a, id_b)
+);
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+INSERT INTO repid_test VALUES (1, 10, 100, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                      data                                                       
+-----------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id_a[integer]:1 id_b[integer]:10 indexed_col[integer]:100 data[text]:'initial'
+(1 row)
+
+-- Update non-key columns - should log both id_a and id_b in old key
+UPDATE repid_test SET data = 'updated', indexed_col = 150 WHERE id_a = 1 AND id_b = 10;
+-- Check logical decoding output - should see old key with both primary key columns
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                      data                                                       
+-----------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: id_a[integer]:1 id_b[integer]:10 indexed_col[integer]:150 data[text]:'updated'
+(1 row)
+
+-- Expression index and replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    email text NOT NULL,
+    data text
+);
+-- Create unique expression index
+CREATE UNIQUE INDEX repid_test_lower_email_idx ON repid_test(lower(email));
+-- Cannot use expression index for replica identity (should fail)
+-- PostgreSQL requires the index to be on simple column references
+-- This should produce an error
+DO $$
+BEGIN
+    ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_lower_email_idx;
+    RAISE EXCEPTION 'Should have failed - expression indexes cannot be used for replica identity';
+EXCEPTION
+    WHEN feature_not_supported THEN
+        RAISE NOTICE 'Correctly rejected expression index for replica identity';
+END$$;
+NOTICE:  Correctly rejected expression index for replica identity
+-- Use FULL instead
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+INSERT INTO repid_test VALUES (1, '[email protected]', 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                data                                                
+----------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 email[text]:'[email protected]' data[text]:'initial'
+(1 row)
+
+-- Update - should log all columns in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                                           data                                                                                           
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 email[text]:'[email protected]' data[text]:'initial' new-tuple: id[integer]:1 email[text]:'[email protected]' data[text]:'updated'
+(1 row)
+
+-- NULL values in replica identity columns
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    nullable_col int,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+INSERT INTO repid_test VALUES (1, NULL, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                              data                                              
+------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 nullable_col[integer]:null data[text]:'initial'
+(1 row)
+
+-- Update - old key should include NULL value
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+-- Check logical decoding output - should see old key with NULL
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                         data                                                                          
+-------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 data[text]:'initial' new-tuple: id[integer]:1 nullable_col[integer]:null data[text]:'updated'
+(1 row)
+
+-- Generated columns and replica identity
+DROP TABLE repid_test;
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    base_col int,
+    generated_col int GENERATED ALWAYS AS (base_col * 2) STORED,
+    data text
+);
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+INSERT INTO repid_test (id, base_col, data) VALUES (1, 50, 'initial');
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+                                                        data                                                         
+---------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: INSERT: id[integer]:1 base_col[integer]:50 generated_col[integer]:100 data[text]:'initial'
+(1 row)
+
+-- Update base_col - generated_col will change automatically
+UPDATE repid_test SET base_col = 60 WHERE id = 1;
+-- Check logical decoding output - should include old generated_col value
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+                                                                                                            data                                                                                                            
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ table public.repid_test: UPDATE: old-key: id[integer]:1 base_col[integer]:50 generated_col[integer]:100 data[text]:'initial' new-tuple: id[integer]:1 base_col[integer]:60 generated_col[integer]:120 data[text]:'initial'
+(1 row)
+
+-- Cleanup
+SELECT pg_drop_replication_slot('repid_test_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
+DROP TABLE repid_test;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 549e9b2d7be..01ed43eba18 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -137,6 +137,13 @@ test: event_trigger_login
 # this test also uses event triggers, so likewise run it by itself
 test: fast_default
 
+# ----------
+# HOT updates and replica identity logging tests
+# Run these sequentially to avoid logical replication slot interference
+# ----------
+test: hot_updates
+test: replica_identity_logging
+
 # run tablespace test at the end because it drops the tablespace created during
 # setup that other tests may use.
 test: tablespace
diff --git a/src/test/regress/sql/hot_updates.sql b/src/test/regress/sql/hot_updates.sql
new file mode 100644
index 00000000000..7030f4fc6db
--- /dev/null
+++ b/src/test/regress/sql/hot_updates.sql
@@ -0,0 +1,553 @@
+--
+-- HOT_UPDATES
+-- Test Heap-Only Tuple (HOT) update decisions
+--
+-- This test systematically verifies that HOT updates are used when appropriate
+-- and avoided when necessary (e.g., when indexed columns are modified).
+--
+-- We use multiple validation methods:
+-- 1. Index verification (index still works = proves no index update for HOT)
+-- 2. Statistics functions (pg_stat_get_tuples_hot_updated)
+-- 3. pageinspect extension for HOT chain examination
+--
+
+-- Load required extensions
+CREATE EXTENSION IF NOT EXISTS pageinspect;
+
+-- Clean up from prior runs
+DROP TABLE IF EXISTS hot_test CASCADE;
+
+-- Function to get HOT update count
+CREATE OR REPLACE FUNCTION get_hot_count(rel_name text)
+RETURNS TABLE (
+    updates BIGINT,
+    hot BIGINT
+) AS $$
+DECLARE
+  rel_oid oid;
+BEGIN
+  rel_oid := rel_name::regclass::oid;
+  PERFORM pg_stat_force_next_flush();
+
+  updates := COALESCE(pg_stat_get_tuples_updated(rel_oid), 0) +
+             COALESCE(pg_stat_get_xact_tuples_updated(rel_oid), 0);
+  hot := COALESCE(pg_stat_get_tuples_hot_updated(rel_oid), 0) +
+         COALESCE(pg_stat_get_xact_tuples_hot_updated(rel_oid), 0);
+
+  RETURN NEXT;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Check if a tuple is part of a HOT chain (has a predecessor on same page)
+CREATE OR REPLACE FUNCTION has_hot_chain(rel_name text, target_ctid tid)
+RETURNS boolean AS $$
+DECLARE
+  block_num int;
+  page_item record;
+BEGIN
+  block_num := (target_ctid::text::point)[0]::int;
+
+  -- Look for a different tuple on the same page that points to our target tuple
+  FOR page_item IN
+    SELECT lp, lp_flags, t_ctid
+    FROM heap_page_items(get_raw_page(rel_name, block_num))
+    WHERE lp_flags = 1
+      AND t_ctid IS NOT NULL
+      AND t_ctid = target_ctid
+      AND ('(' || block_num::text || ',' || lp::text || ')')::tid != target_ctid
+  LOOP
+    RETURN true;
+  END LOOP;
+
+  RETURN false;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Print the HOT chain starting from a given tuple
+CREATE OR REPLACE FUNCTION print_hot_chain(rel_name text, start_ctid tid)
+RETURNS TABLE(chain_position int, ctid tid, lp_flags text, t_ctid tid, chain_end boolean) AS
+$$
+#variable_conflict use_column
+DECLARE
+  block_num int;
+  line_ptr int;
+  current_ctid tid := start_ctid;
+  next_ctid tid;
+  position int := 0;
+  max_iterations int := 100;
+  page_item record;
+  found_predecessor boolean := false;
+  flags_name text;
+BEGIN
+  block_num := (start_ctid::text::point)[0]::int;
+
+  -- Find the predecessor (old tuple pointing to our start_ctid)
+  FOR page_item IN
+    SELECT lp, lp_flags, t_ctid
+    FROM heap_page_items(get_raw_page(rel_name, block_num))
+    WHERE lp_flags = 1
+      AND t_ctid = start_ctid
+  LOOP
+    current_ctid := ('(' || block_num::text || ',' || page_item.lp::text || ')')::tid;
+    found_predecessor := true;
+    EXIT;
+  END LOOP;
+
+  -- If no predecessor found, start with the given ctid
+  IF NOT found_predecessor THEN
+    current_ctid := start_ctid;
+  END IF;
+
+  -- Follow the chain forward
+  WHILE position < max_iterations LOOP
+    line_ptr := (current_ctid::text::point)[1]::int;
+
+    FOR page_item IN
+      SELECT lp, lp_flags, t_ctid
+      FROM heap_page_items(get_raw_page(rel_name, block_num))
+      WHERE lp = line_ptr
+    LOOP
+      -- Map lp_flags to names
+      flags_name := CASE page_item.lp_flags
+        WHEN 0 THEN 'unused (0)'
+        WHEN 1 THEN 'normal (1)'
+        WHEN 2 THEN 'redirect (2)'
+        WHEN 3 THEN 'dead (3)'
+        ELSE 'unknown (' || page_item.lp_flags::text || ')'
+      END;
+
+      RETURN QUERY SELECT
+        position,
+        current_ctid,
+        flags_name,
+        page_item.t_ctid,
+        (page_item.t_ctid IS NULL OR page_item.t_ctid = current_ctid)::boolean
+      ;
+
+      IF page_item.t_ctid IS NULL OR page_item.t_ctid = current_ctid THEN
+        RETURN;
+      END IF;
+
+      next_ctid := page_item.t_ctid;
+
+      IF (next_ctid::text::point)[0]::int != block_num THEN
+        RETURN;
+      END IF;
+
+      current_ctid := next_ctid;
+      position := position + 1;
+    END LOOP;
+
+    IF position = 0 THEN
+      RETURN;
+    END IF;
+  END LOOP;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Trigger page pruning via table scan
+CREATE OR REPLACE FUNCTION heap_prune_page(rel_name text, target_ctid tid)
+RETURNS void AS $$
+DECLARE
+  block_num int;
+BEGIN
+  -- Extract block number from ctid
+  block_num := (target_ctid::text::point)[0]::int;
+
+  -- Scan only the specific page to trigger pruning on that page
+  EXECUTE 'SELECT COUNT(*) FROM ' || quote_ident(rel_name) ||
+           ' WHERE ctid >= (' || block_num || ',0) AND ctid < (' || (block_num + 1) || ',0)';
+END;
+$$ LANGUAGE plpgsql;
+
+-- Basic HOT update (update non-indexed column)
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    non_indexed_col text
+) USING heap;
+
+CREATE INDEX hot_test_indexed_idx ON hot_test(indexed_col);
+
+INSERT INTO hot_test VALUES (1, 100, 'initial');
+INSERT INTO hot_test VALUES (2, 200, 'initial');
+INSERT INTO hot_test VALUES (3, 300, 'initial');
+
+-- Get baseline and initial ctid
+WITH initial_state AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'Initial State' AS phase,
+  initial_state.ctid,
+  (get_hot_count('hot_test')).updates,
+  (get_hot_count('hot_test')).hot
+FROM initial_state;
+
+-- Should be HOT updates (only non-indexed column modified)
+UPDATE hot_test SET non_indexed_col = 'updated1' WHERE id = 1;
+UPDATE hot_test SET non_indexed_col = 'updated2' WHERE id = 2;
+UPDATE hot_test SET non_indexed_col = 'updated3' WHERE id = 3;
+
+-- Verify HOT updates occurred
+SELECT
+  'After Updates' AS phase,
+  (get_hot_count('hot_test')).updates,
+  (get_hot_count('hot_test')).hot;
+
+-- Dump the HOT chain before pruning
+WITH current_tuple AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'Before VACUUM' AS phase,
+  has_hot_chain('hot_test', current_tuple.ctid) AS has_chain,
+  chain_position,
+  print_hot_chain.ctid,
+  lp_flags,
+  t_ctid
+FROM current_tuple,
+LATERAL print_hot_chain('hot_test', current_tuple.ctid);
+
+SET SESSION enable_seqscan = OFF;
+SET SESSION enable_bitmapscan = OFF;
+
+-- Verify indexes still work
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 100;
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 100;
+
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 200;
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 200;
+
+-- Vacuum the relation, expect the HOT chain to collapse
+VACUUM hot_test;
+
+-- Show that there is no chain after vacuum
+WITH current_tuple AS (
+  SELECT ctid FROM hot_test WHERE id = 1
+)
+SELECT
+  'After VACUUM' AS phase,
+  has_hot_chain('hot_test', current_tuple.ctid) AS has_chain,
+  chain_position,
+  print_hot_chain.ctid,
+  lp_flags,
+  t_ctid
+FROM current_tuple,
+LATERAL print_hot_chain('hot_test', current_tuple.ctid);
+
+-- Non-HOT update (update indexed column)
+UPDATE hot_test SET indexed_col = 150 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Verify index was updated (new value findable)
+EXPLAIN (COSTS OFF) SELECT id, indexed_col FROM hot_test WHERE indexed_col = 150;
+SELECT id, indexed_col FROM hot_test WHERE indexed_col = 150;
+
+-- Verify old value no longer in index
+EXPLAIN (COSTS OFF) SELECT id FROM hot_test WHERE indexed_col = 100;
+SELECT id FROM hot_test WHERE indexed_col = 100;
+
+SET SESSION enable_seqscan = ON;
+SET SESSION enable_bitmapscan = ON;
+
+-- All-or-none property: updating one indexed column requires ALL index updates
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    col_a int,
+    col_b int,
+    col_c int,
+    non_indexed text
+) USING heap;
+
+CREATE INDEX hot_test_a_idx ON hot_test(col_a);
+CREATE INDEX hot_test_b_idx ON hot_test(col_b);
+CREATE INDEX hot_test_c_idx ON hot_test(col_c);
+
+INSERT INTO hot_test VALUES (1, 10, 20, 30, 'initial');
+
+-- Update only col_a - should NOT be HOT because an indexed column changed
+-- This means ALL indexes must be updated (all-or-none property)
+UPDATE hot_test SET col_a = 15 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Verify all three indexes still work correctly
+SELECT id, col_a FROM hot_test WHERE col_a = 15;  -- updated index
+SELECT id, col_b FROM hot_test WHERE col_b = 20;  -- unchanged index
+SELECT id, col_c FROM hot_test WHERE col_c = 30;  -- unchanged index
+
+-- Now update only non-indexed column - should be HOT
+UPDATE hot_test SET non_indexed = 'updated';
+SELECT get_hot_count('hot_test');
+
+-- Verify all indexes still work
+SELECT id FROM hot_test WHERE col_a = 15 AND col_b = 20 AND col_c = 30;
+
+-- Partial index: both old and new outside predicate (conservative = non-HOT)
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    status text,
+    data text
+) USING heap;
+
+-- Partial index only covers status = 'active'
+CREATE INDEX hot_test_active_idx ON hot_test(status) WHERE status = 'active';
+
+INSERT INTO hot_test VALUES (1, 'active', 'data1');
+INSERT INTO hot_test VALUES (2, 'inactive', 'data2');
+INSERT INTO hot_test VALUES (3, 'deleted', 'data3');
+
+-- Update non-indexed column on 'active' row (in predicate, status unchanged)
+-- Should be HOT
+UPDATE hot_test SET data = 'updated1' WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Update non-indexed column on 'inactive' row (outside predicate)
+-- Should be HOT
+UPDATE hot_test SET data = 'updated2' WHERE id = 2;
+SELECT get_hot_count('hot_test');
+
+-- Update status from 'inactive' to 'deleted' (both outside predicate)
+-- PostgreSQL is conservative: heap insert happens before predicate check
+-- So this is NON-HOT even though both values are outside predicate
+UPDATE hot_test SET status = 'deleted' WHERE id = 2;
+SELECT get_hot_count('hot_test');
+
+-- Verify index still works for 'active' rows
+SELECT id, status FROM hot_test WHERE status = 'active';
+
+-- Expression index with JSONB
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    metadata jsonb
+) USING heap;
+
+-- Index on JSONB expression
+CREATE INDEX hot_test_user_id_idx ON hot_test((metadata->>'user_id'));
+CREATE INDEX hot_test_status_idx ON hot_test((metadata->>'status'));
+
+INSERT INTO hot_test VALUES (1, '{"user_id": "123", "status": "active"}'::jsonb);
+
+-- Update JSONB field used in expression index to the same value,
+-- this will be HOT because the entire JSONB field is observed to
+-- be unchanged.
+UPDATE hot_test SET metadata = jsonb_set(metadata, '{user_id}', '"123"')
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Update JSONB field that is no used in any index to some new value, this
+-- will prevent a HOT update despite not changing what is used when forming
+-- the index key, this is counter intuitive and causes index bloat as well
+-- as slows down updates on JSONB data as any change will trigger all
+-- indexes to be updated.
+UPDATE hot_test SET metadata = jsonb_set(metadata, '{food}', '"apple"')
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Use a few different methods for mutating JSONB data, but don't modify
+-- indexed portions of the document.  None of these will be HOT.
+UPDATE hot_test SET metadata = jsonb_set(
+  jsonb_set(metadata, '{food}', '"pear"'),
+  '{timestamp}',
+  to_jsonb(now())
+)
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+UPDATE hot_test
+SET metadata = metadata || '{"user_id": "123", "timestamp": "2024-01-01"}'::jsonb
+WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+UPDATE hot_test SET metadata =
+  jsonb_set(
+    jsonb_set(metadata, '{user_id}', '"123"'),
+    '{fruit}',
+    '"plumb"'
+  );
+SELECT get_hot_count('hot_test');
+
+UPDATE hot_test SET metadata = metadata || jsonb_build_object(
+  'user_id', '123',
+  'timestamp', now(),
+  'fruit', 'honeydew'
+);
+SELECT get_hot_count('hot_test');
+
+-- Only BRIN (summarizing) indexes on non-PK columns
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    ts timestamp,
+    value int,
+    brin_col int
+) USING heap;
+
+CREATE INDEX hot_test_ts_brin ON hot_test USING brin(ts);
+CREATE INDEX hot_test_brin_col_brin ON hot_test USING brin(brin_col);
+
+INSERT INTO hot_test VALUES (1, '2024-01-01', 100, 1000);
+
+-- Update both BRIN columns - should still be HOT (only summarizing indexes)
+UPDATE hot_test SET ts = '2024-01-02', brin_col = 2000 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- Verify BRIN indexes work
+SELECT id FROM hot_test WHERE ts >= '2024-01-02';
+SELECT id FROM hot_test WHERE brin_col >= 2000;
+
+-- Update non-indexed column - should also be HOT
+UPDATE hot_test SET value = 200 WHERE id = 1;
+SELECT get_hot_count('hot_test');
+
+-- TOAST and HOT: TOASTed columns can participate in HOT
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    large_text text,
+    small_text text
+) USING heap;
+
+CREATE INDEX hot_test_idx ON hot_test(indexed_col);
+
+-- Insert row with TOASTed column (> 2KB)
+INSERT INTO hot_test VALUES (1, 100, repeat('x', 3000), 'small');
+
+-- Update non-indexed, non-TOASTed column - should be HOT
+UPDATE hot_test SET small_text = 'updated';
+SELECT get_hot_count('hot_test');
+
+-- Update TOASTed column - should be HOT if indexed column unchanged
+UPDATE hot_test SET large_text = repeat('y', 3000);
+SELECT get_hot_count('hot_test');
+
+-- Verify index still works
+SELECT id FROM hot_test WHERE indexed_col = 100;
+
+-- Update indexed column - should NOT be HOT
+UPDATE hot_test SET indexed_col = 200;
+SELECT get_hot_count('hot_test');
+
+-- Verify index was updated
+SELECT id FROM hot_test WHERE indexed_col = 200;
+
+-- Unique constraint (unique index) behaves like regular index
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    unique_col int UNIQUE,
+    data text
+) USING heap;
+
+INSERT INTO hot_test VALUES (1, 100, 'data1');
+INSERT INTO hot_test VALUES (2, 200, 'data2');
+
+-- Update data (non-indexed) - should be HOT
+UPDATE hot_test SET data = 'updated';
+SELECT get_hot_count('hot_test');
+
+-- Verify unique constraint still enforced
+SELECT id, unique_col, data FROM hot_test ORDER BY id;
+
+-- This should fail (unique violation)
+UPDATE hot_test SET unique_col = 100 WHERE id = 2;
+
+-- Multi-column index: any column change = non-HOT
+DROP TABLE hot_test;
+
+CREATE TABLE hot_test (
+    id int PRIMARY KEY,
+    col_a int,
+    col_b int,
+    col_c int,
+    data text
+) USING heap;
+
+CREATE INDEX hot_test_ab_idx ON hot_test(col_a, col_b);
+
+INSERT INTO hot_test VALUES (1, 10, 20, 30, 'data');
+
+-- Update col_a (part of multi-column index) - should NOT be HOT
+UPDATE hot_test SET col_a = 15;
+SELECT get_hot_count('hot_test');
+
+-- Reset
+UPDATE hot_test SET col_a = 10;
+
+-- Update col_b (part of multi-column index) - should NOT be HOT
+UPDATE hot_test SET col_b = 25;
+SELECT get_hot_count('hot_test');
+
+-- Reset
+UPDATE hot_test SET col_b = 20;
+SELECT get_hot_count('hot_test');
+
+-- Update col_c (not indexed) - should be HOT
+UPDATE hot_test SET col_c = 35;
+
+-- Update data (not indexed) - should be HOT
+UPDATE hot_test SET data = 'updated';
+SELECT get_hot_count('hot_test');
+
+-- Verify multi-column index works
+SELECT id FROM hot_test WHERE col_a = 10 AND col_b = 20;
+
+-- Partitioned tables: HOT works within partitions
+DROP TABLE IF EXISTS hot_test_partitioned CASCADE;
+
+CREATE TABLE hot_test_partitioned (
+    id int,
+    partition_key int,
+    indexed_col int,
+    data text,
+    PRIMARY KEY (id, partition_key)
+) PARTITION BY RANGE (partition_key) USING heap;
+
+CREATE TABLE hot_test_part1 PARTITION OF hot_test_partitioned
+    FOR VALUES FROM (1) TO (100) USING heap;
+CREATE TABLE hot_test_part2 PARTITION OF hot_test_partitioned
+    FOR VALUES FROM (100) TO (200) USING heap;
+
+CREATE INDEX hot_test_part_idx ON hot_test_partitioned(indexed_col);
+
+INSERT INTO hot_test_partitioned VALUES (1, 50, 100, 'initial1');
+INSERT INTO hot_test_partitioned VALUES (2, 150, 200, 'initial2');
+
+-- Update in partition 1 (non-indexed column) - should be HOT
+UPDATE hot_test_partitioned SET data = 'updated1' WHERE id = 1;
+
+-- Update in partition 2 (non-indexed column) - should be HOT
+UPDATE hot_test_partitioned SET data = 'updated2' WHERE id = 2;
+
+SELECT get_hot_count('hot_test_part1');
+SELECT get_hot_count('hot_test_part2');
+
+-- Verify indexes work on partitions
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 100;
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 200;
+
+-- Update indexed column in partition - should NOT be HOT
+UPDATE hot_test_partitioned SET indexed_col = 150 WHERE id = 1;
+SELECT get_hot_count('hot_test_part1');
+
+-- Verify index was updated
+SELECT id FROM hot_test_partitioned WHERE indexed_col = 150;
+
+-- Cleanup
+DROP TABLE IF EXISTS hot_test;
+DROP TABLE IF EXISTS hot_test_partitioned CASCADE;
+DROP FUNCTION IF EXISTS has_hot_chain(text, tid);
+DROP FUNCTION IF EXISTS print_hot_chain(text, tid);
+DROP FUNCTION IF EXISTS heap_prune(text);
+DROP FUNCTION IF EXISTS get_hot_count(text);
+DROP EXTENSION pageinspect;
diff --git a/src/test/regress/sql/replica_identity_logging.sql b/src/test/regress/sql/replica_identity_logging.sql
new file mode 100644
index 00000000000..4c45e76e15d
--- /dev/null
+++ b/src/test/regress/sql/replica_identity_logging.sql
@@ -0,0 +1,349 @@
+--
+-- REPLICA_IDENTITY_LOGGING
+-- Test that replica identity keys are correctly extracted and logged for logical replication
+--
+-- This test verifies that the correct old key columns are included in WAL records
+-- for logical replication, based on the table's replica identity setting.
+--
+
+-- Clean up from prior runs
+DROP TABLE IF EXISTS repid_test CASCADE;
+
+-- Drop replication slot if it exists from prior run
+SELECT pg_drop_replication_slot('repid_test_slot') FROM pg_replication_slots WHERE slot_name = 'repid_test_slot';
+
+-- Enable logical decoding to verify what gets logged
+SELECT 'init' FROM pg_create_logical_replication_slot('repid_test_slot', 'test_decoding');
+
+-- REPLICA IDENTITY DEFAULT (primary key columns only)
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    data text
+);
+
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+INSERT INTO repid_test VALUES (2, 200, 'initial');
+
+-- Advance slot to skip inserts (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-key column - should log only id in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Update indexed non-key column - should still log only id in old key
+UPDATE repid_test SET indexed_col = 150 WHERE id = 2;
+
+-- Check logical decoding output - should see old key with only id
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- REPLICA IDENTITY FULL (all columns in old key)
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_col int,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update any column - should log ALL columns in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output - should see old key with all columns
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- REPLICA IDENTITY USING INDEX (index columns only)
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int,
+    unique_col int UNIQUE NOT NULL,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_unique_col_key;
+
+INSERT INTO repid_test VALUES (1, 100, 'initial');
+INSERT INTO repid_test VALUES (2, 200, 'initial');
+
+-- Advance slot past inserts (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-indexed column - should log only unique_col in old key
+UPDATE repid_test SET data = 'updated' WHERE unique_col = 100;
+
+-- Update id (not in replica identity index) - should still log only unique_col
+UPDATE repid_test SET id = 10 WHERE unique_col = 200;
+
+-- Check logical decoding output - should see old key with only unique_col
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- REPLICA IDENTITY NOTHING (no old key logged)
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY NOTHING;
+
+INSERT INTO repid_test VALUES (1, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update - should log no old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output - should see update with no old key
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- Multi-column index replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int,
+    col_a int NOT NULL,
+    col_b int NOT NULL,
+    col_c int,
+    data text,
+    UNIQUE (col_a, col_b)
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_col_a_col_b_key;
+
+INSERT INTO repid_test VALUES (1, 10, 20, 30, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-indexed columns - should log col_a and col_b in old key
+UPDATE repid_test SET data = 'updated', col_c = 35 WHERE col_a = 10 AND col_b = 20;
+
+-- Check logical decoding output - should see old key with col_a and col_b
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- TOAST/external columns in replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    large_text text,
+    data text
+);
+
+-- REPLICA IDENTITY FULL includes toasted columns
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+-- Insert a large value (large enough to show the concept without excessive output)
+INSERT INTO repid_test VALUES (1, repeat('x', 100), 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update small column - should still log large_text column in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output - verify both old and new values are logged
+-- Just check that UPDATE happened and includes both large_text and data columns
+SELECT COUNT(*) as update_count FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%' AND data LIKE '%large_text%' AND data LIKE '%old-key%';
+
+-- Test TOAST columns with REPLICA IDENTITY USING INDEX
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    indexed_large_text text NOT NULL,
+    data text
+);
+
+-- Create unique index on the large text column
+CREATE UNIQUE INDEX repid_test_large_idx ON repid_test(indexed_large_text);
+
+-- Set replica identity to use the index (not FULL)
+ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_large_idx;
+
+-- Insert a large value (large enough to be TOASTed)
+INSERT INTO repid_test VALUES (1, repeat('x', 100000), 'initial');
+
+-- Advance slot past inserts
+SELECT COUNT(*) FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-indexed column - should still log indexed_large_text in old key
+-- despite being unmodified because it is TOASTed and in the replica key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Verify TOASTed indexed column part of the relica identity is logged in old key
+SELECT COUNT(*) AS toasted_index_logged FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%' AND data LIKE '%indexed_large_text%';
+-- Dropped columns and replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    dropped_col int,
+    kept_col int,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+INSERT INTO repid_test VALUES (1, 999, 100, 'initial');
+
+-- Drop a column
+ALTER TABLE repid_test DROP COLUMN dropped_col;
+
+-- Advance slot past insert and DDL (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update - old key should handle dropped column
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- DEFAULT replica identity with composite primary key
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id_a int,
+    id_b int,
+    indexed_col int,
+    data text,
+    PRIMARY KEY (id_a, id_b)
+);
+
+CREATE INDEX repid_test_idx ON repid_test(indexed_col);
+
+INSERT INTO repid_test VALUES (1, 10, 100, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update non-key columns - should log both id_a and id_b in old key
+UPDATE repid_test SET data = 'updated', indexed_col = 150 WHERE id_a = 1 AND id_b = 10;
+
+-- Check logical decoding output - should see old key with both primary key columns
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- Expression index and replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    email text NOT NULL,
+    data text
+);
+
+-- Create unique expression index
+CREATE UNIQUE INDEX repid_test_lower_email_idx ON repid_test(lower(email));
+
+-- Cannot use expression index for replica identity (should fail)
+-- PostgreSQL requires the index to be on simple column references
+-- This should produce an error
+DO $$
+BEGIN
+    ALTER TABLE repid_test REPLICA IDENTITY USING INDEX repid_test_lower_email_idx;
+    RAISE EXCEPTION 'Should have failed - expression indexes cannot be used for replica identity';
+EXCEPTION
+    WHEN feature_not_supported THEN
+        RAISE NOTICE 'Correctly rejected expression index for replica identity';
+END$$;
+
+-- Use FULL instead
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+INSERT INTO repid_test VALUES (1, '[email protected]', 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update - should log all columns in old key
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- NULL values in replica identity columns
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    nullable_col int,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+INSERT INTO repid_test VALUES (1, NULL, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update - old key should include NULL value
+UPDATE repid_test SET data = 'updated' WHERE id = 1;
+
+-- Check logical decoding output - should see old key with NULL
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- Generated columns and replica identity
+DROP TABLE repid_test;
+
+CREATE TABLE repid_test (
+    id int PRIMARY KEY,
+    base_col int,
+    generated_col int GENERATED ALWAYS AS (base_col * 2) STORED,
+    data text
+);
+
+ALTER TABLE repid_test REPLICA IDENTITY FULL;
+
+INSERT INTO repid_test (id, base_col, data) VALUES (1, 50, 'initial');
+
+-- Advance slot past insert (filter out transaction boundaries for stability)
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data NOT LIKE 'BEGIN %' AND data NOT LIKE 'COMMIT %';
+
+-- Update base_col - generated_col will change automatically
+UPDATE repid_test SET base_col = 60 WHERE id = 1;
+
+-- Check logical decoding output - should include old generated_col value
+SELECT data FROM pg_logical_slot_get_changes('repid_test_slot', NULL, NULL)
+WHERE data LIKE '%UPDATE%';
+
+-- Cleanup
+SELECT pg_drop_replication_slot('repid_test_slot');
+DROP TABLE repid_test;
-- 
2.51.2



  [text/x-patch] v33-0002-Idenfity-modified-indexed-attributes-in-the-exec.patch (59.5K, 3-v33-0002-Idenfity-modified-indexed-attributes-in-the-exec.patch)
  download | inline diff:
From 0ef7260343415f98201108778d0793af5ebcf1b3 Mon Sep 17 00:00:00 2001
From: Greg Burd <[email protected]>
Date: Sun, 2 Nov 2025 11:36:20 -0500
Subject: [PATCH v33 2/2] Idenfity modified indexed attributes in the executor
 on UPDATE

Refactor executor update logic to determine which indexed columns have
actually changed during an UPDATE operation rather than leaving this up
to HeapDetermineColumnsInfo() in heap_update().  Finding this set of
attributes is not heap-specific, but more general to all table AMs and
having this information in the executor could inform other decisions
about when index inserts are required and when they are not regardless
of the table AM's MVCC implementation strategy.

The heap-only tuple decision (HOT) in heap functions as it always has,
but the determination of the "modified indexed attributes" (mix_attrs,
was known as modified_attrs) now happens outside the buffer lock and can
inform other decisions unrelated to heap.

ExecUpdateModIdxAttrs() replaces HeapDeterminesColumnsInfo() and is
called before table_tuple_update() crucially without the need for an
exclusive buffer lock on the page that holds the tuple being updated.
This reduces the time the lock is held later within
heapam_tuple_update() and heap_update().

ExecUpdateModIdxAttrs() in turn uses ExecCompareSlotAttrs() to identify
which attributes have changed and then intersects that with the set of
indexed attributes to identify the modified indexed set, the mix_attrs.

Besides identifying the set of modified indexed attributes
HeapDetermineColumnsInfo() was also responsible for part of the logic
involed in the decision to include the replica identity key or not.
This moved into heap_update() and out of HeapDetermineColumnsInfo()
which has been renamed to HeapUpdateModIdxAttrs() as it is still
required within simple_heap_update() to be able to identify mix_attrs
given only an old TID and a new HeapTuple.

Updates stemming from logical replication also use the new
ExecUpdateModIdxAttrs() in ExecSimpleRelationUpdate().

This patch also introduces a few helper functions: HeapUpdateHotAllowable(),
HeapUpdateDetermineLockmode().  These are used in both heap_update() and
simple_heap_update().

The heap_update() function is called now with lockmode pre-determined
and a booleaning indicating if the update allows HOT updates or not.
If during heap_update() the new tuple will fit on the same page and that
boolean is true, the update is HOT.  None of the logic related to when
HOT is allowed has changed.

Triggers are free to use heap_modify_tuple() and update attributes not
found in the UPDATE statement or triggers that fire due to an UPDATE.
When that happens the executor has no knowledge of those changes.  This
forced HeapDetermineColumnsInfo() to scan all indexed attributes on a
relation rather than only the intersection of indexed and those
identified by ExecGetAllUpdatedCols().  This occurs in at least one test
that uses the tsvector_update_trigger() function (tsearch.sql).
ExecBRUpdateTriggers() has been changed to identify changes to indexed
columns not found by ExecGetAllUpdateCols() and add those attributes to
ri_extraUpdatedCols.

Three tests were adjusted to avoid instability due to tuple ordering
during heap page scans.  This avoids nondeterministic results.
---
 src/backend/access/heap/heapam.c              | 481 +++++++++++-------
 src/backend/access/heap/heapam_handler.c      |  32 +-
 src/backend/access/table/tableam.c            |   5 +-
 src/backend/commands/trigger.c                |  20 +-
 src/backend/executor/execReplication.c        |   7 +-
 src/backend/executor/execTuples.c             |  78 +++
 src/backend/executor/nodeModifyTable.c        |  93 +++-
 src/backend/utils/cache/relcache.c            |  44 +-
 src/include/access/heapam.h                   |  13 +-
 src/include/access/tableam.h                  |   8 +-
 src/include/executor/executor.h               |   9 +
 src/include/utils/rel.h                       |   2 +-
 src/include/utils/relcache.h                  |   2 +-
 .../regress/expected/generated_virtual.out    |   2 +-
 src/test/regress/expected/triggers.out        |  16 +-
 src/test/regress/expected/updatable_views.out |   4 +-
 src/test/regress/sql/generated_virtual.sql    |   2 +-
 src/test/regress/sql/triggers.sql             |   4 +-
 src/test/regress/sql/updatable_views.sql      |   2 +-
 19 files changed, 591 insertions(+), 233 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index a231563f0df..18961d714a3 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -37,14 +37,19 @@
 #include "access/multixact.h"
 #include "access/subtrans.h"
 #include "access/syncscan.h"
+#include "access/sysattr.h"
+#include "access/tableam.h"
 #include "access/valid.h"
 #include "access/visibilitymap.h"
 #include "access/xloginsert.h"
 #include "catalog/pg_database.h"
 #include "catalog/pg_database_d.h"
 #include "commands/vacuum.h"
+#include "executor/tuptable.h"
+#include "nodes/lockoptions.h"
 #include "pgstat.h"
 #include "port/pg_bitutils.h"
+#include "storage/buf.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -52,6 +57,7 @@
 #include "utils/datum.h"
 #include "utils/injection_point.h"
 #include "utils/inval.h"
+#include "utils/relcache.h"
 #include "utils/spccache.h"
 #include "utils/syscache.h"
 
@@ -68,11 +74,8 @@ static void check_lock_if_inplace_updateable_rel(Relation relation,
 												 HeapTuple newtup);
 static void check_inplace_rel_lock(HeapTuple oldtup);
 #endif
-static Bitmapset *HeapDetermineColumnsInfo(Relation relation,
-										   Bitmapset *interesting_cols,
-										   Bitmapset *external_cols,
-										   HeapTuple oldtup, HeapTuple newtup,
-										   bool *has_external);
+static Bitmapset *HeapUpdateModIdxAttrs(Relation relation,
+										HeapTuple oldtup, HeapTuple newtup);
 static bool heap_acquire_tuplock(Relation relation, const ItemPointerData *tid,
 								 LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool *have_tuple_lock);
@@ -3302,7 +3305,7 @@ simple_heap_delete(Relation relation, const ItemPointerData *tid)
  *	heap_update - replace a tuple
  *
  * See table_tuple_update() for an explanation of the parameters, except that
- * this routine directly takes a tuple rather than a slot.
+ * this routine directly takes a heap tuple rather than a slot.
  *
  * In the failure cases, the routine fills *tmfd with the tuple's t_ctid,
  * t_xmax (resolving a possible MultiXact, if necessary), and t_cmax (the last
@@ -3312,17 +3315,13 @@ simple_heap_delete(Relation relation, const ItemPointerData *tid)
 TM_Result
 heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			TM_FailureData *tmfd, LockTupleMode *lockmode,
-			TU_UpdateIndexes *update_indexes)
+			TM_FailureData *tmfd, const LockTupleMode lockmode,
+			const Bitmapset *mix_attrs, const bool hot_allowed)
 {
 	TM_Result	result;
 	TransactionId xid = GetCurrentTransactionId();
-	Bitmapset  *hot_attrs;
-	Bitmapset  *sum_attrs;
-	Bitmapset  *key_attrs;
-	Bitmapset  *id_attrs;
-	Bitmapset  *interesting_attrs;
-	Bitmapset  *modified_attrs;
+	Bitmapset  *idx_attrs,
+			   *rid_attrs;
 	ItemId		lp;
 	HeapTupleData oldtup;
 	HeapTuple	heaptup;
@@ -3341,13 +3340,12 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 	bool		have_tuple_lock = false;
 	bool		iscombo;
 	bool		use_hot_update = false;
-	bool		summarized_update = false;
 	bool		key_intact;
 	bool		all_visible_cleared = false;
 	bool		all_visible_cleared_new = false;
 	bool		checked_lockers;
 	bool		locker_remains;
-	bool		id_has_external = false;
+	bool		rep_id_key_required = false;
 	TransactionId xmax_new_tuple,
 				xmax_old_tuple;
 	uint16		infomask_old_tuple,
@@ -3378,33 +3376,14 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 #endif
 
 	/*
-	 * Fetch the list of attributes to be checked for various operations.
-	 *
-	 * For HOT considerations, this is wasted effort if we fail to update or
-	 * have to put the new tuple on a different page.  But we must compute the
-	 * list before obtaining buffer lock --- in the worst case, if we are
-	 * doing an update on one of the relevant system catalogs, we could
-	 * deadlock if we try to fetch the list later.  In any case, the relcache
-	 * caches the data so this is usually pretty cheap.
-	 *
-	 * We also need columns used by the replica identity and columns that are
-	 * considered the "key" of rows in the table.
+	 * Fetch the attributes used across all indexes on this relation as well
+	 * as the replica identity and columns.
 	 *
-	 * Note that we get copies of each bitmap, so we need not worry about
-	 * relcache flush happening midway through.
-	 */
-	hot_attrs = RelationGetIndexAttrBitmap(relation,
-										   INDEX_ATTR_BITMAP_HOT_BLOCKING);
-	sum_attrs = RelationGetIndexAttrBitmap(relation,
-										   INDEX_ATTR_BITMAP_SUMMARIZED);
-	key_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_KEY);
-	id_attrs = RelationGetIndexAttrBitmap(relation,
-										  INDEX_ATTR_BITMAP_IDENTITY_KEY);
-	interesting_attrs = NULL;
-	interesting_attrs = bms_add_members(interesting_attrs, hot_attrs);
-	interesting_attrs = bms_add_members(interesting_attrs, sum_attrs);
-	interesting_attrs = bms_add_members(interesting_attrs, key_attrs);
-	interesting_attrs = bms_add_members(interesting_attrs, id_attrs);
+	 * NOTE: relcache returns copies of each bitmap, so we need not worry
+	 * about relcache flush happening midway through.
+	 */
+	idx_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_INDEXED);
+	rid_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_IDENTITY_KEY);
 
 	block = ItemPointerGetBlockNumber(otid);
 	INJECTION_POINT("heap_update-before-pin", NULL);
@@ -3458,20 +3437,17 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 		tmfd->ctid = *otid;
 		tmfd->xmax = InvalidTransactionId;
 		tmfd->cmax = InvalidCommandId;
-		*update_indexes = TU_None;
 
-		bms_free(hot_attrs);
-		bms_free(sum_attrs);
-		bms_free(key_attrs);
-		bms_free(id_attrs);
-		/* modified_attrs not yet initialized */
-		bms_free(interesting_attrs);
+		bms_free(rid_attrs);
+		bms_free(idx_attrs);
+		/* mix_attrs is owned by the caller, don't free it */
+
 		return TM_Deleted;
 	}
 
 	/*
-	 * Fill in enough data in oldtup for HeapDetermineColumnsInfo to work
-	 * properly.
+	 * Fill in enough data in oldtup to determine replica identity attribute
+	 * requirements.
 	 */
 	oldtup.t_tableOid = RelationGetRelid(relation);
 	oldtup.t_data = (HeapTupleHeader) PageGetItem(page, lp);
@@ -3482,16 +3458,59 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 	newtup->t_tableOid = RelationGetRelid(relation);
 
 	/*
-	 * Determine columns modified by the update.  Additionally, identify
-	 * whether any of the unmodified replica identity key attributes in the
-	 * old tuple is externally stored or not.  This is required because for
-	 * such attributes the flattened value won't be WAL logged as part of the
-	 * new tuple so we must include it as part of the old_key_tuple.  See
-	 * ExtractReplicaIdentity.
+	 * ExtractReplicaIdentity() needs to know if a modified indexed attrbute
+	 * is used as a replica indentity or if any of the replica identity
+	 * attributes are referenced in an index, unmodified, and are stored
+	 * externally in the old tuple being replaced.  In those cases it may be
+	 * necessary to WAL log them to so they are available to replicas.
 	 */
-	modified_attrs = HeapDetermineColumnsInfo(relation, interesting_attrs,
-											  id_attrs, &oldtup,
-											  newtup, &id_has_external);
+	rep_id_key_required = bms_overlap(mix_attrs, rid_attrs);
+	if (!rep_id_key_required)
+	{
+		Bitmapset  *attrs;
+		TupleDesc	tupdesc = RelationGetDescr(relation);
+		int			attidx = -1;
+
+		/*
+		 * Reduce the set under review to only the unmodified indexed replica
+		 * identity key attributes.  idx_attrs is copied (by bms_difference())
+		 * not modified here.
+		 */
+		attrs = bms_difference(idx_attrs, mix_attrs);
+		attrs = bms_int_members(attrs, rid_attrs);
+
+		while ((attidx = bms_next_member(attrs, attidx)) >= 0)
+		{
+			/*
+			 * attidx is zero-based, attrnum is the normal attribute number
+			 */
+			AttrNumber	attrnum = attidx + FirstLowInvalidHeapAttributeNumber;
+			Datum		value;
+			bool		isnull;
+
+			/*
+			 * System attributes are not added into INDEX_ATTR_BITMAP_INDEXED
+			 * bitmap by relcache.
+			 */
+			Assert(attrnum > 0);
+
+			value = heap_getattr(&oldtup, attrnum, tupdesc, &isnull);
+
+			/* No need to check attributes that can't be stored externally */
+			if (isnull ||
+				TupleDescCompactAttr(tupdesc, attrnum - 1)->attlen != -1)
+				continue;
+
+			/* Check if the old tuple's attribute is stored externally */
+			if (VARATT_IS_EXTERNAL((struct varlena *) DatumGetPointer(value)))
+			{
+				rep_id_key_required = true;
+				break;
+			}
+		}
+
+		bms_free(attrs);
+	}
 
 	/*
 	 * If we're not updating any "key" column, we can grab a weaker lock type.
@@ -3504,9 +3523,8 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 	 * is updates that don't manipulate key columns, not those that
 	 * serendipitously arrive at the same key values.
 	 */
-	if (!bms_overlap(modified_attrs, key_attrs))
+	if (lockmode == LockTupleNoKeyExclusive)
 	{
-		*lockmode = LockTupleNoKeyExclusive;
 		mxact_status = MultiXactStatusNoKeyUpdate;
 		key_intact = true;
 
@@ -3523,7 +3541,7 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 	}
 	else
 	{
-		*lockmode = LockTupleExclusive;
+		Assert(lockmode == LockTupleExclusive);
 		mxact_status = MultiXactStatusUpdate;
 		key_intact = false;
 	}
@@ -3534,7 +3552,6 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup,
 	 * with the new tuple's location, so there's great risk of confusion if we
 	 * use otid anymore.
 	 */
-
 l2:
 	checked_lockers = false;
 	locker_remains = false;
@@ -3602,7 +3619,7 @@ l2:
 			bool		current_is_member = false;
 
 			if (DoesMultiXactIdConflict((MultiXactId) xwait, infomask,
-										*lockmode, &current_is_member))
+										lockmode, &current_is_member))
 			{
 				LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
 
@@ -3611,7 +3628,7 @@ l2:
 				 * requesting a lock and already have one; avoids deadlock).
 				 */
 				if (!current_is_member)
-					heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
+					heap_acquire_tuplock(relation, &(oldtup.t_self), lockmode,
 										 LockWaitBlock, &have_tuple_lock);
 
 				/* wait for multixact */
@@ -3696,7 +3713,7 @@ l2:
 			 * lock.
 			 */
 			LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-			heap_acquire_tuplock(relation, &(oldtup.t_self), *lockmode,
+			heap_acquire_tuplock(relation, &(oldtup.t_self), lockmode,
 								 LockWaitBlock, &have_tuple_lock);
 			XactLockTableWait(xwait, relation, &oldtup.t_self,
 							  XLTW_Update);
@@ -3756,17 +3773,14 @@ l2:
 			tmfd->cmax = InvalidCommandId;
 		UnlockReleaseBuffer(buffer);
 		if (have_tuple_lock)
-			UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
+			UnlockTupleTuplock(relation, &(oldtup.t_self), lockmode);
 		if (vmbuffer != InvalidBuffer)
 			ReleaseBuffer(vmbuffer);
-		*update_indexes = TU_None;
 
-		bms_free(hot_attrs);
-		bms_free(sum_attrs);
-		bms_free(key_attrs);
-		bms_free(id_attrs);
-		bms_free(modified_attrs);
-		bms_free(interesting_attrs);
+		bms_free(rid_attrs);
+		bms_free(idx_attrs);
+		/* mix_attrs is owned by the caller, don't free it */
+
 		return result;
 	}
 
@@ -3796,7 +3810,7 @@ l2:
 	compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup.t_data),
 							  oldtup.t_data->t_infomask,
 							  oldtup.t_data->t_infomask2,
-							  xid, *lockmode, true,
+							  xid, lockmode, true,
 							  &xmax_old_tuple, &infomask_old_tuple,
 							  &infomask2_old_tuple);
 
@@ -3913,7 +3927,7 @@ l2:
 		compute_new_xmax_infomask(HeapTupleHeaderGetRawXmax(oldtup.t_data),
 								  oldtup.t_data->t_infomask,
 								  oldtup.t_data->t_infomask2,
-								  xid, *lockmode, false,
+								  xid, lockmode, false,
 								  &xmax_lock_old_tuple, &infomask_lock_old_tuple,
 								  &infomask2_lock_old_tuple);
 
@@ -4073,37 +4087,16 @@ l2:
 
 	/*
 	 * At this point newbuf and buffer are both pinned and locked, and newbuf
-	 * has enough space for the new tuple.  If they are the same buffer, only
-	 * one pin is held.
+	 * has enough space for the new tuple so we can use the HOT update path if
+	 * the caller determined that it is allowable.
+	 *
+	 * NOTE: If newbuf == buffer then only one pin is held.
 	 */
-
-	if (newbuf == buffer)
-	{
-		/*
-		 * Since the new tuple is going into the same page, we might be able
-		 * to do a HOT update.  Check if any of the index columns have been
-		 * changed.
-		 */
-		if (!bms_overlap(modified_attrs, hot_attrs))
-		{
-			use_hot_update = true;
-
-			/*
-			 * If none of the columns that are used in hot-blocking indexes
-			 * were updated, we can apply HOT, but we do still need to check
-			 * if we need to update the summarizing indexes, and update those
-			 * indexes if the columns were updated, or we may fail to detect
-			 * e.g. value bound changes in BRIN minmax indexes.
-			 */
-			if (bms_overlap(modified_attrs, sum_attrs))
-				summarized_update = true;
-		}
-	}
+	if ((newbuf == buffer) && hot_allowed)
+		use_hot_update = true;
 	else
-	{
 		/* Set a hint that the old page could use prune/defrag */
 		PageSetFull(page);
-	}
 
 	/*
 	 * Compute replica identity tuple before entering the critical section so
@@ -4113,8 +4106,7 @@ l2:
 	 * columns are modified or it has external data.
 	 */
 	old_key_tuple = ExtractReplicaIdentity(relation, &oldtup,
-										   bms_overlap(modified_attrs, id_attrs) ||
-										   id_has_external,
+										   rep_id_key_required,
 										   &old_key_copied);
 
 	/* NO EREPORT(ERROR) from here till changes are logged */
@@ -4243,7 +4235,7 @@ l2:
 	 * Release the lmgr tuple lock, if we had it.
 	 */
 	if (have_tuple_lock)
-		UnlockTupleTuplock(relation, &(oldtup.t_self), *lockmode);
+		UnlockTupleTuplock(relation, &(oldtup.t_self), lockmode);
 
 	pgstat_count_heap_update(relation, use_hot_update, newbuf != buffer);
 
@@ -4257,31 +4249,12 @@ l2:
 		heap_freetuple(heaptup);
 	}
 
-	/*
-	 * If it is a HOT update, the update may still need to update summarized
-	 * indexes, lest we fail to update those summaries and get incorrect
-	 * results (for example, minmax bounds of the block may change with this
-	 * update).
-	 */
-	if (use_hot_update)
-	{
-		if (summarized_update)
-			*update_indexes = TU_Summarizing;
-		else
-			*update_indexes = TU_None;
-	}
-	else
-		*update_indexes = TU_All;
-
 	if (old_key_tuple != NULL && old_key_copied)
 		heap_freetuple(old_key_tuple);
 
-	bms_free(hot_attrs);
-	bms_free(sum_attrs);
-	bms_free(key_attrs);
-	bms_free(id_attrs);
-	bms_free(modified_attrs);
-	bms_free(interesting_attrs);
+	bms_free(rid_attrs);
+	bms_free(idx_attrs);
+	/* mix_attrs is owned by the caller, don't free it */
 
 	return TM_Ok;
 }
@@ -4454,28 +4427,113 @@ heap_attr_equals(TupleDesc tupdesc, int attrnum, Datum value1, Datum value2,
 }
 
 /*
- * Check which columns are being updated.
- *
- * Given an updated tuple, determine (and return into the output bitmapset),
- * from those listed as interesting, the set of columns that changed.
- *
- * has_external indicates if any of the unmodified attributes (from those
- * listed as interesting) of the old tuple is a member of external_cols and is
- * stored externally.
+ * HOT updates are possible when either: a) there are no modified indexed
+ * attributes, or b) the modified attributes are all on summarizing indexes.
+ * Later, in heap_update(), we can choose to perform a HOT update if there is
+ * space on the page for the new tuple and the following code has determined
+ * that HOT is allowed.
+ */
+bool
+HeapUpdateHotAllowable(Relation relation, const Bitmapset *mix_attrs,
+					   bool *summarized_only)
+{
+	bool		hot_allowed;
+
+	/*
+	 * Let's be optimistic and start off by assuming the best case, no indexes
+	 * need updating and HOT is allowable.
+	 */
+	hot_allowed = true;
+	*summarized_only = false;
+
+	/*
+	 * Check for case (a); when there are no modified index attributes HOT is
+	 * allowed.
+	 */
+	if (bms_is_empty(mix_attrs))
+		hot_allowed = true;
+	else
+	{
+		Bitmapset  *sum_attrs = RelationGetIndexAttrBitmap(relation,
+														   INDEX_ATTR_BITMAP_SUMMARIZED);
+
+		/*
+		 * At least one index attribute was modified, but is this case (b)
+		 * where all the modified index attributes are only used by
+		 * summarizing indexes?  If that's the case we need to update those
+		 * indexes, but this can be a HOT update.
+		 */
+		if (bms_is_subset(mix_attrs, sum_attrs))
+		{
+			hot_allowed = true;
+			*summarized_only = true;
+		}
+		else
+		{
+			/*
+			 * Now we know that one or more indexed attribute were updated and
+			 * that there was at least one of those attributes were referenced
+			 * by a non-summarizing index. HOT is not allowed.
+			 */
+			hot_allowed = false;
+		}
+
+		bms_free(sum_attrs);
+	}
+
+	return hot_allowed;
+}
+
+/*
+ * If we're not updating any "key" attributes, we can grab a weaker lock type.
+ * This allows for more concurrency when we are running simultaneously with
+ * foreign key checks.
+ */
+LockTupleMode
+HeapUpdateDetermineLockmode(Relation relation, const Bitmapset *mix_attrs)
+{
+	LockTupleMode lockmode = LockTupleExclusive;
+
+	Bitmapset  *key_attrs = RelationGetIndexAttrBitmap(relation,
+													   INDEX_ATTR_BITMAP_KEY);
+
+	if (!bms_overlap(mix_attrs, key_attrs))
+		lockmode = LockTupleNoKeyExclusive;
+
+	bms_free(key_attrs);
+
+	return lockmode;
+}
+
+/*
+ * Return a Bitmapset that contains the set of modified (changed) indexed
+ * attributes between oldtup and newtup.
  */
 static Bitmapset *
-HeapDetermineColumnsInfo(Relation relation,
-						 Bitmapset *interesting_cols,
-						 Bitmapset *external_cols,
-						 HeapTuple oldtup, HeapTuple newtup,
-						 bool *has_external)
+HeapUpdateModIdxAttrs(Relation relation, HeapTuple oldtup, HeapTuple newtup)
 {
 	int			attidx;
-	Bitmapset  *modified = NULL;
+	Bitmapset  *attrs,
+			   *mix_attrs = NULL;
 	TupleDesc	tupdesc = RelationGetDescr(relation);
 
+	/* Get the set of all attributes across all indexes for this relation */
+	attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_INDEXED);
+
+	/* No indexed attributes, we're done */
+	if (bms_is_empty(attrs))
+		return NULL;
+
+	/*
+	 * This heap update function is used outside the executor and so unlike
+	 * heapam_tuple_update() where there is ResultRelInfo and EState to
+	 * provide the concise set of attributes that might have been modified
+	 * (via ExecGetAllUpdatedCols()) we simply check all indexed attributes to
+	 * find the subset that changed value.  That's the "modified indexed
+	 * attributes" or "mix_attrs".
+	 */
 	attidx = -1;
-	while ((attidx = bms_next_member(interesting_cols, attidx)) >= 0)
+	while ((attidx = bms_next_member(attrs, attidx)) >= 0)
 	{
 		/* attidx is zero-based, attrnum is the normal attribute number */
 		AttrNumber	attrnum = attidx + FirstLowInvalidHeapAttributeNumber;
@@ -4491,7 +4549,7 @@ HeapDetermineColumnsInfo(Relation relation,
 		 */
 		if (attrnum == 0)
 		{
-			modified = bms_add_member(modified, attidx);
+			mix_attrs = bms_add_member(mix_attrs, attidx);
 			continue;
 		}
 
@@ -4504,7 +4562,7 @@ HeapDetermineColumnsInfo(Relation relation,
 		{
 			if (attrnum != TableOidAttributeNumber)
 			{
-				modified = bms_add_member(modified, attidx);
+				mix_attrs = bms_add_member(mix_attrs, attidx);
 				continue;
 			}
 		}
@@ -4520,29 +4578,12 @@ HeapDetermineColumnsInfo(Relation relation,
 
 		if (!heap_attr_equals(tupdesc, attrnum, value1,
 							  value2, isnull1, isnull2))
-		{
-			modified = bms_add_member(modified, attidx);
-			continue;
-		}
-
-		/*
-		 * No need to check attributes that can't be stored externally. Note
-		 * that system attributes can't be stored externally.
-		 */
-		if (attrnum < 0 || isnull1 ||
-			TupleDescCompactAttr(tupdesc, attrnum - 1)->attlen != -1)
-			continue;
-
-		/*
-		 * Check if the old tuple's attribute is stored externally and is a
-		 * member of external_cols.
-		 */
-		if (VARATT_IS_EXTERNAL((varlena *) DatumGetPointer(value1)) &&
-			bms_is_member(attidx, external_cols))
-			*has_external = true;
+			mix_attrs = bms_add_member(mix_attrs, attidx);
 	}
 
-	return modified;
+	bms_free(attrs);
+
+	return mix_attrs;
 }
 
 /*
@@ -4554,17 +4595,108 @@ HeapDetermineColumnsInfo(Relation relation,
  * via ereport().
  */
 void
-simple_heap_update(Relation relation, const ItemPointerData *otid, HeapTuple tup,
+simple_heap_update(Relation relation, const ItemPointerData *otid, HeapTuple tuple,
 				   TU_UpdateIndexes *update_indexes)
 {
 	TM_Result	result;
 	TM_FailureData tmfd;
 	LockTupleMode lockmode;
+	TupleTableSlot *slot;
+	BufferHeapTupleTableSlot *bslot;
+	HeapTuple	oldtup;
+	bool		shouldFree = true;
+	Bitmapset  *idx_attrs,
+			   *mix_attrs;
+	bool		hot_allowed,
+				summarized_only;
+	Buffer		buffer;
 
-	result = heap_update(relation, otid, tup,
-						 GetCurrentCommandId(true), InvalidSnapshot,
-						 true /* wait for commit */ ,
-						 &tmfd, &lockmode, update_indexes);
+	Assert(ItemPointerIsValid(otid));
+
+	/*
+	 * Fetch this bitmap of interesting attributes from relcache before
+	 * obtaining a buffer lock because if we are doing an update on one of the
+	 * relevant system catalogs we could deadlock if we try to fetch them
+	 * later on. Relcache will return copies of each bitmap, so we need not
+	 * worry about relcache flush happening midway through this operation.
+	 */
+	idx_attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_INDEXED);
+
+	INJECTION_POINT("heap_update-before-pin", NULL);
+
+	/*
+	 * To update a heap tuple we need to find the set of modified indexed
+	 * attributes ("mix_attrs") so as to see if a HOT update is allowable or
+	 * not.  When updating heap tuples via execution of UPDATE statements this
+	 * set is constructed before calling into the table AM's tuple_update()
+	 * function by the function ExecUpdateModIdxAttrs() which compares the
+	 * old/new TupleTableSlots.  However, here we have the old TID and the new
+	 * tuple, not two TupleTableSlots, but we still need to construct a simlar
+	 * bitmap so as to be able to know if HOT updates are allowed or not.  To
+	 * do that we first have to fetch the old tuple itself.  Because
+	 * heapam_fetch_row_version() is static, we have to replicate that code
+	 * here. This is a bit repetitive because heap_update() will again find
+	 * and form the old HeapTuple from the old TID and in most cases the
+	 * callers (ignoring extensions, always catalog tuple updates) already had
+	 * the set of changed attributes (e.g. the "replaces" array), but for now
+	 * this minor repetition of work is necessary.
+	 */
+
+	slot = MakeTupleTableSlot(RelationGetDescr(relation), &TTSOpsBufferHeapTuple);
+	bslot = (BufferHeapTupleTableSlot *) slot;
+
+	/*
+	 * Set the TID in the slot and then fetch the old tuple so we can examine
+	 * it
+	 */
+	bslot->base.tupdata.t_self = *otid;
+	if (!heap_fetch(relation, SnapshotAny, &bslot->base.tupdata, &buffer, false))
+	{
+		/*
+		 * heap_update() checks for !ItemIdIsNormal(lp) and will return false
+		 * in those cases.
+		 */
+		Assert(RelationSupportsSysCache(RelationGetRelid(relation)));
+
+		*update_indexes = TU_None;
+
+		/* mix_attrs not yet initialized */
+		bms_free(idx_attrs);
+		ExecDropSingleTupleTableSlot(slot);
+
+		elog(ERROR, "tuple concurrently deleted");
+
+		return;
+	}
+
+	Assert(buffer != InvalidBuffer);
+
+	/* Store in slot, transferring existing pin */
+	ExecStorePinnedBufferHeapTuple(&bslot->base.tupdata, slot, buffer);
+	oldtup = ExecFetchSlotHeapTuple(slot, false, &shouldFree);
+
+	mix_attrs = HeapUpdateModIdxAttrs(relation, oldtup, tuple);
+	lockmode = HeapUpdateDetermineLockmode(relation, mix_attrs);
+	hot_allowed = HeapUpdateHotAllowable(relation, mix_attrs, &summarized_only);
+
+	result = heap_update(relation, otid, tuple, GetCurrentCommandId(true),
+						 InvalidSnapshot, true /* wait for commit */ ,
+						 &tmfd, lockmode, mix_attrs, hot_allowed);
+
+	if (shouldFree)
+		heap_freetuple(oldtup);
+
+	ExecDropSingleTupleTableSlot(slot);
+	bms_free(idx_attrs);
+
+	/*
+	 * Decide whether new index entries are needed for the tuple
+	 *
+	 * If the update is not HOT, we must update all indexes. If the update is
+	 * HOT, it could be that we updated summarized columns, so we either
+	 * update only summarized indexes, or none at all.
+	 */
+	*update_indexes = TU_None;
 	switch (result)
 	{
 		case TM_SelfModified:
@@ -4574,6 +4706,10 @@ simple_heap_update(Relation relation, const ItemPointerData *otid, HeapTuple tup
 
 		case TM_Ok:
 			/* done successfully */
+			if (!HeapTupleIsHeapOnly(tuple))
+				*update_indexes = TU_All;
+			else if (summarized_only)
+				*update_indexes = TU_Summarizing;
 			break;
 
 		case TM_Updated:
@@ -4590,7 +4726,6 @@ simple_heap_update(Relation relation, const ItemPointerData *otid, HeapTuple tup
 	}
 }
 
-
 /*
  * Return the MultiXactStatus corresponding to the given tuple lock mode.
  */
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c
index 3ff36f59bf8..4600af61793 100644
--- a/src/backend/access/heap/heapam_handler.c
+++ b/src/backend/access/heap/heapam_handler.c
@@ -27,7 +27,6 @@
 #include "access/syncscan.h"
 #include "access/tableam.h"
 #include "access/tsmapi.h"
-#include "access/visibilitymap.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/index.h"
@@ -44,6 +43,7 @@
 #include "storage/procarray.h"
 #include "storage/smgr.h"
 #include "utils/builtins.h"
+#include "utils/injection_point.h"
 #include "utils/rel.h"
 
 static void reform_and_rewrite_tuple(HeapTuple tuple,
@@ -316,19 +316,26 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
 static TM_Result
 heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 					CommandId cid, Snapshot snapshot, Snapshot crosscheck,
-					bool wait, TM_FailureData *tmfd,
-					LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes)
+					bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
+					const Bitmapset *mix_attrs, TU_UpdateIndexes *update_indexes)
 {
 	bool		shouldFree = true;
 	HeapTuple	tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
+	bool		hot_allowed;
+	bool		summarized_only;
 	TM_Result	result;
 
+	Assert(ItemPointerIsValid(otid));
+
+	hot_allowed = HeapUpdateHotAllowable(relation, mix_attrs, &summarized_only);
+	*lockmode = HeapUpdateDetermineLockmode(relation, mix_attrs);
+
 	/* Update the tuple with table oid */
 	slot->tts_tableOid = RelationGetRelid(relation);
 	tuple->t_tableOid = slot->tts_tableOid;
 
 	result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
-						 tmfd, lockmode, update_indexes);
+						 tmfd, *lockmode, mix_attrs, hot_allowed);
 	ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
 
 	/*
@@ -341,16 +348,17 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
 	 * HOT, it could be that we updated summarized columns, so we either
 	 * update only summarized indexes, or none at all.
 	 */
-	if (result != TM_Ok)
+	*update_indexes = TU_None;
+	if (result == TM_Ok)
 	{
-		Assert(*update_indexes == TU_None);
-		*update_indexes = TU_None;
+		if (HeapTupleIsHeapOnly(tuple))
+		{
+			if (summarized_only)
+				*update_indexes = TU_Summarizing;
+		}
+		else
+			*update_indexes = TU_All;
 	}
-	else if (!HeapTupleIsHeapOnly(tuple))
-		Assert(*update_indexes == TU_All);
-	else
-		Assert((*update_indexes == TU_Summarizing) ||
-			   (*update_indexes == TU_None));
 
 	if (shouldFree)
 		pfree(tuple);
diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c
index dfda1af412e..42acd5b17a9 100644
--- a/src/backend/access/table/tableam.c
+++ b/src/backend/access/table/tableam.c
@@ -359,6 +359,7 @@ void
 simple_table_tuple_update(Relation rel, ItemPointer otid,
 						  TupleTableSlot *slot,
 						  Snapshot snapshot,
+						  const Bitmapset *mix_attrs,
 						  TU_UpdateIndexes *update_indexes)
 {
 	TM_Result	result;
@@ -369,7 +370,9 @@ simple_table_tuple_update(Relation rel, ItemPointer otid,
 								GetCurrentCommandId(true),
 								snapshot, InvalidSnapshot,
 								true /* wait for commit */ ,
-								&tmfd, &lockmode, update_indexes);
+								&tmfd, &lockmode,
+								mix_attrs,
+								update_indexes);
 
 	switch (result)
 	{
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 98d402c0a3b..64efa55dfe3 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -2978,6 +2978,7 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
 					 bool is_merge_update)
 {
 	TriggerDesc *trigdesc = relinfo->ri_TrigDesc;
+	TupleDesc	tupdesc = RelationGetDescr(relinfo->ri_RelationDesc);
 	TupleTableSlot *oldslot = ExecGetTriggerOldSlot(estate, relinfo);
 	HeapTuple	newtuple = NULL;
 	HeapTuple	trigtuple;
@@ -2985,7 +2986,9 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
 	bool		should_free_new = false;
 	TriggerData LocTriggerData = {0};
 	int			i;
-	Bitmapset  *updatedCols;
+	Bitmapset  *updatedCols = NULL;
+	Bitmapset  *remainingCols = NULL;
+	Bitmapset  *modifiedCols;
 	LockTupleMode lockmode;
 
 	/* Determine lock mode to use */
@@ -3127,6 +3130,21 @@ ExecBRUpdateTriggers(EState *estate, EPQState *epqstate,
 	if (should_free_trig)
 		heap_freetuple(trigtuple);
 
+	/*
+	 * Before UPDATE triggers may have updated attributes not known to
+	 * ExecGetAllUpdatedColumns() using heap_modify_tuple() or
+	 * heap_modifiy_tuple_by_cols().  Find and record those now.
+	 */
+	remainingCols = bms_add_range(NULL, 1 - FirstLowInvalidHeapAttributeNumber,
+							   tupdesc->natts - FirstLowInvalidHeapAttributeNumber);
+	remainingCols = bms_del_members(remainingCols, updatedCols);
+	modifiedCols = ExecCompareSlotAttrs(tupdesc, remainingCols, oldslot, newslot);
+	relinfo->ri_extraUpdatedCols =
+		bms_add_members(relinfo->ri_extraUpdatedCols, modifiedCols);
+
+	bms_free(remainingCols);
+	bms_free(modifiedCols);
+
 	return true;
 }
 
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 2497ee7edc5..c2e77740e76 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -33,6 +33,7 @@
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
+#include "utils/relcache.h"
 #include "utils/snapmgr.h"
 #include "utils/syscache.h"
 #include "utils/typcache.h"
@@ -906,6 +907,7 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 	bool		skip_tuple = false;
 	Relation	rel = resultRelInfo->ri_RelationDesc;
 	ItemPointer tid = &(searchslot->tts_tid);
+	Bitmapset  *mix_attrs;
 
 	/*
 	 * We support only non-system tables, with
@@ -944,8 +946,11 @@ ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 		if (rel->rd_rel->relispartition)
 			ExecPartitionCheck(resultRelInfo, slot, estate, true);
 
+		mix_attrs = ExecUpdateModIdxAttrs(resultRelInfo,
+										  estate, searchslot, slot);
+
 		simple_table_tuple_update(rel, tid, slot, estate->es_snapshot,
-								  &update_indexes);
+								  mix_attrs, &update_indexes);
 
 		conflictindexes = resultRelInfo->ri_onConflictArbiterIndexes;
 
diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c
index b768eae9e53..1064ebe845b 100644
--- a/src/backend/executor/execTuples.c
+++ b/src/backend/executor/execTuples.c
@@ -66,6 +66,7 @@
 #include "nodes/nodeFuncs.h"
 #include "storage/bufmgr.h"
 #include "utils/builtins.h"
+#include "utils/datum.h"
 #include "utils/expandeddatum.h"
 #include "utils/lsyscache.h"
 #include "utils/typcache.h"
@@ -1929,6 +1930,83 @@ ExecFetchSlotHeapTupleDatum(TupleTableSlot *slot)
 	return ret;
 }
 
+/*
+ * ExecCompareSlotAttrs
+ *
+ * Compare the subset of attributes in attrs bewtween TupleTableSlots to detect
+ * which attributes have changed.
+ *
+ * Returns a Bitmapset of attribute indices (using
+ * FirstLowInvalidHeapAttributeNumber convention) that differ between the two
+ * slots.
+ */
+Bitmapset *
+ExecCompareSlotAttrs(TupleDesc tupdesc, const Bitmapset *attrs,
+					 TupleTableSlot *s1, TupleTableSlot *s2)
+{
+	int			attidx = -1;
+	Bitmapset  *modified = NULL;
+
+	/* XXX what if slots don't share the same tupleDescriptor... */
+	/* Assert(s1->tts_tupleDescriptor == s2->tts_tupleDescriptor); */
+
+	while ((attidx = bms_next_member(attrs, attidx)) >= 0)
+	{
+		/* attidx is zero-based, attrnum is the normal attribute number */
+		AttrNumber	attrnum = attidx + FirstLowInvalidHeapAttributeNumber;
+		Datum		value1,
+					value2;
+		bool		null1,
+					null2;
+		CompactAttribute *att;
+
+		/*
+		 * If it's a whole-tuple reference, say "not equal".  It's not really
+		 * worth supporting this case, since it could only succeed after a
+		 * no-op update, which is hardly a case worth optimizing for.
+		 */
+		if (attrnum == 0)
+		{
+			modified = bms_add_member(modified, attidx);
+			continue;
+		}
+
+		/*
+		 * Likewise, automatically say "not equal" for any system attribute
+		 * other than tableOID; we cannot expect these to be consistent in a
+		 * HOT chain, or even to be set correctly yet in the new tuple.
+		 */
+		if (attrnum < 0)
+		{
+			if (attrnum != TableOidAttributeNumber)
+			{
+				modified = bms_add_member(modified, attidx);
+				continue;
+			}
+		}
+
+		att = TupleDescCompactAttr(tupdesc, attrnum - 1);
+		value1 = slot_getattr(s1, attrnum, &null1);
+		value2 = slot_getattr(s2, attrnum, &null2);
+
+		/* A change to/from NULL, so not equal */
+		if (null1 != null2)
+		{
+			modified = bms_add_member(modified, attidx);
+			continue;
+		}
+
+		/* Both NULL, no change/unmodified */
+		if (null2)
+			continue;
+
+		if (!datum_image_eq(value1, value2, att->attbyval, att->attlen))
+			modified = bms_add_member(modified, attidx);
+	}
+
+	return modified;
+}
+
 /* ----------------------------------------------------------------
  *				convenience initialization routines
  * ----------------------------------------------------------------
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 793c76d4f82..4927fc88e61 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -17,6 +17,7 @@
  *		ExecModifyTable		- retrieve the next tuple from the node
  *		ExecEndModifyTable	- shut down the ModifyTable node
  *		ExecReScanModifyTable - rescan the ModifyTable node
+ *		ExecUpdateModIdxAttrs - find set of updated indexed columns
  *
  *	 NOTES
  *		The ModifyTable node receives input from its outerPlan, which is
@@ -54,6 +55,7 @@
 
 #include "access/htup_details.h"
 #include "access/tableam.h"
+#include "access/tupdesc.h"
 #include "access/xact.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
@@ -188,6 +190,68 @@ static TupleTableSlot *ExecMergeNotMatched(ModifyTableContext *context,
 										   ResultRelInfo *resultRelInfo,
 										   bool canSetTag);
 
+/*
+ * ExecUpdateModIdxAttrs
+ *
+ * Find the set of attributes referenced by this relation and used in this
+ * UPDATE that now differ in value.  This is done by reviewing slot datum that
+ * are in the UPDATE statment and are known to be referenced by at least one
+ * index in some way.  This set is called the "modified indexed attributes" or
+ * "mix_attrs".  An overlap of a single index's attributes and this "mix" set
+ * signals that the attributes in the new_tts used to form the index datum have
+ * changed.
+ *
+ * Return a Bitmapset that contains the set of modified (changed) indexed
+ * attributes between oldtup and newtup.
+ *
+ * NOTE: There is a simlar function called HeapUpdateModIDxAttrs() that operates
+ * on the old TID and new HeapTuple rather than the old/new TupleTableSlots as
+ * this function does.  These two functions should mirror one another until
+ * someday when catalog tuple updates track their changes avoiding the need to
+ * re-discover them in simple_heap_update().
+ */
+Bitmapset *
+ExecUpdateModIdxAttrs(ResultRelInfo *resultRelInfo,
+					  EState *estate,
+					  TupleTableSlot *old_tts,
+					  TupleTableSlot *new_tts)
+{
+	Relation	relation = resultRelInfo->ri_RelationDesc;
+	TupleDesc	tupdesc = RelationGetDescr(relation);
+	Bitmapset  *attrs,
+			   *mix_attrs = NULL;
+
+	/* If no indexes, we're done */
+	if (resultRelInfo->ri_NumIndices == 0)
+		return NULL;
+
+	/*
+	 * Get the set of all attributes across all indexes for this relation from
+	 * the relcache, it returns us a copy of the bitmap so we can modify it.
+	 */
+	attrs = RelationGetIndexAttrBitmap(relation, INDEX_ATTR_BITMAP_INDEXED);
+
+	/*
+	 * Fetch the set of attributes explicity SET in the UPDATE statement or
+	 * set by a before row trigger (even if not mentioned in the SQL) from the
+	 * executor state and then find the intersection with the indexed
+	 * attributes.  Attributes that are SET might not change value, so we have
+	 * to examine them for changes.
+	 */
+	attrs = bms_int_members(attrs, ExecGetAllUpdatedCols(resultRelInfo, estate));
+
+	/*
+	 * When there are indexed attributes mentioned in the UPDATE then we need
+	 * to find the subset that changed value.  That's the "modified indexed
+	 * attributes" or "mix_attrs".
+	 */
+	if (!bms_is_empty(attrs))
+		mix_attrs = ExecCompareSlotAttrs(tupdesc, attrs, old_tts, new_tts);
+
+	bms_free(attrs);
+
+	return mix_attrs;
+}
 
 /*
  * Verify that the tuples to be produced by INSERT match the
@@ -2195,14 +2259,17 @@ ExecUpdatePrepareSlot(ResultRelInfo *resultRelInfo,
  */
 static TM_Result
 ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
-			  ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot,
-			  bool canSetTag, UpdateContext *updateCxt)
+			  ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *oldSlot,
+			  TupleTableSlot *slot, bool canSetTag, UpdateContext *updateCxt)
 {
 	EState	   *estate = context->estate;
 	Relation	resultRelationDesc = resultRelInfo->ri_RelationDesc;
 	bool		partition_constraint_failed;
 	TM_Result	result;
 
+	/* The set of modified indexed attributes that trigger new index entries */
+	Bitmapset  *mix_attrs = NULL;
+
 	updateCxt->crossPartUpdate = false;
 
 	/*
@@ -2319,7 +2386,16 @@ lreplace:
 		ExecConstraints(resultRelInfo, slot, estate);
 
 	/*
-	 * replace the heap tuple
+	 * Next up we need to find out the set of indexed attributes that have
+	 * changed in value and should trigger a new index tuple.  We could start
+	 * with the set of updated columns via ExecGetUpdatedCols(), but if we do
+	 * we will overlook attributes directly modified by heap_modify_tuple()
+	 * which are not known to ExecGetUpdatedCols().
+	 */
+	mix_attrs = ExecUpdateModIdxAttrs(resultRelInfo, estate, oldSlot, slot);
+
+	/*
+	 * Call into the table AM to update the heap tuple.
 	 *
 	 * Note: if es_crosscheck_snapshot isn't InvalidSnapshot, we check that
 	 * the row to be updated is visible to that snapshot, and throw a
@@ -2333,6 +2409,7 @@ lreplace:
 								estate->es_crosscheck_snapshot,
 								true /* wait for commit */ ,
 								&context->tmfd, &updateCxt->lockmode,
+								mix_attrs,
 								&updateCxt->updateIndexes);
 
 	return result;
@@ -2555,8 +2632,8 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo,
 		 */
 redo_act:
 		lockedtid = *tupleid;
-		result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot,
-							   canSetTag, &updateCxt);
+		result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, oldSlot,
+							   slot, canSetTag, &updateCxt);
 
 		/*
 		 * If ExecUpdateAct reports that a cross-partition update was done,
@@ -3406,8 +3483,8 @@ lmerge_matched:
 					Assert(oldtuple == NULL);
 
 					result = ExecUpdateAct(context, resultRelInfo, tupleid,
-										   NULL, newslot, canSetTag,
-										   &updateCxt);
+										   NULL, resultRelInfo->ri_oldTupleSlot,
+										   newslot, canSetTag, &updateCxt);
 
 					/*
 					 * As in ExecUpdate(), if ExecUpdateAct() reports that a
@@ -4539,7 +4616,7 @@ ExecModifyTable(PlanState *pstate)
 		 * For UPDATE/DELETE/MERGE, fetch the row identity info for the tuple
 		 * to be updated/deleted/merged.  For a heap relation, that's a TID;
 		 * otherwise we may have a wholerow junk attr that carries the old
-		 * tuple in toto.  Keep this in step with the part of
+		 * tuple in total.  Keep this in step with the part of
 		 * ExecInitModifyTable that sets up ri_RowIdAttNo.
 		 */
 		if (operation == CMD_UPDATE || operation == CMD_DELETE ||
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 6b634c9fff1..f30505d8ae3 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -2475,8 +2475,8 @@ RelationDestroyRelation(Relation relation, bool remember_tupdesc)
 	bms_free(relation->rd_keyattr);
 	bms_free(relation->rd_pkattr);
 	bms_free(relation->rd_idattr);
-	bms_free(relation->rd_hotblockingattr);
 	bms_free(relation->rd_summarizedattr);
+	bms_free(relation->rd_indexedattr);
 	if (relation->rd_pubdesc)
 		pfree(relation->rd_pubdesc);
 	if (relation->rd_options)
@@ -5276,8 +5276,8 @@ RelationGetIndexPredicate(Relation relation)
  *									(beware: even if PK is deferrable!)
  *	INDEX_ATTR_BITMAP_IDENTITY_KEY	Columns in the table's replica identity
  *									index (empty if FULL)
- *	INDEX_ATTR_BITMAP_HOT_BLOCKING	Columns that block updates from being HOT
- *	INDEX_ATTR_BITMAP_SUMMARIZED	Columns included in summarizing indexes
+ *	INDEX_ATTR_BITMAP_SUMMARIZED	Columns only included in summarizing indexes
+ *	INDEX_ATTR_BITMAP_INDEXED		Columns referenced by indexes
  *
  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
  * we can include system attributes (e.g., OID) in the bitmap representation.
@@ -5300,8 +5300,8 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
 	Bitmapset  *uindexattrs;	/* columns in unique indexes */
 	Bitmapset  *pkindexattrs;	/* columns in the primary index */
 	Bitmapset  *idindexattrs;	/* columns in the replica identity */
-	Bitmapset  *hotblockingattrs;	/* columns with HOT blocking indexes */
-	Bitmapset  *summarizedattrs;	/* columns with summarizing indexes */
+	Bitmapset  *summarizedattrs;	/* columns only in summarizing indexes */
+	Bitmapset  *indexedattrs;	/* columns referenced by indexes */
 	List	   *indexoidlist;
 	List	   *newindexoidlist;
 	Oid			relpkindex;
@@ -5320,10 +5320,10 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
 				return bms_copy(relation->rd_pkattr);
 			case INDEX_ATTR_BITMAP_IDENTITY_KEY:
 				return bms_copy(relation->rd_idattr);
-			case INDEX_ATTR_BITMAP_HOT_BLOCKING:
-				return bms_copy(relation->rd_hotblockingattr);
 			case INDEX_ATTR_BITMAP_SUMMARIZED:
 				return bms_copy(relation->rd_summarizedattr);
+			case INDEX_ATTR_BITMAP_INDEXED:
+				return bms_copy(relation->rd_indexedattr);
 			default:
 				elog(ERROR, "unknown attrKind %u", attrKind);
 		}
@@ -5366,8 +5366,8 @@ restart:
 	uindexattrs = NULL;
 	pkindexattrs = NULL;
 	idindexattrs = NULL;
-	hotblockingattrs = NULL;
 	summarizedattrs = NULL;
+	indexedattrs = NULL;
 	foreach(l, indexoidlist)
 	{
 		Oid			indexOid = lfirst_oid(l);
@@ -5426,7 +5426,7 @@ restart:
 		if (indexDesc->rd_indam->amsummarizing)
 			attrs = &summarizedattrs;
 		else
-			attrs = &hotblockingattrs;
+			attrs = &indexedattrs;
 
 		/* Collect simple attribute references */
 		for (i = 0; i < indexDesc->rd_index->indnatts; i++)
@@ -5435,9 +5435,9 @@ restart:
 
 			/*
 			 * Since we have covering indexes with non-key columns, we must
-			 * handle them accurately here. non-key columns must be added into
-			 * hotblockingattrs or summarizedattrs, since they are in index,
-			 * and update shouldn't miss them.
+			 * handle them accurately here. Non-key columns must be added into
+			 * indexedattrs or summarizedattrs, since they are in index, and
+			 * update shouldn't miss them.
 			 *
 			 * Summarizing indexes do not block HOT, but do need to be updated
 			 * when the column value changes, thus require a separate
@@ -5498,12 +5498,20 @@ restart:
 		bms_free(uindexattrs);
 		bms_free(pkindexattrs);
 		bms_free(idindexattrs);
-		bms_free(hotblockingattrs);
 		bms_free(summarizedattrs);
+		bms_free(indexedattrs);
 
 		goto restart;
 	}
 
+	/*
+	 * Record what attributes are only referenced by summarizing indexes. Then
+	 * add that into the other indexed attributes to track all referenced
+	 * attributes.
+	 */
+	summarizedattrs = bms_del_members(summarizedattrs, indexedattrs);
+	indexedattrs = bms_add_members(indexedattrs, summarizedattrs);
+
 	/* Don't leak the old values of these bitmaps, if any */
 	relation->rd_attrsvalid = false;
 	bms_free(relation->rd_keyattr);
@@ -5512,10 +5520,10 @@ restart:
 	relation->rd_pkattr = NULL;
 	bms_free(relation->rd_idattr);
 	relation->rd_idattr = NULL;
-	bms_free(relation->rd_hotblockingattr);
-	relation->rd_hotblockingattr = NULL;
 	bms_free(relation->rd_summarizedattr);
 	relation->rd_summarizedattr = NULL;
+	bms_free(relation->rd_indexedattr);
+	relation->rd_indexedattr = NULL;
 
 	/*
 	 * Now save copies of the bitmaps in the relcache entry.  We intentionally
@@ -5528,8 +5536,8 @@ restart:
 	relation->rd_keyattr = bms_copy(uindexattrs);
 	relation->rd_pkattr = bms_copy(pkindexattrs);
 	relation->rd_idattr = bms_copy(idindexattrs);
-	relation->rd_hotblockingattr = bms_copy(hotblockingattrs);
 	relation->rd_summarizedattr = bms_copy(summarizedattrs);
+	relation->rd_indexedattr = bms_copy(indexedattrs);
 	relation->rd_attrsvalid = true;
 	MemoryContextSwitchTo(oldcxt);
 
@@ -5542,10 +5550,10 @@ restart:
 			return pkindexattrs;
 		case INDEX_ATTR_BITMAP_IDENTITY_KEY:
 			return idindexattrs;
-		case INDEX_ATTR_BITMAP_HOT_BLOCKING:
-			return hotblockingattrs;
 		case INDEX_ATTR_BITMAP_SUMMARIZED:
 			return summarizedattrs;
+		case INDEX_ATTR_BITMAP_INDEXED:
+			return indexedattrs;
 		default:
 			elog(ERROR, "unknown attrKind %u", attrKind);
 			return NULL;
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 3c0961ab36b..7abc8e24f21 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -365,10 +365,9 @@ extern TM_Result heap_delete(Relation relation, const ItemPointerData *tid,
 extern void heap_finish_speculative(Relation relation, const ItemPointerData *tid);
 extern void heap_abort_speculative(Relation relation, const ItemPointerData *tid);
 extern TM_Result heap_update(Relation relation, const ItemPointerData *otid,
-							 HeapTuple newtup,
-							 CommandId cid, Snapshot crosscheck, bool wait,
-							 TM_FailureData *tmfd, LockTupleMode *lockmode,
-							 TU_UpdateIndexes *update_indexes);
+							 HeapTuple newtup, CommandId cid, Snapshot crosscheck, bool wait,
+							 TM_FailureData *tmfd, const LockTupleMode lockmode,
+							 const Bitmapset *mix_attrs, const bool hot_allowed);
 extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple,
 								 CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy,
 								 bool follow_updates,
@@ -430,6 +429,12 @@ extern void log_heap_prune_and_freeze(Relation relation, Buffer buffer,
 									  OffsetNumber *dead, int ndead,
 									  OffsetNumber *unused, int nunused);
 
+/* in heap/heapam.c */
+extern bool HeapUpdateHotAllowable(Relation relation, const Bitmapset *mix_attrs,
+								   bool *summarized_only);
+extern LockTupleMode HeapUpdateDetermineLockmode(Relation relation,
+												 const Bitmapset *mix_attrs);
+
 /* in heap/vacuumlazy.c */
 extern void heap_vacuum_rel(Relation rel,
 							const VacuumParams params, BufferAccessStrategy bstrategy);
diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h
index 06084752245..19c58a76854 100644
--- a/src/include/access/tableam.h
+++ b/src/include/access/tableam.h
@@ -549,6 +549,7 @@ typedef struct TableAmRoutine
 								 bool wait,
 								 TM_FailureData *tmfd,
 								 LockTupleMode *lockmode,
+								 const Bitmapset *mix_attrs,
 								 TU_UpdateIndexes *update_indexes);
 
 	/* see table_tuple_lock() for reference about parameters */
@@ -1523,12 +1524,12 @@ static inline TM_Result
 table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot,
 				   CommandId cid, Snapshot snapshot, Snapshot crosscheck,
 				   bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode,
-				   TU_UpdateIndexes *update_indexes)
+				   const Bitmapset *mix_attrs, TU_UpdateIndexes *update_indexes)
 {
 	return rel->rd_tableam->tuple_update(rel, otid, slot,
 										 cid, snapshot, crosscheck,
-										 wait, tmfd,
-										 lockmode, update_indexes);
+										 wait, tmfd, lockmode,
+										 mix_attrs, update_indexes);
 }
 
 /*
@@ -2009,6 +2010,7 @@ extern void simple_table_tuple_delete(Relation rel, ItemPointer tid,
 									  Snapshot snapshot);
 extern void simple_table_tuple_update(Relation rel, ItemPointer otid,
 									  TupleTableSlot *slot, Snapshot snapshot,
+									  const Bitmapset *mix_attrs,
 									  TU_UpdateIndexes *update_indexes);
 
 
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index d46ba59895d..266d5309103 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -17,6 +17,7 @@
 #include "datatype/timestamp.h"
 #include "executor/execdesc.h"
 #include "fmgr.h"
+#include "nodes/execnodes.h"
 #include "nodes/lockoptions.h"
 #include "nodes/parsenodes.h"
 #include "utils/memutils.h"
@@ -606,6 +607,10 @@ extern TupleDesc ExecCleanTypeFromTL(List *targetList);
 extern TupleDesc ExecTypeFromExprList(List *exprList);
 extern void ExecTypeSetColNames(TupleDesc typeInfo, List *namesList);
 extern void UpdateChangedParamSet(PlanState *node, Bitmapset *newchg);
+extern Bitmapset *ExecCompareSlotAttrs(TupleDesc tupdesc,
+									   const Bitmapset *attrs,
+									   TupleTableSlot *old_tts,
+									   TupleTableSlot *new_tts);
 
 typedef struct TupOutputState
 {
@@ -803,5 +808,9 @@ extern ResultRelInfo *ExecLookupResultRelByOid(ModifyTableState *node,
 											   Oid resultoid,
 											   bool missing_ok,
 											   bool update_cache);
+extern Bitmapset *ExecUpdateModIdxAttrs(ResultRelInfo *relinfo,
+										EState *estate,
+										TupleTableSlot *old_tts,
+										TupleTableSlot *new_tts);
 
 #endif							/* EXECUTOR_H  */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 236830f6b93..10e5e9044ee 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -162,8 +162,8 @@ typedef struct RelationData
 	Bitmapset  *rd_keyattr;		/* cols that can be ref'd by foreign keys */
 	Bitmapset  *rd_pkattr;		/* cols included in primary key */
 	Bitmapset  *rd_idattr;		/* included in replica identity index */
-	Bitmapset  *rd_hotblockingattr; /* cols blocking HOT update */
 	Bitmapset  *rd_summarizedattr;	/* cols indexed by summarizing indexes */
+	Bitmapset  *rd_indexedattr; /* all cols referenced by indexes */
 
 	PublicationDesc *rd_pubdesc;	/* publication descriptor, or NULL */
 
diff --git a/src/include/utils/relcache.h b/src/include/utils/relcache.h
index 2700224939a..57b46ee54e5 100644
--- a/src/include/utils/relcache.h
+++ b/src/include/utils/relcache.h
@@ -69,8 +69,8 @@ typedef enum IndexAttrBitmapKind
 	INDEX_ATTR_BITMAP_KEY,
 	INDEX_ATTR_BITMAP_PRIMARY_KEY,
 	INDEX_ATTR_BITMAP_IDENTITY_KEY,
-	INDEX_ATTR_BITMAP_HOT_BLOCKING,
 	INDEX_ATTR_BITMAP_SUMMARIZED,
+	INDEX_ATTR_BITMAP_INDEXED,
 } IndexAttrBitmapKind;
 
 extern Bitmapset *RelationGetIndexAttrBitmap(Relation relation,
diff --git a/src/test/regress/expected/generated_virtual.out b/src/test/regress/expected/generated_virtual.out
index 6dab60c937b..7ebb7890d96 100644
--- a/src/test/regress/expected/generated_virtual.out
+++ b/src/test/regress/expected/generated_virtual.out
@@ -287,7 +287,7 @@ DETAIL:  Column "b" is a generated column.
 INSERT INTO gtest1v VALUES (8, DEFAULT), (9, DEFAULT);  -- error
 ERROR:  cannot insert a non-DEFAULT value into column "b"
 DETAIL:  Column "b" is a generated column.
-SELECT * FROM gtest1v;
+SELECT * FROM gtest1v ORDER BY a;
  a | b  
 ---+----
  3 |  6
diff --git a/src/test/regress/expected/triggers.out b/src/test/regress/expected/triggers.out
index 98dee63b50a..ef98fd0cccf 100644
--- a/src/test/regress/expected/triggers.out
+++ b/src/test/regress/expected/triggers.out
@@ -959,16 +959,24 @@ NOTICE:  main_view BEFORE UPDATE STATEMENT (before_view_upd_stmt)
 NOTICE:  main_view AFTER UPDATE STATEMENT (after_view_upd_stmt)
 UPDATE 0
 -- Delete from view using trigger
-DELETE FROM main_view WHERE a IN (20,21);
+DELETE FROM main_view WHERE a = 20 AND b = 31;
 NOTICE:  main_view BEFORE DELETE STATEMENT (before_view_del_stmt)
 NOTICE:  main_view INSTEAD OF DELETE ROW (instead_of_del)
-NOTICE:  OLD: (21,10)
-NOTICE:  main_view INSTEAD OF DELETE ROW (instead_of_del)
 NOTICE:  OLD: (20,31)
+NOTICE:  main_view AFTER DELETE STATEMENT (after_view_del_stmt)
+DELETE 1
+DELETE FROM main_view WHERE a = 21 AND b = 10;
+NOTICE:  main_view BEFORE DELETE STATEMENT (before_view_del_stmt)
+NOTICE:  main_view INSTEAD OF DELETE ROW (instead_of_del)
+NOTICE:  OLD: (21,10)
+NOTICE:  main_view AFTER DELETE STATEMENT (after_view_del_stmt)
+DELETE 1
+DELETE FROM main_view WHERE a = 21 AND b = 32;
+NOTICE:  main_view BEFORE DELETE STATEMENT (before_view_del_stmt)
 NOTICE:  main_view INSTEAD OF DELETE ROW (instead_of_del)
 NOTICE:  OLD: (21,32)
 NOTICE:  main_view AFTER DELETE STATEMENT (after_view_del_stmt)
-DELETE 3
+DELETE 1
 DELETE FROM main_view WHERE a = 31 RETURNING a, b;
 NOTICE:  main_view BEFORE DELETE STATEMENT (before_view_del_stmt)
 NOTICE:  main_view INSTEAD OF DELETE ROW (instead_of_del)
diff --git a/src/test/regress/expected/updatable_views.out b/src/test/regress/expected/updatable_views.out
index 9cea538b8e8..4877a1ddce9 100644
--- a/src/test/regress/expected/updatable_views.out
+++ b/src/test/regress/expected/updatable_views.out
@@ -372,15 +372,15 @@ INSERT INTO rw_view16 (a, b) VALUES (3, 'Row 3'); -- should be OK
 UPDATE rw_view16 SET a=3, aa=-3 WHERE a=3; -- should fail
 ERROR:  multiple assignments to same column "a"
 UPDATE rw_view16 SET aa=-3 WHERE a=3; -- should be OK
-SELECT * FROM base_tbl;
+SELECT * FROM base_tbl ORDER BY a;
  a  |   b    
 ----+--------
+ -3 | Row 3
  -2 | Row -2
  -1 | Row -1
   0 | Row 0
   1 | Row 1
   2 | Row 2
- -3 | Row 3
 (6 rows)
 
 DELETE FROM rw_view16 WHERE a=-3; -- should be OK
diff --git a/src/test/regress/sql/generated_virtual.sql b/src/test/regress/sql/generated_virtual.sql
index e750866d2d8..877152d6d69 100644
--- a/src/test/regress/sql/generated_virtual.sql
+++ b/src/test/regress/sql/generated_virtual.sql
@@ -127,7 +127,7 @@ ALTER VIEW gtest1v ALTER COLUMN b SET DEFAULT 100;
 INSERT INTO gtest1v VALUES (8, DEFAULT);  -- error
 INSERT INTO gtest1v VALUES (8, DEFAULT), (9, DEFAULT);  -- error
 
-SELECT * FROM gtest1v;
+SELECT * FROM gtest1v ORDER BY a;
 DELETE FROM gtest1v WHERE a >= 5;
 DROP VIEW gtest1v;
 
diff --git a/src/test/regress/sql/triggers.sql b/src/test/regress/sql/triggers.sql
index ea39817ee3d..6ceb61608ae 100644
--- a/src/test/regress/sql/triggers.sql
+++ b/src/test/regress/sql/triggers.sql
@@ -660,7 +660,9 @@ UPDATE main_view SET b = 32 WHERE a = 21 AND b = 31 RETURNING a, b;
 UPDATE main_view SET b = 0 WHERE false;
 
 -- Delete from view using trigger
-DELETE FROM main_view WHERE a IN (20,21);
+DELETE FROM main_view WHERE a = 20 AND b = 31;
+DELETE FROM main_view WHERE a = 21 AND b = 10;
+DELETE FROM main_view WHERE a = 21 AND b = 32;
 DELETE FROM main_view WHERE a = 31 RETURNING a, b;
 
 \set QUIET true
diff --git a/src/test/regress/sql/updatable_views.sql b/src/test/regress/sql/updatable_views.sql
index 1635adde2d4..160e7799715 100644
--- a/src/test/regress/sql/updatable_views.sql
+++ b/src/test/regress/sql/updatable_views.sql
@@ -125,7 +125,7 @@ INSERT INTO rw_view16 VALUES (3, 'Row 3', 3); -- should fail
 INSERT INTO rw_view16 (a, b) VALUES (3, 'Row 3'); -- should be OK
 UPDATE rw_view16 SET a=3, aa=-3 WHERE a=3; -- should fail
 UPDATE rw_view16 SET aa=-3 WHERE a=3; -- should be OK
-SELECT * FROM base_tbl;
+SELECT * FROM base_tbl ORDER BY a;
 DELETE FROM rw_view16 WHERE a=-3; -- should be OK
 -- Read-only views
 INSERT INTO ro_view17 VALUES (3, 'ROW 3');
-- 
2.51.2



view thread (44+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected]
  Subject: Re: Expanding HOT updates for expression and partial indexes
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox