public inbox for [email protected]
help / color / mirror / Atom feedFrom: Zhijie Hou (Fujitsu) <[email protected]>
To: Hayato Kuroda (Fujitsu) <[email protected]>
To: 'Amit Kapila' <[email protected]>
Cc: Doruk Yilmaz <[email protected]>
Cc: [email protected] <[email protected]>
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
Date: Tue, 23 Dec 2025 08:54:03 +0000
Message-ID: <TY4PR01MB169077EE72ABE9E55BAF162D494B5A@TY4PR01MB16907.jpnprd01.prod.outlook.com> (raw)
In-Reply-To: <OSCPR01MB14966B68C2148C1BC462AA906F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
References: <CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com>
<[email protected]>
<CAMPB6we7+97L72Ru0=WxMDi24xMbZgr2B8Nwoo5i=r=UNuG_gQ@mail.gmail.com>
<[email protected]>
<CAA4eK1JfPPFTmz7mUk26zPH8+qH9UBpkquxw75x7Ngx_D_6XXQ@mail.gmail.com>
<CAMPB6wfgvWjSvKNPoJkRqaL46geRDoL++Pt_3Czc2QNAdpVQHw@mail.gmail.com>
<CAA4eK1JC6yB6q52qEZ5dLNWRUEZoO-aa_XKBZ3_mcb=V2z7zug@mail.gmail.com>
<CAMPB6weUqU6P2w5VUGVSLKWcvU1AQHmW+7O9qc9yD4CB5kEYVA@mail.gmail.com>
<CAA4eK1Lm_W5j3DPj6PDSTyodGu87QgxpNwwsi-wVR0+B1FSOoA@mail.gmail.com>
<CAMPB6wckvkKrXVPH5j8Ske2cVedkb-TRLdnOb5e74zYM1CynGw@mail.gmail.com>
<CAA4eK1+NDjprcKvr0p2GDMTCs9yxFCY41bOd+6avqAm2n+TXdQ@mail.gmail.com>
<CAMPB6wdc10tc7gpVXG75r51M41zVSabip9Lz7hssWEtyhecWww@mail.gmail.com>
<OSCPR01MB14966201F1DCB853145912FF1F53DA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAMPB6wcOWBURHB1igRgCjD3geAemdoATfkKByMwrMM1TgMN64w@mail.gmail.com>
<OSCPR01MB14966BF4CA9B767C259DBD9CDF53EA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAMPB6wdtKZLEeZ7UW=DYmXWv8Y=uVGrDqXTMhT19Z4VTzo3cfg@mail.gmail.com>
<CAA4eK1LHVd8wQzauWgeEV436L7btrCfujPH1sR196sY_Mp8zYA@mail.gmail.com>
<CAMPB6wdPtjbR93oB1XJtYkRtTR64BJG4o5a+0DSSez=puuyuGA@mail.gmail.com>
<OSCPR01MB14966FC456D053AB00A2EB278F514A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAA4eK1++mHd-SsHyJd2ZB26F7kCz--LbtjQLuQ0h3z9mcYK-AQ@mail.gmail.com>
<OS7PR01MB149681B14B2432A9CEA7A3586F517A@OS7PR01MB14968.jpnprd01.prod.outlook.com>
<OSCPR01MB14966F65DE462D0A479B8ADD6F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAA4eK1LeyzuiRPZB+o7mO0pB6_=tpkjoum5Hj+t1SYydS4K2kQ@mail.gmail.com>
<OSCPR01MB14966B68C2148C1BC462AA906F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
Hi,
When testing the new parameter in pg_replication_origin_session_setup(), I
noticed a bug allowing the origin in use to be dropped. The issue arises when
two backends set up the same origin; if the second backend resets the origin
first, it resets the acquired_by flag regardless of whether the first backend is
using it. This allows the origin to be dropped, enabling the slot in shared
memory to be reused, which is unintended.
About the fix, simply adding a check for acquired_by field does not work,
because if the first backend resets the origin first, it still risks being
dropped while second backend uses it.
To fully resolve this, I tried to add a reference count (refcount) for the
origin. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.
Thanks to Kuroda-San for discussing and reviewing this patch off-list.
Best Regards,
Hou zj
Attachments:
[application/octet-stream] v1-0001-Fix-unintended-drop-of-active-replication-origins.patch (8.2K, 2-v1-0001-Fix-unintended-drop-of-active-replication-origins.patch)
download | inline diff:
From 8b39ed57f03d0a2f90f9b89572db2e1242a11dd0 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v1] Fix unintended drop of active replication origins
Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.
This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.
---
.../expected/parallel_session_origin.out | 44 +++++++++++
.../specs/parallel_session_origin.spec | 4 +
src/backend/replication/logical/origin.c | 79 ++++++++++++-------
3 files changed, 97 insertions(+), 30 deletions(-)
diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..546d8933954 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -77,3 +77,47 @@ pg_replication_origin_session_reset
(1 row)
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_drop s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
+step s1_drop: SELECT pg_replication_origin_drop('origin');
+ERROR: could not drop replication origin with ID 1, in use by another process
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..8e9c81e4419 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -49,8 +49,12 @@ step "s1_store_lsn" {
SELECT 1, local_lsn FROM pg_replication_origin_status;
}
step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+step "s1_drop" { SELECT pg_replication_origin_drop('origin'); }
# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
# commits a transaction and store the local_lsn of the replication origin.
# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+
+# Test that the origin cannot be dropped if any session is actively using it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_drop" "s1_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 2380f369578..536e524f4d5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
*/
int acquired_by;
+ /* Number of backend that is currently using this origin. */
+ int refcount;
+
/*
* Condition variable that's signaled when acquired_by changes.
*/
@@ -383,16 +386,19 @@ restart:
if (state->roident == roident)
{
/* found our slot, is it busy? */
- if (state->acquired_by != 0)
+ if (state->refcount > 0)
{
ConditionVariable *cv;
if (nowait)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("could not drop replication origin with ID %d, in use by PID %d",
- state->roident,
- state->acquired_by)));
+ (state->acquired_by != 0)
+ ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
+ state->roident,
+ state->acquired_by)
+ : errmsg("could not drop replication origin with ID %d, in use by another process",
+ state->roident)));
/*
* We must wait and then retry. Since we don't know which CV
@@ -1069,32 +1075,47 @@ replorigin_get_progress(RepOriginId node, bool flush)
return remote_lsn;
}
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
{
- ConditionVariable *cv = NULL;
+ ConditionVariable *cv;
- if (session_replication_state == NULL)
- return;
+ Assert(session_replication_state != NULL);
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
- if (session_replication_state->acquired_by == MyProcPid)
- {
- cv = &session_replication_state->origin_cv;
+ Assert(session_replication_state->refcount > 0);
+ /*
+ * Reset the PID only if the current backend is the first to set up this
+ * origin. This prevents resetting the PID when other backends are still
+ * using this origin.
+ */
+ if (session_replication_state->acquired_by == MyProcPid)
session_replication_state->acquired_by = 0;
- session_replication_state = NULL;
- }
+
+ session_replication_state->refcount--;
+
+ cv = &session_replication_state->origin_cv;
+ session_replication_state = NULL;
LWLockRelease(ReplicationOriginLock);
- if (cv)
- ConditionVariableBroadcast(cv);
+ ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+ if (session_replication_state == NULL)
+ return;
+
+ replorigin_session_reset_internal();
}
/*
@@ -1205,9 +1226,17 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
Assert(session_replication_state->roident != InvalidRepOriginId);
if (acquired_by == 0)
+ {
session_replication_state->acquired_by = MyProcPid;
+ Assert(session_replication_state->refcount == 0);
+ }
else
+ {
Assert(session_replication_state->acquired_by == acquired_by);
+ Assert(session_replication_state->refcount > 0);
+ }
+
+ session_replication_state->refcount++;
LWLockRelease(ReplicationOriginLock);
@@ -1224,8 +1253,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
void
replorigin_session_reset(void)
{
- ConditionVariable *cv;
-
Assert(max_active_replication_origins != 0);
if (session_replication_state == NULL)
@@ -1233,15 +1260,7 @@ replorigin_session_reset(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("no replication origin is configured")));
- LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
- session_replication_state->acquired_by = 0;
- cv = &session_replication_state->origin_cv;
- session_replication_state = NULL;
-
- LWLockRelease(ReplicationOriginLock);
-
- ConditionVariableBroadcast(cv);
+ replorigin_session_reset_internal();
}
/*
--
2.51.1.windows.1
view thread (46+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
In-Reply-To: <TY4PR01MB169077EE72ABE9E55BAF162D494B5A@TY4PR01MB16907.jpnprd01.prod.outlook.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox