public inbox for [email protected]  
help / color / mirror / Atom feed
From: Hayato Kuroda (Fujitsu) <[email protected]>
To: 'Amit Kapila' <[email protected]>
To: shveta malik <[email protected]>
Cc: Zhijie Hou (Fujitsu) <[email protected]>
Cc: Doruk Yilmaz <[email protected]>
Cc: [email protected] <[email protected]>
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
Date: Fri, 9 Jan 2026 11:28:35 +0000
Message-ID: <TY7PR01MB14554CAF66B62811DA167CF0BF582A@TY7PR01MB14554.jpnprd01.prod.outlook.com> (raw)
In-Reply-To: <CAA4eK1KkqQD88Td53v6Z=adGSMLR7wN543WhqccOW17ykt-QDg@mail.gmail.com>
References: <CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com>
	<[email protected]>
	<CAMPB6we7+97L72Ru0=WxMDi24xMbZgr2B8Nwoo5i=r=UNuG_gQ@mail.gmail.com>
	<[email protected]>
	<CAA4eK1JfPPFTmz7mUk26zPH8+qH9UBpkquxw75x7Ngx_D_6XXQ@mail.gmail.com>
	<CAMPB6wfgvWjSvKNPoJkRqaL46geRDoL++Pt_3Czc2QNAdpVQHw@mail.gmail.com>
	<CAA4eK1JC6yB6q52qEZ5dLNWRUEZoO-aa_XKBZ3_mcb=V2z7zug@mail.gmail.com>
	<CAMPB6weUqU6P2w5VUGVSLKWcvU1AQHmW+7O9qc9yD4CB5kEYVA@mail.gmail.com>
	<CAA4eK1Lm_W5j3DPj6PDSTyodGu87QgxpNwwsi-wVR0+B1FSOoA@mail.gmail.com>
	<CAMPB6wckvkKrXVPH5j8Ske2cVedkb-TRLdnOb5e74zYM1CynGw@mail.gmail.com>
	<CAA4eK1+NDjprcKvr0p2GDMTCs9yxFCY41bOd+6avqAm2n+TXdQ@mail.gmail.com>
	<CAMPB6wdc10tc7gpVXG75r51M41zVSabip9Lz7hssWEtyhecWww@mail.gmail.com>
	<OSCPR01MB14966201F1DCB853145912FF1F53DA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
	<CAMPB6wcOWBURHB1igRgCjD3geAemdoATfkKByMwrMM1TgMN64w@mail.gmail.com>
	<OSCPR01MB14966BF4CA9B767C259DBD9CDF53EA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
	<CAMPB6wdtKZLEeZ7UW=DYmXWv8Y=uVGrDqXTMhT19Z4VTzo3cfg@mail.gmail.com>
	<CAA4eK1LHVd8wQzauWgeEV436L7btrCfujPH1sR196sY_Mp8zYA@mail.gmail.com>
	<CAMPB6wdPtjbR93oB1XJtYkRtTR64BJG4o5a+0DSSez=puuyuGA@mail.gmail.com>
	<OSCPR01MB14966FC456D053AB00A2EB278F514A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
	<CAA4eK1++mHd-SsHyJd2ZB26F7kCz--LbtjQLuQ0h3z9mcYK-AQ@mail.gmail.com>
	<OS7PR01MB149681B14B2432A9CEA7A3586F517A@OS7PR01MB14968.jpnprd01.prod.outlook.com>
	<OSCPR01MB14966F65DE462D0A479B8ADD6F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
	<CAA4eK1LeyzuiRPZB+o7mO0pB6_=tpkjoum5Hj+t1SYydS4K2kQ@mail.gmail.com>
	<OSCPR01MB14966B68C2148C1BC462AA906F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
	<TY4PR01MB169077EE72ABE9E55BAF162D494B5A@TY4PR01MB16907.jpnprd01.prod.outlook.com>
	<CAJpy0uAcd4BuRGJT0_Xojjtn7w6uiWhAY3Xv=C2GqXoUxeOrYw@mail.gmail.com>
	<CAJpy0uCH=z+x8Rb9U+DCE8=4=7UiG6P_yXoPxGLfp_Nuz5jPfw@mail.gmail.com>
	<CAA4eK1KkqQD88Td53v6Z=adGSMLR7wN543WhqccOW17ykt-QDg@mail.gmail.com>

Dear Amit, Shveta,

> > >
> > > Thanks Hou-San and Kuroda-San.
> > >
> > > What should be the expected behavior when Session1 resets the origin
> > > (changing acquired_pid from its own PID to 0), while Session2 is
> > > already connected to the origin and Session3 also attempts to reuse
> > > the same origin?
> > >
> > > Currently it asserts:
> > >
> > > Session1:
> > > select pg_replication_origin_create('origin');
> > > SELECT pg_replication_origin_session_setup('origin');
> > >
> > > Session2:
> > > SELECT pg_replication_origin_session_setup('origin',48028);
> > >
> > > Session1:
> > > SELECT pg_replication_origin_session_reset();
> > >
> > > Session3:
> > > SELECT pg_replication_origin_session_setup('origin');
> > > This asserts at:
> > > TRAP: failed Assert("session_replication_state->refcount == 0"), File:
> > > "origin.c", Line: 1231, PID: 48037
> > >

FYI, this happened because v1 assumed refcount was 0 if acquired_by was 0.
But your proposed scenario met it.

> > I checked the behavior on HEAD. Session3 is able to set up the origin
> > and sets its own PID in acquired_pid. But it is unclear to me which
> > PID should be recorded in acquired_pid - Session2’s PID, since it set
> > up the origin earlier, or Session3’s PID. Or does this even make any
> > difference?

To clarify, I think the behavior on HEAD is not correct. The backend should
acquire the active origin if it expressly specifies the PID. Otherwise, users
may acquire unintentionally and advance it.

> Can we address these problems by prohibiting leader worker to reset
> when pa workers are still associated with the origin? The way for
> leader to know if pa workers are associated with origin is by checking
> following condition: acquired_by == MyProcpid AND refcount > 1.

I think it's okay. IIUC, the idea is to avoid that active origin has invalid
acquired_by attribute. The replication origin was extended to support parallel
apply of logical replication, and it is reasonable to force the same approach.
Attached 0001 implemented that.

One concern with the implementation is that acquired_by can be zero if the process
exits without releasing the origin; this can happen if the first acquired process
exits while the second is still using it.
Regarding our logical replication, it won't be problematic because the leader
worker ensures all parallel workers finish before it exits.

To address the issue, I propose that another process should not be able to
acquire the origin if the acquired_by of the active origin is 0. The problem
should be resolved within the SQL interface, since it occurs there.

Best regards,
Hayato Kuroda
FUJITSU LIMITED



Attachments:

  [application/octet-stream] v3-0001-Fix-unintended-drop-of-active-replication-origins.patch (8.9K, 2-v3-0001-Fix-unintended-drop-of-active-replication-origins.patch)
  download | inline diff:
From 5383f9d8c801e48192d59d0b1f07c88c2b595de2 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v3 1/2] Fix unintended drop of active replication origins

Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.

This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. Also, the backend process which firstly acquired the
origin does not release till other acquiring process releases it. This ensures
that acquired_by flag cannot be zero while processes are actively using it.
---
 .../expected/parallel_session_origin.out      | 46 ++++++++++-
 .../specs/parallel_session_origin.spec        |  6 +-
 src/backend/replication/logical/origin.c      | 80 +++++++++++++------
 3 files changed, 105 insertions(+), 27 deletions(-)

diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..07c1e3622ce 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -1,6 +1,6 @@
 Parsed test spec with 2 sessions
 
-starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset
 step s0_setup: SELECT pg_replication_origin_session_setup('origin');
 pg_replication_origin_session_setup
 -----------------------------------
@@ -65,15 +65,59 @@ step s0_compare:
 t       
 (1 row)
 
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
 step s0_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s1_setup: 
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+ERROR:  another process is acquiring the replication origin
 step s1_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..2253a7a14eb 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
 # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
 # commits a transaction and store the local_lsn of the replication origin.
 # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
-permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset"
+
+# Test that the origin cannot be released if another session is actively using
+# it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 04bc704a332..dff2c073def 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
 	 */
 	int			acquired_by;
 
+	/* Number of backend that is currently using this origin. */
+	int			refcount;
+
 	/*
 	 * Condition variable that's signaled when acquired_by changes.
 	 */
@@ -1069,32 +1072,47 @@ replorigin_get_progress(RepOriginId node, bool flush)
 	return remote_lsn;
 }
 
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
 static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
 {
-	ConditionVariable *cv = NULL;
+	ConditionVariable *cv;
 
-	if (session_replication_state == NULL)
-		return;
+	Assert(session_replication_state != NULL);
 
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
-	if (session_replication_state->acquired_by == MyProcPid)
-	{
-		cv = &session_replication_state->origin_cv;
+	Assert(session_replication_state->refcount > 0);
 
+	/*
+	 * Reset the PID only if the current backend is the first to set up this
+	 * origin. This prevents resetting the PID when other backends are still
+	 * using this origin.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid)
 		session_replication_state->acquired_by = 0;
-		session_replication_state = NULL;
-	}
+
+	session_replication_state->refcount--;
+
+	cv = &session_replication_state->origin_cv;
+	session_replication_state = NULL;
 
 	LWLockRelease(ReplicationOriginLock);
 
-	if (cv)
-		ConditionVariableBroadcast(cv);
+	ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+	if (session_replication_state == NULL)
+		return;
+
+	replorigin_session_reset_internal();
 }
 
 /*
@@ -1205,9 +1223,17 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	Assert(session_replication_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
+	{
 		session_replication_state->acquired_by = MyProcPid;
+		Assert(session_replication_state->refcount == 0);
+	}
 	else
+	{
 		Assert(session_replication_state->acquired_by == acquired_by);
+		Assert(session_replication_state->refcount > 0);
+	}
+
+	session_replication_state->refcount++;
 
 	LWLockRelease(ReplicationOriginLock);
 
@@ -1224,8 +1250,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 void
 replorigin_session_reset(void)
 {
-	ConditionVariable *cv;
-
 	Assert(max_active_replication_origins != 0);
 
 	if (session_replication_state == NULL)
@@ -1233,15 +1257,21 @@ replorigin_session_reset(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
-	session_replication_state->acquired_by = 0;
-	cv = &session_replication_state->origin_cv;
-	session_replication_state = NULL;
-
-	LWLockRelease(ReplicationOriginLock);
+	/*
+	 * The replication origin cannot be reset if the replication origin is
+	 * firstly acquired by this backend and other processes are actively using
+	 * now. This can cause acquired_by to be zero and active replication origin
+	 * might be dropped.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid &&
+		session_replication_state->refcount > 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg_plural("another process is acquiring the replication origin",
+							   "other processes are acquiring the replication origin",
+							   session_replication_state->refcount - 1)));
 
-	ConditionVariableBroadcast(cv);
+	replorigin_session_reset_internal();
 }
 
 /*
-- 
2.47.3



  [application/octet-stream] v3-0002-Disallow-setting-the-replication-origin-if-it-is-.patch (1.2K, 3-v3-0002-Disallow-setting-the-replication-origin-if-it-is-.patch)
  download | inline diff:
From be56cec12647052135a35a78c0d5a87dd5eebf68 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Fri, 9 Jan 2026 19:31:57 +0900
Subject: [PATCH v3 2/2] Disallow setting the replication origin if it is being
 used PID is not stored

---
 src/backend/replication/logical/origin.c | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index dff2c073def..f373e3df2f9 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1192,6 +1192,19 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 							node, acquired_by)));
 		}
 
+		/*
+		 * PID was not noted in the origin. This can happen the process
+		 * originally acquired the origin exits without releasing. To make the
+		 * staus clean again, other processes cannot acquire the origin till
+		 * all using ones release.
+		 */
+		else if (curstate->acquired_by == 0 && curstate->refcount > 0)
+		{
+			elog(ERROR,
+				 "replication origin with ID %d is already active for another process",
+				 node);
+		}
+
 		/* ok, found slot */
 		session_replication_state = curstate;
 		break;
-- 
2.47.3



view thread (43+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
  In-Reply-To: <TY7PR01MB14554CAF66B62811DA167CF0BF582A@TY7PR01MB14554.jpnprd01.prod.outlook.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox