From 2af76926a9c3eff6b3f4b03be890e5fce55baee3 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Sat, 30 May 2026 13:35:04 +0530 Subject: [PATCH v54 3/6] Implement the conflict insertion infrastructure for the conflict log table This patch introduces the core logic to populate the conflict log table whenever a logical replication conflict is detected. It captures the remote transaction details along with the corresponding local state at the time of the conflict. Handling Multi-row Conflicts: A single remote tuple may conflict with multiple local tuples (e.g., in the case of multiple_unique_conflicts). To handle this, the infrastructure creates a single row in the conflict log table for each remote tuple. The details of all conflicting local rows are aggregated into a single JSON array in the local_conflicts column. The JSON array uses the following structured format: [ { "xid": "1001", "commit_ts": "2025-12-25 10:00:00+05:30", "origin": "node_1", "key": {"id": 1}, "tuple": {"id": 1, "val": "old_data"} }, ... ] Example of querying the structured conflict data: SELECT remote_xid, relname, remote_origin, local_conflicts[1] ->> 'xid' AS local_xid, local_conflicts[1] ->> 'tuple' AS local_tuple FROM pg_conflict.pg_conflict_log_16396; remote_xid | relname | remote_origin | local_xid | local_tuple ------------+----------+---------------+-----------+--------------------- 760 | test | pg_16406 | 771 | {"a":1,"b":10} 765 | conf_tab | pg_16406 | 775 | {"a":2,"b":2,"c":2} --- .../replication/logical/applyparallelworker.c | 84 +- src/backend/replication/logical/conflict.c | 752 +++++++++++++++++- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/worker.c | 65 +- src/include/replication/conflict.h | 3 + src/include/replication/worker_internal.h | 16 + src/test/subscription/t/030_origin.pl | 4 +- src/test/subscription/t/035_conflicts.pl | 47 +- 8 files changed, 919 insertions(+), 53 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 012d55e9d3d..a3f5b9b122d 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -986,7 +986,65 @@ ParallelApplyWorkerMain(Datum main_arg) set_apply_error_context_origin(originname); - LogicalParallelApplyLoop(mqh); + PG_TRY(); + { + LogicalParallelApplyLoop(mqh); + } + PG_CATCH(); + { + MemoryContext oldcontext; + ErrorData *edata; + + /* + * Reset the origin state to prevent the advancement of origin + * progress if we fail to apply. Otherwise, this will result in + * transaction loss as that transaction won't be sent again by the + * server. + */ + replorigin_xact_clear(true); + + /* + * Copy the error and recover to an idle state so we can insert the + * deferred conflict log tuple (if any) before re-throwing. Copy the + * error into a longer-lived context first, as it may have been raised + * under ErrorContext. Also reset the error context stack: the + * callbacks in effect when the error was thrown belong to unwound stack + * frames, and the deferred insert installs its own. + */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + edata = CopyErrorData(); + MemoryContextSwitchTo(oldcontext); + + FlushErrorState(); + error_context_stack = NULL; + + /* + * Tell the leader we failed and are about to report the error and log + * the conflict. This must be set before AbortOutOfAnyTransaction() + * below releases the transaction lock that the leader waits on in + * pa_wait_for_xact_finish(); otherwise the leader would see a + * non-finished state, assume the connection was lost, and tear this + * worker down while it is still writing the conflict log tuple. + */ + pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_ERROR); + + AbortOutOfAnyTransaction(); + + /* + * Insert the deferred conflict log tuple before re-throwing. + * Re-throwing is what reports the error to the leader (via the error + * queue set up above), so the insertion must happen first: otherwise + * the leader could start tearing down this worker while it is still + * writing the conflict log tuple. If the insertion itself fails, that + * error (annotated with the conflict context, see InsertConflictLogTuple) + * propagates to the leader instead of the original. + */ + ProcessPendingConflictLogTuple(); + + /* Re-throw the original error, which reports it to the leader. */ + ReThrowError(edata); + } + PG_END_TRY(); /* * The parallel apply worker must not get here because the parallel apply @@ -1314,9 +1372,33 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo) * released. */ if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED) + { + /* + * If the worker signalled that it errored (PARALLEL_TRANS_ERROR), it is + * logging the conflict and will report the actual error via the error + * queue before exiting. Wait for that rather than reporting a generic + * lost connection: CHECK_FOR_INTERRUPTS() drives + * ProcessParallelApplyMessages(), which raises the real error on the + * worker's ErrorResponse (or "lost connection" if the worker died + * without reporting). Waiting here also keeps the worker alive long + * enough to finish writing the conflict log tuple. + */ + while (pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_ERROR) + { + CHECK_FOR_INTERRUPTS(); + + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); + + ResetLatch(MyLatch); + } + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("lost connection to the logical replication parallel apply worker"))); + } } /* diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c index 4a647277d1f..ce1c899017c 100644 --- a/src/backend/replication/logical/conflict.c +++ b/src/backend/replication/logical/conflict.c @@ -16,17 +16,25 @@ #include "access/commit_ts.h" #include "access/genam.h" +#include "access/heapam.h" #include "access/tableam.h" +#include "access/xact.h" #include "catalog/heap.h" #include "catalog/pg_am.h" #include "catalog/pg_namespace.h" #include "catalog/toasting.h" #include "executor/executor.h" +#include "funcapi.h" #include "pgstat.h" #include "replication/conflict.h" #include "replication/worker_internal.h" #include "storage/lmgr.h" +#include "utils/array.h" +#include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/pg_lsn.h" /* * String representations for the supported conflict logging destinations. @@ -40,7 +48,6 @@ const char *const ConflictLogDestNames[] = { StaticAssertDecl(lengthof(ConflictLogDestNames) == CONFLICT_LOG_DEST_ALL + 1, "ConflictLogDestNames length mismatch"); - /* Structure to hold metadata for one column of the conflict log table */ typedef struct ConflictLogColumnDef { @@ -72,6 +79,18 @@ static const ConflictLogColumnDef ConflictLogSchema[] = { #define NUM_CONFLICT_ATTRS lengthof(ConflictLogSchema) +/* Schema for the elements within the 'local_conflicts' JSON array */ +static const ConflictLogColumnDef LocalConflictSchema[] = +{ + { .attname = "xid", .atttypid = XIDOID }, + { .attname = "commit_ts", .atttypid = TIMESTAMPTZOID }, + { .attname = "origin", .atttypid = TEXTOID }, + { .attname = "key", .atttypid = JSONOID }, + { .attname = "tuple", .atttypid = JSONOID } +}; + +#define NUM_LOCAL_CONFLICT_ATTRS lengthof(LocalConflictSchema) + static const char *const ConflictTypeNames[] = { [CT_INSERT_EXISTS] = "insert_exists", [CT_UPDATE_ORIGIN_DIFFERS] = "update_origin_differs", @@ -99,8 +118,27 @@ static void get_tuple_desc(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *remoteslot, char **remote_desc, TupleTableSlot *searchslot, char **search_desc, Oid indexoid); +static void build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull); static char *build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Oid indexoid); +static Datum tuple_table_slot_to_json_datum(TupleTableSlot *slot); +static Datum tuple_table_slot_to_indextup_json(EState *estate, + Relation localrel, + Oid replica_index, + TupleTableSlot *slot); +static TupleDesc build_conflict_tupledesc(void); +static Datum build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples); +static void prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot); /* * Builds the TupleDesc for the conflict log table. @@ -281,30 +319,220 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel, ConflictType type, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples) { - Relation localrel = relinfo->ri_RelationDesc; - StringInfoData err_detail; + Relation localrel = relinfo->ri_RelationDesc; + ConflictLogDest dest; + Relation conflictlogrel; + bool log_dest_table; + bool log_dest_logfile; - initStringInfo(&err_detail); + pgstat_report_subscription_conflict(MySubscription->oid, type); - /* Form errdetail message by combining conflicting tuples information. */ - foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) - errdetail_apply_conflict(estate, relinfo, type, searchslot, - conflicttuple->slot, remoteslot, - conflicttuple->indexoid, - conflicttuple->xmin, - conflicttuple->origin, - conflicttuple->ts, - &err_detail); + /* + * Get the conflict log destination. Also, if a table is one of the + * destinations, return the CLT relation already opened and ready for + * insertion -- or NULL if it has been dropped concurrently. + */ + conflictlogrel = GetConflictLogDestAndTable(&dest); - pgstat_report_subscription_conflict(MySubscription->oid, type); + log_dest_table = CONFLICTS_LOGGED_TO_TABLE(dest); + log_dest_logfile = CONFLICTS_LOGGED_TO_LOG(dest); + + /* + * If a conflict log table was requested but it has been dropped + * concurrently (e.g. a concurrent ALTER SUBSCRIPTION changed + * conflict_log_destination), GetConflictLogDestAndTable() returned NULL. + * Fall back to logging the full conflict details to the server log so that + * the conflict is not lost. + */ + if (log_dest_table && conflictlogrel == NULL) + { + log_dest_table = false; + log_dest_logfile = true; + } + + /* + * Prepare the conflict log tuple first when the destination includes the + * table. This must happen before the ereport() below, because for an + * ERROR-level conflict that ereport() raises the error and defers the + * actual insertion to ProcessPendingConflictLogTuple(), which relies on the + * tuple having been prepared. + */ + if (log_dest_table) + { + Assert(conflictlogrel != NULL); + prepare_conflict_log_tuple(estate, + relinfo->ri_RelationDesc, + conflictlogrel, + type, + searchslot, + conflicttuples, + remoteslot); + } + + /* + * Report the conflict to the server log before inserting it into the + * conflict log table. Emitting it first guarantees the conflict is + * recorded even if the table insert below fails; it is also what raises the + * error for ERROR-level conflicts. When the server log is one of the + * destinations we emit the full details, otherwise (table-only) we emit a + * shorter message since the details are captured in the table. + */ + if (log_dest_logfile) + { + StringInfoData err_detail; + + initStringInfo(&err_detail); + + /* Form errdetail message by combining conflicting tuples information. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) + errdetail_apply_conflict(estate, relinfo, type, searchslot, + conflicttuple->slot, remoteslot, + conflicttuple->indexoid, + conflicttuple->xmin, + conflicttuple->origin, + conflicttuple->ts, + &err_detail); + + /* Standard reporting with full internal details. */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s\": conflict=%s", + RelationGetQualifiedRelationName(localrel), + ConflictTypeNames[type]), + errdetail_internal("%s", err_detail.data)); + } + else if (log_dest_table) + { + /* + * Not logging conflict details to the server log; report the conflict + * but omit raw tuple data since it is captured in the conflict log + * table. + */ + ereport(elevel, + errcode_apply_conflict(type), + errmsg("conflict detected on relation \"%s\": conflict=%s", + RelationGetQualifiedRelationName(localrel), + ConflictTypeNames[type]), + errdetail("Conflict details are logged to the conflict log table: %s", + RelationGetRelationName(conflictlogrel))); + } + + /* + * Insert into the conflict log table if requested. For conflicts below + * ERROR the apply transaction continues, so insert immediately; for + * ERROR-level conflicts the ereport() above already raised the error and + * the insertion is deferred to a new transaction + * (ProcessPendingConflictLogTuple) so that it is not rolled back. + */ + if (log_dest_table) + { + if (elevel < ERROR) + { + PG_TRY(); + { + InsertConflictLogTuple(conflictlogrel); + } + PG_CATCH(); + { + /* + * The insert failed, so the apply transaction will abort and + * the error will propagate to the worker's error handler. The + * conflict was already reported to the server log above, so it + * is not lost. Discard the prepared tuple so that the deferred + * insertion path (ProcessPendingConflictLogTuple) does not retry + * this same failing insert. + */ + if (MyLogicalRepWorker->conflict_log_tuple != NULL) + { + heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); + MyLogicalRepWorker->conflict_log_tuple = NULL; + } + if (MyLogicalRepWorker->conflict_log_errcontext != NULL) + { + pfree(MyLogicalRepWorker->conflict_log_errcontext); + MyLogicalRepWorker->conflict_log_errcontext = NULL; + } + PG_RE_THROW(); + } + PG_END_TRY(); + } + + table_close(conflictlogrel, RowExclusiveLock); + } +} + +/* + * ProcessPendingConflictLogTuple + * Insert any deferred conflict log tuple in a separate transaction. + * + * For conflicts raised at ERROR level, the conflict log tuple cannot be + * inserted immediately because the surrounding transaction will abort. + * To ensure that conflict information is not lost, such tuples are prepared + * during error processing (see prepare_conflict_log_tuple()) but their + * insertion is deferred. + * + * This function is responsible for completing that deferred insertion after + * the failing transaction has been aborted and the system has returned to an + * idle state. It executes the insertion in a new, independent transaction, + * ensuring that the conflict log entry is durable and not rolled back + * together with the failed apply transaction. + */ +void +ProcessPendingConflictLogTuple(void) +{ + Relation conflictlogrel; + ConflictLogDest dest; + + /* Nothing to do */ + if (MyLogicalRepWorker->conflict_log_tuple == NULL) + return; + + /* + * Insert the deferred conflict log tuple in its own transaction. A + * failure here (e.g. an out-of-disk-space error) is treated like any other + * apply error and raises an ERROR; such failures are expected to be rare + * and persistent. Callers must therefore have already reported (and + * cleared) any in-progress apply error before calling this, so that this + * error does not mask the original one. + */ + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* + * Test hook: pause here so a TAP test can take a conflicting lock on + * the conflict log table before this transaction tries to open it. + * See src/test/subscription/t/039_pa_conflict_log_lock_wait.pl. + */ + INJECTION_POINT("clt-pending-flush-before-open", NULL); + + /* Open the conflict log table and insert the tuple. */ + conflictlogrel = GetConflictLogDestAndTable(&dest); + + if (conflictlogrel != NULL) + { + InsertConflictLogTuple(conflictlogrel); + table_close(conflictlogrel, RowExclusiveLock); + } + else + { + /* + * The conflict log table was dropped concurrently (e.g. by an ALTER + * SUBSCRIPTION that changed conflict_log_destination) after the + * conflict was already reported to the server log by + * ReportApplyConflict(). Nothing more to do; just discard the prepared + * tuple and its context string. + */ + heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); + MyLogicalRepWorker->conflict_log_tuple = NULL; + if (MyLogicalRepWorker->conflict_log_errcontext) + { + pfree(MyLogicalRepWorker->conflict_log_errcontext); + MyLogicalRepWorker->conflict_log_errcontext = NULL; + } + } - ereport(elevel, - errcode_apply_conflict(type), - errmsg("conflict detected on relation \"%s.%s\": conflict=%s", - get_namespace_name(RelationGetNamespace(localrel)), - RelationGetRelationName(localrel), - ConflictTypeNames[type]), - errdetail_internal("%s", err_detail.data)); + PopActiveSnapshot(); + CommitTransactionCommand(); } /* @@ -338,6 +566,100 @@ InitConflictIndexes(ResultRelInfo *relInfo) relInfo->ri_onConflictArbiterIndexes = uniqueIndexes; } +/* + * GetConflictLogDestAndTable + * + * Fetches conflict logging metadata from the cached MySubscription pointer. + * Sets the destination enum in *log_dest and, if a table is one of the + * destinations, opens and returns the relation handle for the conflict log + * table. + * + * The table is opened with try_table_open(), so NULL is returned if the + * conflict log table has been dropped concurrently (e.g. by an ALTER + * SUBSCRIPTION that changed conflict_log_destination). Callers must treat a + * NULL result for a table destination as "table unavailable" and fall back to + * server-log reporting rather than failing. + */ +Relation +GetConflictLogDestAndTable(ConflictLogDest *log_dest) +{ + Oid conflictlogrelid; + + /* + * Convert the text log destination to the internal enum. MySubscription + * already contains the data from pg_subscription. + */ + *log_dest = GetConflictLogDest(MySubscription->conflictlogdest); + + /* Quick exit if a conflict log table was not requested. */ + if (!CONFLICTS_LOGGED_TO_TABLE(*log_dest)) + return NULL; + + conflictlogrelid = MySubscription->conflictlogrelid; + + Assert(OidIsValid(conflictlogrelid)); + + /* + * Use try_table_open(): the table may have been dropped concurrently by an + * ALTER SUBSCRIPTION that changed conflict_log_destination. Returning NULL + * lets the caller fall back to the server log instead of failing. + */ + return try_table_open(conflictlogrelid, RowExclusiveLock); +} + +/* + * Error context callback for failures while inserting into the conflict log + * table. Adds a line identifying the conflict that was being logged. + */ +static void +conflict_log_insert_errcontext(void *arg) +{ + char *ctx = (char *) arg; + + if (ctx) + errcontext("%s", ctx); +} + +/* + * InsertConflictLogTuple + * + * Insert conflict log tuple into the conflict log table. It uses + * HEAP_INSERT_NO_LOGICAL to explicitly block logical decoding of the tuple + * inserted into the conflict log table. + */ +void +InsertConflictLogTuple(Relation conflictlogrel) +{ + ErrorContextCallback errcallback; + + /* A valid tuple must be prepared and stored in MyLogicalRepWorker. */ + Assert(MyLogicalRepWorker->conflict_log_tuple != NULL); + + /* + * Set up an error context so that a failure to insert (e.g. the conflict + * log table was dropped, or an out-of-space error) carries information + * identifying the conflict we were trying to log. + */ + errcallback.callback = conflict_log_insert_errcontext; + errcallback.arg = MyLogicalRepWorker->conflict_log_errcontext; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple, + GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL); + + error_context_stack = errcallback.previous; + + /* Free the conflict log tuple and its context string. */ + heap_freetuple(MyLogicalRepWorker->conflict_log_tuple); + MyLogicalRepWorker->conflict_log_tuple = NULL; + if (MyLogicalRepWorker->conflict_log_errcontext) + { + pfree(MyLogicalRepWorker->conflict_log_errcontext); + MyLogicalRepWorker->conflict_log_errcontext = NULL; + } +} + /* * Add SQLSTATE error code to the current conflict report. */ @@ -771,6 +1093,40 @@ get_tuple_desc(EState *estate, ResultRelInfo *relinfo, ConflictType type, } } +/* + * Helper function to extract the "raw" index key Datums and their null flags + * from a TupleTableSlot, given an already open index descriptor. + * This is the reusable core logic. + */ +static void +build_index_datums_from_slot(EState *estate, Relation localrel, + TupleTableSlot *slot, + Relation indexDesc, Datum *values, + bool *isnull) +{ + TupleTableSlot *tableslot = slot; + + /* + * If the slot is a virtual slot, copy it into a heap tuple slot as + * FormIndexDatum only works with heap tuple slots. + */ + if (TTS_IS_VIRTUAL(slot)) + { + /* Slot is created within the EState's tuple table */ + tableslot = table_slot_create(localrel, &estate->es_tupleTable); + tableslot = ExecCopySlot(tableslot, slot); + } + + /* + * Initialize ecxt_scantuple for potential use in FormIndexDatum + */ + GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + + /* Form the index datums */ + FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, + isnull); +} + /* * Helper functions to construct a string describing the contents of an index * entry. See BuildIndexValueDescription for details. @@ -786,41 +1142,349 @@ build_index_value_desc(EState *estate, Relation localrel, TupleTableSlot *slot, Relation indexDesc; Datum values[INDEX_MAX_KEYS]; bool isnull[INDEX_MAX_KEYS]; - TupleTableSlot *tableslot = slot; - if (!tableslot) + if (!slot) return NULL; Assert(CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true)); indexDesc = index_open(indexoid, NoLock); - /* - * If the slot is a virtual slot, copy it into a heap tuple slot as - * FormIndexDatum only works with heap tuple slots. - */ - if (TTS_IS_VIRTUAL(slot)) + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + + index_value = BuildIndexValueDescription(indexDesc, values, isnull); + + index_close(indexDesc, NoLock); + + return index_value; +} + +/* + * tuple_table_slot_to_json_datum + * + * Helper function to convert a TupleTableSlot to JSON. + */ +static Datum +tuple_table_slot_to_json_datum(TupleTableSlot *slot) +{ + HeapTuple tuple; + Datum datum; + Datum json; + + Assert(slot != NULL); + + tuple = ExecCopySlotHeapTuple(slot); + datum = heap_copy_tuple_as_datum(tuple, slot->tts_tupleDescriptor); + + json = DirectFunctionCall1(row_to_json, datum); + heap_freetuple(tuple); + + return json; +} + +/* + * tuple_table_slot_to_indextup_json + * + * Fetch replica identity key from the tuple table slot and convert into a + * JSON datum. + */ +static Datum +tuple_table_slot_to_indextup_json(EState *estate, Relation localrel, + Oid indexid, TupleTableSlot *slot) +{ + Relation indexDesc; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + HeapTuple tuple; + TupleDesc tupdesc; + Datum datum; + + Assert(slot != NULL); + + Assert(CheckRelationOidLockedByMe(indexid, RowExclusiveLock, true)); + + indexDesc = index_open(indexid, NoLock); + + build_index_datums_from_slot(estate, localrel, slot, indexDesc, values, + isnull); + tupdesc = CreateTupleDescCopy(RelationGetDescr(indexDesc)); + + /* Bless the tupdesc so it can be looked up by row_to_json. */ + BlessTupleDesc(tupdesc); + + /* Form the replica identity tuple. */ + tuple = heap_form_tuple(tupdesc, values, isnull); + datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + heap_freetuple(tuple); + FreeTupleDesc(tupdesc); + index_close(indexDesc, NoLock); + + /* Convert to a JSON datum. */ + return DirectFunctionCall1(row_to_json, datum); +} + +/* + * build_conflict_tupledesc + * + * Build and bless a tuple descriptor for the conflict log table based on the + * predefined LocalConflictSchema. + */ +static TupleDesc +build_conflict_tupledesc(void) +{ + static TupleDesc cached_tupdesc = NULL; + + if (cached_tupdesc == NULL) { - tableslot = table_slot_create(localrel, &estate->es_tupleTable); - tableslot = ExecCopySlot(tableslot, slot); + MemoryContext oldcxt; + + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + + cached_tupdesc = CreateTemplateTupleDesc(NUM_LOCAL_CONFLICT_ATTRS); + + for (int i = 0; i < NUM_LOCAL_CONFLICT_ATTRS; i++) + TupleDescInitEntry(cached_tupdesc, + (AttrNumber) (i + 1), + LocalConflictSchema[i].attname, + LocalConflictSchema[i].atttypid, + -1, 0); + + TupleDescFinalize(cached_tupdesc); + + /* + * Bless once so it can be used as a RECORD type (e.g. for + * row_to_json or other record-based operations). + */ + BlessTupleDesc(cached_tupdesc); + + MemoryContextSwitchTo(oldcxt); } - /* - * Initialize ecxt_scantuple for potential use in FormIndexDatum when - * index expressions are present. - */ - GetPerTupleExprContext(estate)->ecxt_scantuple = tableslot; + return cached_tupdesc; +} - /* - * The values/nulls arrays passed to BuildIndexValueDescription should be - * the results of FormIndexDatum, which are the "raw" input to the index - * AM. - */ - FormIndexDatum(BuildIndexInfo(indexDesc), tableslot, estate, values, isnull); +/* + * Builds the local conflicts JSON array column from the list of + * ConflictTupleInfo objects. + * + * Example output structure: + * [ { "xid": "1001", "commit_ts": "...", "origin": "...", "tuple": {...} }, ... ] + */ +static Datum +build_local_conflicts_json_array(EState *estate, Relation rel, + ConflictType conflict_type, + List *conflicttuples) +{ + ListCell *lc; + List *json_datums = NIL; + Datum *json_datum_array; + Datum json_array_datum; + int num_conflicts; + int i; + int16 typlen; + bool typbyval; + char typalign; + TupleDesc tupdesc; - index_value = BuildIndexValueDescription(indexDesc, values, isnull); + /* Build local conflicts tuple descriptor. */ + tupdesc = build_conflict_tupledesc(); - index_close(indexDesc, NoLock); + /* Process local conflict tuple list and prepare an array of JSON. */ + foreach_ptr(ConflictTupleInfo, conflicttuple, conflicttuples) + { + Datum values[NUM_LOCAL_CONFLICT_ATTRS] = {0}; + bool nulls[NUM_LOCAL_CONFLICT_ATTRS] = {0}; + char *origin_name = NULL; + HeapTuple tuple; + Datum json_datum; + int attno; + + attno = 0; + values[attno++] = TransactionIdGetDatum(conflicttuple->xmin); + + if (conflicttuple->ts) + values[attno++] = TimestampTzGetDatum(conflicttuple->ts); + else + nulls[attno++] = true; - return index_value; + if (conflicttuple->origin != InvalidReplOriginId) + replorigin_by_oid(conflicttuple->origin, true, &origin_name); + + /* Store empty string if origin name for the tuple is NULL. */ + if (origin_name != NULL) + values[attno++] = CStringGetTextDatum(origin_name); + else + nulls[attno++] = true; + + /* + * Add the conflicting key values in the case of a unique constraint + * violation. + */ + if (conflict_type == CT_INSERT_EXISTS || + conflict_type == CT_UPDATE_EXISTS || + conflict_type == CT_MULTIPLE_UNIQUE_CONFLICTS) + { + Oid indexoid = conflicttuple->indexoid; + + Assert(OidIsValid(indexoid) && conflicttuple->slot && + CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, + true)); + values[attno++] = + tuple_table_slot_to_indextup_json(estate, rel, + indexoid, + conflicttuple->slot); + } + else + nulls[attno++] = true; + + /* Convert conflicting tuple to JSON datum. */ + if (conflicttuple->slot) + values[attno] = tuple_table_slot_to_json_datum(conflicttuple->slot); + else + nulls[attno] = true; + + Assert(attno + 1 == NUM_LOCAL_CONFLICT_ATTRS); + + tuple = heap_form_tuple(tupdesc, values, nulls); + + json_datum = heap_copy_tuple_as_datum(tuple, tupdesc); + + /* + * Build the higher level JSON datum in format described in function + * header. + */ + json_datum = DirectFunctionCall1(row_to_json, json_datum); + + /* Done with the temporary tuple. */ + heap_freetuple(tuple); + + /* Add to the array element. */ + json_datums = lappend(json_datums, (void *) json_datum); + } + + num_conflicts = list_length(json_datums); + + json_datum_array = palloc_array(Datum, num_conflicts); + + i = 0; + foreach(lc, json_datums) + { + json_datum_array[i] = (Datum) lfirst(lc); + i++; + } + + /* Construct the JSON array Datum. */ + get_typlenbyvalalign(JSONOID, &typlen, &typbyval, &typalign); + json_array_datum = PointerGetDatum(construct_array(json_datum_array, + num_conflicts, + JSONOID, + typlen, + typbyval, + typalign)); + pfree(json_datum_array); + + return json_array_datum; +} + +/* + * prepare_conflict_log_tuple + * + * This routine prepares a tuple detailing a conflict encountered during + * logical replication. The prepared tuple will be stored in + * MyLogicalRepWorker->conflict_log_tuple which should be inserted into the + * conflict log table by calling InsertConflictLogTuple. + */ +static void +prepare_conflict_log_tuple(EState *estate, Relation rel, + Relation conflictlogrel, + ConflictType conflict_type, + TupleTableSlot *searchslot, + List *conflicttuples, + TupleTableSlot *remoteslot) +{ + Datum values[NUM_CONFLICT_ATTRS] = {0}; + bool nulls[NUM_CONFLICT_ATTRS] = {0}; + int attno; + char *remote_origin = NULL; + MemoryContext oldctx; + + Assert(MyLogicalRepWorker->conflict_log_tuple == NULL); + + /* Populate the values and nulls arrays. */ + attno = 0; + values[attno++] = ObjectIdGetDatum(RelationGetRelid(rel)); + + values[attno++] = + CStringGetTextDatum(get_namespace_name(RelationGetNamespace(rel))); + + values[attno++] = CStringGetTextDatum(RelationGetRelationName(rel)); + + values[attno++] = CStringGetTextDatum(ConflictTypeNames[conflict_type]); + + if (TransactionIdIsValid(remote_xid)) + values[attno++] = TransactionIdGetDatum(remote_xid); + else + nulls[attno++] = true; + + values[attno++] = LSNGetDatum(remote_final_lsn); + + if (remote_commit_ts > 0) + values[attno++] = TimestampTzGetDatum(remote_commit_ts); + else + nulls[attno++] = true; + + if (replorigin_xact_state.origin != InvalidReplOriginId) + replorigin_by_oid(replorigin_xact_state.origin, true, &remote_origin); + + if (remote_origin != NULL) + values[attno++] = CStringGetTextDatum(remote_origin); + else + nulls[attno++] = true; + + if (!TupIsNull(remoteslot)) + values[attno++] = tuple_table_slot_to_json_datum(remoteslot); + else + nulls[attno++] = true; + + if (!TupIsNull(searchslot)) + { + Oid replica_index = GetRelationIdentityOrPK(rel); + + /* + * If the table has a valid replica identity index, build the index + * JSON datum from key value. Otherwise, construct it from the complete + * tuple in REPLICA IDENTITY FULL cases. + */ + if (OidIsValid(replica_index)) + values[attno++] = tuple_table_slot_to_indextup_json(estate, rel, + replica_index, + searchslot); + else + values[attno++] = tuple_table_slot_to_json_datum(searchslot); + } + else + nulls[attno++] = true; + + values[attno] = build_local_conflicts_json_array(estate, rel, + conflict_type, + conflicttuples); + + Assert(attno + 1 == NUM_CONFLICT_ATTRS); + + oldctx = MemoryContextSwitchTo(ApplyContext); + MyLogicalRepWorker->conflict_log_tuple = + heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls); + + /* + * Stash a context string describing this conflict, so that if inserting + * the tuple into the conflict log table fails, the resulting error carries + * enough context to identify the conflict (see InsertConflictLogTuple). + */ + MyLogicalRepWorker->conflict_log_errcontext = + psprintf("while logging conflict \"%s\" detected on relation \"%s\"", + ConflictTypeNames[conflict_type], + RelationGetRelationName(rel)); + MemoryContextSwitchTo(oldctx); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 313e31ff2e3..05a30342f69 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -487,6 +487,7 @@ retry: worker->oldest_nonremovable_xid = retain_dead_tuples ? MyReplicationSlot->data.xmin : InvalidTransactionId; + worker->conflict_log_tuple = NULL; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7799266c614..c34a2336c55 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -487,7 +487,9 @@ static bool MySubscriptionValid = false; static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; -static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +XLogRecPtr remote_final_lsn = InvalidXLogRecPtr; +TransactionId remote_xid = InvalidTransactionId; +TimestampTz remote_commit_ts = 0; /* fields valid only when processing streamed transaction */ static bool in_streamed_transaction = false; @@ -1236,6 +1238,8 @@ apply_handle_begin(StringInfo s) set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn); remote_final_lsn = begin_data.final_lsn; + remote_commit_ts = begin_data.committime; + remote_xid = begin_data.xid; maybe_start_skipping_changes(begin_data.final_lsn); @@ -1296,6 +1300,8 @@ apply_handle_begin_prepare(StringInfo s) set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn); remote_final_lsn = begin_data.prepare_lsn; + remote_xid = begin_data.xid; + remote_commit_ts = 0; maybe_start_skipping_changes(begin_data.prepare_lsn); @@ -1767,6 +1773,10 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); + remote_xid = stream_xid; + remote_final_lsn = InvalidXLogRecPtr; + remote_commit_ts = 0; + set_apply_error_context_xact(stream_xid, InvalidXLogRecPtr); /* Try to allocate a worker for the streaming transaction. */ @@ -2428,6 +2438,9 @@ apply_handle_stream_commit(StringInfo s) { case TRANS_LEADER_APPLY: + /* Set remote_commit_ts for conflict logging. */ + remote_commit_ts = commit_data.committime; + /* * The transaction has been serialized to file, so replay all the * spooled operations. @@ -5647,6 +5660,9 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + MemoryContext oldcontext; + ErrorData *edata; + /* * Reset the origin state to prevent the advancement of origin * progress if we fail to apply. Otherwise, this will result in @@ -5660,14 +5676,33 @@ start_apply(XLogRecPtr origin_startpos) else { /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. + * Save the error and recover to an idle state so we can insert the + * deferred conflict log tuple (if any) before re-throwing. Copy + * the error into a long-lived context first, as it may have been + * raised under ErrorContext. Also reset the error context stack: + * the callbacks in effect when the error was thrown belong to + * unwound stack frames, and the deferred insert installs its own. */ + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + edata = CopyErrorData(); + MemoryContextSwitchTo(oldcontext); + + FlushErrorState(); + error_context_stack = NULL; AbortOutOfAnyTransaction(); pgstat_report_subscription_error(MySubscription->oid); - PG_RE_THROW(); + /* + * Insert the deferred conflict log tuple in its own transaction. + * If this fails, that error (annotated with the conflict context, + * see InsertConflictLogTuple) propagates instead of the original; + * such failures are expected to be rare and persistent (e.g. out of + * disk space). + */ + ProcessPendingConflictLogTuple(); + + /* Re-throw the original error. */ + ReThrowError(edata); } } PG_END_TRY(); @@ -6034,6 +6069,13 @@ DisableSubscriptionAndExit(void) RESUME_INTERRUPTS(); + /* + * The error context callbacks in effect when the error was thrown belong + * to now-unwound stack frames; reset the stack before running further code + * (including the deferred conflict log insertion, which installs its own). + */ + error_context_stack = NULL; + /* * Report the worker failed during sequence synchronization, table * synchronization, or apply. @@ -6062,6 +6104,19 @@ DisableSubscriptionAndExit(void) errmsg("subscription \"%s\" has been disabled because of an error", MySubscription->name)); + /* + * Insert the deferred conflict log tuple (if any) now that the + * subscription has been disabled and committed. Doing it after the + * disable means a failure to log the conflict (treated as a hard error, + * see ProcessPendingConflictLogTuple) cannot prevent the subscription from + * being disabled and so cannot leave the worker restarting and failing + * forever. Do it before the dead-tuple retention check below: that check + * only warns today, but it takes an elevel and could raise an error, which + * must not prevent the conflict from being recorded. The original error + * was already reported above. + */ + ProcessPendingConflictLogTuple(); + /* * Skip the track_commit_timestamp check when disabling the worker due to * an error, as verifying commit timestamps is unnecessary in this diff --git a/src/include/replication/conflict.h b/src/include/replication/conflict.h index a017e1e6cb5..b727cc285e9 100644 --- a/src/include/replication/conflict.h +++ b/src/include/replication/conflict.h @@ -115,5 +115,8 @@ extern void ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, TupleTableSlot *searchslot, TupleTableSlot *remoteslot, List *conflicttuples); +extern void ProcessPendingConflictLogTuple(void); extern void InitConflictIndexes(ResultRelInfo *relInfo); +extern Relation GetConflictLogDestAndTable(ConflictLogDest *log_dest); +extern void InsertConflictLogTuple(Relation conflictlogrel); #endif diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 745b7d9e969..79c90dddd89 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -100,6 +100,16 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* A conflict log tuple that is prepared but not yet inserted. */ + HeapTuple conflict_log_tuple; + + /* + * Error-context string describing the conflict above, used to annotate any + * error raised while inserting conflict_log_tuple into the conflict log + * table. Allocated, like conflict_log_tuple, in ApplyContext. + */ + char *conflict_log_errcontext; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; @@ -121,6 +131,8 @@ typedef enum ParallelTransState PARALLEL_TRANS_UNKNOWN, PARALLEL_TRANS_STARTED, PARALLEL_TRANS_FINISHED, + PARALLEL_TRANS_ERROR, /* worker failed; it will report the error (and + * log the conflict, if any) before exiting */ } ParallelTransState; /* @@ -255,6 +267,10 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern PGDLLIMPORT List *table_states_not_ready; +extern XLogRecPtr remote_final_lsn; +extern TimestampTz remote_commit_ts; +extern TransactionId remote_xid; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index 6bc6b7874c2..5f4d00bdd33 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -166,7 +166,7 @@ is($result, qq(32), 'The node_A data replicated to node_B'); $node_C->safe_psql('postgres', "UPDATE $tab SET a = 33 WHERE a = 32;"); $node_B->wait_for_log( - qr/conflict detected on relation "public.$tab_unquoted": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./ + qr/conflict detected on relation "public.$tab": conflict=update_origin_differs.*\n.*DETAIL:.* Updating the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(32\), remote row \(33\), replica identity \(a\)=\(32\)./ ); $node_B->safe_psql('postgres', "DELETE FROM $tab;"); @@ -182,7 +182,7 @@ is($result, qq(33), 'The node_A data replicated to node_B'); $node_C->safe_psql('postgres', "DELETE FROM $tab WHERE a = 33;"); $node_B->wait_for_log( - qr/conflict detected on relation "public.$tab_unquoted": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/ + qr/conflict detected on relation "public.$tab": conflict=delete_origin_differs.*\n.*DETAIL:.* Deleting the row that was modified by a different origin ".*" in transaction [0-9]+ at .*: local row \(33\), replica identity \(a\)=\(33\).*/ ); # The remaining tests no longer test conflict detection. diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl index f23fe6af2a5..de2d37c8754 100644 --- a/src/test/subscription/t/035_conflicts.pl +++ b/src/test/subscription/t/035_conflicts.pl @@ -50,7 +50,7 @@ $node_subscriber->safe_psql( 'postgres', "CREATE SUBSCRIPTION sub_tab CONNECTION '$publisher_connstr application_name=$appname' - PUBLICATION pub_tab;"); + PUBLICATION pub_tab WITH (conflict_log_destination=all)"); # Wait for initial table sync to finish $node_subscriber->wait_for_subscription_sync($node_publisher, $appname); @@ -84,10 +84,35 @@ $node_subscriber->wait_for_log( .*Key already exists in unique index \"conf_tab_c_key\", modified in transaction .*: key \(c\)=\(4\), local row \(4, 4, 4\)./, $log_offset); +# Verify the contents of the Conflict Log Table (CLT) +# This section ensures that the CLT contains the expected +# type and specific key data. +my $subid = $node_subscriber->safe_psql('postgres', + "SELECT oid FROM pg_subscription WHERE subname = 'sub_tab';"); +my $clt = "pg_conflict.pg_conflict_log_$subid"; + +# Wait for the conflict to be logged in the CLT +my $log_check = $node_subscriber->poll_query_until( + 'postgres', + "SELECT count(*) > 0 FROM $clt;" +); + +my $conflict_check = $node_subscriber->safe_psql('postgres', + "SELECT count(*) >= 1 FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';"); +is($conflict_check, 't', 'Verified multiple_unique_conflicts logged into conflict log table'); + +my $json_query = "SELECT local_conflicts FROM $clt;"; +my $raw_json = $node_subscriber->safe_psql('postgres', $json_query); + +# Verify that '2' is present inside the JSON structure using a regex +# This matches the key/value pattern for "a": 2 +like($raw_json, qr/\\"a\\":2/, 'Verified that key 2 exists in the local_conflicts'); + pass('multiple_unique_conflicts detected during insert'); # Truncate table to get rid of the error $node_subscriber->safe_psql('postgres', "TRUNCATE conf_tab;"); +$node_subscriber->safe_psql('postgres', "DELETE FROM $clt"); ################################################## # Test multiple_unique_conflicts due to UPDATE @@ -114,6 +139,26 @@ $node_subscriber->wait_for_log( .*Key already exists in unique index \"conf_tab_c_key\", modified in transaction .*: key \(c\)=\(8\), local row \(8, 8, 8\)./, $log_offset); +# Verify the contents of the Conflict Log Table (CLT) +# This section ensures that the CLT contains the expected +# type and specific key data. + +# Wait for the conflict to be logged in the CLT +$log_check = $node_subscriber->poll_query_until( + 'postgres', + "SELECT count(*) > 0 FROM $clt;" +); + +$conflict_check = $node_subscriber->safe_psql('postgres', + "SELECT count(*) >= 1 FROM $clt WHERE conflict_type = 'multiple_unique_conflicts';"); +is($conflict_check, 't', 'Verified multiple_unique_conflicts logged into conflict log table'); + +$raw_json = $node_subscriber->safe_psql('postgres', $json_query); + +# Verify that '6' is present inside the JSON structure using a regex +# This matches the key/value pattern for "a": 6 +like($raw_json, qr/\\"a\\":6/, 'Verified that key 6 exists in the local_conflicts'); + pass('multiple_unique_conflicts detected during update'); # Truncate table to get rid of the error -- 2.49.0