public inbox for [email protected]
help / color / mirror / Atom feedFrom: Bharath Rupireddy <[email protected]>
To: Hayato Kuroda (Fujitsu) <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Subject: Re: Add tests for concurrent DML retry paths in logical replication apply
Date: Tue, 28 Apr 2026 18:45:00 -0700
Message-ID: <CALj2ACWLpxQ=z-oO6QNKW6PiT9A-dS6mYKga+m8=+sko=ggr+w@mail.gmail.com> (raw)
In-Reply-To: <OS9PR01MB121493EB8A34905A6155EC918F5372@OS9PR01MB12149.jpnprd01.prod.outlook.com>
References: <CALj2ACV6ESpggn2Az=OdZBzVx7jiBG9O_EdKdgbk2chAJusC2w@mail.gmail.com>
<OS9PR01MB121493EB8A34905A6155EC918F5372@OS9PR01MB12149.jpnprd01.prod.outlook.com>
Hi,
On Tue, Apr 28, 2026 at 12:47 AM Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> > While reading the logical replication apply code in execReplication.c, I noticed
> > that the retry paths in RelationFindReplTupleByIndex and RelationFindReplTupleSeq
> > for concurrent updates and deletes have no test coverage [1]. Specifically,
> > when the same tuple is being updated/deleted on the publisher and subscriber at
> > the same time, the dirty snapshot finds the tuple being modified by another
> > transaction, the apply worker waits and retries the index/sequential scan.
>
> Good catch.
>
> > The attached patch adds an injection point before table_tuple_lock and a TAP test
> > exercising these retry paths, hitting both TM_Updated and TM_Deleted.
>
> I read the code briefly. Here are questions:
Thank you for reviewing!
> 1.
> The test looks like to add the test for retry acquiring the lock. But there is
> another retry path, which waits till xwait finishes. Do you have a reason to
> skip testing?
There are two retry paths, and I added TAP tests covering both. The
XactLockTableWait path is tested by holding an in-progress transaction
on the subscriber, which naturally blocks the apply worker until the
transaction finishes, then it retries the scan. The table_tuple_lock
retry path (TM_Updated / TM_Deleted) is tested using an injection
point that pauses the worker between finding and locking the tuple,
allowing concurrent DML to intervene.
> 2.
> Is it OK to use the same injection point name twice? I cannot find the existing
> case.
Good catch. I used separate injection points for each path.
Please find the attached v2 patch for further review. Thank you!
--
Bharath Rupireddy
Amazon Web Services: https://aws.amazon.com
Attachments:
[application/x-patch] v2-0001-Add-tests-for-concurrent-DML-retry-paths-in-logic.patch (13.5K, 2-v2-0001-Add-tests-for-concurrent-DML-retry-paths-in-logic.patch)
download | inline diff:
From 886da74509f5f885e76767b994ed1074d464d071 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <[email protected]>
Date: Tue, 28 Apr 2026 18:20:34 +0000
Subject: [PATCH v2] Add tests for concurrent DML retry paths in logical
replication apply.
The retry paths in RelationFindReplTupleByIndex and
RelationFindReplTupleSeq for concurrent updates and deletes had
no test coverage. The apply worker uses a dirty snapshot to find
the target tuple on the subscriber (index scan when available,
sequential scan for REPLICA IDENTITY FULL). When it detects an
in-progress transaction on the target tuple, it waits via
XactLockTableWait, then retries the scan. Once found, it locks
the tuple with table_tuple_lock and applies the update or delete
from the publisher.
This commit adds TAP tests covering both retry paths. The
XactLockTableWait path is tested by holding an in-progress transaction
on the subscriber, which naturally blocks the apply worker. The
table_tuple_lock retry path (TM_Updated / TM_Deleted) is tested
using an injection point that pauses the worker between finding
and locking the tuple, allowing concurrent DML to intervene.
While here, fix minor inefficiencies in the retry handling. In
RelationFindReplTupleByIndex, avoid calling ExecMaterializeSlot
before the retry check. In RelationFindReplTupleSeq, remove the
unnecessary separate TupleTableSlot allocation and ExecCopySlot
call by using outslot directly for scanning, keeping it
consistent with RelationFindReplTupleByIndex.
Author: Bharath Rupireddy <[email protected]>
Reviewed-by: Hayato Kuroda <[email protected]>
Discussion: https://www.postgresql.org/message-id/CALj2ACV6ESpggn2Az%3DOdZBzVx7jiBG9O_EdKdgbk2chAJusC2w%40mail.gmail.com
---
src/backend/executor/execReplication.c | 25 +-
src/test/subscription/meson.build | 1 +
.../t/039_concurrent_dml_retry.pl | 225 ++++++++++++++++++
3 files changed, 241 insertions(+), 10 deletions(-)
create mode 100644 src/test/subscription/t/039_concurrent_dml_retry.pl
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index b2ca5cbf117..5a8fed5536a 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -37,6 +37,8 @@
#include "utils/syscache.h"
#include "utils/typcache.h"
+#include "utils/injection_point.h"
+
static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
TypeCacheEntry **eq, Bitmapset *columns);
@@ -229,8 +231,6 @@ retry:
continue;
}
- ExecMaterializeSlot(outslot);
-
xwait = TransactionIdIsValid(snap.xmin) ?
snap.xmin : snap.xmax;
@@ -255,6 +255,8 @@ retry:
TM_FailureData tmfd;
TM_Result res;
+ INJECTION_POINT("find-repl-tuple-by-index-before-lock", NULL);
+
PushActiveSnapshot(GetLatestSnapshot());
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
@@ -269,6 +271,9 @@ retry:
if (should_refetch_tuple(res, &tmfd))
goto retry;
+
+ /* Materialize the slot so it preserves pass-by-ref values. */
+ ExecMaterializeSlot(outslot);
}
index_endscan(scan);
@@ -370,7 +375,6 @@ bool
RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
TupleTableSlot *searchslot, TupleTableSlot *outslot)
{
- TupleTableSlot *scanslot;
TableScanDesc scan;
SnapshotData snap;
TypeCacheEntry **eq;
@@ -386,7 +390,6 @@ RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
InitDirtySnapshot(snap);
scan = table_beginscan(rel, &snap, 0, NULL,
SO_NONE);
- scanslot = table_slot_create(rel, NULL);
retry:
found = false;
@@ -394,14 +397,11 @@ retry:
table_rescan(scan, NULL);
/* Try to find the tuple */
- while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
+ while (table_scan_getnextslot(scan, ForwardScanDirection, outslot))
{
- if (!tuples_equal(scanslot, searchslot, eq, NULL))
+ if (!tuples_equal(outslot, searchslot, eq, NULL))
continue;
- found = true;
- ExecCopySlot(outslot, scanslot);
-
xwait = TransactionIdIsValid(snap.xmin) ?
snap.xmin : snap.xmax;
@@ -416,6 +416,7 @@ retry:
}
/* Found our tuple and it's not locked */
+ found = true;
break;
}
@@ -425,6 +426,8 @@ retry:
TM_FailureData tmfd;
TM_Result res;
+ INJECTION_POINT("find-repl-tuple-seq-before-lock", NULL);
+
PushActiveSnapshot(GetLatestSnapshot());
res = table_tuple_lock(rel, &(outslot->tts_tid), GetActiveSnapshot(),
@@ -439,10 +442,12 @@ retry:
if (should_refetch_tuple(res, &tmfd))
goto retry;
+
+ /* Materialize the slot so it preserves pass-by-ref values. */
+ ExecMaterializeSlot(outslot);
}
table_endscan(scan);
- ExecDropSingleTupleTableSlot(scanslot);
return found;
}
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index e71e95c6297..5df7f143721 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -48,6 +48,7 @@ tests += {
't/036_sequences.pl',
't/037_except.pl',
't/038_walsnd_shutdown_timeout.pl',
+ 't/039_concurrent_dml_retry.pl',
't/100_bugs.pl',
],
},
diff --git a/src/test/subscription/t/039_concurrent_dml_retry.pl b/src/test/subscription/t/039_concurrent_dml_retry.pl
new file mode 100644
index 00000000000..570d6c2f836
--- /dev/null
+++ b/src/test/subscription/t/039_concurrent_dml_retry.pl
@@ -0,0 +1,225 @@
+# Copyright (c) 2026, PostgreSQL Global Development Group
+
+# Test concurrent DML retry paths in logical replication apply.
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Create publisher.
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber.
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create tables on both sides and set up replication.
+$node_publisher->safe_psql('postgres', qq(
+ CREATE TABLE test_tab (a int PRIMARY KEY, b int, c text);
+ CREATE TABLE test_tab_full (a int, b int, c text);
+ ALTER TABLE test_tab_full REPLICA IDENTITY FULL;
+));
+
+$node_subscriber->safe_psql('postgres', qq(
+ CREATE TABLE test_tab (a int PRIMARY KEY, b int, c text);
+ CREATE TABLE test_tab_full (a int, b int, c text);
+ ALTER TABLE test_tab_full REPLICA IDENTITY FULL;
+));
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+ "CREATE PUBLICATION test_pub FOR TABLE test_tab, test_tab_full;");
+
+my $appname = 'test_sub';
+$node_subscriber->safe_psql('postgres',
+ "CREATE SUBSCRIPTION test_sub
+ CONNECTION '$publisher_connstr application_name=$appname'
+ PUBLICATION test_pub;");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# Insert test data. Rows 1-2 are used by XactLockTableWait tests,
+# rows 3-4 by injection-point tests.
+$node_publisher->safe_psql('postgres', qq(
+ INSERT INTO test_tab VALUES (1, 10, 'foo'), (2, 20, 'bar'),
+ (3, 30, 'baz'), (4, 40, 'qux');
+ INSERT INTO test_tab_full VALUES (1, 10, 'foo'), (2, 20, 'bar'),
+ (3, 30, 'baz'), (4, 40, 'qux');
+));
+$node_publisher->wait_for_catchup($appname);
+
+# Test the apply worker retry when the dirty snapshot finds an in-progress
+# transaction on the target tuple, causing XactLockTableWait.
+
+my $sub_session = $node_subscriber->background_psql('postgres');
+
+sub test_retry_with_in_progress_xact
+{
+ my (%args) = @_;
+ my $test_name = $args{name};
+ my $dml = $args{dml};
+ my $verify_query = $args{verify_query};
+ my $verify_result = $args{verify_result};
+
+ # Modify the target tuple in an in-progress transaction on the subscriber.
+ $sub_session->query_safe("BEGIN;");
+ $sub_session->query_safe($dml);
+
+ # Run DML on the publisher.
+ $node_publisher->safe_psql('postgres', $dml);
+
+ # Wait for the apply worker to block on XactLockTableWait.
+ $node_subscriber->poll_query_until('postgres', qq(
+ SELECT count(*) > 0 FROM pg_stat_activity
+ WHERE backend_type = 'logical replication apply worker'
+ AND wait_event_type = 'Lock'
+ AND wait_event = 'transactionid';
+ )) or die "Timed out waiting for apply worker to block on XactLockTableWait";
+
+ # Abort so the apply worker wakes up, retries, and applies successfully.
+ $sub_session->query_safe("ROLLBACK;");
+ $node_publisher->wait_for_catchup($appname);
+
+ # Verify the results.
+ my $result = $node_subscriber->safe_psql('postgres', $verify_query);
+ is($result, $verify_result, $test_name);
+}
+
+# Index scan (PK): publisher UPDATE with in-progress subscriber transaction.
+test_retry_with_in_progress_xact(
+ name => 'XactLockTableWait index scan UPDATE',
+ dml => "UPDATE test_tab SET c = 'foo_u' WHERE a = 1;",
+ verify_query => "SELECT c FROM test_tab WHERE a = 1;",
+ verify_result => 'foo_u',
+);
+
+# Index scan (PK): publisher DELETE with in-progress subscriber transaction.
+test_retry_with_in_progress_xact(
+ name => 'XactLockTableWait index scan DELETE',
+ dml => "DELETE FROM test_tab WHERE a = 2;",
+ verify_query => "SELECT count(*) FROM test_tab WHERE a = 2;",
+ verify_result => '0',
+);
+
+# Seq scan (REPLICA IDENTITY FULL): publisher UPDATE with in-progress
+# subscriber transaction.
+test_retry_with_in_progress_xact(
+ name => 'XactLockTableWait seq scan UPDATE',
+ dml => "UPDATE test_tab_full SET c = 'foo_u' WHERE a = 1;",
+ verify_query => "SELECT c FROM test_tab_full WHERE a = 1;",
+ verify_result => 'foo_u',
+);
+
+# Seq scan (REPLICA IDENTITY FULL): publisher DELETE with in-progress
+# subscriber transaction.
+test_retry_with_in_progress_xact(
+ name => 'XactLockTableWait seq scan DELETE',
+ dml => "DELETE FROM test_tab_full WHERE a = 2;",
+ verify_query => "SELECT count(*) FROM test_tab_full WHERE a = 2;",
+ verify_result => '0',
+);
+
+$sub_session->quit;
+
+# Test the apply worker retry when table_tuple_lock detects a concurrently
+# updated or deleted tuple (TM_Updated / TM_Deleted). An injection point pauses
+# the worker between finding the tuple and locking it, allowing concurrent DML
+# to intervene.
+
+sub test_retry_with_concurrent_dml_before_tuple_lock
+{
+ my (%args) = @_;
+ my $test_name = $args{name};
+ my $inj_point = $args{inj_point};
+ my $dml = $args{dml};
+ my $expected_log = $args{expected_log};
+ my $verify_query = $args{verify_query};
+ my $verify_result = $args{verify_result};
+
+ $node_subscriber->safe_psql('postgres',
+ "SELECT injection_points_attach('$inj_point', 'wait');");
+
+ # Run DML on the publisher.
+ $node_publisher->safe_psql('postgres', $dml);
+
+ $node_subscriber->wait_for_event('logical replication apply worker',
+ $inj_point);
+
+ # Run DML on the subscriber.
+ $node_subscriber->safe_psql('postgres', $dml);
+
+ my $log_offset = -s $node_subscriber->logfile;
+
+ # Detach before wakeup so the retry doesn't hit the same injection point.
+ $node_subscriber->safe_psql('postgres',
+ "SELECT injection_points_detach('$inj_point');
+ SELECT injection_points_wakeup('$inj_point');");
+
+ $node_subscriber->wait_for_log($expected_log, $log_offset);
+ pass("$test_name: concurrent modification detected and retried");
+
+ $node_publisher->wait_for_catchup($appname);
+
+ my $result = $node_subscriber->safe_psql('postgres', $verify_query);
+ is($result, $verify_result, "$test_name: data correct after retry");
+}
+
+# Check whether injection_points extension is available on the subscriber.
+my $injection_points_supported =
+ $node_subscriber->check_extension('injection_points');
+
+if ($injection_points_supported != 0)
+{
+ $node_subscriber->append_conf('postgresql.conf',
+ "shared_preload_libraries = 'injection_points'");
+ $node_subscriber->restart;
+
+ $node_subscriber->safe_psql('postgres',
+ "CREATE EXTENSION injection_points;");
+
+ # TM_Updated via index scan (PK).
+ test_retry_with_concurrent_dml_before_tuple_lock(
+ name => 'index scan TM_Updated',
+ inj_point => 'find-repl-tuple-by-index-before-lock',
+ dml => "UPDATE test_tab SET c = 'baz_u' WHERE a = 3;",
+ expected_log => qr/concurrent update, retrying/,
+ verify_query => "SELECT c FROM test_tab WHERE a = 3;",
+ verify_result => 'baz_u',
+ );
+
+ # TM_Deleted via index scan (PK).
+ test_retry_with_concurrent_dml_before_tuple_lock(
+ name => 'index scan TM_Deleted',
+ inj_point => 'find-repl-tuple-by-index-before-lock',
+ dml => "DELETE FROM test_tab WHERE a = 4;",
+ expected_log => qr/concurrent delete, retrying/,
+ verify_query => "SELECT count(*) FROM test_tab WHERE a = 4;",
+ verify_result => '0',
+ );
+
+ # TM_Updated via seq scan (REPLICA IDENTITY FULL).
+ test_retry_with_concurrent_dml_before_tuple_lock(
+ name => 'seq scan TM_Updated',
+ inj_point => 'find-repl-tuple-seq-before-lock',
+ dml => "UPDATE test_tab_full SET c = 'baz_u' WHERE a = 3;",
+ expected_log => qr/concurrent update, retrying/,
+ verify_query => "SELECT c FROM test_tab_full WHERE a = 3;",
+ verify_result => 'baz_u',
+ );
+
+ # TM_Deleted via seq scan (REPLICA IDENTITY FULL).
+ test_retry_with_concurrent_dml_before_tuple_lock(
+ name => 'seq scan TM_Deleted',
+ inj_point => 'find-repl-tuple-seq-before-lock',
+ dml => "DELETE FROM test_tab_full WHERE a = 4;",
+ expected_log => qr/concurrent delete, retrying/,
+ verify_query => "SELECT count(*) FROM test_tab_full WHERE a = 4;",
+ verify_result => '0',
+ );
+}
+
+done_testing();
--
2.47.3
view thread (3+ messages)
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Add tests for concurrent DML retry paths in logical replication apply
In-Reply-To: <CALj2ACWLpxQ=z-oO6QNKW6PiT9A-dS6mYKga+m8=+sko=ggr+w@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox