From 8806a07656c7a90668132dc17f051174f2a62ae6 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 19 Jun 2026 15:40:07 +0530 Subject: [PATCH v4] Don't tear down a parallel apply worker before it logs its conflict When a parallel apply worker hits an ERROR-level conflict it logs the conflict to the conflict log table in a new transaction in its error path, after aborting the failed apply transaction. But the leader detects worker failure in pa_wait_for_xact_finish() by waiting on the worker's transaction lock, and AbortOutOfAnyTransaction() releases that lock: the leader unblocks, sees a non-finished state, raises "lost connection to the logical replication parallel apply worker", and tears the worker down -- which can SIGTERM it mid-insert and lose the conflict log row, besides being a misleading message. Fix this by adding a PARALLEL_TRANS_ERROR state that the worker sets before aborting (while the leader is still blocked on the lock, so it is visible once the leader unblocks). On seeing it, the leader waits for the worker to report its actual error via the error queue -- which keeps the worker alive long enough to finish writing the conflict log tuple and lets the leader report the real error instead of "lost connection". The original message remains as the fallback for a worker that died without signalling; as a side effect, non-conflict worker failures now also surface the real error. --- .../replication/logical/applyparallelworker.c | 35 +++++++++++++++++++ src/include/replication/worker_internal.h | 2 ++ 2 files changed, 37 insertions(+) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 48cb5558367..a3f5b9b122d 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -1017,6 +1017,17 @@ ParallelApplyWorkerMain(Datum main_arg) FlushErrorState(); error_context_stack = NULL; + + /* + * Tell the leader we failed and are about to report the error and log + * the conflict. This must be set before AbortOutOfAnyTransaction() + * below releases the transaction lock that the leader waits on in + * pa_wait_for_xact_finish(); otherwise the leader would see a + * non-finished state, assume the connection was lost, and tear this + * worker down while it is still writing the conflict log tuple. + */ + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_ERROR); + AbortOutOfAnyTransaction(); /* @@ -1361,9 +1372,33 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo) * released. */ if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED) + { + /* + * If the worker signalled that it errored (PARALLEL_TRANS_ERROR), it is + * logging the conflict and will report the actual error via the error + * queue before exiting. Wait for that rather than reporting a generic + * lost connection: CHECK_FOR_INTERRUPTS() drives + * ProcessParallelApplyMessages(), which raises the real error on the + * worker's ErrorResponse (or "lost connection" if the worker died + * without reporting). Waiting here also keeps the worker alive long + * enough to finish writing the conflict log tuple. + */ + while (pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_ERROR) + { + CHECK_FOR_INTERRUPTS(); + + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); + + ResetLatch(MyLatch); + } + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("lost connection to the logical replication parallel apply worker"))); + } } /* diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index c0059b1b810..79c90dddd89 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -131,6 +131,8 @@ typedef enum ParallelTransState PARALLEL_TRANS_UNKNOWN, PARALLEL_TRANS_STARTED, PARALLEL_TRANS_FINISHED, + PARALLEL_TRANS_ERROR, /* worker failed; it will report the error (and + * log the conflict, if any) before exiting */ } ParallelTransState; /* -- 2.54.0