public inbox for [email protected]
help / color / mirror / Atom feedFrom: Hayato Kuroda (Fujitsu) <[email protected]>
To: 'Amit Kapila' <[email protected]>
To: shveta malik <[email protected]>
To: [email protected] <[email protected]>
Cc: Zhijie Hou (Fujitsu) <[email protected]>
Cc: Doruk Yilmaz <[email protected]>
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
Date: Tue, 13 Jan 2026 04:09:31 +0000
Message-ID: <TY7PR01MB145543A74547443E49E4993E7F58EA@TY7PR01MB14554.jpnprd01.prod.outlook.com> (raw)
In-Reply-To: <CAA4eK1+h4mOvRqRaGfUtSgZuBhzWWmrBcY3jQ4DDV=cEJ4dwnQ@mail.gmail.com>
References: <CAJpy0uB19aKEgVgh8gwzj87NUyDgOf01boa-6xJZK+nhb=3W4g@mail.gmail.com>
<CAA4eK1+h4mOvRqRaGfUtSgZuBhzWWmrBcY3jQ4DDV=cEJ4dwnQ@mail.gmail.com>
Dear Amit, Shveta,
Thanks for suggestions. PSA new patches.
> > Since user is not aware of internal acquired_by logic, the error might
> > not make much sense to him as to why one session is able to reset
> > while another is not. Shall we make it:
> >
> > ERROR: cannot reset replication origin "origin_name" while it is
> > still shared by other processes
> > DETAIL: the current session is the first process for this replication
> > origin, and other processes are sharing it.
> > HINT: ensure this replication origin is reset in all other processes first.
> >
>
> How about a slightly tweaked version of these messages:
> ERROR: cannot reset replication origin "origin_name" because it is
> still in use by other processes
> DETAIL: This session is the first process for this replication origin,
> and other processes are currently sharing it.
> HINT: Reset the replication origin in all other processes before retrying.
I followed the Amit's idea, but the origin ID is used instead of origin name.
I read other functions, and the name is directly output when 1) the specified
origin does not exist or 2) the name is reserved. We seem to use ID as much as
possible.
>
> > 2)
> > When the first session leaves, while the second session is still using
> > origin, the third session is able to drop the origin which is not
> > right.
> > I think replorigin_state_clear() needs a change.
> > 'if (state->acquired_by != 0)' check should be replaced by 'if
> > (state->refcount > 0)'
> >
> > Patch 001 had correct changes in replorigin_state_clear(), IMO we
> > still need those
Good finding. I put it in 0002 because it handles some cases related with
acquired_by = 0.
> >
> > 3)
> > When first session leaves, while second session is still using origin,
> > now correctly third session is not able to join it. It gives error:
> > postgres=# SELECT pg_replication_origin_session_setup('origin');
> > ERROR: replication origin with ID 1 is already active for another process
> >
> > Error is not very informative provided the fact that now sharing is
> > allowed. Shall it be:
> >
>
> Yeah, sharing is allowed but only when used in parallel context by
> passing PID. I think a slightly modified version of the above message
> such as: "replication origin with ID 1 is already active in another
> process" should be sufficient.
Fixed but ereport() was used because I thought this is usar-facing. Feel free to
change to elog() again based on your matter.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
[application/octet-stream] v4-0001-Fix-unintended-drop-of-active-replication-origins.patch (9.1K, 2-v4-0001-Fix-unintended-drop-of-active-replication-origins.patch)
download | inline diff:
From 405ef344f33761eed6e8351937d7a416f385299e Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v4 1/2] Fix unintended drop of active replication origins
Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.
This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. Also, the backend process which firstly acquired the
origin does not release till other acquiring process releases it. This ensures
that acquired_by flag cannot be zero while processes are actively using it.
---
.../expected/parallel_session_origin.out | 46 ++++++++++-
.../specs/parallel_session_origin.spec | 6 +-
src/backend/replication/logical/origin.c | 81 +++++++++++++------
3 files changed, 106 insertions(+), 27 deletions(-)
diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..8e41831fcbc 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -1,6 +1,6 @@
Parsed test spec with 2 sessions
-starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset
step s0_setup: SELECT pg_replication_origin_session_setup('origin');
pg_replication_origin_session_setup
-----------------------------------
@@ -65,15 +65,59 @@ step s0_compare:
t
(1 row)
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
step s0_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
(1 row)
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+ERROR: cannot reset replication origin with ID 1 because it is still in use by other processes
step s1_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
(1 row)
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..2253a7a14eb 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
# commits a transaction and store the local_lsn of the replication origin.
# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
-permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset"
+
+# Test that the origin cannot be released if another session is actively using
+# it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 04bc704a332..389d2b38d20 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
*/
int acquired_by;
+ /* Count of backends that are currently using this origin. */
+ int refcount;
+
/*
* Condition variable that's signaled when acquired_by changes.
*/
@@ -1069,32 +1072,47 @@ replorigin_get_progress(RepOriginId node, bool flush)
return remote_lsn;
}
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
{
- ConditionVariable *cv = NULL;
+ ConditionVariable *cv;
- if (session_replication_state == NULL)
- return;
+ Assert(session_replication_state != NULL);
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
- if (session_replication_state->acquired_by == MyProcPid)
- {
- cv = &session_replication_state->origin_cv;
+ Assert(session_replication_state->refcount > 0);
+ /*
+ * Reset the PID only if the current backend is the first to set up this
+ * origin. This prevents resetting the PID when other backends are still
+ * using this origin.
+ */
+ if (session_replication_state->acquired_by == MyProcPid)
session_replication_state->acquired_by = 0;
- session_replication_state = NULL;
- }
+
+ session_replication_state->refcount--;
+
+ cv = &session_replication_state->origin_cv;
+ session_replication_state = NULL;
LWLockRelease(ReplicationOriginLock);
- if (cv)
- ConditionVariableBroadcast(cv);
+ ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+ if (session_replication_state == NULL)
+ return;
+
+ replorigin_session_reset_internal();
}
/*
@@ -1205,9 +1223,17 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
Assert(session_replication_state->roident != InvalidRepOriginId);
if (acquired_by == 0)
+ {
session_replication_state->acquired_by = MyProcPid;
+ Assert(session_replication_state->refcount == 0);
+ }
else
+ {
Assert(session_replication_state->acquired_by == acquired_by);
+ Assert(session_replication_state->refcount > 0);
+ }
+
+ session_replication_state->refcount++;
LWLockRelease(ReplicationOriginLock);
@@ -1224,8 +1250,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
void
replorigin_session_reset(void)
{
- ConditionVariable *cv;
-
Assert(max_active_replication_origins != 0);
if (session_replication_state == NULL)
@@ -1233,15 +1257,22 @@ replorigin_session_reset(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("no replication origin is configured")));
- LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
- session_replication_state->acquired_by = 0;
- cv = &session_replication_state->origin_cv;
- session_replication_state = NULL;
-
- LWLockRelease(ReplicationOriginLock);
+ /*
+ * The replication origin cannot be reset if the replication origin is
+ * firstly acquired by this backend and other processes are actively using
+ * now. This can cause acquired_by to be zero and active replication
+ * origin might be dropped.
+ */
+ if (session_replication_state->acquired_by == MyProcPid &&
+ session_replication_state->refcount > 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
+ session_replication_state->roident),
+ errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
+ errhint("Reset the replication origin in all other processes before retrying.")));
- ConditionVariableBroadcast(cv);
+ replorigin_session_reset_internal();
}
/*
--
2.47.3
[application/octet-stream] v4-0002-Handle-corner-cases-related-with-origin.patch (2.4K, 3-v4-0002-Handle-corner-cases-related-with-origin.patch)
download | inline diff:
From 7ba371e30e4b2c13734a2dfeee3cf06d41ab1c1d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Fri, 9 Jan 2026 19:31:57 +0900
Subject: [PATCH v4 2/2] Handle corner cases related with origin
The attribute acquired_by can still be 0 while processes are acquiring the origin.
This can happen if the first process exits while holding the origin. This commit
handles corner cases related to it:
- rejects acquiring origin if it does not have a valid acquired_by
but counter > 0.
- ensures origins cannot be dropped if the counter > 0.
---
src/backend/replication/logical/origin.c | 23 +++++++++++++++++++----
1 file changed, 19 insertions(+), 4 deletions(-)
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 389d2b38d20..b9132b3475d 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -386,16 +386,19 @@ restart:
if (state->roident == roident)
{
/* found our slot, is it busy? */
- if (state->acquired_by != 0)
+ if (state->refcount > 0)
{
ConditionVariable *cv;
if (nowait)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("could not drop replication origin with ID %d, in use by PID %d",
- state->roident,
- state->acquired_by)));
+ (state->acquired_by != 0)
+ ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
+ state->roident,
+ state->acquired_by)
+ : errmsg("could not drop replication origin with ID %d, in use by another process",
+ state->roident)));
/*
* We must wait and then retry. Since we don't know which CV
@@ -1192,6 +1195,18 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
node, acquired_by)));
}
+ /*
+ * PID was not noted in the origin. This can happen the process
+ * originally acquired the origin exits without releasing. To make the
+ * staus clean again, other processes cannot acquire the origin till
+ * all using ones release.
+ */
+ else if (curstate->acquired_by == 0 && curstate->refcount > 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication origin with ID %d is already active in another process",
+ curstate->roident)));
+
/* ok, found slot */
session_replication_state = curstate;
break;
--
2.47.3
view thread (43+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
In-Reply-To: <TY7PR01MB145543A74547443E49E4993E7F58EA@TY7PR01MB14554.jpnprd01.prod.outlook.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox