public inbox for [email protected]help / color / mirror / Atom feed
Add tests for concurrent DML retry paths in logical replication apply 3+ messages / 2 participants [nested] [flat]
* Add tests for concurrent DML retry paths in logical replication apply @ 2026-04-27 07:30 Bharath Rupireddy <[email protected]> 2026-04-28 07:47 ` RE: Add tests for concurrent DML retry paths in logical replication apply Hayato Kuroda (Fujitsu) <[email protected]> 0 siblings, 1 reply; 3+ messages in thread From: Bharath Rupireddy @ 2026-04-27 07:30 UTC (permalink / raw) To: PostgreSQL Hackers <[email protected]> Hi hackers, While reading the logical replication apply code in execReplication.c, I noticed that the retry paths in RelationFindReplTupleByIndex and RelationFindReplTupleSeq for concurrent updates and deletes have no test coverage [1]. Specifically, when the same tuple is being updated/deleted on the publisher and subscriber at the same time, the dirty snapshot finds the tuple being modified by another transaction, the apply worker waits and retries the index/sequential scan. The attached patch adds an injection point before table_tuple_lock and a TAP test exercising these retry paths, hitting both TM_Updated and TM_Deleted. While working on this, I also noticed minor issues in the conflict handling code: 1/ In RelationFindReplTupleByIndex, ExecMaterializeSlot was called before checking should_refetch_tuple. If the tuple needs to be refetched due to a concurrent modification, this materialization is wasted work. Moved it after the retry check, so it only runs when we've successfully locked the tuple. 2/ In RelationFindReplTupleSeq, ExecCopySlot and a separate TupleTableSlot allocation were unnecessary. Made this function consistent with RelationFindReplTupleByIndex by using outslot directly while scanning the heap, avoiding the extra TTS allocation and copy overhead. I'm aware that these are not major performance issues in practice, but it keeps the two functions consistent and avoids unnecessary TTS materialize and copy costs. I also think that we could deduplicate these two functions since the code looks mostly the same, but that would be an overkill IMHO. Please find the attached patch. Appreciate any feedback. Thank you! [1] https://coverage.postgresql.org/src/backend/executor/execReplication.c.gcov.html -- Bharath Rupireddy Amazon Web Services: https://aws.amazon.com Attachments: [application/x-patch] v1-0001-Add-tests-for-concurrent-DML-retry-paths-in-logic.patch (10.2K, 3-v1-0001-Add-tests-for-concurrent-DML-retry-paths-in-logic.patch) download | inline diff: From bcf3cb77bd1ee923f128bc4607b66a6287762743 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Tue, 21 Apr 2026 20:24:02 +0000 Subject: [PATCH v1] Add tests for concurrent DML retry paths in logical replication apply. The retry paths in RelationFindReplTupleByIndex and RelationFindReplTupleSeq for concurrent updates and deletes had no test coverage. When a tuple is concurrently modified on the subscriber while the apply worker is trying to lock it, table_tuple_lock returns TM_Updated or TM_Deleted, and the worker retries the scan. This commit adds an injection point and a TAP test that exercises these retry paths for both index scan and sequential scan. While here, fix minor inefficiency in the retry handling. In RelationFindReplTupleByIndex, avoid calling ExecMaterializeSlot before the retry check. In RelationFindReplTupleSeq, remove the unnecessary separate TupleTableSlot allocation and ExecCopySlot call by using outslot directly for scanning, keeping it consistent with RelationFindReplTupleByIndex. --- src/backend/executor/execReplication.c | 25 +-- src/test/subscription/meson.build | 1 + .../t/039_concurrent_dml_retry.pl | 152 ++++++++++++++++++ 3 files changed, 168 insertions(+), 10 deletions(-) create mode 100644 src/test/subscription/t/039_concurrent_dml_retry.pl diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index b2ca5cbf117..0d219e35daa 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -37,6 +37,8 @@ #include "utils/syscache.h" #include "utils/typcache.h" +#include "utils/injection_point.h" + static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq, Bitmapset *columns); @@ -229,8 +231,6 @@ retry: continue; } - ExecMaterializeSlot(outslot); - xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; @@ -255,6 +255,8 @@ retry: TM_FailureData tmfd; TM_Result res; + INJECTION_POINT("find-repl-tuple-before-lock", NULL); + PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), @@ -269,6 +271,9 @@ retry: if (should_refetch_tuple(res, &tmfd)) goto retry; + + /* Materialize the slot so it preserves pass-by-ref values. */ + ExecMaterializeSlot(outslot); } index_endscan(scan); @@ -370,7 +375,6 @@ bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot) { - TupleTableSlot *scanslot; TableScanDesc scan; SnapshotData snap; TypeCacheEntry **eq; @@ -386,7 +390,6 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, InitDirtySnapshot(snap); scan = table_beginscan(rel, &snap, 0, NULL, SO_NONE); - scanslot = table_slot_create(rel, NULL); retry: found = false; @@ -394,14 +397,11 @@ retry: table_rescan(scan, NULL); /* Try to find the tuple */ - while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + while (table_scan_getnextslot(scan, ForwardScanDirection, outslot)) { - if (!tuples_equal(scanslot, searchslot, eq, NULL)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; - found = true; - ExecCopySlot(outslot, scanslot); - xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; @@ -416,6 +416,7 @@ retry: } /* Found our tuple and it's not locked */ + found = true; break; } @@ -425,6 +426,8 @@ retry: TM_FailureData tmfd; TM_Result res; + INJECTION_POINT("find-repl-tuple-before-lock", NULL); + PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), @@ -439,10 +442,12 @@ retry: if (should_refetch_tuple(res, &tmfd)) goto retry; + + /* Materialize the slot so it preserves pass-by-ref values. */ + ExecMaterializeSlot(outslot); } table_endscan(scan); - ExecDropSingleTupleTableSlot(scanslot); return found; } diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index e71e95c6297..5df7f143721 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -48,6 +48,7 @@ tests += { 't/036_sequences.pl', 't/037_except.pl', 't/038_walsnd_shutdown_timeout.pl', + 't/039_concurrent_dml_retry.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/039_concurrent_dml_retry.pl b/src/test/subscription/t/039_concurrent_dml_retry.pl new file mode 100644 index 00000000000..074aeafbcc4 --- /dev/null +++ b/src/test/subscription/t/039_concurrent_dml_retry.pl @@ -0,0 +1,152 @@ +# Copyright (c) 2025-2026, PostgreSQL Global Development Group + +# Test concurrent update/delete retry paths in logical replication apply. +# +# Uses injection points to pause the apply worker after finding a tuple but +# before locking it, allowing a concurrent session to modify or delete the +# same row. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Check if injection points are available. +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +if ($node_publisher->check_extension('injection_points') == 0) +{ + $node_publisher->stop; + plan skip_all => 'injection_points not supported'; +} + +# Subscriber needs injection_points loaded. +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf('postgresql.conf', + "shared_preload_libraries = 'injection_points'"); +$node_subscriber->start; + +# Create tables on both publisher and subscriber, and set up replication. +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_tab (a int PRIMARY KEY, b int); + CREATE TABLE test_tab_full (a int, b int); + ALTER TABLE test_tab_full REPLICA IDENTITY FULL; +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_tab (a int PRIMARY KEY, b int); + CREATE TABLE test_tab_full (a int, b int); + ALTER TABLE test_tab_full REPLICA IDENTITY FULL; + CREATE EXTENSION injection_points; +)); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION test_pub FOR TABLE test_tab, test_tab_full;"); + +my $appname = 'test_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION test_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION test_pub;"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +# Pre-insert all test data in a single batch to avoid multiple +# wait_for_catchup round trips. +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_tab VALUES (1, 10), (2, 30); + INSERT INTO test_tab_full VALUES (1, 100), (2, 300); +)); +$node_publisher->wait_for_catchup($appname); + +# Helper to run a single concurrent DML retry test using injection points. +sub test_concurrent_retry +{ + my (%args) = @_; + my $test_name = $args{name}; + my $pub_dml = $args{pub_dml}; + my $sub_dml = $args{sub_dml}; + my $expected_log = $args{expected_log}; + my $verify_query = $args{verify_query}; + my $verify_result = $args{verify_result}; + + # Attach injection point. + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('find-repl-tuple-before-lock', 'wait');"); + + # Publish the DML that will trigger the apply worker to find + lock. + $node_publisher->safe_psql('postgres', $pub_dml); + + # Wait for the apply worker to hit the injection point. + $node_subscriber->wait_for_event('logical replication apply worker', + 'find-repl-tuple-before-lock'); + + # Execute concurrent DML on subscriber while apply worker is paused. + $node_subscriber->safe_psql('postgres', $sub_dml); + + my $log_offset = -s $node_subscriber->logfile; + + # Detach first so the retry loop doesn't hit the injection point again, + # then wake up the apply worker. + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_detach('find-repl-tuple-before-lock'); + SELECT injection_points_wakeup('find-repl-tuple-before-lock');"); + + # Confirm the expected log message. + $node_subscriber->wait_for_log($expected_log, $log_offset); + pass("$test_name: concurrent modification detected and retried"); + + # Wait for apply to finish and verify result. + $node_publisher->wait_for_catchup($appname); + + my $result = $node_subscriber->safe_psql('postgres', $verify_query); + is($result, $verify_result, "$test_name: data correct after retry"); +} + +# TM_Updated via index scan (PK): concurrent update on subscriber. +test_concurrent_retry( + name => 'index scan TM_Updated', + pub_dml => "UPDATE test_tab SET b = 20 WHERE a = 1;", + sub_dml => "UPDATE test_tab SET b = 99 WHERE a = 1;", + expected_log => qr/concurrent update, retrying/, + verify_query => "SELECT b FROM test_tab WHERE a = 1;", + verify_result => '20', +); + +# TM_Deleted via index scan (PK): concurrent delete on subscriber. +test_concurrent_retry( + name => 'index scan TM_Deleted', + pub_dml => "UPDATE test_tab SET b = 40 WHERE a = 2;", + sub_dml => "DELETE FROM test_tab WHERE a = 2;", + expected_log => qr/concurrent delete, retrying/, + verify_query => "SELECT count(*) FROM test_tab WHERE a = 2;", + verify_result => '0', +); + +# TM_Updated via seq scan (REPLICA IDENTITY FULL): concurrent update on +# subscriber. +test_concurrent_retry( + name => 'seq scan TM_Updated', + pub_dml => "UPDATE test_tab_full SET b = 200 WHERE a = 1;", + sub_dml => "UPDATE test_tab_full SET b = 999 WHERE a = 1;", + expected_log => qr/concurrent update, retrying/, + verify_query => "SELECT b FROM test_tab_full WHERE a = 1;", + verify_result => '999', +); + +# TM_Deleted via seq scan (REPLICA IDENTITY FULL): concurrent delete on +# subscriber. +test_concurrent_retry( + name => 'seq scan TM_Deleted', + pub_dml => "UPDATE test_tab_full SET b = 400 WHERE a = 2;", + sub_dml => "DELETE FROM test_tab_full WHERE a = 2;", + expected_log => qr/concurrent delete, retrying/, + verify_query => "SELECT count(*) FROM test_tab_full WHERE a = 2;", + verify_result => '0', +); + +done_testing(); -- 2.47.3 ^ permalink raw reply [nested|flat] 3+ messages in thread
* RE: Add tests for concurrent DML retry paths in logical replication apply 2026-04-27 07:30 Add tests for concurrent DML retry paths in logical replication apply Bharath Rupireddy <[email protected]> @ 2026-04-28 07:47 ` Hayato Kuroda (Fujitsu) <[email protected]> 2026-04-29 01:45 ` Re: Add tests for concurrent DML retry paths in logical replication apply Bharath Rupireddy <[email protected]> 0 siblings, 1 reply; 3+ messages in thread From: Hayato Kuroda (Fujitsu) @ 2026-04-28 07:47 UTC (permalink / raw) To: 'Bharath Rupireddy' <[email protected]>; +Cc: PostgreSQL Hackers <[email protected]> Dear Bharath, > While reading the logical replication apply code in execReplication.c, I noticed > that the retry paths in RelationFindReplTupleByIndex and RelationFindReplTupleSeq > for concurrent updates and deletes have no test coverage [1]. Specifically, > when the same tuple is being updated/deleted on the publisher and subscriber at > the same time, the dirty snapshot finds the tuple being modified by another > transaction, the apply worker waits and retries the index/sequential scan. Good catch. > The attached patch adds an injection point before table_tuple_lock and a TAP test > exercising these retry paths, hitting both TM_Updated and TM_Deleted. I read the code briefly. Here are questions: 1. The test looks like to add the test for retry acquiring the lock. But there is another retry path, which waits till xwait finishes. Do you have a reason to skip testing? 2. Is it OK to use the same injection point name twice? I cannot find the existing case. Best regards, Hayato Kuroda FUJITSU LIMITED ^ permalink raw reply [nested|flat] 3+ messages in thread
* Re: Add tests for concurrent DML retry paths in logical replication apply 2026-04-27 07:30 Add tests for concurrent DML retry paths in logical replication apply Bharath Rupireddy <[email protected]> 2026-04-28 07:47 ` RE: Add tests for concurrent DML retry paths in logical replication apply Hayato Kuroda (Fujitsu) <[email protected]> @ 2026-04-29 01:45 ` Bharath Rupireddy <[email protected]> 0 siblings, 0 replies; 3+ messages in thread From: Bharath Rupireddy @ 2026-04-29 01:45 UTC (permalink / raw) To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: PostgreSQL Hackers <[email protected]> Hi, On Tue, Apr 28, 2026 at 12:47 AM Hayato Kuroda (Fujitsu) <[email protected]> wrote: > > > While reading the logical replication apply code in execReplication.c, I noticed > > that the retry paths in RelationFindReplTupleByIndex and RelationFindReplTupleSeq > > for concurrent updates and deletes have no test coverage [1]. Specifically, > > when the same tuple is being updated/deleted on the publisher and subscriber at > > the same time, the dirty snapshot finds the tuple being modified by another > > transaction, the apply worker waits and retries the index/sequential scan. > > Good catch. > > > The attached patch adds an injection point before table_tuple_lock and a TAP test > > exercising these retry paths, hitting both TM_Updated and TM_Deleted. > > I read the code briefly. Here are questions: Thank you for reviewing! > 1. > The test looks like to add the test for retry acquiring the lock. But there is > another retry path, which waits till xwait finishes. Do you have a reason to > skip testing? There are two retry paths, and I added TAP tests covering both. The XactLockTableWait path is tested by holding an in-progress transaction on the subscriber, which naturally blocks the apply worker until the transaction finishes, then it retries the scan. The table_tuple_lock retry path (TM_Updated / TM_Deleted) is tested using an injection point that pauses the worker between finding and locking the tuple, allowing concurrent DML to intervene. > 2. > Is it OK to use the same injection point name twice? I cannot find the existing > case. Good catch. I used separate injection points for each path. Please find the attached v2 patch for further review. Thank you! -- Bharath Rupireddy Amazon Web Services: https://aws.amazon.com Attachments: [application/x-patch] v2-0001-Add-tests-for-concurrent-DML-retry-paths-in-logic.patch (13.5K, 2-v2-0001-Add-tests-for-concurrent-DML-retry-paths-in-logic.patch) download | inline diff: From 886da74509f5f885e76767b994ed1074d464d071 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy <[email protected]> Date: Tue, 28 Apr 2026 18:20:34 +0000 Subject: [PATCH v2] Add tests for concurrent DML retry paths in logical replication apply. The retry paths in RelationFindReplTupleByIndex and RelationFindReplTupleSeq for concurrent updates and deletes had no test coverage. The apply worker uses a dirty snapshot to find the target tuple on the subscriber (index scan when available, sequential scan for REPLICA IDENTITY FULL). When it detects an in-progress transaction on the target tuple, it waits via XactLockTableWait, then retries the scan. Once found, it locks the tuple with table_tuple_lock and applies the update or delete from the publisher. This commit adds TAP tests covering both retry paths. The XactLockTableWait path is tested by holding an in-progress transaction on the subscriber, which naturally blocks the apply worker. The table_tuple_lock retry path (TM_Updated / TM_Deleted) is tested using an injection point that pauses the worker between finding and locking the tuple, allowing concurrent DML to intervene. While here, fix minor inefficiencies in the retry handling. In RelationFindReplTupleByIndex, avoid calling ExecMaterializeSlot before the retry check. In RelationFindReplTupleSeq, remove the unnecessary separate TupleTableSlot allocation and ExecCopySlot call by using outslot directly for scanning, keeping it consistent with RelationFindReplTupleByIndex. Author: Bharath Rupireddy <[email protected]> Reviewed-by: Hayato Kuroda <[email protected]> Discussion: https://www.postgresql.org/message-id/CALj2ACV6ESpggn2Az%3DOdZBzVx7jiBG9O_EdKdgbk2chAJusC2w%40mail.gmail.com --- src/backend/executor/execReplication.c | 25 +- src/test/subscription/meson.build | 1 + .../t/039_concurrent_dml_retry.pl | 225 ++++++++++++++++++ 3 files changed, 241 insertions(+), 10 deletions(-) create mode 100644 src/test/subscription/t/039_concurrent_dml_retry.pl diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index b2ca5cbf117..5a8fed5536a 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -37,6 +37,8 @@ #include "utils/syscache.h" #include "utils/typcache.h" +#include "utils/injection_point.h" + static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2, TypeCacheEntry **eq, Bitmapset *columns); @@ -229,8 +231,6 @@ retry: continue; } - ExecMaterializeSlot(outslot); - xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; @@ -255,6 +255,8 @@ retry: TM_FailureData tmfd; TM_Result res; + INJECTION_POINT("find-repl-tuple-by-index-before-lock", NULL); + PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), @@ -269,6 +271,9 @@ retry: if (should_refetch_tuple(res, &tmfd)) goto retry; + + /* Materialize the slot so it preserves pass-by-ref values. */ + ExecMaterializeSlot(outslot); } index_endscan(scan); @@ -370,7 +375,6 @@ bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, TupleTableSlot *searchslot, TupleTableSlot *outslot) { - TupleTableSlot *scanslot; TableScanDesc scan; SnapshotData snap; TypeCacheEntry **eq; @@ -386,7 +390,6 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode, InitDirtySnapshot(snap); scan = table_beginscan(rel, &snap, 0, NULL, SO_NONE); - scanslot = table_slot_create(rel, NULL); retry: found = false; @@ -394,14 +397,11 @@ retry: table_rescan(scan, NULL); /* Try to find the tuple */ - while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot)) + while (table_scan_getnextslot(scan, ForwardScanDirection, outslot)) { - if (!tuples_equal(scanslot, searchslot, eq, NULL)) + if (!tuples_equal(outslot, searchslot, eq, NULL)) continue; - found = true; - ExecCopySlot(outslot, scanslot); - xwait = TransactionIdIsValid(snap.xmin) ? snap.xmin : snap.xmax; @@ -416,6 +416,7 @@ retry: } /* Found our tuple and it's not locked */ + found = true; break; } @@ -425,6 +426,8 @@ retry: TM_FailureData tmfd; TM_Result res; + INJECTION_POINT("find-repl-tuple-seq-before-lock", NULL); + PushActiveSnapshot(GetLatestSnapshot()); res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(), @@ -439,10 +442,12 @@ retry: if (should_refetch_tuple(res, &tmfd)) goto retry; + + /* Materialize the slot so it preserves pass-by-ref values. */ + ExecMaterializeSlot(outslot); } table_endscan(scan); - ExecDropSingleTupleTableSlot(scanslot); return found; } diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index e71e95c6297..5df7f143721 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -48,6 +48,7 @@ tests += { 't/036_sequences.pl', 't/037_except.pl', 't/038_walsnd_shutdown_timeout.pl', + 't/039_concurrent_dml_retry.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/039_concurrent_dml_retry.pl b/src/test/subscription/t/039_concurrent_dml_retry.pl new file mode 100644 index 00000000000..570d6c2f836 --- /dev/null +++ b/src/test/subscription/t/039_concurrent_dml_retry.pl @@ -0,0 +1,225 @@ +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Test concurrent DML retry paths in logical replication apply. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Create publisher. +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +# Create subscriber. +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->start; + +# Create tables on both sides and set up replication. +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_tab (a int PRIMARY KEY, b int, c text); + CREATE TABLE test_tab_full (a int, b int, c text); + ALTER TABLE test_tab_full REPLICA IDENTITY FULL; +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_tab (a int PRIMARY KEY, b int, c text); + CREATE TABLE test_tab_full (a int, b int, c text); + ALTER TABLE test_tab_full REPLICA IDENTITY FULL; +)); + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION test_pub FOR TABLE test_tab, test_tab_full;"); + +my $appname = 'test_sub'; +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION test_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION test_pub;"); + +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname); + +# Insert test data. Rows 1-2 are used by XactLockTableWait tests, +# rows 3-4 by injection-point tests. +$node_publisher->safe_psql('postgres', qq( + INSERT INTO test_tab VALUES (1, 10, 'foo'), (2, 20, 'bar'), + (3, 30, 'baz'), (4, 40, 'qux'); + INSERT INTO test_tab_full VALUES (1, 10, 'foo'), (2, 20, 'bar'), + (3, 30, 'baz'), (4, 40, 'qux'); +)); +$node_publisher->wait_for_catchup($appname); + +# Test the apply worker retry when the dirty snapshot finds an in-progress +# transaction on the target tuple, causing XactLockTableWait. + +my $sub_session = $node_subscriber->background_psql('postgres'); + +sub test_retry_with_in_progress_xact +{ + my (%args) = @_; + my $test_name = $args{name}; + my $dml = $args{dml}; + my $verify_query = $args{verify_query}; + my $verify_result = $args{verify_result}; + + # Modify the target tuple in an in-progress transaction on the subscriber. + $sub_session->query_safe("BEGIN;"); + $sub_session->query_safe($dml); + + # Run DML on the publisher. + $node_publisher->safe_psql('postgres', $dml); + + # Wait for the apply worker to block on XactLockTableWait. + $node_subscriber->poll_query_until('postgres', qq( + SELECT count(*) > 0 FROM pg_stat_activity + WHERE backend_type = 'logical replication apply worker' + AND wait_event_type = 'Lock' + AND wait_event = 'transactionid'; + )) or die "Timed out waiting for apply worker to block on XactLockTableWait"; + + # Abort so the apply worker wakes up, retries, and applies successfully. + $sub_session->query_safe("ROLLBACK;"); + $node_publisher->wait_for_catchup($appname); + + # Verify the results. + my $result = $node_subscriber->safe_psql('postgres', $verify_query); + is($result, $verify_result, $test_name); +} + +# Index scan (PK): publisher UPDATE with in-progress subscriber transaction. +test_retry_with_in_progress_xact( + name => 'XactLockTableWait index scan UPDATE', + dml => "UPDATE test_tab SET c = 'foo_u' WHERE a = 1;", + verify_query => "SELECT c FROM test_tab WHERE a = 1;", + verify_result => 'foo_u', +); + +# Index scan (PK): publisher DELETE with in-progress subscriber transaction. +test_retry_with_in_progress_xact( + name => 'XactLockTableWait index scan DELETE', + dml => "DELETE FROM test_tab WHERE a = 2;", + verify_query => "SELECT count(*) FROM test_tab WHERE a = 2;", + verify_result => '0', +); + +# Seq scan (REPLICA IDENTITY FULL): publisher UPDATE with in-progress +# subscriber transaction. +test_retry_with_in_progress_xact( + name => 'XactLockTableWait seq scan UPDATE', + dml => "UPDATE test_tab_full SET c = 'foo_u' WHERE a = 1;", + verify_query => "SELECT c FROM test_tab_full WHERE a = 1;", + verify_result => 'foo_u', +); + +# Seq scan (REPLICA IDENTITY FULL): publisher DELETE with in-progress +# subscriber transaction. +test_retry_with_in_progress_xact( + name => 'XactLockTableWait seq scan DELETE', + dml => "DELETE FROM test_tab_full WHERE a = 2;", + verify_query => "SELECT count(*) FROM test_tab_full WHERE a = 2;", + verify_result => '0', +); + +$sub_session->quit; + +# Test the apply worker retry when table_tuple_lock detects a concurrently +# updated or deleted tuple (TM_Updated / TM_Deleted). An injection point pauses +# the worker between finding the tuple and locking it, allowing concurrent DML +# to intervene. + +sub test_retry_with_concurrent_dml_before_tuple_lock +{ + my (%args) = @_; + my $test_name = $args{name}; + my $inj_point = $args{inj_point}; + my $dml = $args{dml}; + my $expected_log = $args{expected_log}; + my $verify_query = $args{verify_query}; + my $verify_result = $args{verify_result}; + + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_attach('$inj_point', 'wait');"); + + # Run DML on the publisher. + $node_publisher->safe_psql('postgres', $dml); + + $node_subscriber->wait_for_event('logical replication apply worker', + $inj_point); + + # Run DML on the subscriber. + $node_subscriber->safe_psql('postgres', $dml); + + my $log_offset = -s $node_subscriber->logfile; + + # Detach before wakeup so the retry doesn't hit the same injection point. + $node_subscriber->safe_psql('postgres', + "SELECT injection_points_detach('$inj_point'); + SELECT injection_points_wakeup('$inj_point');"); + + $node_subscriber->wait_for_log($expected_log, $log_offset); + pass("$test_name: concurrent modification detected and retried"); + + $node_publisher->wait_for_catchup($appname); + + my $result = $node_subscriber->safe_psql('postgres', $verify_query); + is($result, $verify_result, "$test_name: data correct after retry"); +} + +# Check whether injection_points extension is available on the subscriber. +my $injection_points_supported = + $node_subscriber->check_extension('injection_points'); + +if ($injection_points_supported != 0) +{ + $node_subscriber->append_conf('postgresql.conf', + "shared_preload_libraries = 'injection_points'"); + $node_subscriber->restart; + + $node_subscriber->safe_psql('postgres', + "CREATE EXTENSION injection_points;"); + + # TM_Updated via index scan (PK). + test_retry_with_concurrent_dml_before_tuple_lock( + name => 'index scan TM_Updated', + inj_point => 'find-repl-tuple-by-index-before-lock', + dml => "UPDATE test_tab SET c = 'baz_u' WHERE a = 3;", + expected_log => qr/concurrent update, retrying/, + verify_query => "SELECT c FROM test_tab WHERE a = 3;", + verify_result => 'baz_u', + ); + + # TM_Deleted via index scan (PK). + test_retry_with_concurrent_dml_before_tuple_lock( + name => 'index scan TM_Deleted', + inj_point => 'find-repl-tuple-by-index-before-lock', + dml => "DELETE FROM test_tab WHERE a = 4;", + expected_log => qr/concurrent delete, retrying/, + verify_query => "SELECT count(*) FROM test_tab WHERE a = 4;", + verify_result => '0', + ); + + # TM_Updated via seq scan (REPLICA IDENTITY FULL). + test_retry_with_concurrent_dml_before_tuple_lock( + name => 'seq scan TM_Updated', + inj_point => 'find-repl-tuple-seq-before-lock', + dml => "UPDATE test_tab_full SET c = 'baz_u' WHERE a = 3;", + expected_log => qr/concurrent update, retrying/, + verify_query => "SELECT c FROM test_tab_full WHERE a = 3;", + verify_result => 'baz_u', + ); + + # TM_Deleted via seq scan (REPLICA IDENTITY FULL). + test_retry_with_concurrent_dml_before_tuple_lock( + name => 'seq scan TM_Deleted', + inj_point => 'find-repl-tuple-seq-before-lock', + dml => "DELETE FROM test_tab_full WHERE a = 4;", + expected_log => qr/concurrent delete, retrying/, + verify_query => "SELECT count(*) FROM test_tab_full WHERE a = 4;", + verify_result => '0', + ); +} + +done_testing(); -- 2.47.3 ^ permalink raw reply [nested|flat] 3+ messages in thread
end of thread, other threads:[~2026-04-29 01:45 UTC | newest] Thread overview: 3+ messages (download: mbox mbox.gz follow: Atom feed) -- links below jump to the message on this page -- 2026-04-27 07:30 Add tests for concurrent DML retry paths in logical replication apply Bharath Rupireddy <[email protected]> 2026-04-28 07:47 ` Hayato Kuroda (Fujitsu) <[email protected]> 2026-04-29 01:45 ` Bharath Rupireddy <[email protected]>
This inbox is served by agora; see mirroring instructions for how to clone and mirror all data and code used for this inbox