public inbox for [email protected]
help / color / mirror / Atom feedFrom: Amit Kapila <[email protected]>
To: shveta malik <[email protected]>
Cc: Dilip Kumar <[email protected]>
Cc: Nisha Moond <[email protected]>
Cc: vignesh C <[email protected]>
Cc: Peter Smith <[email protected]>
Cc: Masahiko Sawada <[email protected]>
Cc: Bharath Rupireddy <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Subject: Re: Proposal: Conflict log history table for Logical Replication
Date: Sun, 21 Jun 2026 19:18:54 +0530
Message-ID: <CAA4eK1JhZYRMP_YYa1j3uAK6L4v057JDuM0+YLABOgAOYuwM8Q@mail.gmail.com> (raw)
In-Reply-To: <CAJpy0uDGuaEvBym5VnE4mXJ_meo3jR7uG=c6wVbDAiuTjHcwrg@mail.gmail.com>
References: <CAFiTN-u5D5o_AGNbHRZHaOqAMWkxLf+hSk_r9X3gv6HbLOB5+g@mail.gmail.com>
<CALj2ACViThGQDYi-yeqUeHqG2Pozn2AiyvtDtjE6zhhbM0KsEA@mail.gmail.com>
<CAA4eK1+44b3vd_OWfiaVNtjf5Njb5cek09pmKRmttBByeg0NoA@mail.gmail.com>
<CAFiTN-v3L0WacCDx5dkOSonaZQbJfstXL4HrCPD1ahRdUsRnSg@mail.gmail.com>
<CALj2ACW63uuxh0fSoxEAF8OMWhz1dJKSkp268WJDzf5BUqCf5g@mail.gmail.com>
<CAFiTN-s9WWLOhW1TO27NtJwGf0bh2+MWyp3NEkZFeN_S5_p_rA@mail.gmail.com>
<CAA4eK1LxnsEx5sMbQkK5MHAgXKPROMQXQ0n=fKMwz+UsfKQaMQ@mail.gmail.com>
<CAD21AoDj+c4LXf2y4ESR-gVyv9d8V0G4R8R9pn-PcmT5zPzYcg@mail.gmail.com>
<CAA4eK1KokmAwNOL6bS-ip_E3F96PiQTjC4j-M+5vD1T6uUyi3Q@mail.gmail.com>
<CAFiTN-vFKE8E_N6h+peX9DP92mxCeFdm5A9Esn4DkLmNcZ-dOA@mail.gmail.com>
<CAFiTN-shLYf-fOTQ_dBf3Xfx05gxs_8d93MHZXyyz6w2Bg5geQ@mail.gmail.com>
<CAFiTN-tEgkKQHUikn6iBFCYf7XOObR7ncUq=OVh7WEk=6P4ymw@mail.gmail.com>
<CAFiTN-tQiakd8m+-d6WN6RpJXSv_JcropZ2oGzme4d1JudQhYg@mail.gmail.com>
<CAJpy0uDKbYWt+YPADj=4fHEvrGEWgnG1n_YsiGT_EZiZf0VSAw@mail.gmail.com>
<CAFiTN-t82BiXen+HfdR9jZyOpuSO92xonnUK=khXsiZWBfOxMA@mail.gmail.com>
<CAJpy0uAu2paxGAEffD=vaBTW9Jqbtxxawb8K8FgiASfeKPnGog@mail.gmail.com>
<CAJpy0uC0ZWgHOivJ102A1fMkppwK3RuSMafRPKyjwkmJrjhVUw@mail.gmail.com>
<CAFiTN-vFV9-zajrwjYHYyFnyQsooOAXW4CpxB5f-iT3APjOtoQ@mail.gmail.com>
<CAJpy0uBeU1dZgaqsSVKc=P=EVUKxRgVuHR8jDXFL-HLibbE-kQ@mail.gmail.com>
<CAA4eK1+FOkOxhzVLAnDymoNjp4i98H-L1+ZsWDgJEv-ndnTzTA@mail.gmail.com>
<CAFiTN-sVK6Bp+BawCJU_WpAXQSTX4OkKmce5EE4YNBgD-XSjZw@mail.gmail.com>
<CAA4eK1LbjV0bctib9wUnBpEkC+2rZFPnGuRtrKuc5AtUAzum+A@mail.gmail.com>
<CAFiTN-vq50N3QP9p3_SH+tJ8Pn=uRDb0X4qEcQZYcGW9AX88rQ@mail.gmail.com>
<CAFiTN-u3+zRGPESP5kUUfa6NxaWh1HL-gd1225KJ0Uvzi1urow@mail.gmail.com>
<CAA4eK1L4iNk6mNTC83PbYrRfUdtivH4U961PkdFfOO7mvc=USg@mail.gmail.com>
<CAFiTN-v+Mh64UfR5zb5rwgyGm6HS80XRSZ_XeaWkg8=+s9o3Kg@mail.gmail.com>
<CAFiTN-s3ZFHteQsiC3H4=AjTWxuwN-w69XQ3xL5X6YOMTua4pA@mail.gmail.com>
<CAJpy0uDe724nY59j-8hMapZ_Fru1Wo-NucF4Ea1B3Jrw=+J+UQ@mail.gmail.com>
<CAFiTN-uR=86L_5tyiA7n73EXCSCuDfQKfL5O=c8n7zZom8_ONQ@mail.gmail.com>
<CAD21AoDfOS-J0M9WbM3D20eGbSPzbfLQ-9XoYkxO4AZ9twqyvg@mail.gmail.com>
<CAFiTN-vMTg2X7vwfHLr5Gvy8ViV63_iaEcpHmM8V5GpA9-u8cg@mail.gmail.com>
<CAA4eK1+b2Ws0e_ZYJsgZAPn7VWndxAK_YM_QMKcfXst3e7F6Jg@mail.gmail.com>
<CAFiTN-v6hFKMPrSyTBsz=AtEETYMbOxrqvhZJsPQqKgQc4WCLw@mail.gmail.com>
<CAA4eK1KV3rYkaxys5fh-PtE9kq5xrFbiaRpOSPoRgQG494ek+g@mail.gmail.com>
<CAFiTN-utvu=QjY1QQ1a_TvkpkpvesMWo9M8wTFYLaOTPdpOJvw@mail.gmail.com>
<CAA4eK1+HoSOEqNwT3twArPNx4_D7hSUoEg2LnYhX8n9iUwhXgQ@mail.gmail.com>
<CAFiTN-tqmsfW0Sk=1RhzuduxqLrf9KEc8VOvBae+4aYxWTJwuA@mail.gmail.com>
<CAA4eK1JmCQ=DHe3HsqpX+P3mGDUd_Z7E7oAxdstK6822W6tuCw@mail.gmail.com>
<CAFiTN-uE4eAUYewuq3c5deAt3TtVork+H6rkUHRv68cOGr5rmQ@mail.gmail.com>
<CAFiTN-sJbhPX+LbA8YuQeYJpfGA2XA+OKXf8jCm04RoJOyzLvw@mail.gmail.com>
<CAJpy0uBPOyWj9itFjHzGXfrUuYS8KGmAvgdcV_9FPjWZ0EZz_w@mail.gmail.com>
<CAFiTN-s=iLE4qM4qmw9yXKqW09R_c_HqaSGeZXJ2EaTVfXss+g@mail.gmail.com>
<CAA4eK1KYo0vZpPSRc_4gVpa06-J39gxjs3tHFyckgkBfYJSfFA@mail.gmail.com>
<CAFiTN-vrKc6OWzrg6yvpwYcj79k=zkrDp3uwiZzjwrWLJAq6tw@mail.gmail.com>
<CAA4eK1LmvrfEgn1NUZZ=E3yMCjQdNZ5=_SBEry73-EmF6jM_PQ@mail.gmail.com>
<CAFiTN-vjfub5b3PqPQzfOw9BSjm8jt28ott+Hoz9CrRxJHzYkg@mail.gmail.com>
<CAFiTN-v=ANapYvRK+SOy2wJb4CSuD6Vb6_bTGuReM9Dv+3tucA@mail.gmail.com>
<CALDaNm1zEYoSdf2Ns-=UJRw95E5sbfpB0oaNUWtRJN27Q1Knhw@mail.gmail.com>
<CALDaNm3USsXVNBsfdpkp60HVgrTV4taWMk1xZYNBa7QUF=V0jg@mail.gmail.com>
<CAFiTN-sNg9ghLNkB2Kn0SwBGOub9acc99XZZU_d5NAcyW-yrEg@mail.gmail.com>
<CAJpy0uAF3EYcYdpTHdKMeXfvaPbNvnWrZUATrSLL1hqjao=33A@mail.gmail.com>
<CAFiTN-uikggCKp2LscTorKY5d3KF9j93DW0xebDcRX86G+ZsSw@mail.gmail.com>
<CAJpy0uDaOoVK8S3_xxTAcTDpfK1AY7tApw7nPOZG_gUz+DMi=Q@mail.gmail.com>
<CAA4eK1+AdeC5B9xrAXSKWGtTh-0d8xdD=fZttmOBm+c8o8thAQ@mail.gmail.com>
<CAFiTN-skBQAeuzuUd+PDK0Gqc8g+4x9ypBMwJhOrmW8ZCFKGSA@mail.gmail.com>
<CAJpy0uCdrsW5T+okq7xTOVxagje7FW3DOeY5B0CGKYa5VqF_tQ@mail.gmail.com>
<CAFiTN-u+_mFj9caYYFO7=_YHFXk5y=vvOm2H2=5hctYktmAVGA@mail.gmail.com>
<CALDaNm1aivk9KgQ5daeF6YZzuE+0wWc2yb7wb6qikNyvfPN0Sg@mail.gmail.com>
<CAJpy0uD6fTEUYJx3+yDbvB=VW7c5AaGoeSd7iwHdYYO=kYGn3g@mail.gmail.com>
<CALDaNm2YOOdJ25X1sJ+DYz37K6Qi4g0ZNFHb_pQMF9UqancnEA@mail.gmail.com>
<CAHut+PtMS5bENS0DVtBj+s3kUEOq61+hSkqLODjFB78egB0imQ@mail.gmail.com>
<CAFiTN-s_M83sfs+MHHbUrMesjsCPN4JWxY5MChCEiY1U-u7=9g@mail.gmail.com>
<CAFiTN-vj8NTm9w_L2XdhxJCub_RZw__YVUgfXa1B1kJzJctRNw@mail.gmail.com>
<CAJpy0uBDLnfhuSiev8W9ZMFNTzUmqhds2dKayUpLoN-z1dtsLA@mail.gmail.com>
<CAFiTN-uL9f0X+=Ep4BbAPvaTJA7S4XHM--G4BsnPJw4uJW7EGQ@mail.gmail.com>
<CAJpy0uDG=t-y_m8t1zpBzfz9viP3K8dyQgkruaraVT85UtTkrg@mail.gmail.com>
<CAFiTN-tR8Rhs8uhfbck0Ac4dd1MopvvYgjK39nWyNXRp9Z3Qww@mail.gmail.com>
<CAA4eK1Kf15UpNmpTTE2XyX=9PE_oTpOoy5xqg3rFWbxwwP4Rbg@mail.gmail.com>
<CAFiTN-tNqb0vjuadDz-as67ksSXa=aEK+JW=4b54RVmkUK1m2Q@mail.gmail.com>
<CAFiTN-vDCxx6ydUFo59L8qNBbierg4as3TGPPiavR7UZjYurzA@mail.gmail.com>
<CAHut+PsWms218ENALnytLEV4NpxjOrAYhChLDaMaeE65-vNgrQ@mail.gmail.com>
<CAFiTN-v9i9RmDvdUmtMUow4=b+nr0k7LKMyEQ+6ZF=EVdfBhBA@mail.gmail.com>
<CALDaNm2YTKwPDjt9OV49RgM0zbkWhMhNu228bj_7f+zzcPb-ew@mail.gmail.com>
<CAFiTN-t_4XvofM3an-WmykqnPE+9wf9U+o2M7p1CWd9eXkN88Q@mail.gmail.com>
<CAHut+PuaqNDfDu_3xkZR4OYxw-B7ew_WjpLXCBvMcSBJz2K6Xg@mail.gmail.com>
<CAFiTN-uqNN9S_hRuda_th5MEpywa15g+XO00yM6tNJ-spGRRJw@mail.gmail.com>
<CABdArM6QxXatkGefTHy__HgaYHBvbKesffeXzT8Vn-kvcvGK4w@mail.gmail.com>
<CAFiTN-tgMWr=TGPhs9BxaPuSC_jhM7sJJ4fHedE5W6=h40jLfA@mail.gmail.com>
<CABdArM5fgzfyC2mH3YGB8t8cJBHWqAG1BS6rJMk7mX-8=9d=Cg@mail.gmail.com>
<CABdArM568KF4WXdFX_aZkCiDK8R71Wpep0gC2a+cV8BMobwkrg@mail.gmail.com>
<CAFiTN-vQ0tu18BD3UmKPb0rzZyFMQAVgGbdpMA8iYLX7PZOqOA@mail.gmail.com>
<CALDaNm20PDtmG2E3qaTC+YuL5twv+c9k573wL3sb=OwgmZphxQ@mail.gmail.com>
<CAA4eK1LhOHa_TEznw+gFoq+w0vMvvsDG2g9Xq8Mwa8xZMY73og@mail.gmail.com>
<CAFiTN-vPDqrQ2rHykNgd+groFxqwBYFQF97R-Co2EmtUkV6MTg@mail.gmail.com>
<CAFiTN-vsd=wNiEPXPQhZnipAb--+mBUC01M-pcjBjbRockgCUA@mail.gmail.com>
<CAJpy0uCjSq_gUCJBfURhqtB6bLvkKSUL-sVXpaGKjEapv5+t+Q@mail.gmail.com>
<CAFiTN-uZ-LaStAY3NuCY-nb7GCB9joiHX7HtHEMseJ0xfnqVSg@mail.gmail.com>
<CAFiTN-vhJxRW5NQ628oidnk0KtHwKt11dW9-+vxqpXLTgjiYiA@mail.gmail.com>
<CALDaNm1cJURibYKY4+DuNosjM72C9oGheUF-roMyff__+AsKBw@mail.gmail.com>
<CAJpy0uD1_77TDAFc4jE-94X-WUus7Q3gGU0pXfC+Tticq1hFvA@mail.gmail.com>
<CAA4eK1LFcSc4XCj4mU-cv27F_6n6=+ehJ=YAsAnyBbz4Sv_tVg@mail.gmail.com>
<CAFiTN-s5ZtjXKrSbam7TNWJ9Ax-kCancXcestAnx2by7dK0-UA@mail.gmail.com>
<CAFiTN-u=Da32mXyz8jocEGtuLSG4ccXXj_aEzUTPp2zkLb3MVA@mail.gmail.com>
<CALDaNm1qY5e0thfsDB2uWXqZn4hgTWTxiUDwcF1hWA-jodsKYg@mail.gmail.com>
<CALDaNm1nFtv3dtdRdbqWo2Rf_av7XbxDfK1Orqjcqs_Su_cLRQ@mail.gmail.com>
<CABdArM7R498qC5Fr42aU_q-2Sc5QsT4dyKgmO_f6Uy=8oCAFXA@mail.gmail.com>
<CAFiTN-sRZ+Z_9B3ue2L4zkbcfmPjjcAjcR1C+px1PyAs+HGsSg@mail.gmail.com>
<CAFiTN-sdcjf9xJ2M-=ab5e4y662tTmFFiP4gHL44tC9PcQozcw@mail.gmail.com>
<CALDaNm2WNjaNxUijVkvT6y69D62rfCu8OMwU-Pf-84un2r_=ig@mail.gmail.com>
<CAHut+PvEP5uUR13xJ3gbNKGU49=Rg32DXMGZ2wL9jTcKHyN_=Q@mail.gmail.com>
<CALDaNm3Jb5AQTsFJFxYZZJCaheT7qToCZkEALfW-vsMMFxjOyQ@mail.gmail.com>
<CAHut+PtQn5U9i00qvBmjo0KBxyb+ZmBb38NzF91KnX4J86Jg_g@mail.gmail.com>
<CALDaNm1a1gzy0L38U394_4OFwGUS8ALgSONYj++VLimY0g9piQ@mail.gmail.com>
<CABdArM5X63AdtS99QKGjVijUd_Q_dV8QUDSo4nTHKJjn3JwtAg@mail.gmail.com>
<CAA4eK1+h=QV4Zi=PW8Zt2D6be5ki5Mu2HgdXcfUophptx6Mt_A@mail.gmail.com>
<CAFiTN-s-tuxar9Dp5He0CFa1pzfy1fmiwcBj6PtwD0hDodE5ng@mail.gmail.com>
<CAFiTN-sx=k+Th=uYsrLcS6YMZbPVi9Wrggn1w2Nzf9MLEU7YRQ@mail.gmail.com>
<CAFiTN-u5pcgAhXyJgj+p7-xmShtp0i8xA000tzjCLFQp_zMXUA@mail.gmail.com>
<CAFiTN-tRpS7b3qFqckqFtHETj0jyzj-8SxC1arjfwf-hQd47PQ@mail.gmail.com>
<CABdArM5Ka_m_GWhL_zZbeDPKmL-Wezwb4A-NHnO-v-fRDuhA-Q@mail.gmail.com>
<CAFiTN-sqEMAbZ2pTt=zMa=918NV7HVeXF4bCOF+swtzKnTy5yQ@mail.gmail.com>
<CALDaNm0WX0Vqoy2UQZh-2TpWraf4OYn28kWe9aGR=vxKwLA+bw@mail.gmail.com>
<CAJpy0uAA_XszCAcoBuCUM3VobD39DbMDwCPUT+XW7wFfE+_E8w@mail.gmail.com>
<CAFiTN-urKwsdyFvwz1go_C7jJFtLA8TJEhnaECAjuqkRk_1cXA@mail.gmail.com>
<CAJpy0uBhJGqD+OyA9Uk8bHyk61XWHEf3Le1QxkotwOLcQCqaZA@mail.gmail.com>
<CAFiTN-sbAXS6deUarPDunj2U+A6Dvhw_TASy4oMv4Tc63-_T9g@mail.gmail.com>
<CAJpy0uBaW7ziTsHOu_z37-epihxx3qGMjqdV+_-Z-RFjq5EOGg@mail.gmail.com>
<CAFiTN-sxVPbuqHjz99NGTz5UU0xgegsvpRa6=NkbP8_iW+X6-A@mail.gmail.com>
<CAJpy0uBs6D9ojMpz4MWgrxDbvRxqnvN+B+JnMNezBtuvhk_j9A@mail.gmail.com>
<CAFiTN-urmbTtk40RfLU8UMYtzk-_DLXwui3_G+TWp6XYKBphjw@mail.gmail.com>
<CABdArM5dtMEUw14-aDSht2Vh1tsgO67t_ZU8VQp=Ut7MwK5aEQ@mail.gmail.com>
<CAJpy0uBSY7zTH=4TvAOS=kj9vivBUc9NO+Vp6KNw-Na9RiAsMg@mail.gmail.com>
<CAFiTN-vqyo2=pgA_jUyQoWrOwUXxw1s3aYHcZBr_EHsztZN9jg@mail.gmail.com>
<CAJpy0uDGuaEvBym5VnE4mXJ_meo3jR7uG=c6wVbDAiuTjHcwrg@mail.gmail.com>
On Thu, Jun 18, 2026 at 9:33 AM shveta malik <[email protected]> wrote:
>
> On Tue, Jun 16, 2026 at 6:54 PM Dilip Kumar <[email protected]> wrote:
> >
> > IMHO we should just log WARNING and continue the apply worker on
> > conflict insertion failure, lets see what other thinks on this.
> >
>
> I have the same opinion. Allowing CLT to block the apply worker would
> be undesirable; CLT is a history/logs collection feature and should
> not interrupt core logical replication work.
>
I think the insert can fail in rare cases like disk getting full while
writing WAL or some internal memory ERROR and the ERROR could be
persistent which means the LOG will be filled with the same WARNING if
there are many conflicts. Also, users may not like missing out on
conflict information. So, we can ERROR out and let users fix the
situation. Additionally, the nested try-catch to downgrade ERROR to
WARNING also looks ugly and a source of future bugs and maintenance
burden. The attached patch tries to fix this by ERRORing out on
insertion failure and attaching the required conflict info as a
context of ERROR. The patch also improved the ReportApplyConflict()
non-ERROR paths by displaying the conflict information in server LOGs
before inserting the same into CLT so that if insertion fails, the
complete conflict info can be present in server LOGs. See
v52-1-amit.Improve-error-handling-for-conflict-log-table-ins.
Additionally, there is another problem with 0003 where when a parallel
apply worker hits an ERROR-level conflict it logs the conflict to the
conflict log table in a new transaction in its error path, after
aborting the failed apply transaction. But the leader detects worker
failure in pa_wait_for_xact_finish() by waiting on the worker's
transaction lock, and AbortOutOfAnyTransaction() releases that lock:
the leader unblocks, sees a non-finished state, raises "lost
connection to the logical replication parallel apply worker", and
tears the worker down -- which can SIGTERM it mid-insert and lose the
conflict log row, besides being a misleading message. The attached
top-up patch v52-2-amit.fix_parallel_apply_logging fixes that by
introducing PARALLEL_TRANS_ERROR state. I think if you are okay with
this patch, you can merge it into your 0003 but
v52-1-amit.Improve-error-handling-for-conflict-log-table-ins can be
reviewed before being merged into 0003.
I have taken help from AI to work on these top-up (atop v52 0003)
bug-fix patches and done self initial review and test of these but
more review and testing is required for this work.
--
With Regards,
Amit Kapila.
From a148d389d3ec3b87a45de4267aeea6e559a0c556 Mon Sep 17 00:00:00 2001
From: Amit Kapila <[email protected]>
Date: Fri, 19 Jun 2026 11:09:08 +0530
Subject: [PATCH v3] Improve error handling for conflict log table insertions.
When a subscription logs conflicts to its conflict log table (CLT), the
apply worker prepares the conflict tuple and inserts it either inline
(for sub-ERROR conflicts, where apply continues) or, for ERROR-level
conflicts, defers the insertion to a fresh transaction after the apply
transaction aborts (ProcessPendingConflictLogTuple), so the log row is
not rolled back with the failed change.
The previous error handling around that insertion had a few problems:
* ProcessPendingConflictLogTuple() wrapped the insert in its own
PG_TRY/PG_CATCH that, on failure, called FlushErrorState() and
downgraded the failure to a WARNING. When called from start_apply()'s
PG_CATCH (which then does PG_RE_THROW()), that FlushErrorState() reset
the error stack and discarded the very error being re-thrown, so a
failure of the deferred insert could corrupt error reporting. It also
silently swallowed genuine insertion failures.
* If the inline insert (sub-ERROR path) failed, the conflict was not
recorded anywhere: the apply transaction aborted before the conflict
was written to the server log.
* In the parallel apply worker, a failed deferred insert likewise lost
the original error.
Rework this so that a failure to write the CLT is treated as a normal
apply error, and so that the conflict is always identifiable:
* Drop the internal PG_TRY/PG_CATCH from ProcessPendingConflictLogTuple().
A failed insert now raises an ERROR like any other apply failure; such
failures (e.g. the CLT was dropped, or out-of-space) are expected to be
rare and persistent.
* Annotate insertion failures with the conflict being logged.
prepare_conflict_log_tuple() stashes a short description ("while logging
conflict <type> detected on relation <rel>") in a new
LogicalRepWorker.conflict_log_errcontext field (allocated, like
conflict_log_tuple, in ApplyContext), and InsertConflictLogTuple()
installs an error context callback around the heap_insert(). Any error
raised during the insert therefore carries enough context to identify
the conflict, even on the deferred path where the original conflict
error is not separately reported.
* Report the conflict to the server log before inserting it into the
table. ReportApplyConflict() now prepares the tuple, emits the
server-log message (full details when the destination includes the log,
otherwise a short message pointing at the table), and only then inserts.
This guarantees the conflict is recorded even if the table insert fails.
The inline (sub-ERROR) insert is wrapped so that, on failure, the
prepared tuple is discarded before re-throwing, ensuring the deferred
path does not retry the same failing insert.
* Rework the worker error handlers to do the deferred insertion cleanly.
start_apply() and ParallelApplyWorkerMain() now copy the error into a
long-lived context, FlushErrorState(), reset error_context_stack (the
callbacks active at throw time belong to unwound frames, and the insert
installs its own), abort, run ProcessPendingConflictLogTuple(), and
finally ReThrowError() the original error. On success the original
error is re-thrown; on a failed insert the insert error (carrying the
conflict context) propagates instead. For the parallel apply worker
this means the leader always receives a real ErrorResponse rather than
a "lost connection to the parallel apply worker" message.
* In DisableSubscriptionAndExit(), perform the deferred insertion after
the subscription has been disabled and committed (and before the
dead-tuple retention check), so a failure to log the conflict cannot
prevent the subscription from being disabled or leave the worker
restarting and failing forever.
---
.../replication/logical/applyparallelworker.c | 38 ++--
src/backend/replication/logical/conflict.c | 214 +++++++++++-------
src/backend/replication/logical/worker.c | 51 ++++-
src/include/replication/worker_internal.h | 7 +
4 files changed, 211 insertions(+), 99 deletions(-)
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d24c52d43e6..48cb5558367 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -993,32 +993,44 @@ ParallelApplyWorkerMain(Datum main_arg)
PG_CATCH();
{
MemoryContext oldcontext;
- ErrorData *edata;
+ ErrorData *edata;
/*
- * Copy the ErrorData before doing any further work. The error may
- * have been raised while running under ErrorContext, so switch to
- * a safe context (TopMemoryContext) to avoid assertions and ensure
- * the error data survives subsequent cleanup.
+ * Reset the origin state to prevent the advancement of origin
+ * progress if we fail to apply. Otherwise, this will result in
+ * transaction loss as that transaction won't be sent again by the
+ * server.
+ */
+ replorigin_xact_clear(true);
+
+ /*
+ * Copy the error and recover to an idle state so we can insert the
+ * deferred conflict log tuple (if any) before re-throwing. Copy the
+ * error into a longer-lived context first, as it may have been raised
+ * under ErrorContext. Also reset the error context stack: the
+ * callbacks in effect when the error was thrown belong to unwound stack
+ * frames, and the deferred insert installs its own.
*/
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
edata = CopyErrorData();
MemoryContextSwitchTo(oldcontext);
FlushErrorState();
+ error_context_stack = NULL;
+ AbortOutOfAnyTransaction();
/*
- * Reset the origin state to prevent the advancement of origin
- * progress if we fail to apply. Otherwise, this will result in
- * transaction loss as that transaction won't be sent again by the
- * server.
+ * Insert the deferred conflict log tuple before re-throwing.
+ * Re-throwing is what reports the error to the leader (via the error
+ * queue set up above), so the insertion must happen first: otherwise
+ * the leader could start tearing down this worker while it is still
+ * writing the conflict log tuple. If the insertion itself fails, that
+ * error (annotated with the conflict context, see InsertConflictLogTuple)
+ * propagates to the leader instead of the original.
*/
- replorigin_xact_clear(true);
-
- AbortOutOfAnyTransaction();
ProcessPendingConflictLogTuple();
- /* Re-throw the original error. */
+ /* Re-throw the original error, which reports it to the leader. */
ReThrowError(edata);
}
PG_END_TRY();
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 6bf3d6d5a44..c2c15f055e6 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -335,17 +335,16 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
log_dest_table = CONFLICTS_LOGGED_TO_TABLE(dest);
log_dest_logfile = CONFLICTS_LOGGED_TO_LOG(dest);
- /* Insert to table if requested. */
+ /*
+ * Prepare the conflict log tuple first when the destination includes the
+ * table. This must happen before the ereport() below, because for an
+ * ERROR-level conflict that ereport() raises the error and defers the
+ * actual insertion to ProcessPendingConflictLogTuple(), which relies on the
+ * tuple having been prepared.
+ */
if (log_dest_table)
{
Assert(conflictlogrel != NULL);
-
- /*
- * Prepare the conflict log tuple. If the error level is below ERROR,
- * insert it immediately. Otherwise, defer the insertion to a new
- * transaction after the current one aborts, ensuring the insertion of
- * the log tuple is not rolled back.
- */
prepare_conflict_log_tuple(estate,
relinfo->ri_RelationDesc,
conflictlogrel,
@@ -353,29 +352,16 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
searchslot,
conflicttuples,
remoteslot);
- if (elevel < ERROR)
- InsertConflictLogTuple(conflictlogrel);
-
- if (!log_dest_logfile)
- {
- /*
- * Not logging conflict details to the server log; Report the error
- * msg but omit raw tuple data from server logs since it's already
- * captured in the conflict log table.
- */
- ereport(elevel,
- errcode_apply_conflict(type),
- errmsg("conflict detected on relation \"%s\": conflict=%s",
- RelationGetQualifiedRelationName(localrel),
- ConflictTypeNames[type]),
- errdetail("Conflict details are logged to the conflict log table: %s",
- RelationGetRelationName(conflictlogrel)));
- }
-
- table_close(conflictlogrel, RowExclusiveLock);
}
- /* Log into the server log if requested. */
+ /*
+ * Report the conflict to the server log before inserting it into the
+ * conflict log table. Emitting it first guarantees the conflict is
+ * recorded even if the table insert below fails; it is also what raises the
+ * error for ERROR-level conflicts. When the server log is one of the
+ * destinations we emit the full details, otherwise (table-only) we emit a
+ * shorter message since the details are captured in the table.
+ */
if (log_dest_logfile)
{
StringInfoData err_detail;
@@ -400,6 +386,64 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictTypeNames[type]),
errdetail_internal("%s", err_detail.data));
}
+ else if (log_dest_table)
+ {
+ /*
+ * Not logging conflict details to the server log; report the conflict
+ * but omit raw tuple data since it is captured in the conflict log
+ * table.
+ */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s\": conflict=%s",
+ RelationGetQualifiedRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail("Conflict details are logged to the conflict log table: %s",
+ RelationGetRelationName(conflictlogrel)));
+ }
+
+ /*
+ * Insert into the conflict log table if requested. For conflicts below
+ * ERROR the apply transaction continues, so insert immediately; for
+ * ERROR-level conflicts the ereport() above already raised the error and
+ * the insertion is deferred to a new transaction
+ * (ProcessPendingConflictLogTuple) so that it is not rolled back.
+ */
+ if (log_dest_table)
+ {
+ if (elevel < ERROR)
+ {
+ PG_TRY();
+ {
+ InsertConflictLogTuple(conflictlogrel);
+ }
+ PG_CATCH();
+ {
+ /*
+ * The insert failed, so the apply transaction will abort and
+ * the error will propagate to the worker's error handler. The
+ * conflict was already reported to the server log above, so it
+ * is not lost. Discard the prepared tuple so that the deferred
+ * insertion path (ProcessPendingConflictLogTuple) does not retry
+ * this same failing insert.
+ */
+ if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+ {
+ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+ }
+ if (MyLogicalRepWorker->conflict_log_errcontext != NULL)
+ {
+ pfree(MyLogicalRepWorker->conflict_log_errcontext);
+ MyLogicalRepWorker->conflict_log_errcontext = NULL;
+ }
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+
+ table_close(conflictlogrel, RowExclusiveLock);
+ }
}
/*
@@ -428,60 +472,28 @@ ProcessPendingConflictLogTuple(void)
if (MyLogicalRepWorker->conflict_log_tuple == NULL)
return;
- PG_TRY();
- {
- StartTransactionCommand();
- PushActiveSnapshot(GetTransactionSnapshot());
-
- /* Open conflict log table and insert the tuple */
- conflictlogrel = GetConflictLogDestAndTable(&dest);
- Assert(conflictlogrel);
-
- InsertConflictLogTuple(conflictlogrel);
-
- table_close(conflictlogrel, RowExclusiveLock);
-
- PopActiveSnapshot();
- CommitTransactionCommand();
- }
- PG_CATCH();
- {
- ErrorData *edata;
- MemoryContext oldctx;
-
- /* Save error info in our memory context */
- oldctx = MemoryContextSwitchTo(TopMemoryContext);
- edata = CopyErrorData();
- MemoryContextSwitchTo(oldctx);
-
- /* Clear the error state so we can continue */
- FlushErrorState();
+ /*
+ * Insert the deferred conflict log tuple in its own transaction. A
+ * failure here (e.g. the conflict log table was dropped, or an
+ * out-of-disk-space error) is treated like any other apply error and
+ * raises an ERROR; such failures are expected to be rare and persistent.
+ * Callers must therefore have already reported (and cleared) any
+ * in-progress apply error before calling this, so that this error does not
+ * mask the original one.
+ */
+ StartTransactionCommand();
+ PushActiveSnapshot(GetTransactionSnapshot());
- /* Abort the transaction we started above */
- AbortOutOfAnyTransaction();
+ /* Open conflict log table and insert the tuple */
+ conflictlogrel = GetConflictLogDestAndTable(&dest);
+ Assert(conflictlogrel);
- /*
- * Report the error as a warning. We use WARNING because we don't want
- * this to be a fatal error for the worker, and we want to allow the
- * caller's original error to remain primary.
- */
- ereport(WARNING,
- (errmsg("could not log conflict to table for subscription \"%s\": %s",
- MySubscription->name, edata->message)));
+ InsertConflictLogTuple(conflictlogrel);
- FreeErrorData(edata);
+ table_close(conflictlogrel, RowExclusiveLock);
- /*
- * Free the conflict log tuple and set it to NULL. This ensures we
- * don't try to insert the same problematic tuple again.
- */
- if (MyLogicalRepWorker->conflict_log_tuple != NULL)
- {
- heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
- MyLogicalRepWorker->conflict_log_tuple = NULL;
- }
- }
- PG_END_TRY();
+ PopActiveSnapshot();
+ CommitTransactionCommand();
}
/*
@@ -544,6 +556,19 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
return table_open(conflictlogrelid, RowExclusiveLock);
}
+/*
+ * Error context callback for failures while inserting into the conflict log
+ * table. Adds a line identifying the conflict that was being logged.
+ */
+static void
+conflict_log_insert_errcontext(void *arg)
+{
+ char *ctx = (char *) arg;
+
+ if (ctx)
+ errcontext("%s", ctx);
+}
+
/*
* InsertConflictLogTuple
*
@@ -554,15 +579,34 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
void
InsertConflictLogTuple(Relation conflictlogrel)
{
+ ErrorContextCallback errcallback;
+
/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+ /*
+ * Set up an error context so that a failure to insert (e.g. the conflict
+ * log table was dropped, or an out-of-space error) carries information
+ * identifying the conflict we were trying to log.
+ */
+ errcallback.callback = conflict_log_insert_errcontext;
+ errcallback.arg = MyLogicalRepWorker->conflict_log_errcontext;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL);
- /* Free conflict log tuple. */
+ error_context_stack = errcallback.previous;
+
+ /* Free the conflict log tuple and its context string. */
heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
MyLogicalRepWorker->conflict_log_tuple = NULL;
+ if (MyLogicalRepWorker->conflict_log_errcontext)
+ {
+ pfree(MyLogicalRepWorker->conflict_log_errcontext);
+ MyLogicalRepWorker->conflict_log_errcontext = NULL;
+ }
}
/*
@@ -1381,5 +1425,15 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
oldctx = MemoryContextSwitchTo(ApplyContext);
MyLogicalRepWorker->conflict_log_tuple =
heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+
+ /*
+ * Stash a context string describing this conflict, so that if inserting
+ * the tuple into the conflict log table fails, the resulting error carries
+ * enough context to identify the conflict (see InsertConflictLogTuple).
+ */
+ MyLogicalRepWorker->conflict_log_errcontext =
+ psprintf("while logging conflict \"%s\" detected on relation \"%s\"",
+ ConflictTypeNames[conflict_type],
+ RelationGetRelationName(rel));
MemoryContextSwitchTo(oldctx);
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 05ba6d8b1ad..69480cbc886 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -5658,6 +5658,9 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ MemoryContext oldcontext;
+ ErrorData *edata;
+
/*
* Reset the origin state to prevent the advancement of origin
* progress if we fail to apply. Otherwise, this will result in
@@ -5671,15 +5674,33 @@ start_apply(XLogRecPtr origin_startpos)
else
{
/*
- * Report the worker failed while applying changes. Abort the
- * current transaction so that the stats message is sent in an
- * idle state.
+ * Save the error and recover to an idle state so we can insert the
+ * deferred conflict log tuple (if any) before re-throwing. Copy
+ * the error into a long-lived context first, as it may have been
+ * raised under ErrorContext. Also reset the error context stack:
+ * the callbacks in effect when the error was thrown belong to
+ * unwound stack frames, and the deferred insert installs its own.
*/
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ edata = CopyErrorData();
+ MemoryContextSwitchTo(oldcontext);
+
+ FlushErrorState();
+ error_context_stack = NULL;
AbortOutOfAnyTransaction();
pgstat_report_subscription_error(MySubscription->oid);
+
+ /*
+ * Insert the deferred conflict log tuple in its own transaction.
+ * If this fails, that error (annotated with the conflict context,
+ * see InsertConflictLogTuple) propagates instead of the original;
+ * such failures are expected to be rare and persistent (e.g. out of
+ * disk space).
+ */
ProcessPendingConflictLogTuple();
- PG_RE_THROW();
+ /* Re-throw the original error. */
+ ReThrowError(edata);
}
}
PG_END_TRY();
@@ -6046,14 +6067,19 @@ DisableSubscriptionAndExit(void)
RESUME_INTERRUPTS();
+ /*
+ * The error context callbacks in effect when the error was thrown belong
+ * to now-unwound stack frames; reset the stack before running further code
+ * (including the deferred conflict log insertion, which installs its own).
+ */
+ error_context_stack = NULL;
+
/*
* Report the worker failed during sequence synchronization, table
* synchronization, or apply.
*/
pgstat_report_subscription_error(MyLogicalRepWorker->subid);
- ProcessPendingConflictLogTuple();
-
/* Disable the subscription */
StartTransactionCommand();
@@ -6076,6 +6102,19 @@ DisableSubscriptionAndExit(void)
errmsg("subscription \"%s\" has been disabled because of an error",
MySubscription->name));
+ /*
+ * Insert the deferred conflict log tuple (if any) now that the
+ * subscription has been disabled and committed. Doing it after the
+ * disable means a failure to log the conflict (treated as a hard error,
+ * see ProcessPendingConflictLogTuple) cannot prevent the subscription from
+ * being disabled and so cannot leave the worker restarting and failing
+ * forever. Do it before the dead-tuple retention check below: that check
+ * only warns today, but it takes an elevel and could raise an error, which
+ * must not prevent the conflict from being recorded. The original error
+ * was already reported above.
+ */
+ ProcessPendingConflictLogTuple();
+
/*
* Skip the track_commit_timestamp check when disabling the worker due to
* an error, as verifying commit timestamps is unnecessary in this
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 6b6525dc2e2..c0059b1b810 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -103,6 +103,13 @@ typedef struct LogicalRepWorker
/* A conflict log tuple that is prepared but not yet inserted. */
HeapTuple conflict_log_tuple;
+ /*
+ * Error-context string describing the conflict above, used to annotate any
+ * error raised while inserting conflict_log_tuple into the conflict log
+ * table. Allocated, like conflict_log_tuple, in ApplyContext.
+ */
+ char *conflict_log_errcontext;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
--
2.54.0
From 8806a07656c7a90668132dc17f051174f2a62ae6 Mon Sep 17 00:00:00 2001
From: Amit Kapila <[email protected]>
Date: Fri, 19 Jun 2026 15:40:07 +0530
Subject: [PATCH v4] Don't tear down a parallel apply worker before it logs its
conflict
When a parallel apply worker hits an ERROR-level conflict it logs the
conflict to the conflict log table in a new transaction in its error path,
after aborting the failed apply transaction. But the leader detects worker
failure in pa_wait_for_xact_finish() by waiting on the worker's transaction
lock, and AbortOutOfAnyTransaction() releases that lock: the leader unblocks,
sees a non-finished state, raises "lost connection to the logical replication
parallel apply worker", and tears the worker down -- which can SIGTERM it
mid-insert and lose the conflict log row, besides being a misleading message.
Fix this by adding a PARALLEL_TRANS_ERROR state that the worker sets before
aborting (while the leader is still blocked on the lock, so it is visible once
the leader unblocks). On seeing it, the leader waits for the worker to report
its actual error via the error queue -- which keeps the worker alive long
enough to finish writing the conflict log tuple and lets the leader report the
real error instead of "lost connection". The original message remains as the
fallback for a worker that died without signalling; as a side effect,
non-conflict worker failures now also surface the real error.
---
.../replication/logical/applyparallelworker.c | 35 +++++++++++++++++++
src/include/replication/worker_internal.h | 2 ++
2 files changed, 37 insertions(+)
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 48cb5558367..a3f5b9b122d 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -1017,6 +1017,17 @@ ParallelApplyWorkerMain(Datum main_arg)
FlushErrorState();
error_context_stack = NULL;
+
+ /*
+ * Tell the leader we failed and are about to report the error and log
+ * the conflict. This must be set before AbortOutOfAnyTransaction()
+ * below releases the transaction lock that the leader waits on in
+ * pa_wait_for_xact_finish(); otherwise the leader would see a
+ * non-finished state, assume the connection was lost, and tear this
+ * worker down while it is still writing the conflict log tuple.
+ */
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_ERROR);
+
AbortOutOfAnyTransaction();
/*
@@ -1361,9 +1372,33 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
* released.
*/
if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
+ {
+ /*
+ * If the worker signalled that it errored (PARALLEL_TRANS_ERROR), it is
+ * logging the conflict and will report the actual error via the error
+ * queue before exiting. Wait for that rather than reporting a generic
+ * lost connection: CHECK_FOR_INTERRUPTS() drives
+ * ProcessParallelApplyMessages(), which raises the real error on the
+ * worker's ErrorResponse (or "lost connection" if the worker died
+ * without reporting). Waiting here also keeps the worker alive long
+ * enough to finish writing the conflict log tuple.
+ */
+ while (pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_ERROR)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ (void) WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 10L,
+ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
+
+ ResetLatch(MyLatch);
+ }
+
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("lost connection to the logical replication parallel apply worker")));
+ }
}
/*
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index c0059b1b810..79c90dddd89 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -131,6 +131,8 @@ typedef enum ParallelTransState
PARALLEL_TRANS_UNKNOWN,
PARALLEL_TRANS_STARTED,
PARALLEL_TRANS_FINISHED,
+ PARALLEL_TRANS_ERROR, /* worker failed; it will report the error (and
+ * log the conflict, if any) before exiting */
} ParallelTransState;
/*
--
2.54.0
Attachments:
[text/plain] v52-1-amit.Improve-error-handling-for-conflict-log-table-ins.txt (20.3K, 2-v52-1-amit.Improve-error-handling-for-conflict-log-table-ins.txt)
download | inline diff:
From a148d389d3ec3b87a45de4267aeea6e559a0c556 Mon Sep 17 00:00:00 2001
From: Amit Kapila <[email protected]>
Date: Fri, 19 Jun 2026 11:09:08 +0530
Subject: [PATCH v3] Improve error handling for conflict log table insertions.
When a subscription logs conflicts to its conflict log table (CLT), the
apply worker prepares the conflict tuple and inserts it either inline
(for sub-ERROR conflicts, where apply continues) or, for ERROR-level
conflicts, defers the insertion to a fresh transaction after the apply
transaction aborts (ProcessPendingConflictLogTuple), so the log row is
not rolled back with the failed change.
The previous error handling around that insertion had a few problems:
* ProcessPendingConflictLogTuple() wrapped the insert in its own
PG_TRY/PG_CATCH that, on failure, called FlushErrorState() and
downgraded the failure to a WARNING. When called from start_apply()'s
PG_CATCH (which then does PG_RE_THROW()), that FlushErrorState() reset
the error stack and discarded the very error being re-thrown, so a
failure of the deferred insert could corrupt error reporting. It also
silently swallowed genuine insertion failures.
* If the inline insert (sub-ERROR path) failed, the conflict was not
recorded anywhere: the apply transaction aborted before the conflict
was written to the server log.
* In the parallel apply worker, a failed deferred insert likewise lost
the original error.
Rework this so that a failure to write the CLT is treated as a normal
apply error, and so that the conflict is always identifiable:
* Drop the internal PG_TRY/PG_CATCH from ProcessPendingConflictLogTuple().
A failed insert now raises an ERROR like any other apply failure; such
failures (e.g. the CLT was dropped, or out-of-space) are expected to be
rare and persistent.
* Annotate insertion failures with the conflict being logged.
prepare_conflict_log_tuple() stashes a short description ("while logging
conflict <type> detected on relation <rel>") in a new
LogicalRepWorker.conflict_log_errcontext field (allocated, like
conflict_log_tuple, in ApplyContext), and InsertConflictLogTuple()
installs an error context callback around the heap_insert(). Any error
raised during the insert therefore carries enough context to identify
the conflict, even on the deferred path where the original conflict
error is not separately reported.
* Report the conflict to the server log before inserting it into the
table. ReportApplyConflict() now prepares the tuple, emits the
server-log message (full details when the destination includes the log,
otherwise a short message pointing at the table), and only then inserts.
This guarantees the conflict is recorded even if the table insert fails.
The inline (sub-ERROR) insert is wrapped so that, on failure, the
prepared tuple is discarded before re-throwing, ensuring the deferred
path does not retry the same failing insert.
* Rework the worker error handlers to do the deferred insertion cleanly.
start_apply() and ParallelApplyWorkerMain() now copy the error into a
long-lived context, FlushErrorState(), reset error_context_stack (the
callbacks active at throw time belong to unwound frames, and the insert
installs its own), abort, run ProcessPendingConflictLogTuple(), and
finally ReThrowError() the original error. On success the original
error is re-thrown; on a failed insert the insert error (carrying the
conflict context) propagates instead. For the parallel apply worker
this means the leader always receives a real ErrorResponse rather than
a "lost connection to the parallel apply worker" message.
* In DisableSubscriptionAndExit(), perform the deferred insertion after
the subscription has been disabled and committed (and before the
dead-tuple retention check), so a failure to log the conflict cannot
prevent the subscription from being disabled or leave the worker
restarting and failing forever.
---
.../replication/logical/applyparallelworker.c | 38 ++--
src/backend/replication/logical/conflict.c | 214 +++++++++++-------
src/backend/replication/logical/worker.c | 51 ++++-
src/include/replication/worker_internal.h | 7 +
4 files changed, 211 insertions(+), 99 deletions(-)
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index d24c52d43e6..48cb5558367 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -993,32 +993,44 @@ ParallelApplyWorkerMain(Datum main_arg)
PG_CATCH();
{
MemoryContext oldcontext;
- ErrorData *edata;
+ ErrorData *edata;
/*
- * Copy the ErrorData before doing any further work. The error may
- * have been raised while running under ErrorContext, so switch to
- * a safe context (TopMemoryContext) to avoid assertions and ensure
- * the error data survives subsequent cleanup.
+ * Reset the origin state to prevent the advancement of origin
+ * progress if we fail to apply. Otherwise, this will result in
+ * transaction loss as that transaction won't be sent again by the
+ * server.
+ */
+ replorigin_xact_clear(true);
+
+ /*
+ * Copy the error and recover to an idle state so we can insert the
+ * deferred conflict log tuple (if any) before re-throwing. Copy the
+ * error into a longer-lived context first, as it may have been raised
+ * under ErrorContext. Also reset the error context stack: the
+ * callbacks in effect when the error was thrown belong to unwound stack
+ * frames, and the deferred insert installs its own.
*/
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
edata = CopyErrorData();
MemoryContextSwitchTo(oldcontext);
FlushErrorState();
+ error_context_stack = NULL;
+ AbortOutOfAnyTransaction();
/*
- * Reset the origin state to prevent the advancement of origin
- * progress if we fail to apply. Otherwise, this will result in
- * transaction loss as that transaction won't be sent again by the
- * server.
+ * Insert the deferred conflict log tuple before re-throwing.
+ * Re-throwing is what reports the error to the leader (via the error
+ * queue set up above), so the insertion must happen first: otherwise
+ * the leader could start tearing down this worker while it is still
+ * writing the conflict log tuple. If the insertion itself fails, that
+ * error (annotated with the conflict context, see InsertConflictLogTuple)
+ * propagates to the leader instead of the original.
*/
- replorigin_xact_clear(true);
-
- AbortOutOfAnyTransaction();
ProcessPendingConflictLogTuple();
- /* Re-throw the original error. */
+ /* Re-throw the original error, which reports it to the leader. */
ReThrowError(edata);
}
PG_END_TRY();
diff --git a/src/backend/replication/logical/conflict.c b/src/backend/replication/logical/conflict.c
index 6bf3d6d5a44..c2c15f055e6 100644
--- a/src/backend/replication/logical/conflict.c
+++ b/src/backend/replication/logical/conflict.c
@@ -335,17 +335,16 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
log_dest_table = CONFLICTS_LOGGED_TO_TABLE(dest);
log_dest_logfile = CONFLICTS_LOGGED_TO_LOG(dest);
- /* Insert to table if requested. */
+ /*
+ * Prepare the conflict log tuple first when the destination includes the
+ * table. This must happen before the ereport() below, because for an
+ * ERROR-level conflict that ereport() raises the error and defers the
+ * actual insertion to ProcessPendingConflictLogTuple(), which relies on the
+ * tuple having been prepared.
+ */
if (log_dest_table)
{
Assert(conflictlogrel != NULL);
-
- /*
- * Prepare the conflict log tuple. If the error level is below ERROR,
- * insert it immediately. Otherwise, defer the insertion to a new
- * transaction after the current one aborts, ensuring the insertion of
- * the log tuple is not rolled back.
- */
prepare_conflict_log_tuple(estate,
relinfo->ri_RelationDesc,
conflictlogrel,
@@ -353,29 +352,16 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
searchslot,
conflicttuples,
remoteslot);
- if (elevel < ERROR)
- InsertConflictLogTuple(conflictlogrel);
-
- if (!log_dest_logfile)
- {
- /*
- * Not logging conflict details to the server log; Report the error
- * msg but omit raw tuple data from server logs since it's already
- * captured in the conflict log table.
- */
- ereport(elevel,
- errcode_apply_conflict(type),
- errmsg("conflict detected on relation \"%s\": conflict=%s",
- RelationGetQualifiedRelationName(localrel),
- ConflictTypeNames[type]),
- errdetail("Conflict details are logged to the conflict log table: %s",
- RelationGetRelationName(conflictlogrel)));
- }
-
- table_close(conflictlogrel, RowExclusiveLock);
}
- /* Log into the server log if requested. */
+ /*
+ * Report the conflict to the server log before inserting it into the
+ * conflict log table. Emitting it first guarantees the conflict is
+ * recorded even if the table insert below fails; it is also what raises the
+ * error for ERROR-level conflicts. When the server log is one of the
+ * destinations we emit the full details, otherwise (table-only) we emit a
+ * shorter message since the details are captured in the table.
+ */
if (log_dest_logfile)
{
StringInfoData err_detail;
@@ -400,6 +386,64 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
ConflictTypeNames[type]),
errdetail_internal("%s", err_detail.data));
}
+ else if (log_dest_table)
+ {
+ /*
+ * Not logging conflict details to the server log; report the conflict
+ * but omit raw tuple data since it is captured in the conflict log
+ * table.
+ */
+ ereport(elevel,
+ errcode_apply_conflict(type),
+ errmsg("conflict detected on relation \"%s\": conflict=%s",
+ RelationGetQualifiedRelationName(localrel),
+ ConflictTypeNames[type]),
+ errdetail("Conflict details are logged to the conflict log table: %s",
+ RelationGetRelationName(conflictlogrel)));
+ }
+
+ /*
+ * Insert into the conflict log table if requested. For conflicts below
+ * ERROR the apply transaction continues, so insert immediately; for
+ * ERROR-level conflicts the ereport() above already raised the error and
+ * the insertion is deferred to a new transaction
+ * (ProcessPendingConflictLogTuple) so that it is not rolled back.
+ */
+ if (log_dest_table)
+ {
+ if (elevel < ERROR)
+ {
+ PG_TRY();
+ {
+ InsertConflictLogTuple(conflictlogrel);
+ }
+ PG_CATCH();
+ {
+ /*
+ * The insert failed, so the apply transaction will abort and
+ * the error will propagate to the worker's error handler. The
+ * conflict was already reported to the server log above, so it
+ * is not lost. Discard the prepared tuple so that the deferred
+ * insertion path (ProcessPendingConflictLogTuple) does not retry
+ * this same failing insert.
+ */
+ if (MyLogicalRepWorker->conflict_log_tuple != NULL)
+ {
+ heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
+ MyLogicalRepWorker->conflict_log_tuple = NULL;
+ }
+ if (MyLogicalRepWorker->conflict_log_errcontext != NULL)
+ {
+ pfree(MyLogicalRepWorker->conflict_log_errcontext);
+ MyLogicalRepWorker->conflict_log_errcontext = NULL;
+ }
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+
+ table_close(conflictlogrel, RowExclusiveLock);
+ }
}
/*
@@ -428,60 +472,28 @@ ProcessPendingConflictLogTuple(void)
if (MyLogicalRepWorker->conflict_log_tuple == NULL)
return;
- PG_TRY();
- {
- StartTransactionCommand();
- PushActiveSnapshot(GetTransactionSnapshot());
-
- /* Open conflict log table and insert the tuple */
- conflictlogrel = GetConflictLogDestAndTable(&dest);
- Assert(conflictlogrel);
-
- InsertConflictLogTuple(conflictlogrel);
-
- table_close(conflictlogrel, RowExclusiveLock);
-
- PopActiveSnapshot();
- CommitTransactionCommand();
- }
- PG_CATCH();
- {
- ErrorData *edata;
- MemoryContext oldctx;
-
- /* Save error info in our memory context */
- oldctx = MemoryContextSwitchTo(TopMemoryContext);
- edata = CopyErrorData();
- MemoryContextSwitchTo(oldctx);
-
- /* Clear the error state so we can continue */
- FlushErrorState();
+ /*
+ * Insert the deferred conflict log tuple in its own transaction. A
+ * failure here (e.g. the conflict log table was dropped, or an
+ * out-of-disk-space error) is treated like any other apply error and
+ * raises an ERROR; such failures are expected to be rare and persistent.
+ * Callers must therefore have already reported (and cleared) any
+ * in-progress apply error before calling this, so that this error does not
+ * mask the original one.
+ */
+ StartTransactionCommand();
+ PushActiveSnapshot(GetTransactionSnapshot());
- /* Abort the transaction we started above */
- AbortOutOfAnyTransaction();
+ /* Open conflict log table and insert the tuple */
+ conflictlogrel = GetConflictLogDestAndTable(&dest);
+ Assert(conflictlogrel);
- /*
- * Report the error as a warning. We use WARNING because we don't want
- * this to be a fatal error for the worker, and we want to allow the
- * caller's original error to remain primary.
- */
- ereport(WARNING,
- (errmsg("could not log conflict to table for subscription \"%s\": %s",
- MySubscription->name, edata->message)));
+ InsertConflictLogTuple(conflictlogrel);
- FreeErrorData(edata);
+ table_close(conflictlogrel, RowExclusiveLock);
- /*
- * Free the conflict log tuple and set it to NULL. This ensures we
- * don't try to insert the same problematic tuple again.
- */
- if (MyLogicalRepWorker->conflict_log_tuple != NULL)
- {
- heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
- MyLogicalRepWorker->conflict_log_tuple = NULL;
- }
- }
- PG_END_TRY();
+ PopActiveSnapshot();
+ CommitTransactionCommand();
}
/*
@@ -544,6 +556,19 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
return table_open(conflictlogrelid, RowExclusiveLock);
}
+/*
+ * Error context callback for failures while inserting into the conflict log
+ * table. Adds a line identifying the conflict that was being logged.
+ */
+static void
+conflict_log_insert_errcontext(void *arg)
+{
+ char *ctx = (char *) arg;
+
+ if (ctx)
+ errcontext("%s", ctx);
+}
+
/*
* InsertConflictLogTuple
*
@@ -554,15 +579,34 @@ GetConflictLogDestAndTable(ConflictLogDest *log_dest)
void
InsertConflictLogTuple(Relation conflictlogrel)
{
+ ErrorContextCallback errcallback;
+
/* A valid tuple must be prepared and stored in MyLogicalRepWorker. */
Assert(MyLogicalRepWorker->conflict_log_tuple != NULL);
+ /*
+ * Set up an error context so that a failure to insert (e.g. the conflict
+ * log table was dropped, or an out-of-space error) carries information
+ * identifying the conflict we were trying to log.
+ */
+ errcallback.callback = conflict_log_insert_errcontext;
+ errcallback.arg = MyLogicalRepWorker->conflict_log_errcontext;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
heap_insert(conflictlogrel, MyLogicalRepWorker->conflict_log_tuple,
GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, NULL);
- /* Free conflict log tuple. */
+ error_context_stack = errcallback.previous;
+
+ /* Free the conflict log tuple and its context string. */
heap_freetuple(MyLogicalRepWorker->conflict_log_tuple);
MyLogicalRepWorker->conflict_log_tuple = NULL;
+ if (MyLogicalRepWorker->conflict_log_errcontext)
+ {
+ pfree(MyLogicalRepWorker->conflict_log_errcontext);
+ MyLogicalRepWorker->conflict_log_errcontext = NULL;
+ }
}
/*
@@ -1381,5 +1425,15 @@ prepare_conflict_log_tuple(EState *estate, Relation rel,
oldctx = MemoryContextSwitchTo(ApplyContext);
MyLogicalRepWorker->conflict_log_tuple =
heap_form_tuple(RelationGetDescr(conflictlogrel), values, nulls);
+
+ /*
+ * Stash a context string describing this conflict, so that if inserting
+ * the tuple into the conflict log table fails, the resulting error carries
+ * enough context to identify the conflict (see InsertConflictLogTuple).
+ */
+ MyLogicalRepWorker->conflict_log_errcontext =
+ psprintf("while logging conflict \"%s\" detected on relation \"%s\"",
+ ConflictTypeNames[conflict_type],
+ RelationGetRelationName(rel));
MemoryContextSwitchTo(oldctx);
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 05ba6d8b1ad..69480cbc886 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -5658,6 +5658,9 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ MemoryContext oldcontext;
+ ErrorData *edata;
+
/*
* Reset the origin state to prevent the advancement of origin
* progress if we fail to apply. Otherwise, this will result in
@@ -5671,15 +5674,33 @@ start_apply(XLogRecPtr origin_startpos)
else
{
/*
- * Report the worker failed while applying changes. Abort the
- * current transaction so that the stats message is sent in an
- * idle state.
+ * Save the error and recover to an idle state so we can insert the
+ * deferred conflict log tuple (if any) before re-throwing. Copy
+ * the error into a long-lived context first, as it may have been
+ * raised under ErrorContext. Also reset the error context stack:
+ * the callbacks in effect when the error was thrown belong to
+ * unwound stack frames, and the deferred insert installs its own.
*/
+ oldcontext = MemoryContextSwitchTo(TopMemoryContext);
+ edata = CopyErrorData();
+ MemoryContextSwitchTo(oldcontext);
+
+ FlushErrorState();
+ error_context_stack = NULL;
AbortOutOfAnyTransaction();
pgstat_report_subscription_error(MySubscription->oid);
+
+ /*
+ * Insert the deferred conflict log tuple in its own transaction.
+ * If this fails, that error (annotated with the conflict context,
+ * see InsertConflictLogTuple) propagates instead of the original;
+ * such failures are expected to be rare and persistent (e.g. out of
+ * disk space).
+ */
ProcessPendingConflictLogTuple();
- PG_RE_THROW();
+ /* Re-throw the original error. */
+ ReThrowError(edata);
}
}
PG_END_TRY();
@@ -6046,14 +6067,19 @@ DisableSubscriptionAndExit(void)
RESUME_INTERRUPTS();
+ /*
+ * The error context callbacks in effect when the error was thrown belong
+ * to now-unwound stack frames; reset the stack before running further code
+ * (including the deferred conflict log insertion, which installs its own).
+ */
+ error_context_stack = NULL;
+
/*
* Report the worker failed during sequence synchronization, table
* synchronization, or apply.
*/
pgstat_report_subscription_error(MyLogicalRepWorker->subid);
- ProcessPendingConflictLogTuple();
-
/* Disable the subscription */
StartTransactionCommand();
@@ -6076,6 +6102,19 @@ DisableSubscriptionAndExit(void)
errmsg("subscription \"%s\" has been disabled because of an error",
MySubscription->name));
+ /*
+ * Insert the deferred conflict log tuple (if any) now that the
+ * subscription has been disabled and committed. Doing it after the
+ * disable means a failure to log the conflict (treated as a hard error,
+ * see ProcessPendingConflictLogTuple) cannot prevent the subscription from
+ * being disabled and so cannot leave the worker restarting and failing
+ * forever. Do it before the dead-tuple retention check below: that check
+ * only warns today, but it takes an elevel and could raise an error, which
+ * must not prevent the conflict from being recorded. The original error
+ * was already reported above.
+ */
+ ProcessPendingConflictLogTuple();
+
/*
* Skip the track_commit_timestamp check when disabling the worker due to
* an error, as verifying commit timestamps is unnecessary in this
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 6b6525dc2e2..c0059b1b810 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -103,6 +103,13 @@ typedef struct LogicalRepWorker
/* A conflict log tuple that is prepared but not yet inserted. */
HeapTuple conflict_log_tuple;
+ /*
+ * Error-context string describing the conflict above, used to annotate any
+ * error raised while inserting conflict_log_tuple into the conflict log
+ * table. Allocated, like conflict_log_tuple, in ApplyContext.
+ */
+ char *conflict_log_errcontext;
+
/* Stats. */
XLogRecPtr last_lsn;
TimestampTz last_send_time;
--
2.54.0
[text/plain] v52-2-amit.fix_parallel_apply_logging.txt (4.2K, 3-v52-2-amit.fix_parallel_apply_logging.txt)
download | inline diff:
From 8806a07656c7a90668132dc17f051174f2a62ae6 Mon Sep 17 00:00:00 2001
From: Amit Kapila <[email protected]>
Date: Fri, 19 Jun 2026 15:40:07 +0530
Subject: [PATCH v4] Don't tear down a parallel apply worker before it logs its
conflict
When a parallel apply worker hits an ERROR-level conflict it logs the
conflict to the conflict log table in a new transaction in its error path,
after aborting the failed apply transaction. But the leader detects worker
failure in pa_wait_for_xact_finish() by waiting on the worker's transaction
lock, and AbortOutOfAnyTransaction() releases that lock: the leader unblocks,
sees a non-finished state, raises "lost connection to the logical replication
parallel apply worker", and tears the worker down -- which can SIGTERM it
mid-insert and lose the conflict log row, besides being a misleading message.
Fix this by adding a PARALLEL_TRANS_ERROR state that the worker sets before
aborting (while the leader is still blocked on the lock, so it is visible once
the leader unblocks). On seeing it, the leader waits for the worker to report
its actual error via the error queue -- which keeps the worker alive long
enough to finish writing the conflict log tuple and lets the leader report the
real error instead of "lost connection". The original message remains as the
fallback for a worker that died without signalling; as a side effect,
non-conflict worker failures now also surface the real error.
---
.../replication/logical/applyparallelworker.c | 35 +++++++++++++++++++
src/include/replication/worker_internal.h | 2 ++
2 files changed, 37 insertions(+)
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 48cb5558367..a3f5b9b122d 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -1017,6 +1017,17 @@ ParallelApplyWorkerMain(Datum main_arg)
FlushErrorState();
error_context_stack = NULL;
+
+ /*
+ * Tell the leader we failed and are about to report the error and log
+ * the conflict. This must be set before AbortOutOfAnyTransaction()
+ * below releases the transaction lock that the leader waits on in
+ * pa_wait_for_xact_finish(); otherwise the leader would see a
+ * non-finished state, assume the connection was lost, and tear this
+ * worker down while it is still writing the conflict log tuple.
+ */
+ pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_ERROR);
+
AbortOutOfAnyTransaction();
/*
@@ -1361,9 +1372,33 @@ pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
* released.
*/
if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
+ {
+ /*
+ * If the worker signalled that it errored (PARALLEL_TRANS_ERROR), it is
+ * logging the conflict and will report the actual error via the error
+ * queue before exiting. Wait for that rather than reporting a generic
+ * lost connection: CHECK_FOR_INTERRUPTS() drives
+ * ProcessParallelApplyMessages(), which raises the real error on the
+ * worker's ErrorResponse (or "lost connection" if the worker died
+ * without reporting). Waiting here also keeps the worker alive long
+ * enough to finish writing the conflict log tuple.
+ */
+ while (pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_ERROR)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ (void) WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ 10L,
+ WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
+
+ ResetLatch(MyLatch);
+ }
+
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("lost connection to the logical replication parallel apply worker")));
+ }
}
/*
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index c0059b1b810..79c90dddd89 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -131,6 +131,8 @@ typedef enum ParallelTransState
PARALLEL_TRANS_UNKNOWN,
PARALLEL_TRANS_STARTED,
PARALLEL_TRANS_FINISHED,
+ PARALLEL_TRANS_ERROR, /* worker failed; it will report the error (and
+ * log the conflict, if any) before exiting */
} ParallelTransState;
/*
--
2.54.0
view thread (151+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: Proposal: Conflict log history table for Logical Replication
In-Reply-To: <CAA4eK1JhZYRMP_YYa1j3uAK6L4v057JDuM0+YLABOgAOYuwM8Q@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox