public inbox for [email protected]  
help / color / mirror / Atom feed
From: Hayato Kuroda (Fujitsu) <[email protected]>
To: 'shveta malik' <[email protected]>
Cc: Amit Kapila <[email protected]>
Cc: [email protected] <[email protected]>
Cc: Zhijie Hou (Fujitsu) <[email protected]>
Cc: Doruk Yilmaz <[email protected]>
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
Date: Wed, 14 Jan 2026 03:43:47 +0000
Message-ID: <TY7PR01MB14554FB7D02601425DE3752DEF58FA@TY7PR01MB14554.jpnprd01.prod.outlook.com> (raw)
In-Reply-To: <CAJpy0uD6D294d=Hq4oROmtKAew5DfKERNxs=DsAwUFBFF2kERg@mail.gmail.com>
References: <CAJpy0uB19aKEgVgh8gwzj87NUyDgOf01boa-6xJZK+nhb=3W4g@mail.gmail.com>
	<CAA4eK1+h4mOvRqRaGfUtSgZuBhzWWmrBcY3jQ4DDV=cEJ4dwnQ@mail.gmail.com>
	<TY7PR01MB145543A74547443E49E4993E7F58EA@TY7PR01MB14554.jpnprd01.prod.outlook.com>
	<CAJpy0uD6D294d=Hq4oROmtKAew5DfKERNxs=DsAwUFBFF2kERg@mail.gmail.com>

Dear Shveta,

> 1)
> +step s1_reset: SELECT pg_replication_origin_session_reset();
> 
> After the above step, please add a step to attempt dropping the
> replication origin. The original issue was that once s1 releases the
> origin, it becomes eligible for dropping, so the test should
> explicitly verify this behavior.

I think it is bit difficult because pg_replication_origin_drop() has PID in the
ERROR message. Also, this patch prevents first process resets the origin, i.e.,
the exact same situation won't happen anymore. Not fixed.

> 2)
> Also before the above step, please add a step where s0 tries to reset
> the origin while s1 is still acquiring it. It is needed to cover the
> path where s0 should fail to release origin.

The step has already existed, see below.

```
step s0_reset: SELECT pg_replication_origin_session_reset();
ERROR:  cannot reset replication origin with ID 1 because it is still in use by other processes
step s1_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
                                   
(1 row)
```

Others are corrected and adjusted by me, see the attached.
0001 and 0002 are combined because no one claimed them.

Best regards,
Hayato Kuroda
FUJITSU LIMITED



Attachments:

  [application/octet-stream] v5-0001-Fix-unintended-drop-of-active-replication-origins.patch (11.0K, 2-v5-0001-Fix-unintended-drop-of-active-replication-origins.patch)
  download | inline diff:
From 71dccd9a8eddbba8e3e6d136c35435597a7b09ec Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v5] Fix unintended drop of active replication origins

Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.

This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. Also, the backend process which firstly acquired the
origin does not release till other acquiring process releases it.

This commit also prevents dropping the replication origin when its acquired_by
is zero but the reference counter is incremented. That guard handles a case that
first session exits without releasing the origin.
---
 .../expected/parallel_session_origin.out      |  46 +++++++-
 .../specs/parallel_session_origin.spec        |   6 +-
 src/backend/replication/logical/origin.c      | 109 +++++++++++++-----
 3 files changed, 130 insertions(+), 31 deletions(-)

diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..8e41831fcbc 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -1,6 +1,6 @@
 Parsed test spec with 2 sessions
 
-starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset
 step s0_setup: SELECT pg_replication_origin_session_setup('origin');
 pg_replication_origin_session_setup
 -----------------------------------
@@ -65,15 +65,59 @@ step s0_compare:
 t       
 (1 row)
 
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
 step s0_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s1_setup: 
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+ERROR:  cannot reset replication origin with ID 1 because it is still in use by other processes
 step s1_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..2253a7a14eb 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
 # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
 # commits a transaction and store the local_lsn of the replication origin.
 # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
-permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset"
+
+# Test that the origin cannot be released if another session is actively using
+# it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 04bc704a332..1662bed4046 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
 	 */
 	int			acquired_by;
 
+	/* Count of processes that are currently using this origin. */
+	int			refcount;
+
 	/*
 	 * Condition variable that's signaled when acquired_by changes.
 	 */
@@ -383,16 +386,19 @@ restart:
 		if (state->roident == roident)
 		{
 			/* found our slot, is it busy? */
-			if (state->acquired_by != 0)
+			if (state->refcount > 0)
 			{
 				ConditionVariable *cv;
 
 				if (nowait)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_IN_USE),
-							 errmsg("could not drop replication origin with ID %d, in use by PID %d",
-									state->roident,
-									state->acquired_by)));
+							 (state->acquired_by != 0)
+							 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
+									  state->roident,
+									  state->acquired_by)
+							 : errmsg("could not drop replication origin with ID %d, in use by another process",
+									  state->roident)));
 
 				/*
 				 * We must wait and then retry.  Since we don't know which CV
@@ -1069,32 +1075,48 @@ replorigin_get_progress(RepOriginId node, bool flush)
 	return remote_lsn;
 }
 
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
 static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
 {
-	ConditionVariable *cv = NULL;
+	ConditionVariable *cv;
 
-	if (session_replication_state == NULL)
-		return;
+	Assert(session_replication_state != NULL);
 
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
-	if (session_replication_state->acquired_by == MyProcPid)
-	{
-		cv = &session_replication_state->origin_cv;
+	/* The origin must be held by at least one process at this point. */
+	Assert(session_replication_state->refcount > 0);
 
+	/*
+	 * Reset the PID only if the current session is the first to set up this
+	 * origin. This avoids clearing the first process's PID when any other
+	 * session releases the origin.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid)
 		session_replication_state->acquired_by = 0;
-		session_replication_state = NULL;
-	}
+
+	session_replication_state->refcount--;
+
+	cv = &session_replication_state->origin_cv;
+	session_replication_state = NULL;
 
 	LWLockRelease(ReplicationOriginLock);
 
-	if (cv)
-		ConditionVariableBroadcast(cv);
+	ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+	if (session_replication_state == NULL)
+		return;
+
+	replorigin_session_reset_internal();
 }
 
 /*
@@ -1174,6 +1196,18 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 							node, acquired_by)));
 		}
 
+		/*
+		 * The origin is in use, but PID is not recorded. This can happen if
+		 * the process that originally acquired the origin exited without
+		 * releasing it. To ensure correctness, other processes cannot acquire
+		 * the origin until all processes currently using it have released it.
+		 */
+		else if (curstate->acquired_by == 0 && curstate->refcount > 0)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_IN_USE),
+					 errmsg("replication origin with ID %d is already active in another process",
+							curstate->roident)));
+
 		/* ok, found slot */
 		session_replication_state = curstate;
 		break;
@@ -1205,9 +1239,21 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	Assert(session_replication_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
+	{
 		session_replication_state->acquired_by = MyProcPid;
+		Assert(session_replication_state->refcount == 0);
+	}
 	else
+	{
+		/*
+		 * Sanity check: the origin must already be acquired by the process
+		 * passed as input, and at least one process must be using it.
+		 */
 		Assert(session_replication_state->acquired_by == acquired_by);
+		Assert(session_replication_state->refcount > 0);
+	}
+
+	session_replication_state->refcount++;
 
 	LWLockRelease(ReplicationOriginLock);
 
@@ -1224,8 +1270,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 void
 replorigin_session_reset(void)
 {
-	ConditionVariable *cv;
-
 	Assert(max_active_replication_origins != 0);
 
 	if (session_replication_state == NULL)
@@ -1233,15 +1277,22 @@ replorigin_session_reset(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
-	session_replication_state->acquired_by = 0;
-	cv = &session_replication_state->origin_cv;
-	session_replication_state = NULL;
-
-	LWLockRelease(ReplicationOriginLock);
+	/*
+	 * Restrict explicit resetting of the replication origin if it was first
+	 * acquired by this process and others are still using it. While the
+	 * system handles this safely (as happens if the first session exits
+	 * without calling reset), it is best to avoid doing so.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid &&
+		session_replication_state->refcount > 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
+						session_replication_state->roident),
+				 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
+				 errhint("Reset the replication origin in all other processes before retrying.")));
 
-	ConditionVariableBroadcast(cv);
+	replorigin_session_reset_internal();
 }
 
 /*
-- 
2.47.3



view thread (43+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
  In-Reply-To: <TY7PR01MB14554FB7D02601425DE3752DEF58FA@TY7PR01MB14554.jpnprd01.prod.outlook.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox