public inbox for [email protected]help / color / mirror / Atom feed
Re: BUG #19360: Bug Report: Logical Replication initial sync fails with "conflict=update_origin_differs" PG12 toPG18 5+ messages / 4 participants [nested] [flat]
* Re: BUG #19360: Bug Report: Logical Replication initial sync fails with "conflict=update_origin_differs" PG12 toPG18 @ 2026-01-09 04:46 Dilip Kumar <[email protected]> 0 siblings, 1 reply; 5+ messages in thread From: Dilip Kumar @ 2026-01-09 04:46 UTC (permalink / raw) To: Masahiko Sawada <[email protected]>; +Cc: Amit Kapila <[email protected]>; vignesh C <[email protected]>; [email protected]; [email protected] On Fri, Jan 9, 2026 at 4:17 AM Masahiko Sawada <[email protected]> wrote: > > On Mon, Dec 29, 2025 at 10:55 PM Amit Kapila <[email protected]> wrote: > > > > On Mon, Dec 29, 2025 at 4:26 PM vignesh C <[email protected]> wrote: > > > > > > On Mon, 22 Dec 2025 at 19:00, PG Bug reporting form > > > <[email protected]> wrote: > > > > > > > > > > This can occur in the following scenario: commit timestamp tracking is > > > enabled on the subscriber; the same table exists on both publisher and > > > subscriber; a publication is created on the publisher with initial > > > data; and a subscription is created on the subscriber with origin = > > > none. During the initial table synchronization, the row is inserted > > > using a tablesync replication origin, which is dropped once > > > synchronization completes. If the row is updated on the publisher > > > after the initial sync, the apply worker attempts to update a row that > > > was inserted using a different replication origin(tablesync origin), > > > resulting in an origin mismatch. > > > > > > The conflict is logged and logical replication continues normally. No > > > crash occurs, and the log entry is informational rather than > > > indicative of a failure. > > > > > > > I agree with this analysis. > > > > > These messages can be safely ignored for now. > > > > > > We are currently evaluating possible improvements to handle this > > > scenario more gracefully and to avoid reporting these conflicts in the > > > future. > > > > > > > One idea to safely ignore these LOGs is we could modify the state > > management in the catalog pg_subscription_rel to store originID. When > > a tablesync worker completes, instead of just deleting the origin and > > setting the relation state to ready, it could record the origin_id it > > used into pg_subscription_rel. When the apply worker encounters an > > origin mismatch, it checks pg_subscription_rel for that specific > > table. If the "old" origin ID matches the one recorded during the sync > > phase, the worker knows the row is "ours" and suppresses the log. Now, > > as the origin ID could be reused, we could additionally store local > > timestamp along with originId in pg_subscription_rel. Then, we can > > suppress the log if: row_origin_id == srsuboriginid AND > > row_commit_time <= srsubsynctime. > > It sounds very costly. IIUC we would need these checks for every first > update to tuples loaded via initial table sync. Can we somehow share > the apply worker's origin with tablesync workers so that they can > refer to the same origin ID? Or can we invent special origin IDs > (e.g., > 0x00FF) that are the same as the normal origin ID except for > being ignored by the conflict detection system? How will this distinguish between the initial sync is done from the publisher node we are getting the update vs the initial sync is done from some other node? Can we always ignore conflict checking for initial synced data or do we just want to ignore if the initial sync is done from the same node? -- Regards, Dilip Kumar Google ^ permalink raw reply [nested|flat] 5+ messages in thread
* Re: BUG #19360: Bug Report: Logical Replication initial sync fails with "conflict=update_origin_differs" PG12 toPG18 @ 2026-01-10 00:56 Masahiko Sawada <[email protected]> parent: Dilip Kumar <[email protected]> 0 siblings, 1 reply; 5+ messages in thread From: Masahiko Sawada @ 2026-01-10 00:56 UTC (permalink / raw) To: Dilip Kumar <[email protected]>; +Cc: Amit Kapila <[email protected]>; vignesh C <[email protected]>; [email protected]; [email protected] On Thu, Jan 8, 2026 at 8:46 PM Dilip Kumar <[email protected]> wrote: > > On Fri, Jan 9, 2026 at 4:17 AM Masahiko Sawada <[email protected]> wrote: > > > > On Mon, Dec 29, 2025 at 10:55 PM Amit Kapila <[email protected]> wrote: > > > > > > On Mon, Dec 29, 2025 at 4:26 PM vignesh C <[email protected]> wrote: > > > > > > > > On Mon, 22 Dec 2025 at 19:00, PG Bug reporting form > > > > <[email protected]> wrote: > > > > > > > > > > > > > This can occur in the following scenario: commit timestamp tracking is > > > > enabled on the subscriber; the same table exists on both publisher and > > > > subscriber; a publication is created on the publisher with initial > > > > data; and a subscription is created on the subscriber with origin = > > > > none. During the initial table synchronization, the row is inserted > > > > using a tablesync replication origin, which is dropped once > > > > synchronization completes. If the row is updated on the publisher > > > > after the initial sync, the apply worker attempts to update a row that > > > > was inserted using a different replication origin(tablesync origin), > > > > resulting in an origin mismatch. > > > > > > > > The conflict is logged and logical replication continues normally. No > > > > crash occurs, and the log entry is informational rather than > > > > indicative of a failure. > > > > > > > > > > I agree with this analysis. > > > > > > > These messages can be safely ignored for now. > > > > > > > > We are currently evaluating possible improvements to handle this > > > > scenario more gracefully and to avoid reporting these conflicts in the > > > > future. > > > > > > > > > > One idea to safely ignore these LOGs is we could modify the state > > > management in the catalog pg_subscription_rel to store originID. When > > > a tablesync worker completes, instead of just deleting the origin and > > > setting the relation state to ready, it could record the origin_id it > > > used into pg_subscription_rel. When the apply worker encounters an > > > origin mismatch, it checks pg_subscription_rel for that specific > > > table. If the "old" origin ID matches the one recorded during the sync > > > phase, the worker knows the row is "ours" and suppresses the log. Now, > > > as the origin ID could be reused, we could additionally store local > > > timestamp along with originId in pg_subscription_rel. Then, we can > > > suppress the log if: row_origin_id == srsuboriginid AND > > > row_commit_time <= srsubsynctime. > > > > It sounds very costly. IIUC we would need these checks for every first > > update to tuples loaded via initial table sync. Can we somehow share > > the apply worker's origin with tablesync workers so that they can > > refer to the same origin ID? Or can we invent special origin IDs > > (e.g., > 0x00FF) that are the same as the normal origin ID except for > > being ignored by the conflict detection system? > > How will this distinguish between the initial sync is done from the > publisher node we are getting the update vs the initial sync is done > from some other node? Can we always ignore conflict checking for > initial synced data or do we just want to ignore if the initial sync > is done from the same node? I imagined the former idea; always ignore conflict checking, so we don't need to distinguish them. IOW we treat the changes via the initial tablesync as if the changes made by the normal backend process (who doesn't use replication origin) while using the replication tracking ability of the replication origin. Regards, -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com ^ permalink raw reply [nested|flat] 5+ messages in thread
* RE: BUG #19360: Bug Report: Logical Replication initial sync fails with "conflict=update_origin_differs" PG12 toPG18 @ 2026-04-03 07:24 Zhijie Hou (Fujitsu) <[email protected]> parent: Masahiko Sawada <[email protected]> 0 siblings, 1 reply; 5+ messages in thread From: Zhijie Hou (Fujitsu) @ 2026-04-03 07:24 UTC (permalink / raw) To: Masahiko Sawada <[email protected]>; Dilip Kumar <[email protected]>; +Cc: Amit Kapila <[email protected]>; vignesh C <[email protected]>; [email protected] <[email protected]>; [email protected] <[email protected]> On Saturday, January 10, 2026 8:57 AM Masahiko Sawada <[email protected]> wrote: > > On Thu, Jan 8, 2026 at 8:46 PM Dilip Kumar <[email protected]> wrote: > > > > On Fri, Jan 9, 2026 at 4:17 AM Masahiko Sawada > <[email protected]> wrote: > > > Can we somehow > > > share the apply worker's origin with tablesync workers so that they > > > can refer to the same origin ID? Or can we invent special origin IDs > > > (e.g., > 0x00FF) that are the same as the normal origin ID except > > > for being ignored by the conflict detection system? > > > > How will this distinguish between the initial sync is done from the > > publisher node we are getting the update vs the initial sync is done > > from some other node? Can we always ignore conflict checking for > > initial synced data or do we just want to ignore if the initial sync > > is done from the same node? > > I imagined the former idea; always ignore conflict checking, so we don't need > to distinguish them. IOW we treat the changes via the initial tablesync as if > the changes made by the normal backend process (who doesn't use > replication origin) while using the replication tracking ability of the replication > origin. I think for changes made by backend process without setting up the origin, the apply worker still treat that as a conflict change when applying the remote changes as that's necessary to local vs. remote updates. I personally prefer to let the tablesync worker share the apply worker's origin ID while keeping a separate origin for progress tracking. Currently, the worker first calls replorigin_session_setup() and then stores the origin ID in replorigin_xact_state. The natural implementation is for the tablesync worker to still set up its own origin for tracking, but assign the apply worker's origin ID to the global state. This gives us per‑tablesync progress tracking while ensuring that changes from both workers appear to come from the same origin. A small patch demonstrating this approach is attached. Best Regards, Hou zj Attachments: [application/octet-stream] v1-0001-write-tablesync-changes-with-the-subscription-ori.patch (4.7K, 2-v1-0001-write-tablesync-changes-with-the-subscription-ori.patch) download | inline diff: From 3047c7473df8f3be43859c1bec74c99fba80ecf0 Mon Sep 17 00:00:00 2001 From: Zhijie Hou <[email protected]> Date: Fri, 3 Apr 2026 13:44:40 +0800 Subject: [PATCH v1] write tablesync changes with the subscription origin ID During initial table synchronization, tablesync workers were writing tuples with the per-table tablesync origin identity. Later, when the leader apply worker processed UPDATE/DELETE for those tuples, conflict checks could see an origin mismatch and report benign update_origin_differs or delete_origin_differs noise for changes from the same subscription. Fix this by keeping the tablesync origin for per-table sync progress tracking and resume but stamp tuple writes with the subscription-level apply origin identity. This ensures conflict detection sees tablesync and apply changes as coming from the same subscription. --- src/backend/replication/logical/tablesync.c | 27 ++++++++++++++++++--- src/test/subscription/t/029_on_error.pl | 9 +++++++ 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f49a4852ecb..4aa39341e8e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1226,6 +1226,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) AclResult aclresult; WalRcvExecResult *res; char originname[NAMEDATALEN]; + char applyoriginname[NAMEDATALEN]; ReplOriginId originid; UserContext ucxt; bool must_use_password; @@ -1285,12 +1286,17 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC || MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY); - /* Assign the origin tracking record name. */ + /* Assign the origin tracking record names. */ ReplicationOriginNameForLogicalRep(MySubscription->oid, MyLogicalRepWorker->relid, originname, sizeof(originname)); + ReplicationOriginNameForLogicalRep(MySubscription->oid, + InvalidOid, + applyoriginname, + sizeof(applyoriginname)); + if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC) { /* @@ -1320,7 +1326,15 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ originid = replorigin_by_name(originname, false); replorigin_session_setup(originid, 0); - replorigin_xact_state.origin = originid; + + /* + * Tablesync keeps its own origin for replication progress, but writes + * must be tagged with the subscription-level apply origin so conflict + * detection sees tablesync and apply changes as coming from the same + * subscription. + */ + replorigin_xact_state.origin = replorigin_by_name(applyoriginname, false); + *origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); @@ -1407,7 +1421,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); replorigin_session_setup(originid, 0); - replorigin_xact_state.origin = originid; + + /* + * Tablesync keeps its own origin for replication progress, but writes + * must be tagged with the subscription-level apply origin so conflict + * detection sees tablesync and apply changes as coming from the same + * subscription. + */ + replorigin_xact_state.origin = replorigin_by_name(applyoriginname, false); /* * If the user did not opt to run as the owner of the subscription diff --git a/src/test/subscription/t/029_on_error.pl b/src/test/subscription/t/029_on_error.pl index 7d68759b6cd..85d3478f44f 100644 --- a/src/test/subscription/t/029_on_error.pl +++ b/src/test/subscription/t/029_on_error.pl @@ -146,6 +146,8 @@ COMMIT; test_skip_lsn($node_publisher, $node_subscriber, "(2, NULL)", "2", "test skipping transaction"); +my $log_location = -s $node_subscriber->logfile; + # Test for PREPARE and COMMIT PREPARED. Update the data and PREPARE the # transaction, raising an error on the subscriber due to violation of the # unique constraint on tbl. Then skip the transaction. @@ -160,6 +162,13 @@ COMMIT PREPARED 'gtx'; test_skip_lsn($node_publisher, $node_subscriber, "(3, NULL)", "3", "test skipping prepare and commit prepared "); +# Check that no update_origin_differs conflicts are raised +my $logfile = slurp_file($node_subscriber->logfile(), $log_location); +unlike( + $logfile, + qr/conflict detected on relation "public.tbl": conflict=update_origin_differs.*/, + 'modifying the row copied by tablesync should not cause update_origin_differs conflict'); + # Test for STREAM COMMIT. Insert enough rows to tbl to exceed the 64kB # limit, also raising an error on the subscriber during applying spooled # changes for the same reason. Then skip the transaction. -- 2.53.0.windows.2 ^ permalink raw reply [nested|flat] 5+ messages in thread
* RE: BUG #19360: Bug Report: Logical Replication initial sync fails with "conflict=update_origin_differs" PG12 toPG18 @ 2026-04-07 08:13 Zhijie Hou (Fujitsu) <[email protected]> parent: Zhijie Hou (Fujitsu) <[email protected]> 0 siblings, 1 reply; 5+ messages in thread From: Zhijie Hou (Fujitsu) @ 2026-04-07 08:13 UTC (permalink / raw) To: Zhijie Hou (Fujitsu) <[email protected]>; +Cc: Amit Kapila <[email protected]>; vignesh C <[email protected]>; [email protected] <[email protected]>; [email protected] <[email protected]>; Masahiko Sawada <[email protected]>; Dilip Kumar <[email protected]> On Friday, April 3, 2026 3:24 PM Zhijie Hou (Fujitsu) <[email protected]> wrote: > On Saturday, January 10, 2026 8:57 AM Masahiko Sawada > <[email protected]> wrote: > > > > On Thu, Jan 8, 2026 at 8:46 PM Dilip Kumar <[email protected]> > wrote: > > > > > > On Fri, Jan 9, 2026 at 4:17 AM Masahiko Sawada > > <[email protected]> wrote: > > > > Can we somehow > > > > share the apply worker's origin with tablesync workers so that > > > > they can refer to the same origin ID? Or can we invent special > > > > origin IDs (e.g., > 0x00FF) that are the same as the normal origin > > > > ID except for being ignored by the conflict detection system? > > > > > > How will this distinguish between the initial sync is done from the > > > publisher node we are getting the update vs the initial sync is done > > > from some other node? Can we always ignore conflict checking for > > > initial synced data or do we just want to ignore if the initial > > > sync is done from the same node? > > > > I imagined the former idea; always ignore conflict checking, so we > > don't need to distinguish them. IOW we treat the changes via the > > initial tablesync as if the changes made by the normal backend process > > (who doesn't use replication origin) while using the replication > > tracking ability of the replication origin. > > I think for changes made by backend process without setting up the origin, the > apply worker still treat that as a conflict change when applying the remote > changes as that's necessary to local vs. remote updates. > > I personally prefer to let the tablesync worker share the apply worker's origin > ID while keeping a separate origin for progress tracking. Currently, the worker > first calls replorigin_session_setup() and then stores the origin ID in > replorigin_xact_state. The natural implementation is for the tablesync worker > to still set up its own origin for tracking, but assign the apply worker's origin ID > to the global state. This gives us per‑tablesync progress tracking while > ensuring that changes from both workers appear to come from the same > origin. > After further analysis, I think the approach I mentioned earlier is unsafe. When replaying the commit record during recovery, if only the main apply origin ID is present, we cannot recover the progress status for each tablesync origin. The idea of using a special origin ID for all tablesync origins suffers from the same problem, e.g., progress cannot be recovered when replaying commit WAL records. I have been trying to find a way to fix this issue within the proposed approaches, but I haven't been able to come up with a better solution for now. One attempt was to continue WAL‑logging the tablesync's own origin ID, but only store the main origin ID in the commit timestamp module. However, this also has a problem during recovery: it cannot identify which main origin corresponds to a given tablesync origin recorded in the commit WAL record. (One might think we could store this top‑level relationship in the catalog, but since catalogs are not accessible during recovery, that approach would not work.) Consequently, we cannot restore the same origin ID in the commit timestamp module during recovery as was present during normal commit. The remaining idea: storing the origin ID in pg_subscription_rel and teaching the apply worker to skip reporting origin_differs if the origin of the update matches the one stored in pg_subscription_rel, seems worth considering, if we cannot find an easier solution. There was a concern about performance, but since we could cache those tablesync origins in a local hash table and consult it during conflict detection, the performance impact might not be significant. That said, I may have missed some points. I will continue to think about this and try to update the patch later. Best Regards, Hou zj ^ permalink raw reply [nested|flat] 5+ messages in thread
* Re: BUG #19360: Bug Report: Logical Replication initial sync fails with "conflict=update_origin_differs" PG12 toPG18 @ 2026-06-04 11:37 Ajin Cherian <[email protected]> parent: Zhijie Hou (Fujitsu) <[email protected]> 0 siblings, 0 replies; 5+ messages in thread From: Ajin Cherian @ 2026-06-04 11:37 UTC (permalink / raw) To: Zhijie Hou (Fujitsu) <[email protected]>; +Cc: Amit Kapila <[email protected]>; vignesh C <[email protected]>; [email protected] <[email protected]>; [email protected] <[email protected]>; Masahiko Sawada <[email protected]>; Dilip Kumar <[email protected]> On Thu, Jun 4, 2026 at 6:04 PM Zhijie Hou (Fujitsu) <[email protected]> wrote: > The remaining idea: storing the origin ID in pg_subscription_rel and > teaching > the apply worker to skip reporting origin_differs if the origin of the > update > matches the one stored in pg_subscription_rel, seems worth considering, if > we > cannot find an easier solution. There was a concern about performance, but > since > we could cache those tablesync origins in a local hash table and consult it > during conflict detection, the performance impact might not be significant. > > That said, I may have missed some points. I will continue to think about > this > and try to update the patch later. > > Best Regards, > Hou zj > I have implemented this solution in the attached patch. The patch adds a new field to pg_subscription_rel, srtablesyncoriginid, which stores the tablesync worker's origin ID for each relation. These origin IDs are loaded into a per-subscription hash table after the tablesync COPY completes, and also when the apply worker starts, thereby avoiding repeated catalog lookups. During conflict detection, the apply worker checks this hash table to determine whether the conflict origin ID matches one of the saved tablesync origin IDs for the subscription. If a match is found, the conflict is treated as originating from the initial table synchronization and logging is suppressed. I have not included documentation updates in this patch. If this approach is considered acceptable, I can prepare the corresponding documentation changes as well. regards, Ajin Cherian Fujitsu Australia Attachments: [application/octet-stream] v1-0001-Avoid-spurious-update_origin_differs-conflicts.patch (16.0K, 3-v1-0001-Avoid-spurious-update_origin_differs-conflicts.patch) download | inline diff: From 79efb925bd931358811689b23ec8aac631418420 Mon Sep 17 00:00:00 2001 From: Ajin Cherian <[email protected]> Date: Thu, 4 Jun 2026 18:05:47 +1000 Subject: [PATCH v1] Avoid spurious update_origin_differs conflicts Conflicts such as conflict=update_origin_differs can occur on the subscriber when rows are modified after being initially copied during table synchronisation. Rows inserted by the tablesync worker carry its origin ID. However, this origin is dropped once the tablesync worker exits. As a result, when the apply worker later processes an update or delete for such rows, the missing origin can lead to spurious update_origin_differs or delete_origin_differs conflicts. To address this, retain tablesync origin IDs for the subscription and use them during conflict detection. The pg_subscription_rel catalog is extended with a new field, srtablesyncoriginid, which stores the tablesync worker origin ID for each relation. These origin IDs are loaded into a per-subscription hash table after the tablesync COPY completes, avoiding repeated catalog lookups. When the apply worker encounters an update_origin_differs or delete_origin_differs conflict, it now suppresses logging if the original origin ID matches one of the preserved tablesync origins. --- src/backend/catalog/pg_subscription.c | 40 ++++++- src/backend/commands/subscriptioncmds.c | 2 +- .../replication/logical/sequencesync.c | 3 +- src/backend/replication/logical/tablesync.c | 22 +++- src/backend/replication/logical/worker.c | 111 +++++++++++++++++- src/include/catalog/pg_subscription_rel.h | 7 +- src/include/replication/worker_internal.h | 2 + 7 files changed, 172 insertions(+), 15 deletions(-) mode change 100644 => 100755 src/backend/replication/logical/worker.c diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1f1fdc75af6..ba057968a84 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -25,6 +25,7 @@ #include "catalog/pg_type.h" #include "foreign/foreign.h" #include "miscadmin.h" +#include "replication/origin.h" #include "storage/lmgr.h" #include "storage/lock.h" #include "utils/acl.h" @@ -327,6 +328,16 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, values[Anum_pg_subscription_rel_srsubid - 1] = ObjectIdGetDatum(subid); values[Anum_pg_subscription_rel_srrelid - 1] = ObjectIdGetDatum(relid); values[Anum_pg_subscription_rel_srsubstate - 1] = CharGetDatum(state); + + /* + * No tablesync origin is known at start - the origin id is written + * later by UpdateSubscriptionRelState() when the tablesync worker + * transitions the relation to SUBREL_STATE_FINISHEDCOPY. + */ + values[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = + Int16GetDatum((int16) InvalidReplOriginId); + nulls[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = false; + if (XLogRecPtrIsValid(sublsn)) values[Anum_pg_subscription_rel_srsublsn - 1] = LSNGetDatum(sublsn); else @@ -356,7 +367,8 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, */ void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool already_locked) + XLogRecPtr sublsn, bool already_locked, + ReplOriginId originid) { Relation rel; HeapTuple tup; @@ -405,6 +417,30 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, else nulls[Anum_pg_subscription_rel_srsublsn - 1] = true; + + + /* + * Store the tablesync origin ID used during the initial COPY phase so + * that the apply worker can suppress false update_origin_differs conflicts + * on rows stamped with this origin after crash recovery. If the caller + * passes InvalidReplOriginId, preserve the existing value; all state + * transitions after FINISHEDCOPY have no origin to contribute and pass + * in InvalidReplOriginId and that should not overwrite the one recorded + * during COPY. + */ + + if (originid == InvalidReplOriginId) + { + replaces[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = false; + } + else + { + values[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = + Int16GetDatum((int16) originid); + replaces[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = true; + nulls[Anum_pg_subscription_rel_srtablesyncoriginid - 1] = false; + } + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); @@ -655,6 +691,8 @@ GetSubscriptionRelations(Oid subid, bool tables, bool sequences, relstate = palloc_object(SubscriptionRelState); relstate->relid = subrel->srrelid; relstate->state = subrel->srsubstate; + relstate->originid = subrel->srtablesyncoriginid; + d = SysCacheGetAttr(SUBSCRIPTIONRELMAP, tup, Anum_pg_subscription_rel_srsublsn, &isnull); if (isnull) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 523959ba0ce..8e5dbd7defc 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1300,7 +1300,7 @@ AlterSubscription_refresh_seq(Subscription *sub) Oid relid = subrel->relid; UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT, - InvalidXLogRecPtr, false); + InvalidXLogRecPtr, false, InvalidReplOriginId); ereport(DEBUG1, errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state", get_namespace_name(get_rel_namespace(relid)), diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c index e2ff8d77b16..d83b106a835 100644 --- a/src/backend/replication/logical/sequencesync.c +++ b/src/backend/replication/logical/sequencesync.c @@ -59,6 +59,7 @@ #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/logicalworker.h" +#include "replication/origin.h" #include "replication/worker_internal.h" #include "storage/lwlock.h" #include "utils/acl.h" @@ -372,7 +373,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner) * sequence as READY. */ UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY, - seqinfo->page_lsn, false); + seqinfo->page_lsn, false, InvalidReplOriginId); return COPYSEQ_SUCCESS; } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index a04b84ebc1d..ab2d54d5825 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -269,7 +269,8 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn) MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn, - false); + false, + InvalidReplOriginId); /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop @@ -469,7 +470,17 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn) */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, - rstate->lsn, true); + rstate->lsn, true, + InvalidReplOriginId); + + /* + * Rebuild the tablesync origins cache now that this relation + * has transitioned to READY. The srtablesyncoriginid written + * at FINISHEDCOPY is now stable and needs to be loaded into + * the cache before the apply worker starts processing WAL + * changes for this relation. + */ + rebuild_tablesync_origins_cache(); } } else @@ -1375,7 +1386,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn, - false); + false, + InvalidReplOriginId); /* * Create the replication origin in a separate transaction from the one @@ -1504,8 +1516,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relid, SUBREL_STATE_FINISHEDCOPY, MyLogicalRepWorker->relstate_lsn, - false); - + false, + originid); CommitTransactionCommand(); copy_table_done: diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c old mode 100644 new mode 100755 index a3f2406ed83..7062d89a5b1 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -290,6 +290,7 @@ #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/guc.h" +#include "utils/hsearch.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -484,6 +485,21 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL; Subscription *MySubscription = NULL; static bool MySubscriptionValid = false; +/* + * Hash table mapping ReplOriginId -> bool for origins used by tablesync + * workers during initial COPY. Built from pg_subscription_rel at apply + * worker startup and refreshed whenever a relation transitions to READY. + * Lets the apply worker suppress false update/delete_origin_differs + * conflicts on rows that were re-stamped with the tablesync origin ID + * during WAL replay after a crash. + */ +typedef struct TablesyncOriginEntry +{ + ReplOriginId originid; /* hash key — must be first */ +} TablesyncOriginEntry; + +static HTAB *tablesync_origins = NULL; + static List *on_commit_wakeup_workers_subids = NIL; bool in_remote_transaction = false; @@ -722,6 +738,75 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel) return false; /* dummy for compiler */ } +/* + * Rebuild the in-memory hash table of tablesync origin IDs from + * pg_subscription_rel. Called at apply worker startup and whenever a + * relation transitions to SUBREL_STATE_READY, so newly finished tablesync + * workers are always reflected in the cache. + */ +void +rebuild_tablesync_origins_cache(void) +{ + List *subrels; + ListCell *lc; + HASHCTL ctl; + + /* Destroy the old table if it exists */ + if (tablesync_origins != NULL) + { + hash_destroy(tablesync_origins); + tablesync_origins = NULL; + } + + /* + * Call GetSubscriptionRelations to get all tables for this subscription from + * pg_subscription_rel. + * + */ + subrels = GetSubscriptionRelations(MySubscription->oid, true, false, false); + + foreach(lc, subrels) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc); + + if (relstate->originid == InvalidReplOriginId) + continue; + + if (tablesync_origins == NULL) + { + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(ReplOriginId); + ctl.entrysize = sizeof(TablesyncOriginEntry); + ctl.hcxt = ApplyContext; + tablesync_origins = hash_create("tablesync origins", + 16, + &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + hash_search(tablesync_origins, &relstate->originid, HASH_ENTER, NULL); + } + + list_free_deep(subrels); +} + +/* + * is_tablesync_origin + * + * Returns true if the given origin ID is recorded in pg_subscription_rel + * as the tablesync origin for any relation in this subscription. Used to + * suppress false update/delete_origin_differs conflicts on rows that were + * stamped with the tablesync origin ID during WAL replay after a crash. + */ +static inline bool +is_tablesync_origin(ReplOriginId originid) +{ + if (tablesync_origins == NULL || originid == InvalidReplOriginId) + return false; + + return hash_search(tablesync_origins, &originid, HASH_FIND, NULL) != NULL; +} + /* * Begin one step (one INSERT, UPDATE, etc) of a replication transaction. * @@ -2958,11 +3043,13 @@ apply_handle_update_internal(ApplyExecutionData *edata, { /* * Report the conflict if the tuple was modified by a different - * origin. + * origin. Skip if the origin is recorded in pg_subscription_rel + * as a known tablesync origin for this subscription. */ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_xact_state.origin) + conflicttuple.origin != replorigin_xact_state.origin && + !is_tablesync_origin(conflicttuple.origin)) { TupleTableSlot *newslot; @@ -2971,7 +3058,6 @@ apply_handle_update_internal(ApplyExecutionData *edata, slot_store_data(newslot, relmapentry, newtup); conflicttuple.slot = localslot; - ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_ORIGIN_DIFFERS, remoteslot, newslot, list_make1(&conflicttuple)); @@ -3153,11 +3239,13 @@ apply_handle_delete_internal(ApplyExecutionData *edata, { /* * Report the conflict if the tuple was modified by a different - * origin. + * origin. Skip if the origin is recorded in pg_subscription_rel + * as a known tablesync origin. */ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_xact_state.origin) + conflicttuple.origin != replorigin_xact_state.origin && + !is_tablesync_origin(conflicttuple.origin)) { conflicttuple.slot = localslot; ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, @@ -3525,7 +3613,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_xact_state.origin) + conflicttuple.origin != replorigin_xact_state.origin && + !is_tablesync_origin(conflicttuple.origin)) { TupleTableSlot *newslot; @@ -5714,6 +5803,16 @@ run_apply_worker(void) origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); + /* + * Build the tablesync origins cache from pg_subscription_rel. This + * lets the apply worker recognise rows that were stamped with a (now + * dropped) tablesync origin ID while applying updates and deletes, and + * suppress false update/delete_origin_differs conflicts for them. + */ + StartTransactionCommand(); + rebuild_tablesync_origins_cache(); + CommitTransactionCommand(); + /* Is the use of a password mandatory? */ must_use_password = MySubscription->passwordrequired && !MySubscription->ownersuperuser; diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 502640d3018..81621ebe464 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -35,6 +35,8 @@ CATALOG(pg_subscription_rel,6102,SubscriptionRelRelationId) Oid srsubid BKI_LOOKUP(pg_subscription); /* Oid of subscription */ Oid srrelid BKI_LOOKUP(pg_class); /* Oid of relation */ char srsubstate; /* state of the relation in subscription */ + int16 srtablesyncoriginid; /* tablesync origin used during COPY, + * InvalidReplOriginId if not applicable */ /* * Although srsublsn is a fixed-width type, it is allowed to be NULL, so @@ -84,6 +86,8 @@ typedef struct SubscriptionRelState Oid relid; XLogRecPtr lsn; char state; + ReplOriginId originid; /* tablesync origin from srtablesyncoriginid, + * InvalidReplOriginId if not set */ } SubscriptionRelState; /* @@ -112,7 +116,8 @@ typedef struct LogicalRepSequenceInfo extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn, bool already_locked); + XLogRecPtr sublsn, bool already_locked, + ReplOriginId originid); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 745b7d9e969..6b9233c7e09 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -305,6 +305,8 @@ extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, extern void apply_dispatch(StringInfo s); +extern void rebuild_tablesync_origins_cache(void); + extern void maybe_reread_subscription(void); extern void stream_cleanup_files(Oid subid, TransactionId xid); -- 2.47.3 ^ permalink raw reply [nested|flat] 5+ messages in thread
end of thread, other threads:[~2026-06-04 11:37 UTC | newest] Thread overview: 5+ messages (download: mbox mbox.gz follow: Atom feed) -- links below jump to the message on this page -- 2026-01-09 04:46 Re: BUG #19360: Bug Report: Logical Replication initial sync fails with "conflict=update_origin_differs" PG12 toPG18 Dilip Kumar <[email protected]> 2026-01-10 00:56 ` Masahiko Sawada <[email protected]> 2026-04-03 07:24 ` Zhijie Hou (Fujitsu) <[email protected]> 2026-04-07 08:13 ` Zhijie Hou (Fujitsu) <[email protected]> 2026-06-04 11:37 ` Ajin Cherian <[email protected]>
This inbox is served by agora; see mirroring instructions for how to clone and mirror all data and code used for this inbox