From bae4f16a7a6a80d2d27ba26a7eac70b9dc089785 Mon Sep 17 00:00:00 2001 From: Nisha Moond Date: Mon, 22 Jun 2026 11:50:12 +0530 Subject: [PATCH v53] Add TAP test for parallel-apply deferred CLT insert race Test the race where, on an ERROR-level conflict, a parallel apply (PA) worker logs the deferred conflict row in a fresh transaction in its PG_CATCH path. AbortOutOfAnyTransaction() in the PG_CATCH releases the worker's transaction lock before the deferred insert runs, so the leader's pa_wait_for_xact_finish() unblocks, sees a non-finished state, raises the "lost connection to the logical replication parallel apply worker", and SIGTERMs the PA mid-insert -- the deferred conflict row is lost. Also verifies that PA correctly inserts the conflict into the CLT and also reports it in the server log with fix. --- src/backend/replication/logical/conflict.c | 8 + src/test/subscription/meson.build | 1 + .../t/039_pa_conflict_log_lock_wait.pl | 204 ++++++++++++++++++ 3 files changed, 213 insertions(+) create mode 100644 src/test/subscription/t/039_pa_conflict_log_lock_wait.pl diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index c2c15f055e6..6ec57180813 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -31,6 +31,7 @@ #include "storage/lmgr.h" #include "utils/array.h" #include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" @@ -484,6 +485,13 @@ ProcessPendingConflictLogTuple(void) StartTransactionCommand(); PushActiveSnapshot(GetTransactionSnapshot()); + /* + * Test hook: pause here so a TAP test can take a conflicting lock on + * the conflict log table before this transaction tries to open it. + * See src/test/subscription/t/039_pa_conflict_log_lock_wait.pl. + */ + INJECTION_POINT("clt-pending-flush-before-open", NULL); + /* Open conflict log table and insert the tuple */ conflictlogrel = GetConflictLogDestAndTable(&dest); Assert(conflictlogrel); diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index e71e95c6297..225f90a37b3 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -48,6 +48,7 @@ tests += { 't/036_sequences.pl', 't/037_except.pl', 't/038_walsnd_shutdown_timeout.pl', + 't/039_pa_conflict_log_lock_wait.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/039_pa_conflict_log_lock_wait.pl b/src/test/subscription/t/039_pa_conflict_log_lock_wait.pl new file mode 100644 index 00000000000..d69f5b43656 --- /dev/null +++ b/src/test/subscription/t/039_pa_conflict_log_lock_wait.pl @@ -0,0 +1,204 @@ +# Copyright (c) 2026, PostgreSQL Global Development Group + +# Test that a parallel apply (PA) worker correctly inserts a deferred +# conflict-log tuple even when, by the time it reaches +# ProcessPendingConflictLogTuple(), the conflict log table is held under +# ACCESS EXCLUSIVE by another session. +# +# The window we want to exercise is narrow: PA must already be past +# ReportApplyConflict() (so the error has fired and PA is in PG_CATCH), +# and the locker must take the CLT lock *before* PA reaches the second +# CLT open inside ProcessPendingConflictLogTuple(). An injection point +# pauses PA at exactly that point so the locker can grab the lock first; +# without it, the locker either takes the lock too early (PA blocks in +# the inline CLT open inside ReportApplyConflict, before the error has +# fired) or too late (PA inserts before the lock is taken). + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +# --------------------------------------------------------------------- +# Set up publisher and subscriber. Force every transaction to stream so +# the conflict is handled by a parallel apply worker rather than the +# leader. +# --------------------------------------------------------------------- +my $node_pub = PostgreSQL::Test::Cluster->new('publisher'); +$node_pub->init(allows_streaming => 'logical'); +$node_pub->append_conf('postgresql.conf', q{ +debug_logical_replication_streaming = immediate +logical_decoding_work_mem = 64kB +}); +$node_pub->start; + +my $node_sub = PostgreSQL::Test::Cluster->new('subscriber'); +$node_sub->init; +$node_sub->append_conf('postgresql.conf', q{ +shared_preload_libraries = 'injection_points' +max_logical_replication_workers = 4 +max_parallel_apply_workers_per_subscription = 2 +}); +$node_sub->start; + +# Replicated table; the pre-existing row on the subscriber is what makes +# the publisher's INSERT (id=1) trigger an INSERT_EXISTS conflict. +$node_pub->safe_psql('postgres', q{ + CREATE TABLE t (id int PRIMARY KEY, val text); + ALTER TABLE t REPLICA IDENTITY FULL; + CREATE PUBLICATION p FOR TABLE t; +}); + +$node_sub->safe_psql('postgres', q{ + CREATE TABLE t (id int PRIMARY KEY, val text); + INSERT INTO t VALUES (1, 'pre-existing'); + CREATE EXTENSION injection_points; +}); + +my $pub_connstr = $node_pub->connstr . ' dbname=postgres'; +$node_sub->safe_psql('postgres', qq{ + CREATE SUBSCRIPTION s + CONNECTION '$pub_connstr' + PUBLICATION p + WITH (streaming = parallel, + conflict_log_destination = 'all', + disable_on_error = true); +}); + +$node_sub->wait_for_subscription_sync($node_pub, 's'); + +# --------------------------------------------------------------------- +# Send a non-conflicting INSERT and then wait until pg_subscription_rel +# reaches 'r' (ready) on every relation. pa_can_start() requires +# AllTablesyncsReady(), which returns true only when every +# pg_subscription_rel row is 'r'. The 's' (syncdone) -> 'r' transition +# fires inside ProcessSyncingTablesForApply, which only flips the state +# when the apply worker's last_received LSN has advanced past the +# tablesync end LSN -- so we need a triggering commit on the publisher +# to drive last_received forward. Without this step, the conflict txn +# below would arrive while the table is still 's', pa_can_start() would +# return false, the leader would spool to file and apply serially, and +# no parallel apply worker would ever spawn. +# --------------------------------------------------------------------- +$node_pub->safe_psql('postgres', "INSERT INTO t VALUES (1000, 'warmup');"); +$node_sub->poll_query_until('postgres', + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r');" +) or die "subscription tables did not reach READY state"; + +# --------------------------------------------------------------------- +# Look up the per-subscription CLT name. +# --------------------------------------------------------------------- +my $sub_oid = $node_sub->safe_psql('postgres', + "SELECT oid FROM pg_subscription WHERE subname = 's'"); +my $clt = "pg_conflict.pg_conflict_log_$sub_oid"; +note "conflict log table for subscription s: $clt"; + +# --------------------------------------------------------------------- +# Arm the injection point. This pauses the PA worker inside +# ProcessPendingConflictLogTuple() — i.e. *after* the error has fired +# and the PG_CATCH has run, *before* the second open of the CLT. This +# is the exact window the deferred-insert path needs to be tested in. +# --------------------------------------------------------------------- +$node_sub->safe_psql('postgres', + "SELECT injection_points_attach('clt-pending-flush-before-open', 'wait');"); + +# --------------------------------------------------------------------- +# Drive the conflict. PA receives the streamed txn, hits INSERT_EXISTS +# inside ReportApplyConflict (which opens/closes the CLT cleanly while +# preparing the deferred tuple), then ereport(ERROR) fires, PG_CATCH +# runs, and PA enters ProcessPendingConflictLogTuple — where it pauses +# at the injection point. +# --------------------------------------------------------------------- +my $log_offset = -s $node_sub->logfile; + +$node_pub->safe_psql('postgres', q{ + BEGIN; + INSERT INTO t SELECT g, repeat('x', 1000) FROM generate_series(2, 200) g; + INSERT INTO t VALUES (1, 'conflict'); + COMMIT; +}); + +# Wait until PA is parked at the injection point. +$node_sub->wait_for_event('logical replication parallel worker', + 'clt-pending-flush-before-open'); + +# --------------------------------------------------------------------- +# Now take ACCESS EXCLUSIVE on the CLT. TRUNCATE is permitted on CLTs; +# At this point the CLT is empty, so the TRUNCATE is effectively a no-op +# that just acquires the lock. +# Because PA is paused at the injection point, this lock is guaranteed +# to be acquired *before* PA tries to open the CLT. +# --------------------------------------------------------------------- +my $locker = $node_sub->background_psql('postgres'); +$locker->query_until(qr/locker_ready/, qq{ + \\echo locker_ready + BEGIN; + TRUNCATE $clt; +}); + +# --------------------------------------------------------------------- +# Wake the PA from the injection point. It will now try to open the +# CLT inside ProcessPendingConflictLogTuple and block on the lock the +# locker session holds. +# --------------------------------------------------------------------- +$node_sub->safe_psql('postgres', + "SELECT injection_points_wakeup('clt-pending-flush-before-open'); + SELECT injection_points_detach('clt-pending-flush-before-open');"); + +# Confirm the PA worker is actually parked waiting on the CLT lock — +# this verifies we are exercising the deferred-insert lock-wait path, +# not racing past it. +my $clt_oid = $node_sub->safe_psql('postgres', + "SELECT '$clt'::regclass::oid"); +ok( $node_sub->poll_query_until( + 'postgres', qq{ + SELECT EXISTS ( + SELECT 1 + FROM pg_locks l + JOIN pg_stat_activity a ON l.pid = a.pid + WHERE NOT l.granted + AND l.relation = $clt_oid + AND a.backend_type = 'logical replication parallel worker' + ); + }, 't'), + 'PA worker is blocked on the CLT lock inside ProcessPendingConflictLogTuple'); + +# --------------------------------------------------------------------- +# Release the lock. PA wakes, inserts the deferred row, commits its +# CLT txn, re-throws the original error to the leader, and the leader +# disables the subscription (disable_on_error = true). +# --------------------------------------------------------------------- +$locker->query_safe('COMMIT;'); +ok($locker->quit, 'locker session closed cleanly'); + +ok( $node_sub->poll_query_until( + 'postgres', + "SELECT subenabled = false FROM pg_subscription WHERE subname = 's'", + 't'), + 'subscription disabled after the conflict'); + +# --------------------------------------------------------------------- +# Verify the deferred conflict log tuple survived the lock wait. +# --------------------------------------------------------------------- +my $rows = $node_sub->safe_psql('postgres', + "SELECT count(*) FROM $clt WHERE conflict_type = 'insert_exists'"); +is($rows, '1', + 'deferred CLT insert by PA worker succeeded after lock release'); + +# --------------------------------------------------------------------- +# Also verify the conflict was reported in the server log +# (conflict_log_destination = 'all' logs to both the table and the log). +# --------------------------------------------------------------------- +my $log_contents = slurp_file($node_sub->logfile, $log_offset); +like( + $log_contents, + qr/ERROR:\s+conflict detected on relation "public\.t": conflict=insert_exists/, + 'conflict reported in server log'); + +done_testing(); -- 2.50.1 (Apple Git-155)