From 773671f06bf3c2e577d0ad99d40a23b84cae9258 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 24 Jun 2026 16:08:18 +0530 Subject: [PATCH v1_amit] Move pending conflict log state out of shared memory The prepared conflict tuple and its error-context string are purely worker-private data ferried across the apply error boundary, so keep them in a process-local static struct in conflict.c instead of the shared LogicalRepWorker. --- src/backend/replication/logical/conflict.c | 67 ++++++++++++++-------- src/backend/replication/logical/launcher.c | 1 - src/include/replication/worker_internal.h | 10 ---- 3 files changed, 43 insertions(+), 35 deletions(-) diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 983cdf94cf0..e927c152028 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -101,6 +101,25 @@ static const char *const ConflictTypeNames[] = { [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts" }; +/* + * Worker-private state for a conflict that has been prepared for the conflict + * log table but not yet inserted. It carries the prepared tuple, and a + * description of the conflict used for error context, from + * prepare_conflict_log_tuple() across the apply error boundary to + * ProcessPendingConflictLogTuple()/InsertConflictLogTuple(). Both pointers + * reference memory allocated in ApplyContext. + * + * This is purely process-local state, so it lives here rather than in the + * shared LogicalRepWorker struct. + */ +typedef struct PendingConflictLogData +{ + HeapTuple tuple; /* prepared, not-yet-inserted conflict tuple */ + char *errcontext_str; /* conflict description for error context */ +} PendingConflictLog; + +static PendingConflictLog pending_conflict_log = {0}; + static int errcode_apply_conflict(ConflictType type); static void errdetail_apply_conflict(EState *estate, ResultRelInfo *relinfo, @@ -427,15 +446,15 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, * insertion path (ProcessPendingConflictLogTuple) does not retry * this same failing insert. */ - if (MyLogicalRepWorker->conflict_log_tuple != NULL) + if (pending_conflict_log.tuple != NULL) { - heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); - MyLogicalRepWorker->conflict_log_tuple = NULL; + heap_freetuple(pending_conflict_log.tuple); + pending_conflict_log.tuple = NULL; } - if (MyLogicalRepWorker->conflict_log_errcontext != NULL) + if (pending_conflict_log.errcontext_str != NULL) { - pfree(MyLogicalRepWorker->conflict_log_errcontext); - MyLogicalRepWorker->conflict_log_errcontext = NULL; + pfree(pending_conflict_log.errcontext_str); + pending_conflict_log.errcontext_str = NULL; } PG_RE_THROW(); } @@ -468,7 +487,7 @@ ProcessPendingConflictLogTuple(void) Relation conflictlogrel; /* Nothing to do */ - if (MyLogicalRepWorker->conflict_log_tuple == NULL) + if (pending_conflict_log.tuple == NULL) return; /* @@ -499,12 +518,12 @@ ProcessPendingConflictLogTuple(void) * ReportApplyConflict(). Nothing more to do; just discard the prepared * tuple and its context string. */ - heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); - MyLogicalRepWorker->conflict_log_tuple = NULL; - if (MyLogicalRepWorker->conflict_log_errcontext) + heap_freetuple(pending_conflict_log.tuple); + pending_conflict_log.tuple = NULL; + if (pending_conflict_log.errcontext_str) { - pfree(MyLogicalRepWorker->conflict_log_errcontext); - MyLogicalRepWorker->conflict_log_errcontext = NULL; + pfree(pending_conflict_log.errcontext_str); + pending_conflict_log.errcontext_str = NULL; } } @@ -634,7 +653,7 @@ InsertConflictLogTuple(Relation conflictlogrel) ErrorContextCallback errcallback; /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ - Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); + Assert(pending_conflict_log.tuple != NULL); /* * Set up an error context so that a failure to insert (e.g. the conflict @@ -642,22 +661,22 @@ InsertConflictLogTuple(Relation conflictlogrel) * identifying the conflict we were trying to log. */ errcallback.callback = conflict_log_insert_errcontext; - errcallback.arg = MyLogicalRepWorker->conflict_log_errcontext; + errcallback.arg = pending_conflict_log.errcontext_str; errcallback.previous = error_context_stack; error_context_stack = &errcallback; - heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, + heap_insert(conflictlogrel, pending_conflict_log.tuple, GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL); error_context_stack = errcallback.previous; /* Free the conflict log tuple and its context string. */ - heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); - MyLogicalRepWorker->conflict_log_tuple = NULL; - if (MyLogicalRepWorker->conflict_log_errcontext) + heap_freetuple(pending_conflict_log.tuple); + pending_conflict_log.tuple = NULL; + if (pending_conflict_log.errcontext_str) { - pfree(MyLogicalRepWorker->conflict_log_errcontext); - MyLogicalRepWorker->conflict_log_errcontext = NULL; + pfree(pending_conflict_log.errcontext_str); + pending_conflict_log.errcontext_str = NULL; } } @@ -1394,7 +1413,7 @@ build_local_conflicts_json_array(EState *estate, Relation rel, * * This routine prepares a tuple detailing a conflict encountered during * logical replication. The prepared tuple will be stored in - * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the + * pending_conflict_log.tuple which should be inserted into the * conflict log table by calling InsertConflictLogTuple. */ static void @@ -1411,7 +1430,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel, char *remote_origin = NULL; MemoryContext oldctx; - Assert(MyLogicalRepWorker->conflict_log_tuple == NULL); + Assert(pending_conflict_log.tuple == NULL); /* Populate the values and nulls arrays. */ attno = 0; @@ -1475,7 +1494,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel, Assert(attno + 1 == NUM_CONFLICT_ATTRS); oldctx = MemoryContextSwitchTo(ApplyContext); - MyLogicalRepWorker->conflict_log_tuple = + pending_conflict_log.tuple = heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls); /* @@ -1483,7 +1502,7 @@ prepare_conflict_log_tuple(EState *estate, Relation rel, * the tuple into the conflict log table fails, the resulting error carries * enough context to identify the conflict (see InsertConflictLogTuple). */ - MyLogicalRepWorker->conflict_log_errcontext = + pending_conflict_log.errcontext_str = psprintf("while logging conflict \"%s\" detected on relation \"%s\"", ConflictTypeNames[conflict_type], RelationGetRelationName(rel)); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 05a30342f69..313e31ff2e3 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -487,7 +487,6 @@ retry: worker->oldest_nonremovable_xid = retain_dead_tuples ? MyReplicationSlot->data.xmin : InvalidTransactionId; - worker->conflict_log_tuple = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 79c90dddd89..00ad0d86a79 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -100,16 +100,6 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; - /* A conflict log tuple that is prepared but not yet inserted. */ - HeapTuple conflict_log_tuple; - - /* - * Error-context string describing the conflict above, used to annotate any - * error raised while inserting conflict_log_tuple into the conflict log - * table. Allocated, like conflict_log_tuple, in ApplyContext. - */ - char *conflict_log_errcontext; - /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; -- 2.54.0