public inbox for [email protected]
help / color / mirror / Atom feedFrom: Ajin Cherian <[email protected]>
To: Zhijie Hou (Fujitsu) <[email protected]>
Cc: Amit Kapila <[email protected]>
Cc: vignesh C <[email protected]>
Cc: [email protected] <[email protected]>
Cc: [email protected] <[email protected]>
Cc: Masahiko Sawada <[email protected]>
Cc: Dilip Kumar <[email protected]>
Subject: Re: BUG #19360: Bug Report: Logical Replication initial sync fails with "conflict=update_origin_differs" PG12 toPG18
Date: Thu, 4 Jun 2026 21:37:30 +1000
Message-ID: <CAFPTHDbs+P1O46LrDT9AX3ZAQwcKyOEyDT8N9F+q8MzxKRTqOA@mail.gmail.com> (raw)
In-Reply-To: <TYRPR01MB14195A04472A71EB78F35E42B945AA@TYRPR01MB14195.jpnprd01.prod.outlook.com>
References: <[email protected]>
<CALDaNm3Y6Y4Mub6QC8fZKnNy5jZspELQYCoQF_FL2Zwzweu=og@mail.gmail.com>
<CAA4eK1LxGXR7jOAKh0B8N362S-Q3b6GhBxxcV_HxUaicEPq5Cg@mail.gmail.com>
<CAD21AoDUKQHyy07gTrwsxHTwXAURYnzUYAsf6PxHHv2x1UdFog@mail.gmail.com>
<CAFiTN-vcp7mVT7=rvTpf1uqEQ+rxzDoHd+eJu7u541X9ivG9zQ@mail.gmail.com>
<CAD21AoCRReJHoLkQBMZztjO7C3Cste9w-PS_SG7VtBW1c3cR9w@mail.gmail.com>
<TYRPR01MB1419594753654605E54773C16945EA@TYRPR01MB14195.jpnprd01.prod.outlook.com>
<TYRPR01MB14195A04472A71EB78F35E42B945AA@TYRPR01MB14195.jpnprd01.prod.outlook.com>
On Thu, Jun 4, 2026 at 6:04 PM Zhijie Hou (Fujitsu) <[email protected]>
wrote:
> The remaining idea: storing the origin ID in pg_subscription_rel and
> teaching
> the apply worker to skip reporting origin_differs if the origin of the
> update
> matches the one stored in pg_subscription_rel, seems worth considering, if
> we
> cannot find an easier solution. There was a concern about performance, but
> since
> we could cache those tablesync origins in a local hash table and consult it
> during conflict detection, the performance impact might not be significant.
>
> That said, I may have missed some points. I will continue to think about
> this
> and try to update the patch later.
>
> Best Regards,
> Hou zj
>
I have implemented this solution in the attached patch. The patch adds a
new field to pg_subscription_rel, srtablesyncoriginid, which stores the
tablesync worker's origin ID for each relation. These origin IDs are loaded
into a per-subscription hash table after the tablesync COPY completes, and
also when the apply worker starts, thereby avoiding repeated catalog
lookups. During conflict detection, the apply worker checks this hash table
to determine whether the conflict origin ID matches one of the saved
tablesync origin IDs for the subscription. If a match is found, the
conflict is treated as originating from the initial table synchronization
and logging is suppressed. I have not included documentation updates in
this patch. If this approach is considered acceptable, I can prepare the
corresponding documentation changes as well.
regards,
Ajin Cherian
Fujitsu Australia
Attachments:
[application/octet-stream] v1-0001-Avoid-spurious-update_origin_differs-conflicts.patch (16.0K, 3-v1-0001-Avoid-spurious-update_origin_differs-conflicts.patch)
download | inline diff:
From 79efb925bd931358811689b23ec8aac631418420 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <[email protected]>
Date: Thu, 4 Jun 2026 18:05:47 +1000
Subject: [PATCH v1] Avoid spurious update_origin_differs conflicts
Conflicts such as conflict=update_origin_differs can occur on the
subscriber when rows are modified after being initially copied during
table synchronisation. Rows inserted by the tablesync worker carry its
origin ID. However, this origin is dropped once the tablesync worker
exits. As a result, when the apply worker later processes an update or
delete for such rows, the missing origin can lead to spurious
update_origin_differs or delete_origin_differs conflicts.
To address this, retain tablesync origin IDs for the subscription and
use them during conflict detection. The pg_subscription_rel catalog is
extended with a new field, srtablesyncoriginid, which stores the
tablesync worker origin ID for each relation. These origin IDs are
loaded into a per-subscription hash table after the tablesync COPY
completes, avoiding repeated catalog lookups.
When the apply worker encounters an update_origin_differs or
delete_origin_differs conflict, it now suppresses logging if the
original origin ID matches one of the preserved tablesync origins.
---
src/backend/catalog/pg_subscription.c | 40 ++++++-
src/backend/commands/subscriptioncmds.c | 2 +-
.../replication/logical/sequencesync.c | 3 +-
src/backend/replication/logical/tablesync.c | 22 +++-
src/backend/replication/logical/worker.c | 111 +++++++++++++++++-
src/include/catalog/pg_subscription_rel.h | 7 +-
src/include/replication/worker_internal.h | 2 +
7 files changed, 172 insertions(+), 15 deletions(-)
mode change 100644 => 100755 src/backend/replication/logical/worker.c
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 1f1fdc75af6..ba057968a84 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -25,6 +25,7 @@
#include "catalog/pg_type.h"
#include "foreign/foreign.h"
#include "miscadmin.h"
+#include "replication/origin.h"
#include "storage/lmgr.h"
#include "storage/lock.h"
#include "utils/acl.h"
@@ -327,6 +328,16 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid);
values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid);
values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state);
+
+ /*
+ * No tablesync origin is known at start - the origin id is written
+ * later by UpdateSubscriptionRelState() when the tablesync worker
+ * transitions the relation to SUBREL_STATE_FINISHEDCOPY.
+ */
+ values[Anum_pg_subscription_rel_srtablesyncoriginid - 1] =
+ Int16GetDatum((int16) InvalidReplOriginId);
+ nulls[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = false;
+
if (XLogRecPtrIsValid(sublsn))
values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn);
else
@@ -356,7 +367,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
*/
void
UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool already_locked)
+ XLogRecPtr sublsn, bool already_locked,
+ ReplOriginId originid)
{
Relation rel;
HeapTuple tup;
@@ -405,6 +417,30 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
else
nulls[Anum_pg_subscription_rel_srsublsn - 1] = true;
+
+
+ /*
+ * Store the tablesync origin ID used during the initial COPY phase so
+ * that the apply worker can suppress false update_origin_differs conflicts
+ * on rows stamped with this origin after crash recovery. If the caller
+ * passes InvalidReplOriginId, preserve the existing value; all state
+ * transitions after FINISHEDCOPY have no origin to contribute and pass
+ * in InvalidReplOriginId and that should not overwrite the one recorded
+ * during COPY.
+ */
+
+ if (originid == InvalidReplOriginId)
+ {
+ replaces[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = false;
+ }
+ else
+ {
+ values[Anum_pg_subscription_rel_srtablesyncoriginid - 1] =
+ Int16GetDatum((int16) originid);
+ replaces[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = true;
+ nulls[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = false;
+ }
+
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
replaces);
@@ -655,6 +691,8 @@ GetSubscriptionRelations(Oid subid, bool tables, bool sequences,
relstate = palloc_object(SubscriptionRelState);
relstate->relid = subrel->srrelid;
relstate->state = subrel->srsubstate;
+ relstate->originid = subrel->srtablesyncoriginid;
+
d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup,
Anum_pg_subscription_rel_srsublsn, &isnull);
if (isnull)
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 523959ba0ce..8e5dbd7defc 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1300,7 +1300,7 @@ AlterSubscription_refresh_seq(Subscription *sub)
Oid relid = subrel->relid;
UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
- InvalidXLogRecPtr, false);
+ InvalidXLogRecPtr, false, InvalidReplOriginId);
ereport(DEBUG1,
errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
get_namespace_name(get_rel_namespace(relid)),
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index e2ff8d77b16..d83b106a835 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -59,6 +59,7 @@
#include "pgstat.h"
#include "postmaster/interrupt.h"
#include "replication/logicalworker.h"
+#include "replication/origin.h"
#include "replication/worker_internal.h"
#include "storage/lwlock.h"
#include "utils/acl.h"
@@ -372,7 +373,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
* sequence as READY.
*/
UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
- seqinfo->page_lsn, false);
+ seqinfo->page_lsn, false, InvalidReplOriginId);
return COPYSEQ_SUCCESS;
}
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index a04b84ebc1d..ab2d54d5825 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -269,7 +269,8 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn,
- false);
+ false,
+ InvalidReplOriginId);
/*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
@@ -469,7 +470,17 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,
- rstate->lsn, true);
+ rstate->lsn, true,
+ InvalidReplOriginId);
+
+ /*
+ * Rebuild the tablesync origins cache now that this relation
+ * has transitioned to READY. The srtablesyncoriginid written
+ * at FINISHEDCOPY is now stable and needs to be loaded into
+ * the cache before the apply worker starts processing WAL
+ * changes for this relation.
+ */
+ rebuild_tablesync_origins_cache();
}
}
else
@@ -1375,7 +1386,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
MyLogicalRepWorker->relid,
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn,
- false);
+ false,
+ InvalidReplOriginId);
/*
* Create the replication origin in a separate transaction from the one
@@ -1504,8 +1516,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
MyLogicalRepWorker->relid,
SUBREL_STATE_FINISHEDCOPY,
MyLogicalRepWorker->relstate_lsn,
- false);
-
+ false,
+ originid);
CommitTransactionCommand();
copy_table_done:
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
old mode 100644
new mode 100755
index a3f2406ed83..7062d89a5b1
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -290,6 +290,7 @@
#include "tcop/tcopprot.h"
#include "utils/acl.h"
#include "utils/guc.h"
+#include "utils/hsearch.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -484,6 +485,21 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
static bool MySubscriptionValid = false;
+/*
+ * Hash table mapping ReplOriginId -> bool for origins used by tablesync
+ * workers during initial COPY. Built from pg_subscription_rel at apply
+ * worker startup and refreshed whenever a relation transitions to READY.
+ * Lets the apply worker suppress false update/delete_origin_differs
+ * conflicts on rows that were re-stamped with the tablesync origin ID
+ * during WAL replay after a crash.
+ */
+typedef struct TablesyncOriginEntry
+{
+ ReplOriginId originid; /* hash key — must be first */
+} TablesyncOriginEntry;
+
+static HTAB *tablesync_origins = NULL;
+
static List *on_commit_wakeup_workers_subids = NIL;
bool in_remote_transaction = false;
@@ -722,6 +738,75 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
return false; /* dummy for compiler */
}
+/*
+ * Rebuild the in-memory hash table of tablesync origin IDs from
+ * pg_subscription_rel. Called at apply worker startup and whenever a
+ * relation transitions to SUBREL_STATE_READY, so newly finished tablesync
+ * workers are always reflected in the cache.
+ */
+void
+rebuild_tablesync_origins_cache(void)
+{
+ List *subrels;
+ ListCell *lc;
+ HASHCTL ctl;
+
+ /* Destroy the old table if it exists */
+ if (tablesync_origins != NULL)
+ {
+ hash_destroy(tablesync_origins);
+ tablesync_origins = NULL;
+ }
+
+ /*
+ * Call GetSubscriptionRelations to get all tables for this subscription from
+ * pg_subscription_rel.
+ *
+ */
+ subrels = GetSubscriptionRelations(MySubscription->oid, true, false, false);
+
+ foreach(lc, subrels)
+ {
+ SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc);
+
+ if (relstate->originid == InvalidReplOriginId)
+ continue;
+
+ if (tablesync_origins == NULL)
+ {
+ memset(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(ReplOriginId);
+ ctl.entrysize = sizeof(TablesyncOriginEntry);
+ ctl.hcxt = ApplyContext;
+ tablesync_origins = hash_create("tablesync origins",
+ 16,
+ &ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ }
+
+ hash_search(tablesync_origins, &relstate->originid, HASH_ENTER, NULL);
+ }
+
+ list_free_deep(subrels);
+}
+
+/*
+ * is_tablesync_origin
+ *
+ * Returns true if the given origin ID is recorded in pg_subscription_rel
+ * as the tablesync origin for any relation in this subscription. Used to
+ * suppress false update/delete_origin_differs conflicts on rows that were
+ * stamped with the tablesync origin ID during WAL replay after a crash.
+ */
+static inline bool
+is_tablesync_origin(ReplOriginId originid)
+{
+ if (tablesync_origins == NULL || originid == InvalidReplOriginId)
+ return false;
+
+ return hash_search(tablesync_origins, &originid, HASH_FIND, NULL) != NULL;
+}
+
/*
* Begin one step (one INSERT, UPDATE, etc) of a replication transaction.
*
@@ -2958,11 +3043,13 @@ apply_handle_update_internal(ApplyExecutionData *edata,
{
/*
* Report the conflict if the tuple was modified by a different
- * origin.
+ * origin. Skip if the origin is recorded in pg_subscription_rel
+ * as a known tablesync origin for this subscription.
*/
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin, &conflicttuple.ts) &&
- conflicttuple.origin != replorigin_xact_state.origin)
+ conflicttuple.origin != replorigin_xact_state.origin &&
+ !is_tablesync_origin(conflicttuple.origin))
{
TupleTableSlot *newslot;
@@ -2971,7 +3058,6 @@ apply_handle_update_internal(ApplyExecutionData *edata,
slot_store_data(newslot, relmapentry, newtup);
conflicttuple.slot = localslot;
-
ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS,
remoteslot, newslot,
list_make1(&conflicttuple));
@@ -3153,11 +3239,13 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
{
/*
* Report the conflict if the tuple was modified by a different
- * origin.
+ * origin. Skip if the origin is recorded in pg_subscription_rel
+ * as a known tablesync origin.
*/
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin, &conflicttuple.ts) &&
- conflicttuple.origin != replorigin_xact_state.origin)
+ conflicttuple.origin != replorigin_xact_state.origin &&
+ !is_tablesync_origin(conflicttuple.origin))
{
conflicttuple.slot = localslot;
ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS,
@@ -3525,7 +3613,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin,
&conflicttuple.origin,
&conflicttuple.ts) &&
- conflicttuple.origin != replorigin_xact_state.origin)
+ conflicttuple.origin != replorigin_xact_state.origin &&
+ !is_tablesync_origin(conflicttuple.origin))
{
TupleTableSlot *newslot;
@@ -5714,6 +5803,16 @@ run_apply_worker(void)
origin_startpos = replorigin_session_get_progress(false);
CommitTransactionCommand();
+ /*
+ * Build the tablesync origins cache from pg_subscription_rel. This
+ * lets the apply worker recognise rows that were stamped with a (now
+ * dropped) tablesync origin ID while applying updates and deletes, and
+ * suppress false update/delete_origin_differs conflicts for them.
+ */
+ StartTransactionCommand();
+ rebuild_tablesync_origins_cache();
+ CommitTransactionCommand();
+
/* Is the use of a password mandatory? */
must_use_password = MySubscription->passwordrequired &&
!MySubscription->ownersuperuser;
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 502640d3018..81621ebe464 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -35,6 +35,8 @@ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId)
Oid srsubid BKI_LOOKUP(pg_subscription); /* Oid of subscription */
Oid srrelid BKI_LOOKUP(pg_class); /* Oid of relation */
char srsubstate; /* state of the relation in subscription */
+ int16 srtablesyncoriginid; /* tablesync origin used during COPY,
+ * InvalidReplOriginId if not applicable */
/*
* Although srsublsn is a fixed-width type, it is allowed to be NULL, so
@@ -84,6 +86,8 @@ typedef struct SubscriptionRelState
Oid relid;
XLogRecPtr lsn;
char state;
+ ReplOriginId originid; /* tablesync origin from srtablesyncoriginid,
+ * InvalidReplOriginId if not set */
} SubscriptionRelState;
/*
@@ -112,7 +116,8 @@ typedef struct LogicalRepSequenceInfo
extern void AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn, bool retain_lock);
extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn, bool already_locked);
+ XLogRecPtr sublsn, bool already_locked,
+ ReplOriginId originid);
extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn);
extern void RemoveSubscriptionRel(Oid subid, Oid relid);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 745b7d9e969..6b9233c7e09 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -305,6 +305,8 @@ extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
extern void apply_dispatch(StringInfo s);
+extern void rebuild_tablesync_origins_cache(void);
+
extern void maybe_reread_subscription(void);
extern void stream_cleanup_files(Oid subid, TransactionId xid);
--
2.47.3
view thread (5+ messages)
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: BUG #19360: Bug Report: Logical Replication initial sync fails with "conflict=update_origin_differs" PG12 toPG18
In-Reply-To: <CAFPTHDbs+P1O46LrDT9AX3ZAQwcKyOEyDT8N9F+q8MzxKRTqOA@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox