public inbox for [email protected]  
help / color / mirror / Atom feed
Re: [Patch] add new parameter to pg_replication_origin_session_setup
46+ messages / 8 participants
[nested] [flat]

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2024-08-12 21:48  Euler Taveira <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Euler Taveira @ 2024-08-12 21:48 UTC (permalink / raw)
  To: [email protected]

On Mon, Aug 12, 2024, at 3:43 PM, Doruk Yilmaz wrote:
> Hello all,

Hi!

> While working on our internal tools that utilise replication, we
> realised that a new parameter was added to the internal C function
> corresponding to pg_replication_origin_session_setup.
> However this parameter wasn't included in the user-facing API [1].

I'm curious about your use case. Is it just because the internal function has a
different signature or your tool is capable of apply logical replication changes
in parallel using the SQL API?

> I made this patch to the master which adds a way to control this
> parameter by adding a new version of the
> pg_replication_origin_session_setup function with user facing
> parameters 'text int4' in place of the current 'text' while keeping
> the existing variant
> (ensuring backwards compatibility). Could someone take a look at it?

I did a quick look at your patch and have a few suggestions.

* no documentation changes. Since the function you are changing has a new
signature, this change should be reflected in the documentation.
* no need for a new internal function. The second parameter (PID) can be
optional and defaults to 0 in this case. See how we changed the
pg_create_logical_replication_slot along the years add some IN parameters like
twophase and failover in the recent versions.
* add a CF entry [1] for this patch so we don't forget it. Another advantage is
that this patch is covered by CI [2][3].


[1] https://commitfest.postgresql.org/49/
[2] https://wiki.postgresql.org/wiki/Cfbot
[3] http://cfbot.cputube.org/index.html


--
Euler Taveira
EDB   https://www.enterprisedb.com/


^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2024-08-15 20:53  Doruk Yilmaz <[email protected]>
  parent: Euler Taveira <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Doruk Yilmaz @ 2024-08-15 20:53 UTC (permalink / raw)
  To: Euler Taveira <[email protected]>; +Cc: [email protected]

Hello again,

On Tue, Aug 13, 2024 at 12:48 AM Euler Taveira <[email protected]> wrote:
> I'm curious about your use case. Is it just because the internal function has a
> different signature or your tool is capable of apply logical replication changes
> in parallel using the SQL API?

The latter is correct, it applies logical replication changes in parallel.
Since multiple connections may commit, we need all of them to be able
to advance the replication origin.

> * no documentation changes. Since the function you are changing has a new
> signature, this change should be reflected in the documentation.
> * no need for a new internal function. The second parameter (PID) can be
> optional and defaults to 0 in this case. See how we changed the
> pg_create_logical_replication_slot along the years add some IN parameters like
> twophase and failover in the recent versions.

I updated/rewrote the patch to reflect these suggestions.
It now has the same DEFAULT 0 style used in pg_create_logical_replication_slot.
I also updated the documentation.

> * add a CF entry [1] for this patch so we don't forget it. Another advantage is
> that this patch is covered by CI [2][3].
Sadly I still can't log in to the Commitfest due to the cool-off
period. I will create an entry as soon as this period ends.

Thanks for all the feedback,
Doruk Yılmaz


Attachments:

  [text/x-patch] v2-0001-pg_replication_origin_session_setup-new-parameter.patch (3.7K, 2-v2-0001-pg_replication_origin_session_setup-new-parameter.patch)
  download | inline diff:
From b9c54f3d217f67c24ce74ffa7c1f2812d784333e Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Thu, 15 Aug 2024 23:34:26 +0300
Subject: [PATCH] add new parameter to pg_replication_origin_session_setup

---
 doc/src/sgml/func.sgml                   | 2 +-
 src/backend/catalog/system_functions.sql | 9 ++++++++-
 src/backend/replication/logical/origin.c | 8 +++++---
 src/include/catalog/pg_proc.dat          | 2 +-
 4 files changed, 15 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 5dd95d73a1..7db5a8ed52 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29486,7 +29486,7 @@ DETAIL:  Make sure pg_wal_replay_wait() isn't called within a transaction with a
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type>, <parameter>acquired_by</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 623b9539b1..4aae06e06d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -639,6 +639,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
 AS 'pg_stat_reset_slru';
 
+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, acquired_by integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -736,7 +743,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text,integer) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 419e4814f0..e50bcc8466 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1351,13 +1351,15 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
-
+	int         pid;
+	
 	replorigin_check_prerequisites(true, false);
 
 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
-
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);
+	
 	replorigin_session_origin = origin;
 
 	pfree(name);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 4abc6d9526..a490d4fc6e 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11948,7 +11948,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },
 
 { oid => '6007', descr => 'teardown configured replication progress tracking',
-- 
2.39.2



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-01-08 21:55  Euler Taveira <[email protected]>
  parent: Doruk Yilmaz <[email protected]>
  0 siblings, 2 replies; 46+ messages in thread

From: Euler Taveira @ 2025-01-08 21:55 UTC (permalink / raw)
  To: Doruk Yilmaz <[email protected]>; +Cc: [email protected]

On Thu, Aug 15, 2024, at 5:53 PM, Doruk Yilmaz wrote:
> Hello again,
> 
> On Tue, Aug 13, 2024 at 12:48 AM Euler Taveira <[email protected]> wrote:
> > I'm curious about your use case. Is it just because the internal function has a
> > different signature or your tool is capable of apply logical replication changes
> > in parallel using the SQL API?
> 
> The latter is correct, it applies logical replication changes in parallel.
> Since multiple connections may commit, we need all of them to be able
> to advance the replication origin.
> 
> > * no documentation changes. Since the function you are changing has a new
> > signature, this change should be reflected in the documentation.
> > * no need for a new internal function. The second parameter (PID) can be
> > optional and defaults to 0 in this case. See how we changed the
> > pg_create_logical_replication_slot along the years add some IN parameters like
> > twophase and failover in the recent versions.
> 
> I updated/rewrote the patch to reflect these suggestions.
> It now has the same DEFAULT 0 style used in pg_create_logical_replication_slot.
> I also updated the documentation.

[after a long hiatus...]

I tested your patch again and it does what is advertised. I changed your patch
a bit. The main change was the documentation. You didn't explain what this new
parameter is for. I tried to explain but don't want to add lots of details.
(There is a section that explain how parallel apply processes work behind the
scenes.) I also renamed it from acquired_by to pid to be more descriptive. I
fixed some white space issues too. I noticed that there are no tests. This
doesn't appear to be a shortcoming from this patch but we need to cover some of
these replication functions with an additional test file in another patch.
Finally, I wrote a commit message and it is RfC.

session 1:

postgres=# select * from pg_replication_origin;
roident | roname 
---------+--------
(0 rows)

postgres=# SELECT pg_backend_pid();
pg_backend_pid 
----------------
         260732
(1 row)

postgres=# SELECT pg_replication_origin_create('test');
pg_replication_origin_create 
------------------------------
                            1
(1 row)

postgres=# SELECT pg_replication_origin_session_setup('test', 0);
pg_replication_origin_session_setup 
-------------------------------------
 
(1 row)

postgres=# select * from pg_replication_origin;
roident | roname 
---------+--------
       1 | test
(1 row)

session 2:

postgres=# SELECT pg_replication_origin_session_setup('test', 260732);
pg_replication_origin_session_setup 
-------------------------------------
 
(1 row)

session 3:

postgres=# SELECT pg_replication_origin_session_setup('test', 12345);
ERROR:  could not find replication state slot for replication origin with OID 1 which was acquired by 12345


--
Euler Taveira
EDB   https://www.enterprisedb.com/


Attachments:

  [text/x-patch] v3-0001-pg_replication_origin_session_setup-pid-parameter.patch (5.1K, 3-v3-0001-pg_replication_origin_session_setup-pid-parameter.patch)
  download | inline diff:
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Thu, 15 Aug 2024 23:34:26 +0300
Subject: [PATCH v3] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 doc/src/sgml/func.sgml                   | 8 +++++++-
 src/backend/catalog/system_functions.sql | 9 ++++++++-
 src/backend/replication/logical/origin.c | 4 +++-
 src/include/catalog/pg_proc.dat          | 2 +-
 4 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 47370e581a..e50e689fb6 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29475,7 +29475,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -29483,6 +29483,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
        </para></entry>
       </row>
 
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 591157b1d1..26151e0f1c 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -668,6 +668,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE
 AS 'pg_set_attribute_stats';
 
+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -769,7 +776,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 1b586cb1cf..9cbe1eec45 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1355,12 +1355,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;
 
 	replorigin_check_prerequisites(true, false);
 
 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);
 
 	replorigin_session_origin = origin;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b37e8a6f88..ea118a0563 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12063,7 +12063,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },
 
 { oid => '6007', descr => 'teardown configured replication progress tracking',
-- 
2.39.5



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-03-01 17:52  Doruk Yilmaz <[email protected]>
  parent: Euler Taveira <[email protected]>
  1 sibling, 0 replies; 46+ messages in thread

From: Doruk Yilmaz @ 2025-03-01 17:52 UTC (permalink / raw)
  To: [email protected]

I noticed that the patch needs rebasing, so here is the rebased version.
Hopefully it makes to the commitfest.

Doruk Yılmaz


Attachments:

  [application/x-patch] v4-0001-pg_replication_origin_session_setup-pid-parameter.patch (5.1K, 2-v4-0001-pg_replication_origin_session_setup-pid-parameter.patch)
  download | inline diff:
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Thu, 15 Aug 2024 23:34:26 +0300
Subject: [PATCH v4] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 doc/src/sgml/func.sgml                   | 8 +++++++-
 src/backend/catalog/system_functions.sql | 9 ++++++++-
 src/backend/replication/logical/origin.c | 4 +++-
 src/include/catalog/pg_proc.dat          | 2 +-
 4 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 47370e581a..e50e689fb6 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -29475,7 +29475,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -29483,6 +29483,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
        </para></entry>
       </row>
 
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 86888cd..ebc4005 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -636,6 +636,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
 AS 'pg_stat_reset_slru';
 
+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -737,7 +744,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 1b586cb1cf..9cbe1eec45 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1355,12 +1355,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;
 
 	replorigin_check_prerequisites(true, false);
 
 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);
 
 	replorigin_session_origin = origin;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index b37e8a6f88..ea118a0563 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12063,7 +12063,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },
 
 { oid => '6007', descr => 'teardown configured replication progress tracking',
-- 
2.39.5



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-03-03 03:38  Amit Kapila <[email protected]>
  parent: Euler Taveira <[email protected]>
  1 sibling, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2025-03-03 03:38 UTC (permalink / raw)
  To: Euler Taveira <[email protected]>; +Cc: Doruk Yilmaz <[email protected]>; [email protected]

On Thu, Jan 9, 2025 at 3:26 AM Euler Taveira <[email protected]> wrote:
>
> On Thu, Aug 15, 2024, at 5:53 PM, Doruk Yilmaz wrote:
>
> Hello again,
>
> On Tue, Aug 13, 2024 at 12:48 AM Euler Taveira <[email protected]> wrote:
> > I'm curious about your use case. Is it just because the internal function has a
> > different signature or your tool is capable of apply logical replication changes
> > in parallel using the SQL API?
>
> The latter is correct, it applies logical replication changes in parallel.
> Since multiple connections may commit, we need all of them to be able
> to advance the replication origin.
>

To use replication_origin by multiple processes, one must maintain the
commit order as we do internally by allowing the leader process to
wait for the parallel worker to finish the commit. See comments atop
replorigin_session_setup(). Now, we could expose the pid parameter as
proposed by the patch after documenting the additional requirements,
but I am afraid that users may directly start using the API without
following the commit order principle, which can lead to incorrect
replication. So, isn't it better to do something to avoid the misuse
of this feature before exposing it?

-- 
With Regards,
Amit Kapila.






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-07-28 21:13  Doruk Yilmaz <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Doruk Yilmaz @ 2025-07-28 21:13 UTC (permalink / raw)
  To: Amit Kapila <[email protected]>; Euler Taveira <[email protected]>; +Cc: [email protected]

On Mon, Mar 3, 2025 at 6:39 AM Amit Kapila <[email protected]> wrote:
>
> To use replication_origin by multiple processes, one must maintain the
> commit order as we do internally by allowing the leader process to
> wait for the parallel worker to finish the commit. See comments atop
> replorigin_session_setup(). Now, we could expose the pid parameter as
> proposed by the patch after documenting the additional requirements,
> but I am afraid that users may directly start using the API without
> following the commit order principle, which can lead to incorrect
> replication. So, isn't it better to do something to avoid the misuse
> of this feature before exposing it?

Wouldn't mentioning/describing needing to follow the commit order
principle on the documentation be enough for this?
It is quite an advanced feature that I don't believe person intending
to use it won't start with reading documentation first.


Is there any updates on the commit? I see that intended commitfest window ended.

Thanks,
Doruk Yılmaz





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-07-29 05:13  Amit Kapila <[email protected]>
  parent: Doruk Yilmaz <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2025-07-29 05:13 UTC (permalink / raw)
  To: Doruk Yilmaz <[email protected]>; +Cc: Euler Taveira <[email protected]>; [email protected]

On Tue, Jul 29, 2025 at 2:43 AM Doruk Yilmaz <[email protected]> wrote:
>
> On Mon, Mar 3, 2025 at 6:39 AM Amit Kapila <[email protected]> wrote:
> >
> > To use replication_origin by multiple processes, one must maintain the
> > commit order as we do internally by allowing the leader process to
> > wait for the parallel worker to finish the commit. See comments atop
> > replorigin_session_setup(). Now, we could expose the pid parameter as
> > proposed by the patch after documenting the additional requirements,
> > but I am afraid that users may directly start using the API without
> > following the commit order principle, which can lead to incorrect
> > replication. So, isn't it better to do something to avoid the misuse
> > of this feature before exposing it?
>
> Wouldn't mentioning/describing needing to follow the commit order
> principle on the documentation be enough for this?
> It is quite an advanced feature that I don't believe person intending
> to use it won't start with reading documentation first.
>

That is true but I still feel there has to be some mechanism where we
can catch and give an ERROR to the user, if it doesn't follow the
same. For example, pg_replication_origin_advance() always allows going
backwards in terms of LSN which means if one doesn't follow commit
order, it can lead to breaking the replication as after restart the
client can ask to start replication from some prior point.

>
> Is there any updates on the commit?
>

I think we are still under discussion about the requirements and
design for this API. Can you tell us the use case? Did you also intend
to use it for parallel apply, if so, can you also tell at a high
level, how you are planning to manage origin? It will help us to
extend the API(s) in a meaningful way.

-- 
With Regards,
Amit Kapila.





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-07-29 18:30  Doruk Yilmaz <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Doruk Yilmaz @ 2025-07-29 18:30 UTC (permalink / raw)
  To: Amit Kapila <[email protected]>; +Cc: Euler Taveira <[email protected]>; [email protected]

On Mon, Jul 29, 2025 at 8:13 AM Amit Kapila <[email protected]> wrote:
> That is true but I still feel there has to be some mechanism where we
> can catch and give an ERROR to the user, if it doesn't follow the
> same. For example, pg_replication_origin_advance() always allows going
> backwards in terms of LSN which means if one doesn't follow commit
> order, it can lead to breaking the replication as after restart the
> client can ask to start replication from some prior point.
If you have any ideas for safeguards or API changes, I'd be happy to
help implement them or discuss them.
> Can you tell us the use case? Did you also intend to use it for parallel apply, if so, can you also tell at a high
> level, how you are planning to manage origin?
Yes, we use it for parallel apply. We have a custom logical
replication system that applies changes using multiple worker
processes, each with their own database connection.
Our use case requires multiple connections to be able to advance the
same replication origin. We handle this by having a master process
coordinate the workers, where each worker process calls
pg_replication_origin_session_setup with the master's PID as the
second parameter.
We may send operations out of order but we always commit in order, so
there's no chance of creating inconsistencies. There's the chance of
deadlocks, but these can be detected. It's really similar to the
existing parallel apply implementation - the main difference is that
we're applying from jsonl files instead of directly from another
database.
Currently we use a local patch to expose the PID parameter, but having
this upstream would be great. It causes a lot of headaches for us to
use a patched PostgreSQL.
Thanks,
Doruk Yılmaz





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-08-11 06:44  Amit Kapila <[email protected]>
  parent: Doruk Yilmaz <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2025-08-11 06:44 UTC (permalink / raw)
  To: Doruk Yilmaz <[email protected]>; +Cc: Euler Taveira <[email protected]>; [email protected]; Sawada Masahiko <[email protected]>; Hou, Zhijie/侯 志杰 <[email protected]>

On Wed, Jul 30, 2025 at 12:00 AM Doruk Yilmaz <[email protected]> wrote:
>
> On Mon, Jul 29, 2025 at 8:13 AM Amit Kapila <[email protected]> wrote:
> > That is true but I still feel there has to be some mechanism where we
> > can catch and give an ERROR to the user, if it doesn't follow the
> > same. For example, pg_replication_origin_advance() always allows going
> > backwards in terms of LSN which means if one doesn't follow commit
> > order, it can lead to breaking the replication as after restart the
> > client can ask to start replication from some prior point.
> If you have any ideas for safeguards or API changes, I'd be happy to
> help implement them or discuss them.
> > Can you tell us the use case? Did you also intend to use it for parallel apply, if so, can you also tell at a high
> > level, how you are planning to manage origin?
> Yes, we use it for parallel apply. We have a custom logical
> replication system that applies changes using multiple worker
> processes, each with their own database connection.
> Our use case requires multiple connections to be able to advance the
> same replication origin.
>

How do you advance the origin? Did you use
pg_replication_origin_advance()? If so, you should be aware that it
can be used for initial setup; see comment in that API code: "Can't
sensibly pass a local commit to be flushed at checkpoint - this xact
hasn't committed yet. This is why this function should be used to set
up the initial replication state, but not for replay." I wonder if you
are using pg_replication_origin_advance(), won't its current
implementation has the potential to cause a problem for your usecase?
I think the problem it can cause is it may miss a transaction to apply
after restart because we can use remote_lsn without a corresponding
transaction (local_lsn) flushed on the subscriber. This can happen
because ideally we want the transaction that is not successfully
flushed to be replayed after restart.

In general, I was thinking of adding a restriction
pg_replication_origin_advance() such that it gives an ERROR when a
user tries to move remote_lsn backward unless requested explicitly.

It would be good to know the opinion of others involved in the
original change of maintaining commit order for parallel apply of
large transactions.

-- 
With Regards,
Amit Kapila.





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-08-11 17:11  Doruk Yilmaz <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Doruk Yilmaz @ 2025-08-11 17:11 UTC (permalink / raw)
  To: Amit Kapila <[email protected]>; +Cc: [email protected]

On Mon, Aug 11, 2025 at 9:44 AM Amit Kapila <[email protected]> wrote:
> How do you advance the origin? Did you use > pg_replication_origin_advance()? If so, you should be aware that it
> can be used for initial setup; see comment in that API code...

No, we don't use pg_replication_origin_advance(). We use
pg_replication_origin_xact_setup() instead as I mentioned before.

Each worker does the following:
1. Sets up its own replication-origin session with
pg_replication_origin_session_setup() (using the master process PID).
2. Applies changes inside transactions.
3. Right before commit, calls pg_replication_origin_xact_setup(lsn,
commit_timestamp).
4. Commits only if everything succeeded, so the origin only advances
on a real commit.

That way, the origin LSN moves forward only when the transaction is
actually committed. If something fails or the process crashes, the
origin stays at the last successful commit, and on restart we replay
from the correct spot. It's safer than advancing the origin without
knowing the transaction made it to disk.

So the issue you described is not relevant for our implementation.





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-08-12 09:27  Amit Kapila <[email protected]>
  parent: Doruk Yilmaz <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2025-08-12 09:27 UTC (permalink / raw)
  To: Doruk Yilmaz <[email protected]>; +Cc: [email protected]

On Mon, Aug 11, 2025 at 10:41 PM Doruk Yilmaz <[email protected]> wrote:
>
> On Mon, Aug 11, 2025 at 9:44 AM Amit Kapila <[email protected]> wrote:
> > How do you advance the origin? Did you use > pg_replication_origin_advance()? If so, you should be aware that it
> > can be used for initial setup; see comment in that API code...
>
> No, we don't use pg_replication_origin_advance(). We use
> pg_replication_origin_xact_setup() instead as I mentioned before.
>
> Each worker does the following:
> 1. Sets up its own replication-origin session with
> pg_replication_origin_session_setup() (using the master process PID).
> 2. Applies changes inside transactions.
> 3. Right before commit, calls pg_replication_origin_xact_setup(lsn,
> commit_timestamp).
> 4. Commits only if everything succeeded, so the origin only advances
> on a real commit.
>
> That way, the origin LSN moves forward only when the transaction is
> actually committed. If something fails or the process crashes, the
> origin stays at the last successful commit, and on restart we replay
> from the correct spot. It's safer than advancing the origin without
> knowing the transaction made it to disk.
>

Your use looks good to me. So, maybe we can update the docs with the
dangers if the users of API doesn't follow commit order then it may
lead to data inconsistency should be sufficient. Additionally, we may
want to give an example as to how to use this API for parallel apply.

Thoughts?

-- 
With Regards,
Amit Kapila.





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-08-18 18:55  Doruk Yilmaz <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Doruk Yilmaz @ 2025-08-18 18:55 UTC (permalink / raw)
  To: [email protected]; +Cc: Amit Kapila <[email protected]>

> Your use looks good to me. So, maybe we can update the docs with the
> dangers if the users of API doesn't follow commit order then it may
> lead to data inconsistency should be sufficient. Additionally, we may
> want to give an example as to how to use this API for parallel apply.

That sounds reasonable. I’ve updated the patch and added more
information to the documentation covering the topics you mentioned.
I also added a Caution block so potential users won’t miss it. I hope
this patch meets your expectations.


Attachments:

  [text/x-patch] v5-0001-pg_replication_origin_session_setup-pid-parameter.patch (5.9K, 2-v5-0001-pg_replication_origin_session_setup-pid-parameter.patch)
  download | inline diff:
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Fri, 15 Aug 2025 21:37:18 +0300
Subject: [PATCH v5] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 doc/src/sgml/func/func-admin.sgml        | 22 ++++++++++++++++++++--
 src/backend/catalog/system_functions.sql |  9 ++++++++-
 src/backend/replication/logical/origin.c |  4 +++-
 src/include/catalog/pg_proc.dat          |  2 +-
 4 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 446fdfe..4b86676 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
-       </para></entry>
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
+       </para>
+       <caution>
+        <para>
+         When multiple processes share the same replication origin, it is critical
+         to maintain commit order to prevent data inconsistency. While processes
+         may send operations out of order, they must commit transactions in the
+         correct sequence to ensure proper replication consistency. The recommended workflow
+         for each worker is: set up the replication origin session with the first process's PID,
+         apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+         with the LSN and commit timestamp before committing, then commit the
+         transaction only if everything succeeded.
+        </para>
+       </caution>
+      </entry>
       </row>

       <row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308..f60287d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
 AS 'pg_stat_reset_slru';

+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM

 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;

-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;

 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e5..98d47e1 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;

 	replorigin_check_prerequisites(true, false);

 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);

 	replorigin_session_origin = origin;

diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 118d6da..dd2d938 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12223,7 +12223,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },

 { oid => '6007', descr => 'teardown configured replication progress tracking',


^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-08-22 05:50  Hayato Kuroda (Fujitsu) <[email protected]>
  parent: Doruk Yilmaz <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Hayato Kuroda (Fujitsu) @ 2025-08-22 05:50 UTC (permalink / raw)
  To: 'Doruk Yilmaz' <[email protected]>; +Cc: Amit Kapila <[email protected]>; [email protected] <[email protected]>

Dear Doruk,

> That sounds reasonable. I’ve updated the patch and added more
> information to the documentation covering the topics you mentioned.
> I also added a Caution block so potential users won’t miss it. I hope
> this patch meets your expectations.

Can you explain more why we must extend the SQL interface? I read your use
case [1], and looks like that a new type of background worker is introduced in
your system. If so, why doesn't the worker directly call C-lang interface
replorigin_session_setup()?
Personally considered, SQL functions are usable by unfamiliar users so that this
change may be dangerous. It is better if developers can use C APIs instead.

[1]: https://www.postgresql.org/message-id/CAMPB6wckvkKrXVPH5j8Ske2cVedkb-TRLdnOb5e74zYM1CynGw%40mail.gma...

Best regards,
Hayato Kuroda
FUJITSU LIMITED


^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-08-22 12:51  Doruk Yilmaz <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Doruk Yilmaz @ 2025-08-22 12:51 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: [email protected]; Amit Kapila <[email protected]>

Dear Hayato,

> Can you explain more why we must extend the SQL interface?

In our system the workers aren't background workers and we don't ship
a server-side extension; they're plain external processes (Python in
our case) talking over standard database connections. In many
deployments -especially managed Postgres- we can't load custom C code
even if we wanted to. That's why we want to expose the existing pid
knob via SQL: it lets ordinary client sessions participate in the
same, already-implemented origin coordination without maintaining a
fork or an extension.
This patch doesn't invent a new capability, it just makes the internal
behavior reachable from SQL. The new argument is optional and defaults
to the current behavior, so nothing changes for existing users. It
also keeps the feature usable from any language/runtime that
coordinates parallel apply at the application layer. And I don't
believe it is that dangerous or risky. The actual code we use in
python is not that complex that I believe a person using replication
already should be able to set it up. I don't understand why being able
to achieve parallel replication is not accessible via SQL already.

I am happy to do changes to the patch if you think there should be
more guardrails.

Thanks,
Doruk Yılmaz





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-08-25 10:43  Hayato Kuroda (Fujitsu) <[email protected]>
  parent: Doruk Yilmaz <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Hayato Kuroda (Fujitsu) @ 2025-08-25 10:43 UTC (permalink / raw)
  To: 'Doruk Yilmaz' <[email protected]>; +Cc: [email protected] <[email protected]>; Amit Kapila <[email protected]>

Dear Doruk,

> In our system the workers aren't background workers and we don't ship
> a server-side extension; they're plain external processes (Python in
> our case) talking over standard database connections. In many
> deployments -especially managed Postgres- we can't load custom C code
> even if we wanted to. That's why we want to expose the existing pid
> knob via SQL: it lets ordinary client sessions participate in the
> same, already-implemented origin coordination without maintaining a
> fork or an extension.

So, your python process establishes two connections, for publisher (replication connection)
and subscriber (normal connection). It receives changes from the publisher,
constructs SQL statements from the received results, and sends to subscriber's
backend, is it right?
I'm not sure it is the common approach, but I see your point that you cannot
install your extensions on managed postgres.

Anyway, I still feel bit dangerous but OK if others can accept.

Regarding the patch, I want to ask one point.
```
+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE
+AS 'pg_replication_origin_session_setup';
...
{ oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },
```

Is there a rule which attribute is clarified and others are not?
For example, VOLATILE is specified on both side, STRICT is written only in the
system_functions.sql, and PARALLEL UNSAFE is set on pg_proc.dat.

Best regards,
Hayato Kuroda
FUJITSU LIMITED



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-03 12:43  Doruk Yilmaz <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Doruk Yilmaz @ 2025-09-03 12:43 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: [email protected]; Amit Kapila <[email protected]>

Dear Hayato,

> So, your python process establishes two connections, for publisher (replication connection)
> and subscriber (normal connection). It receives changes from the publisher,
> constructs SQL statements from the received results, and sends to subscriber's
> backend, is it right?

Actually, it's a bit simpler than that - there are no two connections.
Our program reads changes from JSONL files rather than directly from a
publisher connection.
We have multiple Python processes, each with a single database
connection to the subscriber,
reading from these files and applying changes in parallel.

> Is there a rule which attribute is clarified and others are not?
> For example, VOLATILE is specified on both side, STRICT is written only in the
> system_functions.sql, and PARALLEL UNSAFE is set on pg_proc.dat.

In pg_proc.dat, I believe the STRICT, IMMUTABLE, and PARALLEL SAFE are
the defaults (check out pg_proc.h).
So in pg_proc.dat, the ones that are specified are the ones that
aren't defaults,
there is provolatile => 'v' (for VOLATILE) and proparallel => 'u' (for
UNSAFE), but no prostrict since it's already true by default.
In system_functions.sql, I went with being explicit about all the
attributes for clarity as it is the code declaration.
If you want, I can also make the pg_proc.dat explicit.

Thanks,
Doruk Yılmaz





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-06 05:09  Amit Kapila <[email protected]>
  parent: Doruk Yilmaz <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2025-09-06 05:09 UTC (permalink / raw)
  To: Doruk Yilmaz <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; [email protected]

On Wed, Sep 3, 2025 at 6:13 PM Doruk Yilmaz <[email protected]> wrote:
>
> Dear Hayato,
>
> > So, your python process establishes two connections, for publisher (replication connection)
> > and subscriber (normal connection). It receives changes from the publisher,
> > constructs SQL statements from the received results, and sends to subscriber's
> > backend, is it right?
>
> Actually, it's a bit simpler than that - there are no two connections.
> Our program reads changes from JSONL files rather than directly from a
> publisher connection.
> We have multiple Python processes, each with a single database
> connection to the subscriber,
> reading from these files and applying changes in parallel.
>
> > Is there a rule which attribute is clarified and others are not?
> > For example, VOLATILE is specified on both side, STRICT is written only in the
> > system_functions.sql, and PARALLEL UNSAFE is set on pg_proc.dat.
>
> In pg_proc.dat, I believe the STRICT, IMMUTABLE, and PARALLEL SAFE are
> the defaults (check out pg_proc.h).
> So in pg_proc.dat, the ones that are specified are the ones that
> aren't defaults,
> there is provolatile => 'v' (for VOLATILE) and proparallel => 'u' (for
> UNSAFE), but no prostrict since it's already true by default.
> In system_functions.sql, I went with being explicit about all the
> attributes for clarity as it is the code declaration.
>

Then why didn't you specified PARALLEL UNSAFE as well?

BTW, yesterday a new thread started with the same requirement [1]. It
uses a slightly different way to define the new function. do you have
any opinion on it?

[1] - https://www.postgresql.org/message-id/CAE2gYzyTSNvHY1%2BiWUwykaLETSuAZsCWyryokjP6rG46ZvRgQA%40mail.g...
-- 
With Regards,
Amit Kapila.





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-08 17:22  Doruk Yilmaz <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Doruk Yilmaz @ 2025-09-08 17:22 UTC (permalink / raw)
  To: Amit Kapila <[email protected]>; +Cc: [email protected]; Hayato Kuroda (Fujitsu) <[email protected]>

> Then why didn't you specified PARALLEL UNSAFE as well?

You are correct, I missed marking the function as PARALLEL UNSAFE.
I’ve attached a revised patch with the correct annotation.

> BTW, yesterday a new thread started with the same requirement [1]. It
> uses a slightly different way to define the new function. do you have
> any opinion on it?

I don’t think introducing a separate function is a good idea. It’s
effectively the same behavior, technical debt, and maintenance
overhead without a clear benefit.
Our patch keeps a single function with a default parameter, so it’s
not a breaking change. So I believe our approach is preferable.
But I would say that, the fact that another patch is proposing the
same capability indicates there’s broader demand for this change.


Attachments:

  [text/x-patch] v6-0001-pg_replication_origin_session_setup-pid-parameter.patch (5.9K, 2-v6-0001-pg_replication_origin_session_setup-pid-parameter.patch)
  download | inline diff:
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 8 00:00:00 2025
From: Doruk <[email protected]>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v6] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 doc/src/sgml/func/func-admin.sgml        | 22 ++++++++++++++++++++--
 src/backend/catalog/system_functions.sql |  9 ++++++++-
 src/backend/replication/logical/origin.c |  4 +++-
 src/include/catalog/pg_proc.dat          |  2 +-
 4 files changed, 32 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 446fdfe..4b86676 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
-       </para></entry>
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
+       </para>
+       <caution>
+        <para>
+         When multiple processes share the same replication origin, it is critical
+         to maintain commit order to prevent data inconsistency. While processes
+         may send operations out of order, they must commit transactions in the
+         correct sequence to ensure proper replication consistency. The recommended workflow
+         for each worker is: set up the replication origin session with the first process's PID,
+         apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+         with the LSN and commit timestamp before committing, then commit the
+         transaction only if everything succeeded.
+        </para>
+       </caution>
+      </entry>
       </row>

       <row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308..f60287d 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
 AS 'pg_stat_reset_slru';

+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM

 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;

-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;

 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e5..98d47e1 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;

 	replorigin_check_prerequisites(true, false);

 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);

 	replorigin_session_origin = origin;

diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 118d6da..dd2d938 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12223,7 +12223,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },

 { oid => '6007', descr => 'teardown configured replication progress tracking',


^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-16 04:37  Hayato Kuroda (Fujitsu) <[email protected]>
  parent: Doruk Yilmaz <[email protected]>
  0 siblings, 2 replies; 46+ messages in thread

From: Hayato Kuroda (Fujitsu) @ 2025-09-16 04:37 UTC (permalink / raw)
  To: 'Doruk Yilmaz' <[email protected]>; +Cc: [email protected] <[email protected]>; Amit Kapila <[email protected]>

Dear Doruk,

Thanks for updating the patch and sorry for being late.
The new patch looks good to me.

Best regards,
Hayato Kuroda
FUJITSU LIMITED



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-16 21:49  Doruk Yilmaz <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  1 sibling, 0 replies; 46+ messages in thread

From: Doruk Yilmaz @ 2025-09-16 21:49 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: [email protected]; Amit Kapila <[email protected]>

Dear Hayato,

Thanks for the feedback on the patch, I'm glad the latest version looks good.
I was wondering if there is anything else I need to do on my end, or
any other process I should be aware of, for this patch to move
forward? I'm happy to make any further adjustments or provide more
information if needed.

Thanks,
Doruk Yılmaz





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-17 06:19  Amit Kapila <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  1 sibling, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2025-09-17 06:19 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>

On Tue, Sep 16, 2025 at 10:07 AM Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> Dear Doruk,
>
> Thanks for updating the patch and sorry for being late.
> The new patch looks good to me.
>

Can we think of writing a few tests for this newly exposed functionality?

-- 
With Regards,
Amit Kapila.





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-17 09:22  Hayato Kuroda (Fujitsu) <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Hayato Kuroda (Fujitsu) @ 2025-09-17 09:22 UTC (permalink / raw)
  To: 'Amit Kapila' <[email protected]>; +Cc: Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>

Dear Amit,

> Can we think of writing a few tests for this newly exposed functionality?

I considered a test, please see attached files. 0001 was not changed from v6 and
0002 contained tests. Here, two sessions were opened and confirmed that they can
set the same origin.

BTW, while testing I found the existing issue of this function. Since the
session_replication_state is set before the pid check, there is a case that
session origin retains in case of failure. Here is a quick reproducer:

```
postgres=# SELECT pg_replication_origin_create('origin');
 pg_replication_origin_create 
------------------------------
                            1
(1 row)

postgres=# -- run origin_session_setup with incorrect parameter
postgres=# SELECT pg_replication_origin_session_setup('origin', -1);
ERROR:  could not find replication state slot for replication origin with OID 1 which was acquired by -1
postgres=# -- run origin_session_setup again with correct parameter
postgres=# SELECT pg_replication_origin_session_setup('origin');
ERROR:  cannot setup replication origin when one is already setup
```

The issue has exist since we introduces the parallel apply, but it has not been
found till now. Because parallel apply workers have not specified the invalid
pid. It can be more likely to happen so it's time to fix at the same time.

Idea for fix is that use local replication state and then at end assign it to
process-level. 0003 implemented that.

How do you feel?

Best regards,
Hayato Kuroda
FUJITSU LIMITED 



Attachments:

  [application/octet-stream] v7-0001-pg_replication_origin_session_setup-pid-parameter.patch (5.9K, 2-v7-0001-pg_replication_origin_session_setup-pid-parameter.patch)
  download | inline diff:
From 566b20a80856c1fe49efb63930cbb695903a8235 Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v7 1/3] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 doc/src/sgml/func/func-admin.sgml        | 23 +++++++++++++++++++++--
 src/backend/catalog/system_functions.sql |  9 ++++++++-
 src/backend/replication/logical/origin.c |  4 +++-
 src/include/catalog/pg_proc.dat          |  2 +-
 4 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..1b465bc8ba7 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
-       </para></entry>
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
+       </para>
+       <caution>
+        <para>
+         When multiple processes share the same replication origin, it is critical
+         to maintain commit order to prevent data inconsistency. While processes
+         may send operations out of order, they must commit transactions in the
+         correct sequence to ensure proper replication consistency. The recommended workflow
+         for each worker is: set up the replication origin session with the first process's PID,
+         apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+         with the LSN and commit timestamp before committing, then commit the
+         transaction only if everything succeeded.
+        </para>
+       </caution>
+      </entry>
       </row>
 
       <row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
 AS 'pg_stat_reset_slru';
 
+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..98d47e1beb8 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;
 
 	replorigin_check_prerequisites(true, false);
 
 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);
 
 	replorigin_session_origin = origin;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },
 
 { oid => '6007', descr => 'teardown configured replication progress tracking',
-- 
2.47.3



  [application/octet-stream] v7-0002-add-test.patch (3.3K, 3-v7-0002-add-test.patch)
  download | inline diff:
From 3a5018870916f0be07797127c628e152b3c920bd Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 17 Sep 2025 15:20:45 +0900
Subject: [PATCH v7 2/3] add test

---
 contrib/test_decoding/meson.build          |  1 +
 contrib/test_decoding/t/002_repl_origin.pl | 73 ++++++++++++++++++++++
 2 files changed, 74 insertions(+)
 create mode 100644 contrib/test_decoding/t/002_repl_origin.pl

diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..e00e03ba08d 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -74,6 +74,7 @@ tests += {
   'tap': {
     'tests': [
       't/001_repl_stats.pl',
+      't/002_repl_origin.pl',
     ],
   },
 }
diff --git a/contrib/test_decoding/t/002_repl_origin.pl b/contrib/test_decoding/t/002_repl_origin.pl
new file mode 100644
index 00000000000..e1aa57b0995
--- /dev/null
+++ b/contrib/test_decoding/t/002_repl_origin.pl
@@ -0,0 +1,73 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test session replication origin setup, especially by the multiple sessions
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Test set-up
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'autovacuum = off');
+$node->start;
+
+# Create a replication origin
+$node->safe_psql('postgres',
+	"SELECT pg_replication_origin_create('origin');");
+
+# Bump the query timeout to avoid false negatives on slow test systems.
+my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
+
+# Start a background session
+my $session1 = $node->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+# Setup the replication origin to the session
+my $pid = $session1->query_safe(
+	qq(SELECT pg_replication_origin_session_setup('origin');
+    SELECT pg_backend_pid();));
+
+is( $session1->query_safe(
+		qq(SELECT pg_replication_origin_session_is_setup();)),
+	't',
+	"A replication origin is assigned to the session");
+
+# Start another session
+my $session2 = $node->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+# Attach to the same replication origin
+$session2->query_safe(
+	qq(SELECT pg_replication_origin_session_setup('origin', $pid);));
+
+is( $session2->query_safe(
+		qq(SELECT pg_replication_origin_session_is_setup();)),
+	't',
+	"Replication origin can accept multiple assignment");
+
+# Emit a transactional message to update the local_lsn and store the current
+# value.
+$session1->query_safe(
+	qq(SELECT pg_logical_emit_message(true, 'prefix', 'message on session1');)
+);
+my $old_lsn = $session1->query_safe(
+	qq(SELECT local_lsn FROM pg_replication_origin_status;));
+
+# Emit a transactional message from another session
+$session2->query_safe(
+	qq(SELECT pg_logical_emit_message(true, 'prefix', 'message on session2');)
+);
+
+# Confirm local_lsn can be updated by concurrent processes
+my $new_lsn = $session1->query_safe(
+	qq(SELECT local_lsn FROM pg_replication_origin_status;));
+is($session1->query_safe(qq(SELECT '$old_lsn' < '$new_lsn')),
+	't', "Replication origin can be advanced by both sessions");
+
+done_testing();
-- 
2.47.3



  [application/octet-stream] v7-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patch (4.5K, 4-v7-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patch)
  download | inline diff:
From aef02a788bacc87400872a6a8d42d9aeb0301001 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 17 Sep 2025 18:15:33 +0900
Subject: [PATCH v7 3/3] Avoid setting ReplicationState in case of ERROR

---
 contrib/test_decoding/expected/replorigin.out |  3 ++
 contrib/test_decoding/sql/replorigin.sql      |  3 ++
 src/backend/replication/logical/origin.c      | 31 +++++++++++++------
 3 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..4f64ea8942f 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 ERROR:  duplicate key value violates unique constraint "pg_replication_origin_roname_index"
 DETAIL:  Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR:  could not find replication state slot for replication origin with OID 1 which was acquired by -1
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
  pg_replication_origin_create 
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..d899d5cdc18 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 -- ensure duplicate creations fail
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
 SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 98d47e1beb8..0bbc96bcee5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1122,6 +1122,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	static bool registered_cleanup;
 	int			i;
 	int			free_slot = -1;
+	ReplicationState *candidate_state = NULL;
+	bool		initialized = false;
 
 	if (!registered_cleanup)
 	{
@@ -1168,34 +1170,43 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 		}
 
 		/* ok, found slot */
-		session_replication_state = curstate;
+		candidate_state = curstate;
 		break;
 	}
 
 
-	if (session_replication_state == NULL && free_slot == -1)
+	if (candidate_state == NULL && free_slot == -1)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("could not find free replication state slot for replication origin with ID %d",
 						node),
 				 errhint("Increase \"max_active_replication_origins\" and try again.")));
-	else if (session_replication_state == NULL)
+	else if (candidate_state == NULL)
 	{
 		/* initialize new slot */
-		session_replication_state = &replication_states[free_slot];
-		Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
-		Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
-		session_replication_state->roident = node;
+		candidate_state = &replication_states[free_slot];
+		Assert(candidate_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(candidate_state->local_lsn == InvalidXLogRecPtr);
+		candidate_state->roident = node;
+		initialized = true;
 	}
 
 
-	Assert(session_replication_state->roident != InvalidRepOriginId);
+	Assert(candidate_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
-		session_replication_state->acquired_by = MyProcPid;
-	else if (session_replication_state->acquired_by != acquired_by)
+		candidate_state->acquired_by = MyProcPid;
+	else if (candidate_state->acquired_by != acquired_by)
+	{
+		if (initialized)
+			candidate_state->roident = InvalidRepOriginId;
+
 		elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
 			 node, acquired_by);
+	}
+
+	/* Candidate slot looks ok, use it */
+	session_replication_state = candidate_state;
 
 	LWLockRelease(ReplicationOriginLock);
 
-- 
2.47.3



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-18 07:37  Hayato Kuroda (Fujitsu) <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Hayato Kuroda (Fujitsu) @ 2025-09-18 07:37 UTC (permalink / raw)
  To: 'Amit Kapila' <[email protected]>; +Cc: Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>

Dear hackers,

> I considered a test, please see attached files. 0001 was not changed from v6 and
> 0002 contained tests. Here, two sessions were opened and confirmed that they
> can
> set the same origin.

After considering and verifying more, it is more efficient to test via isolation
tester. Attached patchset does like that.
On my env, the duration became 10x faster because it does not start the instance
within the test.

In the test file, two sessions s0 and s1 are launched, they set the same session
origin. They insert local_lsn to a table and confirm latter insertion has larger
value.

One hacky point is to obtain pid for s0 from s1. Below contains my analysis.

application_name is controlled by the isolation_main.c and isolationtester.c.
When the isolation test works, initially isolation_main starts and launches
isolaiontester process, one per spec file.
In main.c, the application_name is set to "isolation/${testname}" at the starter
function. Then, after isolationtester parses the spec file, it appends given
name to each session. This is done at line 193.

Best regards,
Hayato Kuroda
FUJITSU LIMITED



Attachments:

  [application/octet-stream] v8-0001-pg_replication_origin_session_setup-pid-parameter.patch (5.9K, 2-v8-0001-pg_replication_origin_session_setup-pid-parameter.patch)
  download | inline diff:
From 6315206a177be337669883376218cef362d8412b Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v8 1/3] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 doc/src/sgml/func/func-admin.sgml        | 23 +++++++++++++++++++++--
 src/backend/catalog/system_functions.sql |  9 ++++++++-
 src/backend/replication/logical/origin.c |  4 +++-
 src/include/catalog/pg_proc.dat          |  2 +-
 4 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..1b465bc8ba7 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
-       </para></entry>
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
+       </para>
+       <caution>
+        <para>
+         When multiple processes share the same replication origin, it is critical
+         to maintain commit order to prevent data inconsistency. While processes
+         may send operations out of order, they must commit transactions in the
+         correct sequence to ensure proper replication consistency. The recommended workflow
+         for each worker is: set up the replication origin session with the first process's PID,
+         apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+         with the LSN and commit timestamp before committing, then commit the
+         transaction only if everything succeeded.
+        </para>
+       </caution>
+      </entry>
       </row>
 
       <row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
 AS 'pg_stat_reset_slru';
 
+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..98d47e1beb8 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;
 
 	replorigin_check_prerequisites(true, false);
 
 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);
 
 	replorigin_session_origin = origin;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },
 
 { oid => '6007', descr => 'teardown configured replication progress tracking',
-- 
2.47.3



  [application/octet-stream] v8-0002-add-test.patch (6.3K, 3-v8-0002-add-test.patch)
  download | inline diff:
From ee0f1539ddb8d68cc80af229fa0afb5d58e55e92 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 17 Sep 2025 15:20:45 +0900
Subject: [PATCH v8 2/3] add test

---
 contrib/test_decoding/Makefile                |  2 +-
 .../test_decoding/expected/repl_origin.out    | 79 +++++++++++++++++++
 contrib/test_decoding/meson.build             |  1 +
 contrib/test_decoding/specs/repl_origin.spec  | 56 +++++++++++++
 4 files changed, 137 insertions(+), 1 deletion(-)
 create mode 100644 contrib/test_decoding/expected/repl_origin.out
 create mode 100644 contrib/test_decoding/specs/repl_origin.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 02e961f4d31..8aa80054944 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot slot_creation_error catalog_change_snapshot \
-	skip_snapshot_restore invalidation_distribution
+	skip_snapshot_restore invalidation_distribution repl_origin
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/repl_origin.out b/contrib/test_decoding/expected/repl_origin.out
new file mode 100644
index 00000000000..9ef80217b9d
--- /dev/null
+++ b/contrib/test_decoding/expected/repl_origin.out
@@ -0,0 +1,79 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s1_setup: 
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/repl_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s0_add_message: 
+    SELECT 1
+    FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+
+?column?
+--------
+       1
+(1 row)
+
+step s0_store_lsn: 
+    INSERT INTO local_lsn_store
+    SELECT 0, local_lsn FROM pg_replication_origin_status;
+
+step s1_add_message: 
+    SELECT 1
+    FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+
+?column?
+--------
+       1
+(1 row)
+
+step s1_store_lsn: 
+    INSERT INTO local_lsn_store
+    SELECT 1, local_lsn FROM pg_replication_origin_status;
+
+step s0_compare: 
+    SELECT s0.lsn < s1.lsn
+    FROM local_lsn_store as s0, local_lsn_store as s1
+    WHERE s0.session = 0 AND s1.session = 1;
+
+?column?
+--------
+t       
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..6d687eeb2d7 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -64,6 +64,7 @@ tests += {
       'slot_creation_error',
       'skip_snapshot_restore',
       'invalidation_distribution',
+      'repl_origin',
     ],
     'regress_args': [
       '--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/repl_origin.spec b/contrib/test_decoding/specs/repl_origin.spec
new file mode 100644
index 00000000000..266ce553444
--- /dev/null
+++ b/contrib/test_decoding/specs/repl_origin.spec
@@ -0,0 +1,56 @@
+# Test multi-session replication origin manipulations; ensure local_lsn can be
+# updated by all attached sessions.
+
+setup
+{
+    SELECT pg_replication_origin_create('origin');
+    CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
+}
+
+teardown
+{
+    SELECT pg_replication_origin_drop('origin');
+    DROP TABLE local_lsn_store;
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
+step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s0_add_message" {
+    SELECT 1
+    FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+}
+step "s0_store_lsn" {
+    INSERT INTO local_lsn_store
+    SELECT 0, local_lsn FROM pg_replication_origin_status;
+}
+step "s0_compare" {
+    SELECT s0.lsn < s1.lsn
+    FROM local_lsn_store as s0, local_lsn_store as s1
+    WHERE s0.session = 0 AND s1.session = 1;
+}
+step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_setup" {
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/repl_origin/s0';
+}
+step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s1_add_message" {
+    SELECT 1
+    FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+}
+step "s1_store_lsn" {
+    INSERT INTO local_lsn_store
+    SELECT 1, local_lsn FROM pg_replication_origin_status;
+}
+step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+
+# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
+# commits a transaction and store the local_lsn of the replication origin.
+# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
-- 
2.47.3



  [application/octet-stream] v8-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patch (4.5K, 4-v8-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patch)
  download | inline diff:
From 601cbb02265a5373d298e6803715d30c2370a111 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 17 Sep 2025 18:15:33 +0900
Subject: [PATCH v8 3/3] Avoid setting ReplicationState in case of ERROR

---
 contrib/test_decoding/expected/replorigin.out |  3 ++
 contrib/test_decoding/sql/replorigin.sql      |  3 ++
 src/backend/replication/logical/origin.c      | 31 +++++++++++++------
 3 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..4f64ea8942f 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 ERROR:  duplicate key value violates unique constraint "pg_replication_origin_roname_index"
 DETAIL:  Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR:  could not find replication state slot for replication origin with OID 1 which was acquired by -1
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
  pg_replication_origin_create 
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..d899d5cdc18 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 -- ensure duplicate creations fail
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
 SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 98d47e1beb8..0bbc96bcee5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1122,6 +1122,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	static bool registered_cleanup;
 	int			i;
 	int			free_slot = -1;
+	ReplicationState *candidate_state = NULL;
+	bool		initialized = false;
 
 	if (!registered_cleanup)
 	{
@@ -1168,34 +1170,43 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 		}
 
 		/* ok, found slot */
-		session_replication_state = curstate;
+		candidate_state = curstate;
 		break;
 	}
 
 
-	if (session_replication_state == NULL && free_slot == -1)
+	if (candidate_state == NULL && free_slot == -1)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("could not find free replication state slot for replication origin with ID %d",
 						node),
 				 errhint("Increase \"max_active_replication_origins\" and try again.")));
-	else if (session_replication_state == NULL)
+	else if (candidate_state == NULL)
 	{
 		/* initialize new slot */
-		session_replication_state = &replication_states[free_slot];
-		Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
-		Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
-		session_replication_state->roident = node;
+		candidate_state = &replication_states[free_slot];
+		Assert(candidate_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(candidate_state->local_lsn == InvalidXLogRecPtr);
+		candidate_state->roident = node;
+		initialized = true;
 	}
 
 
-	Assert(session_replication_state->roident != InvalidRepOriginId);
+	Assert(candidate_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
-		session_replication_state->acquired_by = MyProcPid;
-	else if (session_replication_state->acquired_by != acquired_by)
+		candidate_state->acquired_by = MyProcPid;
+	else if (candidate_state->acquired_by != acquired_by)
+	{
+		if (initialized)
+			candidate_state->roident = InvalidRepOriginId;
+
 		elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
 			 node, acquired_by);
+	}
+
+	/* Candidate slot looks ok, use it */
+	session_replication_state = candidate_state;
 
 	LWLockRelease(ReplicationOriginLock);
 
-- 
2.47.3



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-18 11:02  Amit Kapila <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2025-09-18 11:02 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>

On Thu, Sep 18, 2025 at 1:07 PM Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> Dear hackers,
>
> > I considered a test, please see attached files.
>

Few comments:
1. +step "s0_compare" {
+    SELECT s0.lsn < s1.lsn
+    FROM local_lsn_store as s0, local_lsn_store as s1
+    WHERE s0.session = 0 AND s1.session = 1;
+}

This appears to be a bit tricky to compare the values. Doing a
sequential scan won't guarantee the order of rows' appearance. Can't
we somehow get the two rows ordered by session_id and compare their
values?

2.
+ else if (candidate_state->acquired_by != acquired_by)
+ {
+ if (initialized)
+ candidate_state->roident = InvalidRepOriginId;
+
  elog(ERROR, "could not find replication state slot for replication
origin with OID %u which was acquired by %d",
  node, acquired_by);
+ }

This doesn't appear neat. Instead, how about checking this case before
setting current_state as shown in attached. If we do that, we
shouldn't even need new variables like current_state and initialized.
Additionally, as shown in attached, it is better to make this a
user-facing error by using ereport.

3. Merge all patches as I don't see the need to do any backpatch here.

-- 
With Regards,
Amit Kapila.

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 0bbc96bcee5..c93b6eb1798 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1169,6 +1169,15 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 							curstate->roident, curstate->acquired_by)));
 		}
 
+		else if (curstate->acquired_by != 0 &&
+				 curstate->acquired_by != acquired_by)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
+							node, acquired_by)));
+		}
+
 		/* ok, found slot */
 		candidate_state = curstate;
 		break;
@@ -1196,14 +1205,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 
 	if (acquired_by == 0)
 		candidate_state->acquired_by = MyProcPid;
-	else if (candidate_state->acquired_by != acquired_by)
-	{
-		if (initialized)
-			candidate_state->roident = InvalidRepOriginId;
-
-		elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
-			 node, acquired_by);
-	}
+	else
+		Assert(candidate_state->acquired_by == acquired_by);
 
 	/* Candidate slot looks ok, use it */
 	session_replication_state = candidate_state;


Attachments:

  [text/plain] v8_amit_1.txt (1.3K, 2-v8_amit_1.txt)
  download | inline diff:
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 0bbc96bcee5..c93b6eb1798 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1169,6 +1169,15 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 							curstate->roident, curstate->acquired_by)));
 		}
 
+		else if (curstate->acquired_by != 0 &&
+				 curstate->acquired_by != acquired_by)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
+							node, acquired_by)));
+		}
+
 		/* ok, found slot */
 		candidate_state = curstate;
 		break;
@@ -1196,14 +1205,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 
 	if (acquired_by == 0)
 		candidate_state->acquired_by = MyProcPid;
-	else if (candidate_state->acquired_by != acquired_by)
-	{
-		if (initialized)
-			candidate_state->roident = InvalidRepOriginId;
-
-		elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
-			 node, acquired_by);
-	}
+	else
+		Assert(candidate_state->acquired_by == acquired_by);
 
 	/* Candidate slot looks ok, use it */
 	session_replication_state = candidate_state;


^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-09-18 14:19  Hayato Kuroda (Fujitsu) <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Hayato Kuroda (Fujitsu) @ 2025-09-18 14:19 UTC (permalink / raw)
  To: 'Amit Kapila' <[email protected]>; +Cc: Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>

Dear Amit,

> 
> Few comments:
> 1. +step "s0_compare" {
> +    SELECT s0.lsn < s1.lsn
> +    FROM local_lsn_store as s0, local_lsn_store as s1
> +    WHERE s0.session = 0 AND s1.session = 1;
> +}
> 
> This appears to be a bit tricky to compare the values. Doing a
> sequential scan won't guarantee the order of rows' appearance. Can't
> we somehow get the two rows ordered by session_id and compare their
> values?

I considered another way to use the CTE for session 0. How do you feel?

> 2.
> + else if (candidate_state->acquired_by != acquired_by)
> + {
> + if (initialized)
> + candidate_state->roident = InvalidRepOriginId;
> +
>   elog(ERROR, "could not find replication state slot for replication
> origin with OID %u which was acquired by %d",
>   node, acquired_by);
> + }
> 
> This doesn't appear neat. Instead, how about checking this case before
> setting current_state as shown in attached. If we do that, we
> shouldn't even need new variables like current_state and initialized.

Your approach cannot work when the specified origin is not used yet after the
instance starts. In this case the origin has not exist in the replication_states
yet and new slot is initialized.

Per current understanding, two ERRORs are needed to avoid adding new variables;
first one is in the loop, and second one is in session_replication_state == NULL
case. Latter one indicates the case that origin is inactive but PID is specified
so different error message can be set.

> Additionally, as shown in attached, it is better to make this a
> user-facing error by using ereport.

Indeed, elog() were replaced with ereport().

> 3. Merge all patches as I don't see the need to do any backpatch here.

Sure.

Attached patch includes all changes. Thought?

Best regards,
Hayato Kuroda
FUJITSU LIMITED



Attachments:

  [application/octet-stream] v9-0001-pg_replication_origin_session_setup-pid-parameter.patch (15.4K, 2-v9-0001-pg_replication_origin_session_setup-pid-parameter.patch)
  download | inline diff:
From 7a41bc12c7464cc9dca9f3bfd6d0548538f64ed9 Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v9] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 contrib/test_decoding/Makefile                |  2 +-
 .../test_decoding/expected/repl_origin.out    | 79 +++++++++++++++++++
 contrib/test_decoding/expected/replorigin.out |  3 +
 contrib/test_decoding/meson.build             |  1 +
 contrib/test_decoding/specs/repl_origin.spec  | 56 +++++++++++++
 contrib/test_decoding/sql/replorigin.sql      |  3 +
 doc/src/sgml/func/func-admin.sgml             | 23 +++++-
 src/backend/catalog/system_functions.sql      |  9 ++-
 src/backend/replication/logical/origin.c      | 24 +++++-
 src/include/catalog/pg_proc.dat               |  2 +-
 10 files changed, 193 insertions(+), 9 deletions(-)
 create mode 100644 contrib/test_decoding/expected/repl_origin.out
 create mode 100644 contrib/test_decoding/specs/repl_origin.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 02e961f4d31..8aa80054944 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot slot_creation_error catalog_change_snapshot \
-	skip_snapshot_restore invalidation_distribution
+	skip_snapshot_restore invalidation_distribution repl_origin
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/repl_origin.out b/contrib/test_decoding/expected/repl_origin.out
new file mode 100644
index 00000000000..9ef80217b9d
--- /dev/null
+++ b/contrib/test_decoding/expected/repl_origin.out
@@ -0,0 +1,79 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s1_setup: 
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/repl_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s0_add_message: 
+    SELECT 1
+    FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+
+?column?
+--------
+       1
+(1 row)
+
+step s0_store_lsn: 
+    INSERT INTO local_lsn_store
+    SELECT 0, local_lsn FROM pg_replication_origin_status;
+
+step s1_add_message: 
+    SELECT 1
+    FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+
+?column?
+--------
+       1
+(1 row)
+
+step s1_store_lsn: 
+    INSERT INTO local_lsn_store
+    SELECT 1, local_lsn FROM pg_replication_origin_status;
+
+step s0_compare: 
+    SELECT s0.lsn < s1.lsn
+    FROM local_lsn_store as s0, local_lsn_store as s1
+    WHERE s0.session = 0 AND s1.session = 1;
+
+?column?
+--------
+t       
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..cb9d63e20c1 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 ERROR:  duplicate key value violates unique constraint "pg_replication_origin_roname_index"
 DETAIL:  Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure inactive origin cannot be set as session one if pid is specified
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR:  replication origin with ID 1 is inactive but PID -1 was specified
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
  pg_replication_origin_create 
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..6d687eeb2d7 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -64,6 +64,7 @@ tests += {
       'slot_creation_error',
       'skip_snapshot_restore',
       'invalidation_distribution',
+      'repl_origin',
     ],
     'regress_args': [
       '--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/repl_origin.spec b/contrib/test_decoding/specs/repl_origin.spec
new file mode 100644
index 00000000000..266ce553444
--- /dev/null
+++ b/contrib/test_decoding/specs/repl_origin.spec
@@ -0,0 +1,56 @@
+# Test multi-session replication origin manipulations; ensure local_lsn can be
+# updated by all attached sessions.
+
+setup
+{
+    SELECT pg_replication_origin_create('origin');
+    CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
+}
+
+teardown
+{
+    SELECT pg_replication_origin_drop('origin');
+    DROP TABLE local_lsn_store;
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
+step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s0_add_message" {
+    SELECT 1
+    FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+}
+step "s0_store_lsn" {
+    INSERT INTO local_lsn_store
+    SELECT 0, local_lsn FROM pg_replication_origin_status;
+}
+step "s0_compare" {
+    SELECT s0.lsn < s1.lsn
+    FROM local_lsn_store as s0, local_lsn_store as s1
+    WHERE s0.session = 0 AND s1.session = 1;
+}
+step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_setup" {
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/repl_origin/s0';
+}
+step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s1_add_message" {
+    SELECT 1
+    FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+}
+step "s1_store_lsn" {
+    INSERT INTO local_lsn_store
+    SELECT 1, local_lsn FROM pg_replication_origin_status;
+}
+step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+
+# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
+# commits a transaction and store the local_lsn of the replication origin.
+# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..17f2b888238 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 -- ensure duplicate creations fail
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 
+-- ensure inactive origin cannot be set as session one if pid is specified
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
 SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..1b465bc8ba7 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
-       </para></entry>
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
+       </para>
+       <caution>
+        <para>
+         When multiple processes share the same replication origin, it is critical
+         to maintain commit order to prevent data inconsistency. While processes
+         may send operations out of order, they must commit transactions in the
+         correct sequence to ensure proper replication consistency. The recommended workflow
+         for each worker is: set up the replication origin session with the first process's PID,
+         apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+         with the LSN and commit timestamp before committing, then commit the
+         transaction only if everything succeeded.
+        </para>
+       </caution>
+      </entry>
       </row>
 
       <row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
 AS 'pg_stat_reset_slru';
 
+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..a88b0e6d1ea 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1167,6 +1167,14 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 							curstate->roident, curstate->acquired_by)));
 		}
 
+		else if (curstate->acquired_by != acquired_by)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_IN_USE),
+					 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
+							node, acquired_by)));
+		}
+
 		/* ok, found slot */
 		session_replication_state = curstate;
 		break;
@@ -1181,6 +1189,13 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 				 errhint("Increase \"max_active_replication_origins\" and try again.")));
 	else if (session_replication_state == NULL)
 	{
+		/* The origin is not used but PID is specified */
+		if(acquired_by)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					 errmsg("replication origin with ID %d is inactive but PID %d was specified",
+							node, acquired_by)));
+
 		/* initialize new slot */
 		session_replication_state = &replication_states[free_slot];
 		Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
@@ -1193,9 +1208,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 
 	if (acquired_by == 0)
 		session_replication_state->acquired_by = MyProcPid;
-	else if (session_replication_state->acquired_by != acquired_by)
-		elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
-			 node, acquired_by);
+	else
+		Assert(session_replication_state->acquired_by == acquired_by);
 
 	LWLockRelease(ReplicationOriginLock);
 
@@ -1374,12 +1388,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;
 
 	replorigin_check_prerequisites(true, false);
 
 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);
 
 	replorigin_session_origin = origin;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },
 
 { oid => '6007', descr => 'teardown configured replication progress tracking',
-- 
2.47.3



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-12-23 08:54  Zhijie Hou (Fujitsu) <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  0 siblings, 2 replies; 46+ messages in thread

From: Zhijie Hou (Fujitsu) @ 2025-12-23 08:54 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; 'Amit Kapila' <[email protected]>; +Cc: Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>

Hi,

When testing the new parameter in pg_replication_origin_session_setup(), I
noticed a bug allowing the origin in use to be dropped. The issue arises when
two backends set up the same origin; if the second backend resets the origin
first, it resets the acquired_by flag regardless of whether the first backend is
using it. This allows the origin to be dropped, enabling the slot in shared
memory to be reused, which is unintended.

About the fix, simply adding a check for acquired_by field does not work,
because if the first backend resets the origin first, it still risks being
dropped while second backend uses it.

To fully resolve this, I tried to add a reference count (refcount) for the
origin. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.

Thanks to Kuroda-San for discussing and reviewing this patch off-list.

Best Regards,
Hou zj


Attachments:

  [application/octet-stream] v1-0001-Fix-unintended-drop-of-active-replication-origins.patch (8.2K, 2-v1-0001-Fix-unintended-drop-of-active-replication-origins.patch)
  download | inline diff:
From 8b39ed57f03d0a2f90f9b89572db2e1242a11dd0 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v1] Fix unintended drop of active replication origins

Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.

This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. As a result, the replication origin is only dropped
when the reference count reaches zero.
---
 .../expected/parallel_session_origin.out      | 44 +++++++++++
 .../specs/parallel_session_origin.spec        |  4 +
 src/backend/replication/logical/origin.c      | 79 ++++++++++++-------
 3 files changed, 97 insertions(+), 30 deletions(-)

diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..546d8933954 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -77,3 +77,47 @@ pg_replication_origin_session_reset
                                    
 (1 row)
 
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_drop s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s1_setup: 
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
+step s1_drop: SELECT pg_replication_origin_drop('origin');
+ERROR:  could not drop replication origin with ID 1, in use by another process
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..8e9c81e4419 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -49,8 +49,12 @@ step "s1_store_lsn" {
     SELECT 1, local_lsn FROM pg_replication_origin_status;
 }
 step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+step "s1_drop" { SELECT pg_replication_origin_drop('origin'); }
 
 # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
 # commits a transaction and store the local_lsn of the replication origin.
 # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
 permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+
+# Test that the origin cannot be dropped if any session is actively using it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_drop" "s1_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 2380f369578..536e524f4d5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
 	 */
 	int			acquired_by;
 
+	/* Number of backend that is currently using this origin. */
+	int			refcount;
+
 	/*
 	 * Condition variable that's signaled when acquired_by changes.
 	 */
@@ -383,16 +386,19 @@ restart:
 		if (state->roident == roident)
 		{
 			/* found our slot, is it busy? */
-			if (state->acquired_by != 0)
+			if (state->refcount > 0)
 			{
 				ConditionVariable *cv;
 
 				if (nowait)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_IN_USE),
-							 errmsg("could not drop replication origin with ID %d, in use by PID %d",
-									state->roident,
-									state->acquired_by)));
+							 (state->acquired_by != 0)
+							 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
+									  state->roident,
+									  state->acquired_by)
+							 : errmsg("could not drop replication origin with ID %d, in use by another process",
+									  state->roident)));
 
 				/*
 				 * We must wait and then retry.  Since we don't know which CV
@@ -1069,32 +1075,47 @@ replorigin_get_progress(RepOriginId node, bool flush)
 	return remote_lsn;
 }
 
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
 static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
 {
-	ConditionVariable *cv = NULL;
+	ConditionVariable *cv;
 
-	if (session_replication_state == NULL)
-		return;
+	Assert(session_replication_state != NULL);
 
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
-	if (session_replication_state->acquired_by == MyProcPid)
-	{
-		cv = &session_replication_state->origin_cv;
+	Assert(session_replication_state->refcount > 0);
 
+	/*
+	 * Reset the PID only if the current backend is the first to set up this
+	 * origin. This prevents resetting the PID when other backends are still
+	 * using this origin.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid)
 		session_replication_state->acquired_by = 0;
-		session_replication_state = NULL;
-	}
+
+	session_replication_state->refcount--;
+
+	cv = &session_replication_state->origin_cv;
+	session_replication_state = NULL;
 
 	LWLockRelease(ReplicationOriginLock);
 
-	if (cv)
-		ConditionVariableBroadcast(cv);
+	ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+	if (session_replication_state == NULL)
+		return;
+
+	replorigin_session_reset_internal();
 }
 
 /*
@@ -1205,9 +1226,17 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	Assert(session_replication_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
+	{
 		session_replication_state->acquired_by = MyProcPid;
+		Assert(session_replication_state->refcount == 0);
+	}
 	else
+	{
 		Assert(session_replication_state->acquired_by == acquired_by);
+		Assert(session_replication_state->refcount > 0);
+	}
+
+	session_replication_state->refcount++;
 
 	LWLockRelease(ReplicationOriginLock);
 
@@ -1224,8 +1253,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 void
 replorigin_session_reset(void)
 {
-	ConditionVariable *cv;
-
 	Assert(max_active_replication_origins != 0);
 
 	if (session_replication_state == NULL)
@@ -1233,15 +1260,7 @@ replorigin_session_reset(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
-	session_replication_state->acquired_by = 0;
-	cv = &session_replication_state->origin_cv;
-	session_replication_state = NULL;
-
-	LWLockRelease(ReplicationOriginLock);
-
-	ConditionVariableBroadcast(cv);
+	replorigin_session_reset_internal();
 }
 
 /*
-- 
2.51.1.windows.1



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2025-12-24 09:12  Amit Kapila <[email protected]>
  parent: Zhijie Hou (Fujitsu) <[email protected]>
  1 sibling, 0 replies; 46+ messages in thread

From: Amit Kapila @ 2025-12-24 09:12 UTC (permalink / raw)
  To: Zhijie Hou (Fujitsu) <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>

On Tue, Dec 23, 2025 at 2:24 PM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
>
> When testing the new parameter in pg_replication_origin_session_setup(), I
> noticed a bug allowing the origin in use to be dropped. The issue arises when
> two backends set up the same origin; if the second backend resets the origin
> first, it resets the acquired_by flag regardless of whether the first backend is
> using it. This allows the origin to be dropped, enabling the slot in shared
> memory to be reused, which is unintended.
>
> About the fix, simply adding a check for acquired_by field does not work,
> because if the first backend resets the origin first, it still risks being
> dropped while second backend uses it.
>
> To fully resolve this, I tried to add a reference count (refcount) for the
> origin. The count is incremented when a backend sets up the origin and
> decremented upon a reset. As a result, the replication origin is only dropped
> when the reference count reaches zero.
>
> Thanks to Kuroda-San for discussing and reviewing this patch off-list.
>

Thanks to both of you for the report and patch. I'll look into it
sometime during the next CF.

-- 
With Regards,
Amit Kapila.





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-05 09:45  shveta malik <[email protected]>
  parent: Zhijie Hou (Fujitsu) <[email protected]>
  1 sibling, 1 reply; 46+ messages in thread

From: shveta malik @ 2026-01-05 09:45 UTC (permalink / raw)
  To: Zhijie Hou (Fujitsu) <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; Amit Kapila <[email protected]>; Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>; shveta malik <[email protected]>

On Tue, Dec 23, 2025 at 2:24 PM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
>
> Hi,
>
> When testing the new parameter in pg_replication_origin_session_setup(), I
> noticed a bug allowing the origin in use to be dropped. The issue arises when
> two backends set up the same origin; if the second backend resets the origin
> first, it resets the acquired_by flag regardless of whether the first backend is
> using it. This allows the origin to be dropped, enabling the slot in shared
> memory to be reused, which is unintended.
>
> About the fix, simply adding a check for acquired_by field does not work,
> because if the first backend resets the origin first, it still risks being
> dropped while second backend uses it.
>
> To fully resolve this, I tried to add a reference count (refcount) for the
> origin. The count is incremented when a backend sets up the origin and
> decremented upon a reset. As a result, the replication origin is only dropped
> when the reference count reaches zero.
>
> Thanks to Kuroda-San for discussing and reviewing this patch off-list.
>

Thanks Hou-San and Kuroda-San.

What should be the expected behavior when Session1 resets the origin
(changing acquired_pid from its own PID to 0), while Session2 is
already connected to the origin and Session3 also attempts to reuse
the same origin?

Currently it asserts:

Session1:
select pg_replication_origin_create('origin');
SELECT pg_replication_origin_session_setup('origin');

Session2:
SELECT pg_replication_origin_session_setup('origin',48028);

Session1:
SELECT pg_replication_origin_session_reset();

Session3:
SELECT pg_replication_origin_session_setup('origin');
This asserts at:
TRAP: failed Assert("session_replication_state->refcount == 0"), File:
"origin.c", Line: 1231, PID: 48037

thanks
Shveta






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-05 10:30  shveta malik <[email protected]>
  parent: shveta malik <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: shveta malik @ 2026-01-05 10:30 UTC (permalink / raw)
  To: Zhijie Hou (Fujitsu) <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; Amit Kapila <[email protected]>; Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>; shveta malik <[email protected]>

On Mon, Jan 5, 2026 at 3:15 PM shveta malik <[email protected]> wrote:
>
> On Tue, Dec 23, 2025 at 2:24 PM Zhijie Hou (Fujitsu)
> <[email protected]> wrote:
> >
> > Hi,
> >
> > When testing the new parameter in pg_replication_origin_session_setup(), I
> > noticed a bug allowing the origin in use to be dropped. The issue arises when
> > two backends set up the same origin; if the second backend resets the origin
> > first, it resets the acquired_by flag regardless of whether the first backend is
> > using it. This allows the origin to be dropped, enabling the slot in shared
> > memory to be reused, which is unintended.
> >
> > About the fix, simply adding a check for acquired_by field does not work,
> > because if the first backend resets the origin first, it still risks being
> > dropped while second backend uses it.
> >
> > To fully resolve this, I tried to add a reference count (refcount) for the
> > origin. The count is incremented when a backend sets up the origin and
> > decremented upon a reset. As a result, the replication origin is only dropped
> > when the reference count reaches zero.
> >
> > Thanks to Kuroda-San for discussing and reviewing this patch off-list.
> >
>
> Thanks Hou-San and Kuroda-San.
>
> What should be the expected behavior when Session1 resets the origin
> (changing acquired_pid from its own PID to 0), while Session2 is
> already connected to the origin and Session3 also attempts to reuse
> the same origin?
>
> Currently it asserts:
>
> Session1:
> select pg_replication_origin_create('origin');
> SELECT pg_replication_origin_session_setup('origin');
>
> Session2:
> SELECT pg_replication_origin_session_setup('origin',48028);
>
> Session1:
> SELECT pg_replication_origin_session_reset();
>
> Session3:
> SELECT pg_replication_origin_session_setup('origin');
> This asserts at:
> TRAP: failed Assert("session_replication_state->refcount == 0"), File:
> "origin.c", Line: 1231, PID: 48037
>

I checked the behavior on HEAD. Session3 is able to set up the origin
and sets its own PID in acquired_pid. But it is unclear to me which
PID should be recorded in acquired_pid - Session2’s PID, since it set
up the origin earlier, or Session3’s PID. Or does this even make any
difference?

I found one more related issue on HEAD, sharing it here:

When the first backend creates and sets up the origin, followed by a
second backend setting it up, and then the first backend resets it
while the second backend attempts to drop it, an assertion is
triggered:
TRAP: failed Assert("session_replication_state->roident !=
InvalidRepOriginId"), File: "origin.c", Line: 1257, PID: 48438

thanks
Shveta






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-09 09:00  Amit Kapila <[email protected]>
  parent: shveta malik <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2026-01-09 09:00 UTC (permalink / raw)
  To: shveta malik <[email protected]>; +Cc: Zhijie Hou (Fujitsu) <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>

On Mon, Jan 5, 2026 at 4:00 PM shveta malik <[email protected]> wrote:
>
> On Mon, Jan 5, 2026 at 3:15 PM shveta malik <[email protected]> wrote:
> >
> > On Tue, Dec 23, 2025 at 2:24 PM Zhijie Hou (Fujitsu)
> > <[email protected]> wrote:
> > >
> > > Hi,
> > >
> > > When testing the new parameter in pg_replication_origin_session_setup(), I
> > > noticed a bug allowing the origin in use to be dropped. The issue arises when
> > > two backends set up the same origin; if the second backend resets the origin
> > > first, it resets the acquired_by flag regardless of whether the first backend is
> > > using it. This allows the origin to be dropped, enabling the slot in shared
> > > memory to be reused, which is unintended.
> > >
> > > About the fix, simply adding a check for acquired_by field does not work,
> > > because if the first backend resets the origin first, it still risks being
> > > dropped while second backend uses it.
> > >
> > > To fully resolve this, I tried to add a reference count (refcount) for the
> > > origin. The count is incremented when a backend sets up the origin and
> > > decremented upon a reset. As a result, the replication origin is only dropped
> > > when the reference count reaches zero.
> > >
> > > Thanks to Kuroda-San for discussing and reviewing this patch off-list.
> > >
> >
> > Thanks Hou-San and Kuroda-San.
> >
> > What should be the expected behavior when Session1 resets the origin
> > (changing acquired_pid from its own PID to 0), while Session2 is
> > already connected to the origin and Session3 also attempts to reuse
> > the same origin?
> >
> > Currently it asserts:
> >
> > Session1:
> > select pg_replication_origin_create('origin');
> > SELECT pg_replication_origin_session_setup('origin');
> >
> > Session2:
> > SELECT pg_replication_origin_session_setup('origin',48028);
> >
> > Session1:
> > SELECT pg_replication_origin_session_reset();
> >
> > Session3:
> > SELECT pg_replication_origin_session_setup('origin');
> > This asserts at:
> > TRAP: failed Assert("session_replication_state->refcount == 0"), File:
> > "origin.c", Line: 1231, PID: 48037
> >
>
> I checked the behavior on HEAD. Session3 is able to set up the origin
> and sets its own PID in acquired_pid. But it is unclear to me which
> PID should be recorded in acquired_pid - Session2’s PID, since it set
> up the origin earlier, or Session3’s PID. Or does this even make any
> difference?
>
> I found one more related issue on HEAD, sharing it here:
>
> When the first backend creates and sets up the origin, followed by a
> second backend setting it up, and then the first backend resets it
> while the second backend attempts to drop it, an assertion is
> triggered:
> TRAP: failed Assert("session_replication_state->roident !=
> InvalidRepOriginId"), File: "origin.c", Line: 1257, PID: 48438
>

Can we address these problems by prohibiting leader worker to reset
when pa workers are still associated with the origin? The way for
leader to know if pa workers are associated with origin is by checking
following condition: acquired_by == MyProcpid AND refcount > 1.

-- 
With Regards,
Amit Kapila.






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-09 11:28  Hayato Kuroda (Fujitsu) <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Hayato Kuroda (Fujitsu) @ 2026-01-09 11:28 UTC (permalink / raw)
  To: 'Amit Kapila' <[email protected]>; shveta malik <[email protected]>; +Cc: Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>

Dear Amit, Shveta,

> > >
> > > Thanks Hou-San and Kuroda-San.
> > >
> > > What should be the expected behavior when Session1 resets the origin
> > > (changing acquired_pid from its own PID to 0), while Session2 is
> > > already connected to the origin and Session3 also attempts to reuse
> > > the same origin?
> > >
> > > Currently it asserts:
> > >
> > > Session1:
> > > select pg_replication_origin_create('origin');
> > > SELECT pg_replication_origin_session_setup('origin');
> > >
> > > Session2:
> > > SELECT pg_replication_origin_session_setup('origin',48028);
> > >
> > > Session1:
> > > SELECT pg_replication_origin_session_reset();
> > >
> > > Session3:
> > > SELECT pg_replication_origin_session_setup('origin');
> > > This asserts at:
> > > TRAP: failed Assert("session_replication_state->refcount == 0"), File:
> > > "origin.c", Line: 1231, PID: 48037
> > >

FYI, this happened because v1 assumed refcount was 0 if acquired_by was 0.
But your proposed scenario met it.

> > I checked the behavior on HEAD. Session3 is able to set up the origin
> > and sets its own PID in acquired_pid. But it is unclear to me which
> > PID should be recorded in acquired_pid - Session2’s PID, since it set
> > up the origin earlier, or Session3’s PID. Or does this even make any
> > difference?

To clarify, I think the behavior on HEAD is not correct. The backend should
acquire the active origin if it expressly specifies the PID. Otherwise, users
may acquire unintentionally and advance it.

> Can we address these problems by prohibiting leader worker to reset
> when pa workers are still associated with the origin? The way for
> leader to know if pa workers are associated with origin is by checking
> following condition: acquired_by == MyProcpid AND refcount > 1.

I think it's okay. IIUC, the idea is to avoid that active origin has invalid
acquired_by attribute. The replication origin was extended to support parallel
apply of logical replication, and it is reasonable to force the same approach.
Attached 0001 implemented that.

One concern with the implementation is that acquired_by can be zero if the process
exits without releasing the origin; this can happen if the first acquired process
exits while the second is still using it.
Regarding our logical replication, it won't be problematic because the leader
worker ensures all parallel workers finish before it exits.

To address the issue, I propose that another process should not be able to
acquire the origin if the acquired_by of the active origin is 0. The problem
should be resolved within the SQL interface, since it occurs there.

Best regards,
Hayato Kuroda
FUJITSU LIMITED



Attachments:

  [application/octet-stream] v3-0001-Fix-unintended-drop-of-active-replication-origins.patch (8.9K, 2-v3-0001-Fix-unintended-drop-of-active-replication-origins.patch)
  download | inline diff:
From 5383f9d8c801e48192d59d0b1f07c88c2b595de2 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v3 1/2] Fix unintended drop of active replication origins

Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.

This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. Also, the backend process which firstly acquired the
origin does not release till other acquiring process releases it. This ensures
that acquired_by flag cannot be zero while processes are actively using it.
---
 .../expected/parallel_session_origin.out      | 46 ++++++++++-
 .../specs/parallel_session_origin.spec        |  6 +-
 src/backend/replication/logical/origin.c      | 80 +++++++++++++------
 3 files changed, 105 insertions(+), 27 deletions(-)

diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..07c1e3622ce 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -1,6 +1,6 @@
 Parsed test spec with 2 sessions
 
-starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset
 step s0_setup: SELECT pg_replication_origin_session_setup('origin');
 pg_replication_origin_session_setup
 -----------------------------------
@@ -65,15 +65,59 @@ step s0_compare:
 t       
 (1 row)
 
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
 step s0_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s1_setup: 
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+ERROR:  another process is acquiring the replication origin
 step s1_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..2253a7a14eb 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
 # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
 # commits a transaction and store the local_lsn of the replication origin.
 # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
-permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset"
+
+# Test that the origin cannot be released if another session is actively using
+# it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 04bc704a332..dff2c073def 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
 	 */
 	int			acquired_by;
 
+	/* Number of backend that is currently using this origin. */
+	int			refcount;
+
 	/*
 	 * Condition variable that's signaled when acquired_by changes.
 	 */
@@ -1069,32 +1072,47 @@ replorigin_get_progress(RepOriginId node, bool flush)
 	return remote_lsn;
 }
 
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
 static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
 {
-	ConditionVariable *cv = NULL;
+	ConditionVariable *cv;
 
-	if (session_replication_state == NULL)
-		return;
+	Assert(session_replication_state != NULL);
 
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
-	if (session_replication_state->acquired_by == MyProcPid)
-	{
-		cv = &session_replication_state->origin_cv;
+	Assert(session_replication_state->refcount > 0);
 
+	/*
+	 * Reset the PID only if the current backend is the first to set up this
+	 * origin. This prevents resetting the PID when other backends are still
+	 * using this origin.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid)
 		session_replication_state->acquired_by = 0;
-		session_replication_state = NULL;
-	}
+
+	session_replication_state->refcount--;
+
+	cv = &session_replication_state->origin_cv;
+	session_replication_state = NULL;
 
 	LWLockRelease(ReplicationOriginLock);
 
-	if (cv)
-		ConditionVariableBroadcast(cv);
+	ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+	if (session_replication_state == NULL)
+		return;
+
+	replorigin_session_reset_internal();
 }
 
 /*
@@ -1205,9 +1223,17 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	Assert(session_replication_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
+	{
 		session_replication_state->acquired_by = MyProcPid;
+		Assert(session_replication_state->refcount == 0);
+	}
 	else
+	{
 		Assert(session_replication_state->acquired_by == acquired_by);
+		Assert(session_replication_state->refcount > 0);
+	}
+
+	session_replication_state->refcount++;
 
 	LWLockRelease(ReplicationOriginLock);
 
@@ -1224,8 +1250,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 void
 replorigin_session_reset(void)
 {
-	ConditionVariable *cv;
-
 	Assert(max_active_replication_origins != 0);
 
 	if (session_replication_state == NULL)
@@ -1233,15 +1257,21 @@ replorigin_session_reset(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
-	session_replication_state->acquired_by = 0;
-	cv = &session_replication_state->origin_cv;
-	session_replication_state = NULL;
-
-	LWLockRelease(ReplicationOriginLock);
+	/*
+	 * The replication origin cannot be reset if the replication origin is
+	 * firstly acquired by this backend and other processes are actively using
+	 * now. This can cause acquired_by to be zero and active replication origin
+	 * might be dropped.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid &&
+		session_replication_state->refcount > 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg_plural("another process is acquiring the replication origin",
+							   "other processes are acquiring the replication origin",
+							   session_replication_state->refcount - 1)));
 
-	ConditionVariableBroadcast(cv);
+	replorigin_session_reset_internal();
 }
 
 /*
-- 
2.47.3



  [application/octet-stream] v3-0002-Disallow-setting-the-replication-origin-if-it-is-.patch (1.2K, 3-v3-0002-Disallow-setting-the-replication-origin-if-it-is-.patch)
  download | inline diff:
From be56cec12647052135a35a78c0d5a87dd5eebf68 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Fri, 9 Jan 2026 19:31:57 +0900
Subject: [PATCH v3 2/2] Disallow setting the replication origin if it is being
 used PID is not stored

---
 src/backend/replication/logical/origin.c | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index dff2c073def..f373e3df2f9 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1192,6 +1192,19 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 							node, acquired_by)));
 		}
 
+		/*
+		 * PID was not noted in the origin. This can happen the process
+		 * originally acquired the origin exits without releasing. To make the
+		 * staus clean again, other processes cannot acquire the origin till
+		 * all using ones release.
+		 */
+		else if (curstate->acquired_by == 0 && curstate->refcount > 0)
+		{
+			elog(ERROR,
+				 "replication origin with ID %d is already active for another process",
+				 node);
+		}
+
 		/* ok, found slot */
 		session_replication_state = curstate;
 		break;
-- 
2.47.3



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-12 06:01  shveta malik <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: shveta malik @ 2026-01-12 06:01 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: Amit Kapila <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>; shveta malik <[email protected]>

On Fri, Jan 9, 2026 at 4:58 PM Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> Dear Amit, Shveta,
>
> > > >
> > > > Thanks Hou-San and Kuroda-San.
> > > >
> > > > What should be the expected behavior when Session1 resets the origin
> > > > (changing acquired_pid from its own PID to 0), while Session2 is
> > > > already connected to the origin and Session3 also attempts to reuse
> > > > the same origin?
> > > >
> > > > Currently it asserts:
> > > >
> > > > Session1:
> > > > select pg_replication_origin_create('origin');
> > > > SELECT pg_replication_origin_session_setup('origin');
> > > >
> > > > Session2:
> > > > SELECT pg_replication_origin_session_setup('origin',48028);
> > > >
> > > > Session1:
> > > > SELECT pg_replication_origin_session_reset();
> > > >
> > > > Session3:
> > > > SELECT pg_replication_origin_session_setup('origin');
> > > > This asserts at:
> > > > TRAP: failed Assert("session_replication_state->refcount == 0"), File:
> > > > "origin.c", Line: 1231, PID: 48037
> > > >
>
> FYI, this happened because v1 assumed refcount was 0 if acquired_by was 0.
> But your proposed scenario met it.
>
> > > I checked the behavior on HEAD. Session3 is able to set up the origin
> > > and sets its own PID in acquired_pid. But it is unclear to me which
> > > PID should be recorded in acquired_pid - Session2’s PID, since it set
> > > up the origin earlier, or Session3’s PID. Or does this even make any
> > > difference?
>
> To clarify, I think the behavior on HEAD is not correct. The backend should
> acquire the active origin if it expressly specifies the PID. Otherwise, users
> may acquire unintentionally and advance it.

I agree.

>
> > Can we address these problems by prohibiting leader worker to reset
> > when pa workers are still associated with the origin? The way for
> > leader to know if pa workers are associated with origin is by checking
> > following condition: acquired_by == MyProcpid AND refcount > 1.
>
> I think it's okay. IIUC, the idea is to avoid that active origin has invalid
> acquired_by attribute. The replication origin was extended to support parallel
> apply of logical replication, and it is reasonable to force the same approach.
> Attached 0001 implemented that.
>
> One concern with the implementation is that acquired_by can be zero if the process
> exits without releasing the origin; this can happen if the first acquired process
> exits while the second is still using it.
> Regarding our logical replication, it won't be problematic because the leader
> worker ensures all parallel workers finish before it exits.
>
> To address the issue, I propose that another process should not be able to
> acquire the origin if the acquired_by of the active origin is 0. The problem
> should be resolved within the SQL interface, since it occurs there.
>

+1.

Please find a few comments:

1)
+ /*
+ * The replication origin cannot be reset if the replication origin is
+ * firstly acquired by this backend and other processes are actively using
+ * now. This can cause acquired_by to be zero and active replication origin
+ * might be dropped.
+ */
+ if (session_replication_state->acquired_by == MyProcPid &&
+ session_replication_state->refcount > 1)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg_plural("another process is acquiring the replication origin",
+    "other processes are acquiring the replication origin",
+

Since user is not aware of internal acquired_by logic, the error might
not make much sense to him as to why one session is able to reset
while another is not. Shall we make it:

ERROR:  cannot reset replication origin "origin_name" while it is
still shared by other processes
DETAIL:  the current session is the first process for this replication
origin, and other processes are sharing it.
HINT:  ensure this replication origin is reset in all other processes first.

2)
When the first session leaves, while the second session is still using
origin, the third session is able to drop the origin which is not
right.
I think replorigin_state_clear() needs a change.
'if (state->acquired_by != 0)' check should be replaced by 'if
(state->refcount > 0)'

Patch 001 had correct changes in replorigin_state_clear(), IMO we
still need those

3)
When first session leaves, while second session is still using origin,
now correctly third session is not able to join it. It gives error:
postgres=# SELECT pg_replication_origin_session_setup('origin');
ERROR: replication origin with ID 1 is already active for another process

Error is not very informative provided the fact that now sharing is
allowed. Shall it be:

ERROR:  replication origin "origin_name" cannot be acquired while it
is still in use
DETAIL:  the process that first acquired this origin exited without
releasing it.
HINT:  wait until all processes sharing this origin have released it.
Or
HINT:  ensure this replication origin is reset in all other processes first.

4)
+ /* Number of backend that is currently using this origin. */

Number of backend that is --> Count of backends that are

thanks
Shveta






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-13 02:56  Amit Kapila <[email protected]>
  parent: shveta malik <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2026-01-13 02:56 UTC (permalink / raw)
  To: shveta malik <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>; [email protected] <[email protected]>

On Mon, Jan 12, 2026 at 11:31 AM shveta malik <[email protected]> wrote:
>
> Please find a few comments:
>
> 1)
> + /*
> + * The replication origin cannot be reset if the replication origin is
> + * firstly acquired by this backend and other processes are actively using
> + * now. This can cause acquired_by to be zero and active replication origin
> + * might be dropped.
> + */
> + if (session_replication_state->acquired_by == MyProcPid &&
> + session_replication_state->refcount > 1)
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg_plural("another process is acquiring the replication origin",
> +    "other processes are acquiring the replication origin",
> +
>
> Since user is not aware of internal acquired_by logic, the error might
> not make much sense to him as to why one session is able to reset
> while another is not. Shall we make it:
>
> ERROR:  cannot reset replication origin "origin_name" while it is
> still shared by other processes
> DETAIL:  the current session is the first process for this replication
> origin, and other processes are sharing it.
> HINT:  ensure this replication origin is reset in all other processes first.
>

How about a slightly tweaked version of these messages:
ERROR: cannot reset replication origin "origin_name" because it is
still in use by other processes
DETAIL: This session is the first process for this replication origin,
and other processes are currently sharing it.
HINT: Reset the replication origin in all other processes before retrying.

> 2)
> When the first session leaves, while the second session is still using
> origin, the third session is able to drop the origin which is not
> right.
> I think replorigin_state_clear() needs a change.
> 'if (state->acquired_by != 0)' check should be replaced by 'if
> (state->refcount > 0)'
>
> Patch 001 had correct changes in replorigin_state_clear(), IMO we
> still need those
>
> 3)
> When first session leaves, while second session is still using origin,
> now correctly third session is not able to join it. It gives error:
> postgres=# SELECT pg_replication_origin_session_setup('origin');
> ERROR: replication origin with ID 1 is already active for another process
>
> Error is not very informative provided the fact that now sharing is
> allowed. Shall it be:
>

Yeah, sharing is allowed but only when used in parallel context by
passing PID. I think a slightly modified version of the above message
such as: "replication origin with ID 1 is already active in another
process" should be sufficient.

-- 
With Regards,
Amit Kapila.






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-13 04:09  Hayato Kuroda (Fujitsu) <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Hayato Kuroda (Fujitsu) @ 2026-01-13 04:09 UTC (permalink / raw)
  To: 'Amit Kapila' <[email protected]>; shveta malik <[email protected]>; [email protected] <[email protected]>; +Cc: Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

Dear Amit, Shveta,

Thanks for suggestions. PSA new patches.

> > Since user is not aware of internal acquired_by logic, the error might
> > not make much sense to him as to why one session is able to reset
> > while another is not. Shall we make it:
> >
> > ERROR:  cannot reset replication origin "origin_name" while it is
> > still shared by other processes
> > DETAIL:  the current session is the first process for this replication
> > origin, and other processes are sharing it.
> > HINT:  ensure this replication origin is reset in all other processes first.
> >
> 
> How about a slightly tweaked version of these messages:
> ERROR: cannot reset replication origin "origin_name" because it is
> still in use by other processes
> DETAIL: This session is the first process for this replication origin,
> and other processes are currently sharing it.
> HINT: Reset the replication origin in all other processes before retrying.

I followed the Amit's idea, but the origin ID is used instead of origin name.
I read other functions, and the name is directly output when 1) the specified
origin does not exist or 2) the name is reserved. We seem to use ID as much as
possible.

> 
> > 2)
> > When the first session leaves, while the second session is still using
> > origin, the third session is able to drop the origin which is not
> > right.
> > I think replorigin_state_clear() needs a change.
> > 'if (state->acquired_by != 0)' check should be replaced by 'if
> > (state->refcount > 0)'
> >
> > Patch 001 had correct changes in replorigin_state_clear(), IMO we
> > still need those

Good finding. I put it in 0002 because it handles some cases related with
acquired_by = 0.

> >
> > 3)
> > When first session leaves, while second session is still using origin,
> > now correctly third session is not able to join it. It gives error:
> > postgres=# SELECT pg_replication_origin_session_setup('origin');
> > ERROR: replication origin with ID 1 is already active for another process
> >
> > Error is not very informative provided the fact that now sharing is
> > allowed. Shall it be:
> >
> 
> Yeah, sharing is allowed but only when used in parallel context by
> passing PID. I think a slightly modified version of the above message
> such as: "replication origin with ID 1 is already active in another
> process" should be sufficient.

Fixed but ereport() was used because I thought this is usar-facing. Feel free to
change to elog() again based on your matter.

Best regards,
Hayato Kuroda
FUJITSU LIMITED



Attachments:

  [application/octet-stream] v4-0001-Fix-unintended-drop-of-active-replication-origins.patch (9.1K, 2-v4-0001-Fix-unintended-drop-of-active-replication-origins.patch)
  download | inline diff:
From 405ef344f33761eed6e8351937d7a416f385299e Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v4 1/2] Fix unintended drop of active replication origins

Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.

This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. Also, the backend process which firstly acquired the
origin does not release till other acquiring process releases it. This ensures
that acquired_by flag cannot be zero while processes are actively using it.
---
 .../expected/parallel_session_origin.out      | 46 ++++++++++-
 .../specs/parallel_session_origin.spec        |  6 +-
 src/backend/replication/logical/origin.c      | 81 +++++++++++++------
 3 files changed, 106 insertions(+), 27 deletions(-)

diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..8e41831fcbc 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -1,6 +1,6 @@
 Parsed test spec with 2 sessions
 
-starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset
 step s0_setup: SELECT pg_replication_origin_session_setup('origin');
 pg_replication_origin_session_setup
 -----------------------------------
@@ -65,15 +65,59 @@ step s0_compare:
 t       
 (1 row)
 
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
 step s0_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s1_setup: 
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+ERROR:  cannot reset replication origin with ID 1 because it is still in use by other processes
 step s1_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..2253a7a14eb 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
 # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
 # commits a transaction and store the local_lsn of the replication origin.
 # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
-permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset"
+
+# Test that the origin cannot be released if another session is actively using
+# it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 04bc704a332..389d2b38d20 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
 	 */
 	int			acquired_by;
 
+	/* Count of backends that are currently using this origin. */
+	int			refcount;
+
 	/*
 	 * Condition variable that's signaled when acquired_by changes.
 	 */
@@ -1069,32 +1072,47 @@ replorigin_get_progress(RepOriginId node, bool flush)
 	return remote_lsn;
 }
 
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
 static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
 {
-	ConditionVariable *cv = NULL;
+	ConditionVariable *cv;
 
-	if (session_replication_state == NULL)
-		return;
+	Assert(session_replication_state != NULL);
 
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
-	if (session_replication_state->acquired_by == MyProcPid)
-	{
-		cv = &session_replication_state->origin_cv;
+	Assert(session_replication_state->refcount > 0);
 
+	/*
+	 * Reset the PID only if the current backend is the first to set up this
+	 * origin. This prevents resetting the PID when other backends are still
+	 * using this origin.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid)
 		session_replication_state->acquired_by = 0;
-		session_replication_state = NULL;
-	}
+
+	session_replication_state->refcount--;
+
+	cv = &session_replication_state->origin_cv;
+	session_replication_state = NULL;
 
 	LWLockRelease(ReplicationOriginLock);
 
-	if (cv)
-		ConditionVariableBroadcast(cv);
+	ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+	if (session_replication_state == NULL)
+		return;
+
+	replorigin_session_reset_internal();
 }
 
 /*
@@ -1205,9 +1223,17 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	Assert(session_replication_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
+	{
 		session_replication_state->acquired_by = MyProcPid;
+		Assert(session_replication_state->refcount == 0);
+	}
 	else
+	{
 		Assert(session_replication_state->acquired_by == acquired_by);
+		Assert(session_replication_state->refcount > 0);
+	}
+
+	session_replication_state->refcount++;
 
 	LWLockRelease(ReplicationOriginLock);
 
@@ -1224,8 +1250,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 void
 replorigin_session_reset(void)
 {
-	ConditionVariable *cv;
-
 	Assert(max_active_replication_origins != 0);
 
 	if (session_replication_state == NULL)
@@ -1233,15 +1257,22 @@ replorigin_session_reset(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
-	session_replication_state->acquired_by = 0;
-	cv = &session_replication_state->origin_cv;
-	session_replication_state = NULL;
-
-	LWLockRelease(ReplicationOriginLock);
+	/*
+	 * The replication origin cannot be reset if the replication origin is
+	 * firstly acquired by this backend and other processes are actively using
+	 * now. This can cause acquired_by to be zero and active replication
+	 * origin might be dropped.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid &&
+		session_replication_state->refcount > 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
+						session_replication_state->roident),
+				 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
+				 errhint("Reset the replication origin in all other processes before retrying.")));
 
-	ConditionVariableBroadcast(cv);
+	replorigin_session_reset_internal();
 }
 
 /*
-- 
2.47.3



  [application/octet-stream] v4-0002-Handle-corner-cases-related-with-origin.patch (2.4K, 3-v4-0002-Handle-corner-cases-related-with-origin.patch)
  download | inline diff:
From 7ba371e30e4b2c13734a2dfeee3cf06d41ab1c1d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Fri, 9 Jan 2026 19:31:57 +0900
Subject: [PATCH v4 2/2] Handle corner cases related with origin

The attribute acquired_by can still be 0 while processes are acquiring the origin.
This can happen if the first process exits while holding the origin. This commit
handles corner cases related to it:

 - rejects acquiring origin if it does not have a valid acquired_by
   but counter > 0.
 - ensures origins cannot be dropped if the counter > 0.
---
 src/backend/replication/logical/origin.c | 23 +++++++++++++++++++----
 1 file changed, 19 insertions(+), 4 deletions(-)

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 389d2b38d20..b9132b3475d 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -386,16 +386,19 @@ restart:
 		if (state->roident == roident)
 		{
 			/* found our slot, is it busy? */
-			if (state->acquired_by != 0)
+			if (state->refcount > 0)
 			{
 				ConditionVariable *cv;
 
 				if (nowait)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_IN_USE),
-							 errmsg("could not drop replication origin with ID %d, in use by PID %d",
-									state->roident,
-									state->acquired_by)));
+							 (state->acquired_by != 0)
+							 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
+									  state->roident,
+									  state->acquired_by)
+							 : errmsg("could not drop replication origin with ID %d, in use by another process",
+									  state->roident)));
 
 				/*
 				 * We must wait and then retry.  Since we don't know which CV
@@ -1192,6 +1195,18 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 							node, acquired_by)));
 		}
 
+		/*
+		 * PID was not noted in the origin. This can happen the process
+		 * originally acquired the origin exits without releasing. To make the
+		 * staus clean again, other processes cannot acquire the origin till
+		 * all using ones release.
+		 */
+		else if (curstate->acquired_by == 0 && curstate->refcount > 0)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_IN_USE),
+					 errmsg("replication origin with ID %d is already active in another process",
+							curstate->roident)));
+
 		/* ok, found slot */
 		session_replication_state = curstate;
 		break;
-- 
2.47.3



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-13 05:34  shveta malik <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: shveta malik @ 2026-01-13 05:34 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: Amit Kapila <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>; shveta malik <[email protected]>

On Tue, Jan 13, 2026 at 9:39 AM Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> Dear Amit, Shveta,
>
> Thanks for suggestions. PSA new patches.
>
> > > Since user is not aware of internal acquired_by logic, the error might
> > > not make much sense to him as to why one session is able to reset
> > > while another is not. Shall we make it:
> > >
> > > ERROR:  cannot reset replication origin "origin_name" while it is
> > > still shared by other processes
> > > DETAIL:  the current session is the first process for this replication
> > > origin, and other processes are sharing it.
> > > HINT:  ensure this replication origin is reset in all other processes first.
> > >
> >
> > How about a slightly tweaked version of these messages:
> > ERROR: cannot reset replication origin "origin_name" because it is
> > still in use by other processes
> > DETAIL: This session is the first process for this replication origin,
> > and other processes are currently sharing it.
> > HINT: Reset the replication origin in all other processes before retrying.
>
> I followed the Amit's idea, but the origin ID is used instead of origin name.
> I read other functions, and the name is directly output when 1) the specified
> origin does not exist or 2) the name is reserved. We seem to use ID as much as
> possible.
>
> >
> > > 2)
> > > When the first session leaves, while the second session is still using
> > > origin, the third session is able to drop the origin which is not
> > > right.
> > > I think replorigin_state_clear() needs a change.
> > > 'if (state->acquired_by != 0)' check should be replaced by 'if
> > > (state->refcount > 0)'
> > >
> > > Patch 001 had correct changes in replorigin_state_clear(), IMO we
> > > still need those
>
> Good finding. I put it in 0002 because it handles some cases related with
> acquired_by = 0.
>
> > >
> > > 3)
> > > When first session leaves, while second session is still using origin,
> > > now correctly third session is not able to join it. It gives error:
> > > postgres=# SELECT pg_replication_origin_session_setup('origin');
> > > ERROR: replication origin with ID 1 is already active for another process
> > >
> > > Error is not very informative provided the fact that now sharing is
> > > allowed. Shall it be:
> > >
> >
> > Yeah, sharing is allowed but only when used in parallel context by
> > passing PID. I think a slightly modified version of the above message
> > such as: "replication origin with ID 1 is already active in another
> > process" should be sufficient.
>
> Fixed but ereport() was used because I thought this is usar-facing. Feel free to
> change to elog() again based on your matter.
>

Thanks for the patch. All scenarios (known to me) seem to work well.
Few trivial comments:

1)
+step s1_reset: SELECT pg_replication_origin_session_reset();

After the above step, please add a step to attempt dropping the
replication origin. The original issue was that once s1 releases the
origin, it becomes eligible for dropping, so the test should
explicitly verify this behavior.

2)
Also before the above step, please add a step where s0 tries to reset
the origin while s1 is still acquiring it. It is needed to cover the
path where s0 should fail to release origin.

3)
+ /*
+ * Reset the PID only if the current backend is the first to set up this
+ * origin. This prevents resetting the PID when other backends are still
+ * using this origin.
+ */

The second sentence seems contradictory to the logic, as we are
resetting the PID here regardless of whether other backends are using
it.

Suggestion:
/*
 * Reset the PID only if the current backend is the first to set up this
 * origin. This avoids clearing the first process's PID when any other
 * session releases the origin.
 */

4)
+ * PID was not noted in the origin. This can happen the process
+ * originally acquired the origin exits without releasing. To make the
+ * staus clean again, other processes cannot acquire the origin till
+ * all using ones release.
+ */

Slight tweak:

/*
 * The origin is in use, but PID is not recorded. This can happen if
 * the process that originally acquired the origin exited without releasing it.
 * To ensure correctness, other processes cannot acquire the origin
 * until all processes currently using it have released it.

thanks
Shveta






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* RE: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-14 03:43  Hayato Kuroda (Fujitsu) <[email protected]>
  parent: shveta malik <[email protected]>
  0 siblings, 2 replies; 46+ messages in thread

From: Hayato Kuroda (Fujitsu) @ 2026-01-14 03:43 UTC (permalink / raw)
  To: 'shveta malik' <[email protected]>; +Cc: Amit Kapila <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

Dear Shveta,

> 1)
> +step s1_reset: SELECT pg_replication_origin_session_reset();
> 
> After the above step, please add a step to attempt dropping the
> replication origin. The original issue was that once s1 releases the
> origin, it becomes eligible for dropping, so the test should
> explicitly verify this behavior.

I think it is bit difficult because pg_replication_origin_drop() has PID in the
ERROR message. Also, this patch prevents first process resets the origin, i.e.,
the exact same situation won't happen anymore. Not fixed.

> 2)
> Also before the above step, please add a step where s0 tries to reset
> the origin while s1 is still acquiring it. It is needed to cover the
> path where s0 should fail to release origin.

The step has already existed, see below.

```
step s0_reset: SELECT pg_replication_origin_session_reset();
ERROR:  cannot reset replication origin with ID 1 because it is still in use by other processes
step s1_reset: SELECT pg_replication_origin_session_reset();
pg_replication_origin_session_reset
-----------------------------------
                                   
(1 row)
```

Others are corrected and adjusted by me, see the attached.
0001 and 0002 are combined because no one claimed them.

Best regards,
Hayato Kuroda
FUJITSU LIMITED



Attachments:

  [application/octet-stream] v5-0001-Fix-unintended-drop-of-active-replication-origins.patch (11.0K, 2-v5-0001-Fix-unintended-drop-of-active-replication-origins.patch)
  download | inline diff:
From 71dccd9a8eddbba8e3e6d136c35435597a7b09ec Mon Sep 17 00:00:00 2001
From: Zhijie Hou <[email protected]>
Date: Mon, 22 Sep 2025 11:22:55 +0800
Subject: [PATCH v5] Fix unintended drop of active replication origins

Currently, if two backends configure the same replication origin and one backend
resets it first, the acquired_by flag is cleared without recognizing the active
usage by the first backend. This can result in the unintended dropping of the
origin, potentially leading to issues if the shared memory of the dropped origin
is reused for a newly created origin. Such reuse could cause unpredictable
advancement of a different slot by the remaining backend holding the memory of
the dropped origin.

This commit addresses the issue by introducing a reference count for replication
origins. The count is incremented when a backend sets up the origin and
decremented upon a reset. Also, the backend process which firstly acquired the
origin does not release till other acquiring process releases it.

This commit also prevents dropping the replication origin when its acquired_by
is zero but the reference counter is incremented. That guard handles a case that
first session exits without releasing the origin.
---
 .../expected/parallel_session_origin.out      |  46 +++++++-
 .../specs/parallel_session_origin.spec        |   6 +-
 src/backend/replication/logical/origin.c      | 109 +++++++++++++-----
 3 files changed, 130 insertions(+), 31 deletions(-)

diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out
index e515b39f7ce..8e41831fcbc 100644
--- a/contrib/test_decoding/expected/parallel_session_origin.out
+++ b/contrib/test_decoding/expected/parallel_session_origin.out
@@ -1,6 +1,6 @@
 Parsed test spec with 2 sessions
 
-starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset
 step s0_setup: SELECT pg_replication_origin_session_setup('origin');
 pg_replication_origin_session_setup
 -----------------------------------
@@ -65,15 +65,59 @@ step s0_compare:
 t       
 (1 row)
 
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
 step s0_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s1_setup: 
+    SELECT pg_replication_origin_session_setup('origin', pid)
+    FROM pg_stat_activity
+    WHERE application_name = 'isolation/parallel_session_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+                                   
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t                                     
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+ERROR:  cannot reset replication origin with ID 1 because it is still in use by other processes
 step s1_reset: SELECT pg_replication_origin_session_reset();
 pg_replication_origin_session_reset
 -----------------------------------
                                    
 (1 row)
 
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+                                   
+(1 row)
+
diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec
index c0e5fda0723..2253a7a14eb 100644
--- a/contrib/test_decoding/specs/parallel_session_origin.spec
+++ b/contrib/test_decoding/specs/parallel_session_origin.spec
@@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
 # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
 # commits a transaction and store the local_lsn of the replication origin.
 # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
-permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset"
+
+# Test that the origin cannot be released if another session is actively using
+# it.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset"
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 04bc704a332..1662bed4046 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -130,6 +130,9 @@ typedef struct ReplicationState
 	 */
 	int			acquired_by;
 
+	/* Count of processes that are currently using this origin. */
+	int			refcount;
+
 	/*
 	 * Condition variable that's signaled when acquired_by changes.
 	 */
@@ -383,16 +386,19 @@ restart:
 		if (state->roident == roident)
 		{
 			/* found our slot, is it busy? */
-			if (state->acquired_by != 0)
+			if (state->refcount > 0)
 			{
 				ConditionVariable *cv;
 
 				if (nowait)
 					ereport(ERROR,
 							(errcode(ERRCODE_OBJECT_IN_USE),
-							 errmsg("could not drop replication origin with ID %d, in use by PID %d",
-									state->roident,
-									state->acquired_by)));
+							 (state->acquired_by != 0)
+							 ? errmsg("could not drop replication origin with ID %d, in use by PID %d",
+									  state->roident,
+									  state->acquired_by)
+							 : errmsg("could not drop replication origin with ID %d, in use by another process",
+									  state->roident)));
 
 				/*
 				 * We must wait and then retry.  Since we don't know which CV
@@ -1069,32 +1075,48 @@ replorigin_get_progress(RepOriginId node, bool flush)
 	return remote_lsn;
 }
 
-/*
- * Tear down a (possibly) configured session replication origin during process
- * exit.
- */
+/* Helpful function to reset the session replication origin */
 static void
-ReplicationOriginExitCleanup(int code, Datum arg)
+replorigin_session_reset_internal(void)
 {
-	ConditionVariable *cv = NULL;
+	ConditionVariable *cv;
 
-	if (session_replication_state == NULL)
-		return;
+	Assert(session_replication_state != NULL);
 
 	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
-	if (session_replication_state->acquired_by == MyProcPid)
-	{
-		cv = &session_replication_state->origin_cv;
+	/* The origin must be held by at least one process at this point. */
+	Assert(session_replication_state->refcount > 0);
 
+	/*
+	 * Reset the PID only if the current session is the first to set up this
+	 * origin. This avoids clearing the first process's PID when any other
+	 * session releases the origin.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid)
 		session_replication_state->acquired_by = 0;
-		session_replication_state = NULL;
-	}
+
+	session_replication_state->refcount--;
+
+	cv = &session_replication_state->origin_cv;
+	session_replication_state = NULL;
 
 	LWLockRelease(ReplicationOriginLock);
 
-	if (cv)
-		ConditionVariableBroadcast(cv);
+	ConditionVariableBroadcast(cv);
+}
+
+/*
+ * Tear down a (possibly) configured session replication origin during process
+ * exit.
+ */
+static void
+ReplicationOriginExitCleanup(int code, Datum arg)
+{
+	if (session_replication_state == NULL)
+		return;
+
+	replorigin_session_reset_internal();
 }
 
 /*
@@ -1174,6 +1196,18 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 							node, acquired_by)));
 		}
 
+		/*
+		 * The origin is in use, but PID is not recorded. This can happen if
+		 * the process that originally acquired the origin exited without
+		 * releasing it. To ensure correctness, other processes cannot acquire
+		 * the origin until all processes currently using it have released it.
+		 */
+		else if (curstate->acquired_by == 0 && curstate->refcount > 0)
+			ereport(ERROR,
+					(errcode(ERRCODE_OBJECT_IN_USE),
+					 errmsg("replication origin with ID %d is already active in another process",
+							curstate->roident)));
+
 		/* ok, found slot */
 		session_replication_state = curstate;
 		break;
@@ -1205,9 +1239,21 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	Assert(session_replication_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
+	{
 		session_replication_state->acquired_by = MyProcPid;
+		Assert(session_replication_state->refcount == 0);
+	}
 	else
+	{
+		/*
+		 * Sanity check: the origin must already be acquired by the process
+		 * passed as input, and at least one process must be using it.
+		 */
 		Assert(session_replication_state->acquired_by == acquired_by);
+		Assert(session_replication_state->refcount > 0);
+	}
+
+	session_replication_state->refcount++;
 
 	LWLockRelease(ReplicationOriginLock);
 
@@ -1224,8 +1270,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 void
 replorigin_session_reset(void)
 {
-	ConditionVariable *cv;
-
 	Assert(max_active_replication_origins != 0);
 
 	if (session_replication_state == NULL)
@@ -1233,15 +1277,22 @@ replorigin_session_reset(void)
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("no replication origin is configured")));
 
-	LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
-
-	session_replication_state->acquired_by = 0;
-	cv = &session_replication_state->origin_cv;
-	session_replication_state = NULL;
-
-	LWLockRelease(ReplicationOriginLock);
+	/*
+	 * Restrict explicit resetting of the replication origin if it was first
+	 * acquired by this process and others are still using it. While the
+	 * system handles this safely (as happens if the first session exits
+	 * without calling reset), it is best to avoid doing so.
+	 */
+	if (session_replication_state->acquired_by == MyProcPid &&
+		session_replication_state->refcount > 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot reset replication origin with ID %d because it is still in use by other processes",
+						session_replication_state->roident),
+				 errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."),
+				 errhint("Reset the replication origin in all other processes before retrying.")));
 
-	ConditionVariableBroadcast(cv);
+	replorigin_session_reset_internal();
 }
 
 /*
-- 
2.47.3



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-14 04:08  shveta malik <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  1 sibling, 0 replies; 46+ messages in thread

From: shveta malik @ 2026-01-14 04:08 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: Amit Kapila <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>; shveta malik <[email protected]>

On Wed, Jan 14, 2026 at 9:13 AM Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> Dear Shveta,
>
> > 1)
> > +step s1_reset: SELECT pg_replication_origin_session_reset();
> >
> > After the above step, please add a step to attempt dropping the
> > replication origin. The original issue was that once s1 releases the
> > origin, it becomes eligible for dropping, so the test should
> > explicitly verify this behavior.
>
> I think it is bit difficult because pg_replication_origin_drop() has PID in the
> ERROR message. Also, this patch prevents first process resets the origin, i.e.,
> the exact same situation won't happen anymore. Not fixed.
>

Okay.

> > 2)
> > Also before the above step, please add a step where s0 tries to reset
> > the origin while s1 is still acquiring it. It is needed to cover the
> > path where s0 should fail to release origin.
>
> The step has already existed, see below.
>

Okay, sorry I missed it somehow.

> ```
> step s0_reset: SELECT pg_replication_origin_session_reset();
> ERROR:  cannot reset replication origin with ID 1 because it is still in use by other processes
> step s1_reset: SELECT pg_replication_origin_session_reset();
> pg_replication_origin_session_reset
> -----------------------------------
>
> (1 row)
> ```
>
> Others are corrected and adjusted by me, see the attached.
> 0001 and 0002 are combined because no one claimed them.
>

Thanks. The patch LGTM.

thanks
Shveta






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-01-14 09:58  Amit Kapila <[email protected]>
  parent: Hayato Kuroda (Fujitsu) <[email protected]>
  1 sibling, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2026-01-14 09:58 UTC (permalink / raw)
  To: Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: shveta malik <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

On Wed, Jan 14, 2026 at 9:13 AM Hayato Kuroda (Fujitsu)
<[email protected]> wrote:
>
> Others are corrected and adjusted by me, see the attached.
> 0001 and 0002 are combined because no one claimed them.
>

A change similar to what you did in replorigin_state_clear() was
required in replorigin_advance() to prevent advancing origin via APIs
when it is still in use. I made that change and pushed the patch.
Thanks for working on it.

-- 
With Regards,
Amit Kapila.






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-02-03 19:08  Heikki Linnakangas <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 2 replies; 46+ messages in thread

From: Heikki Linnakangas @ 2026-02-03 19:08 UTC (permalink / raw)
  To: Amit Kapila <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; +Cc: shveta malik <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

The new error message is not great:

postgres=# select pg_replication_origin_session_setup('myorigin', 12345678);
ERROR:  could not find replication state slot for replication origin 
with OID 1 which was acquired by 12345678

Firstly, replication origin is not an OID. Secondly, it's a little 
confusing because the "replication state slot" is in fact present. 
However, it's currently inactive, i.e. not "acquired" by the given PID.

I propose to change that to:

postgres=# select pg_replication_origin_session_setup('myorigin', 12345678);
ERROR:  replication origin with ID 1 is not active for PID 12345678

That's more in line with this neighboring message:

ERROR:  replication origin with ID 1 is already active for PID 701228


I also wonder if the error code is appropriate. That error uses 
ERRCODE_OBJECT_IN_USE, but if the problem is that the origin is 
currently *not* active, that seems backwards. I didn't change that in 
the attached patch, but it's something to think about.


The second patch rearranges the if-else statements to check those 
conditions. I found the current logic hard to follow, this makes them 
feel more natural, in my opinion at least. It has one user-visible 
effect: If you call the function with acquired_pid != 0 and the origin 
has no state slot, *and* there are no free slots, you previously got 
this error:

postgres=# select pg_replication_origin_session_setup('other', 123);
ERROR:  could not find free replication state slot for replication 
origin with ID 2
HINT:  Increase "max_active_replication_origins" and try again.

Now you get this:

postgres=# select pg_replication_origin_session_setup('other', 123);
ERROR:  cannot use PID 123 for inactive replication origin with ID 2

Both error messages are more or less appropriate in that situation, but 
I think the new behavior is slightly better. The fact that the origin is 
inactive feels like the bigger problem here.

- Heikki


Attachments:

  [text/x-patch] 0001-Improve-error-message.patch (1.1K, 2-0001-Improve-error-message.patch)
  download | inline diff:
From 233e54b0b4dbdaa84910fb7cc453877391865839 Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <[email protected]>
Date: Tue, 3 Feb 2026 20:33:23 +0200
Subject: [PATCH 1/2] Improve error message

- Use the same wordings and terms as the neighboring messages
- Use 'curstate->roident' instead of 'node'. We've checked that they're
  equal, but let's be consistent with the neighboring ereports().
---
 src/backend/replication/logical/origin.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index c3271a6fd0e..ec040a039d3 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1197,8 +1197,8 @@ replorigin_session_setup(ReplOriginId node, int acquired_by)
 		{
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_IN_USE),
-					 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
-							node, acquired_by)));
+					 errmsg("replication origin with ID %d is not active for PID %d",
+							curstate->roident, acquired_by)));
 		}
 
 		/*
-- 
2.47.3



  [text/x-patch] 0002-Rearrange-the-checks-in-replorigin_session_setup.patch (4.1K, 3-0002-Rearrange-the-checks-in-replorigin_session_setup.patch)
  download | inline diff:
From 57b2e789b40297ba299af06882bd4076586300da Mon Sep 17 00:00:00 2001
From: Heikki Linnakangas <[email protected]>
Date: Tue, 3 Feb 2026 20:29:54 +0200
Subject: [PATCH 2/2] Rearrange the checks in replorigin_session_setup()

Makes it easier to follow the logic.
---
 src/backend/replication/logical/origin.c | 73 +++++++++++++-----------
 1 file changed, 41 insertions(+), 32 deletions(-)

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index ec040a039d3..cb18b4b6686 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1185,55 +1185,64 @@ replorigin_session_setup(ReplOriginId node, int acquired_by)
 		if (curstate->roident != node)
 			continue;
 
-		else if (curstate->acquired_by != 0 && acquired_by == 0)
+		if (acquired_by == 0)
 		{
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_IN_USE),
-					 errmsg("replication origin with ID %d is already active for PID %d",
-							curstate->roident, curstate->acquired_by)));
+			/* With acquired_by == 0, we need the origin to be free */
+			if (curstate->acquired_by != 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_IN_USE),
+						 errmsg("replication origin with ID %d is already active for PID %d",
+								curstate->roident, curstate->acquired_by)));
+
+			if (curstate->refcount > 0)
+			{
+				/*
+				 * The origin is in use, but PID is not recorded. This can
+				 * happen if the process that originally acquired the origin
+				 * exited without releasing it. To ensure correctness, other
+				 * processes cannot acquire the origin until all processes
+				 * currently using it have released it.
+				 */
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_IN_USE),
+						 errmsg("replication origin with ID %d is already active in another process",
+								curstate->roident)));
+			}
 		}
-
-		else if (curstate->acquired_by != acquired_by)
+		else
 		{
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_IN_USE),
-					 errmsg("replication origin with ID %d is not active for PID %d",
-							curstate->roident, acquired_by)));
+			/*
+			 * With acquired_by != 0, we need the origin to be active by the
+			 * given PID
+			 */
+			if (curstate->acquired_by != acquired_by)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_IN_USE),
+						 errmsg("replication origin with ID %d is not active for PID %d",
+								curstate->roident, acquired_by)));
 		}
 
-		/*
-		 * The origin is in use, but PID is not recorded. This can happen if
-		 * the process that originally acquired the origin exited without
-		 * releasing it. To ensure correctness, other processes cannot acquire
-		 * the origin until all processes currently using it have released it.
-		 */
-		else if (curstate->acquired_by == 0 && curstate->refcount > 0)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_IN_USE),
-					 errmsg("replication origin with ID %d is already active in another process",
-							curstate->roident)));
-
 		/* ok, found slot */
 		session_replication_state = curstate;
 		break;
 	}
 
-
-	if (session_replication_state == NULL && free_slot == -1)
-		ereport(ERROR,
-				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
-				 errmsg("could not find free replication state slot for replication origin with ID %d",
-						node),
-				 errhint("Increase \"max_active_replication_origins\" and try again.")));
-	else if (session_replication_state == NULL)
+	if (session_replication_state == NULL)
 	{
-		if (acquired_by)
+		if (acquired_by != 0)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("cannot use PID %d for inactive replication origin with ID %d",
 							acquired_by, node)));
 
 		/* initialize new slot */
+		if (free_slot == -1)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+					 errmsg("could not find free replication state slot for replication origin with ID %d",
+							node),
+					 errhint("Increase \"max_active_replication_origins\" and try again.")));
+
 		session_replication_state = &replication_states[free_slot];
 		Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
 		Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
-- 
2.47.3



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-02-11 05:23  Amit Kapila <[email protected]>
  parent: Heikki Linnakangas <[email protected]>
  1 sibling, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2026-02-11 05:23 UTC (permalink / raw)
  To: Heikki Linnakangas <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; shveta malik <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

On Wed, Feb 4, 2026 at 12:38 AM Heikki Linnakangas <[email protected]> wrote:
>
> The new error message is not great:
>
> postgres=# select pg_replication_origin_session_setup('myorigin', 12345678);
> ERROR:  could not find replication state slot for replication origin
> with OID 1 which was acquired by 12345678
>
> Firstly, replication origin is not an OID. Secondly, it's a little
> confusing because the "replication state slot" is in fact present.
> However, it's currently inactive, i.e. not "acquired" by the given PID.
>
> I propose to change that to:
>
> postgres=# select pg_replication_origin_session_setup('myorigin', 12345678);
> ERROR:  replication origin with ID 1 is not active for PID 12345678
>
> That's more in line with this neighboring message:
>
> ERROR:  replication origin with ID 1 is already active for PID 701228
>

+1 for the new message.

>
> I also wonder if the error code is appropriate. That error uses
> ERRCODE_OBJECT_IN_USE, but if the problem is that the origin is
> currently *not* active, that seems backwards. I didn't change that in
> the attached patch, but it's something to think about.
>

The other way to look at this is that the origin is already active for
some other pid which is not the same as what is given by the user in
the second parameter, so OBJECT_IN_USE sounds okay from that angle.

-- 
With Regards,
Amit Kapila.






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-02-16 08:52  Amit Kapila <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2026-02-16 08:52 UTC (permalink / raw)
  To: Heikki Linnakangas <[email protected]>; +Cc: Hayato Kuroda (Fujitsu) <[email protected]>; shveta malik <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

On Wed, Feb 11, 2026 at 10:53 AM Amit Kapila <[email protected]> wrote:
>
> On Wed, Feb 4, 2026 at 12:38 AM Heikki Linnakangas <[email protected]> wrote:
> >
> > The new error message is not great:
> >
> > postgres=# select pg_replication_origin_session_setup('myorigin', 12345678);
> > ERROR:  could not find replication state slot for replication origin
> > with OID 1 which was acquired by 12345678
> >
> > Firstly, replication origin is not an OID. Secondly, it's a little
> > confusing because the "replication state slot" is in fact present.
> > However, it's currently inactive, i.e. not "acquired" by the given PID.
> >
> > I propose to change that to:
> >
> > postgres=# select pg_replication_origin_session_setup('myorigin', 12345678);
> > ERROR:  replication origin with ID 1 is not active for PID 12345678
> >
> > That's more in line with this neighboring message:
> >
> > ERROR:  replication origin with ID 1 is already active for PID 701228
> >
>
> +1 for the new message.
>

Heikki, would you like to take care of improvements proposed by you?
Otherwise, I am happy to take care of them.

-- 
With Regards,
Amit Kapila.






^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-03-25 02:45  Michael Paquier <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: Michael Paquier @ 2026-03-25 02:45 UTC (permalink / raw)
  To: Amit Kapila <[email protected]>; +Cc: Heikki Linnakangas <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; shveta malik <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

On Mon, Feb 16, 2026 at 02:22:57PM +0530, Amit Kapila wrote:
> Heikki, would you like to take care of improvements proposed by you?
> Otherwise, I am happy to take care of them.

This thread has stalled.  Heikki, Amit, are there any actions planned
by the end of the release cycle?
--
Michael


Attachments:

  [application/pgp-signature] signature.asc (833B, 2-signature.asc)
  download

^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-03-25 03:02  Amit Kapila <[email protected]>
  parent: Michael Paquier <[email protected]>
  0 siblings, 0 replies; 46+ messages in thread

From: Amit Kapila @ 2026-03-25 03:02 UTC (permalink / raw)
  To: Michael Paquier <[email protected]>; +Cc: Heikki Linnakangas <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; shveta malik <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

On Wed, Mar 25, 2026 at 8:15 AM Michael Paquier <[email protected]> wrote:
>
> On Mon, Feb 16, 2026 at 02:22:57PM +0530, Amit Kapila wrote:
> > Heikki, would you like to take care of improvements proposed by you?
> > Otherwise, I am happy to take care of them.
>
> This thread has stalled.  Heikki, Amit, are there any actions planned
> by the end of the release cycle?
>

I'll take care of this in the next few days unless I see any
response/feedback from Heikki or others.

-- 
With Regards,
Amit Kapila.





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-03-25 08:21  Amit Kapila <[email protected]>
  parent: Heikki Linnakangas <[email protected]>
  1 sibling, 1 reply; 46+ messages in thread

From: Amit Kapila @ 2026-03-25 08:21 UTC (permalink / raw)
  To: shveta malik <[email protected]>; +Cc: Heikki Linnakangas <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

On Wed, Feb 11, 2026 at 5:09 PM shveta malik <[email protected]> wrote:
>
> On Wed, Feb 11, 2026 at 3:41 PM Amit Kapila <[email protected]> wrote:
> >
> >
> > I see your point but one advantage with the proposed code change is
> > that it started to appear that we can extend this part of code easily
> > in the future as it separates most of the handling related to when a
> > user has given acquired_by parameter's value as zero and non-zero.
>
> Okay, yes. So I am okay with it. The slight change I suggested (if to
> else-if) and a comment will make it more clean.
>

I have tried to address both your suggestions in the attached. See, if
this looks okay to you now?

-- 
With Regards,
Amit Kapila.


Attachments:

  [application/octet-stream] v2-0001-Simplify-replorigin_session_setup.patch (4.3K, 2-v2-0001-Simplify-replorigin_session_setup.patch)
  download | inline diff:
From 494b31e79dce2a08fb56cee2b6af521242be043f Mon Sep 17 00:00:00 2001
From: Amit Kapila <[email protected]>
Date: Wed, 25 Mar 2026 13:42:52 +0530
Subject: [PATCH v3] Simplify replorigin_session_setup()

---
 src/backend/replication/logical/origin.c | 78 ++++++++++++++----------
 1 file changed, 46 insertions(+), 32 deletions(-)

diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 26afd8f0af9..26c3725aa68 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1186,55 +1186,69 @@ replorigin_session_setup(ReplOriginId node, int acquired_by)
 		if (curstate->roident != node)
 			continue;
 
-		else if (curstate->acquired_by != 0 && acquired_by == 0)
+		if (acquired_by == 0)
 		{
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_IN_USE),
-					 errmsg("replication origin with ID %d is already active for PID %d",
-							curstate->roident, curstate->acquired_by)));
+			/* With acquired_by == 0, we need the origin to be free */
+			if (curstate->acquired_by != 0)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_IN_USE),
+						 errmsg("replication origin with ID %d is already active for PID %d",
+								curstate->roident, curstate->acquired_by)));
+			}
+			else if (curstate->refcount > 0)
+			{
+				/*
+				 * The origin is in use, but PID is not recorded. This can
+				 * happen if the process that originally acquired the origin
+				 * exited without releasing it. To ensure correctness, other
+				 * processes cannot acquire the origin until all processes
+				 * currently using it have released it.
+				 */
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_IN_USE),
+						 errmsg("replication origin with ID %d is already active in another process",
+								curstate->roident)));
+			}
 		}
-
-		else if (curstate->acquired_by != acquired_by)
+		else
 		{
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_IN_USE),
-					 errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
-							node, acquired_by)));
+			/*
+			 * With acquired_by != 0, we need the origin to be active by the
+			 * given PID
+			 */
+			if (curstate->acquired_by != acquired_by)
+				ereport(ERROR,
+						(errcode(ERRCODE_OBJECT_IN_USE),
+						 errmsg("replication origin with ID %d is not active for PID %d",
+								curstate->roident, acquired_by)));
+			/*
+			 * Here, it is okay to have refcount > 0 as more than one process
+			 * can safely re-use the origin.
+			 */
 		}
 
-		/*
-		 * The origin is in use, but PID is not recorded. This can happen if
-		 * the process that originally acquired the origin exited without
-		 * releasing it. To ensure correctness, other processes cannot acquire
-		 * the origin until all processes currently using it have released it.
-		 */
-		else if (curstate->acquired_by == 0 && curstate->refcount > 0)
-			ereport(ERROR,
-					(errcode(ERRCODE_OBJECT_IN_USE),
-					 errmsg("replication origin with ID %d is already active in another process",
-							curstate->roident)));
-
 		/* ok, found slot */
 		session_replication_state = curstate;
 		break;
 	}
 
-
-	if (session_replication_state == NULL && free_slot == -1)
-		ereport(ERROR,
-				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
-				 errmsg("could not find free replication state slot for replication origin with ID %d",
-						node),
-				 errhint("Increase \"max_active_replication_origins\" and try again.")));
-	else if (session_replication_state == NULL)
+	if (session_replication_state == NULL)
 	{
-		if (acquired_by)
+		if (acquired_by != 0)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("cannot use PID %d for inactive replication origin with ID %d",
 							acquired_by, node)));
 
 		/* initialize new slot */
+		if (free_slot == -1)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
+					 errmsg("could not find free replication state slot for replication origin with ID %d",
+							node),
+					 errhint("Increase \"max_active_replication_origins\" and try again.")));
+
 		session_replication_state = &replication_states[free_slot];
 		Assert(!XLogRecPtrIsValid(session_replication_state->remote_lsn));
 		Assert(!XLogRecPtrIsValid(session_replication_state->local_lsn));
-- 
2.52.0.windows.1



^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-03-25 09:33  shveta malik <[email protected]>
  parent: Amit Kapila <[email protected]>
  0 siblings, 1 reply; 46+ messages in thread

From: shveta malik @ 2026-03-25 09:33 UTC (permalink / raw)
  To: Amit Kapila <[email protected]>; +Cc: Heikki Linnakangas <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>; shveta malik <[email protected]>

On Wed, Mar 25, 2026 at 1:51 PM Amit Kapila <[email protected]> wrote:
>
> On Wed, Feb 11, 2026 at 5:09 PM shveta malik <[email protected]> wrote:
> >
> > On Wed, Feb 11, 2026 at 3:41 PM Amit Kapila <[email protected]> wrote:
> > >
> > >
> > > I see your point but one advantage with the proposed code change is
> > > that it started to appear that we can extend this part of code easily
> > > in the future as it separates most of the handling related to when a
> > > user has given acquired_by parameter's value as zero and non-zero.
> >
> > Okay, yes. So I am okay with it. The slight change I suggested (if to
> > else-if) and a comment will make it more clean.
> >
>
> I have tried to address both your suggestions in the attached. See, if
> this looks okay to you now?
>

LGTM now, thanks!

thanks
Shveta





^ permalink  raw  reply  [nested|flat] 46+ messages in thread

* Re: [Patch] add new parameter to pg_replication_origin_session_setup
@ 2026-03-26 06:40  Amit Kapila <[email protected]>
  parent: shveta malik <[email protected]>
  0 siblings, 0 replies; 46+ messages in thread

From: Amit Kapila @ 2026-03-26 06:40 UTC (permalink / raw)
  To: shveta malik <[email protected]>; +Cc: Heikki Linnakangas <[email protected]>; Hayato Kuroda (Fujitsu) <[email protected]>; [email protected] <[email protected]>; Zhijie Hou (Fujitsu) <[email protected]>; Doruk Yilmaz <[email protected]>

On Wed, Mar 25, 2026 at 3:03 PM shveta malik <[email protected]> wrote:
>
> On Wed, Mar 25, 2026 at 1:51 PM Amit Kapila <[email protected]> wrote:
> >
> > On Wed, Feb 11, 2026 at 5:09 PM shveta malik <[email protected]> wrote:
> > >
> > > On Wed, Feb 11, 2026 at 3:41 PM Amit Kapila <[email protected]> wrote:
> > > >
> > > >
> > > > I see your point but one advantage with the proposed code change is
> > > > that it started to appear that we can extend this part of code easily
> > > > in the future as it separates most of the handling related to when a
> > > > user has given acquired_by parameter's value as zero and non-zero.
> > >
> > > Okay, yes. So I am okay with it. The slight change I suggested (if to
> > > else-if) and a comment will make it more clean.
> > >
> >
> > I have tried to address both your suggestions in the attached. See, if
> > this looks okay to you now?
> >
>
> LGTM now, thanks!
>

Pushed.

-- 
With Regards,
Amit Kapila.





^ permalink  raw  reply  [nested|flat] 46+ messages in thread


end of thread, other threads:[~2026-03-26 06:40 UTC | newest]

Thread overview: 46+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2024-08-12 21:48 Re: [Patch] add new parameter to pg_replication_origin_session_setup Euler Taveira <[email protected]>
2024-08-15 20:53 ` Doruk Yilmaz <[email protected]>
2025-01-08 21:55   ` Euler Taveira <[email protected]>
2025-03-01 17:52     ` Doruk Yilmaz <[email protected]>
2025-03-03 03:38     ` Amit Kapila <[email protected]>
2025-07-28 21:13       ` Doruk Yilmaz <[email protected]>
2025-07-29 05:13         ` Amit Kapila <[email protected]>
2025-07-29 18:30           ` Doruk Yilmaz <[email protected]>
2025-08-11 06:44             ` Amit Kapila <[email protected]>
2025-08-11 17:11               ` Doruk Yilmaz <[email protected]>
2025-08-12 09:27                 ` Amit Kapila <[email protected]>
2025-08-18 18:55                   ` Doruk Yilmaz <[email protected]>
2025-08-22 05:50                     ` Hayato Kuroda (Fujitsu) <[email protected]>
2025-08-22 12:51                       ` Doruk Yilmaz <[email protected]>
2025-08-25 10:43                         ` Hayato Kuroda (Fujitsu) <[email protected]>
2025-09-03 12:43                           ` Doruk Yilmaz <[email protected]>
2025-09-06 05:09                             ` Amit Kapila <[email protected]>
2025-09-08 17:22                               ` Doruk Yilmaz <[email protected]>
2025-09-16 04:37                                 ` Hayato Kuroda (Fujitsu) <[email protected]>
2025-09-16 21:49                                   ` Doruk Yilmaz <[email protected]>
2025-09-17 06:19                                   ` Amit Kapila <[email protected]>
2025-09-17 09:22                                     ` Hayato Kuroda (Fujitsu) <[email protected]>
2025-09-18 07:37                                       ` Hayato Kuroda (Fujitsu) <[email protected]>
2025-09-18 11:02                                         ` Amit Kapila <[email protected]>
2025-09-18 14:19                                           ` Hayato Kuroda (Fujitsu) <[email protected]>
2025-12-23 08:54                                             ` Zhijie Hou (Fujitsu) <[email protected]>
2025-12-24 09:12                                               ` Amit Kapila <[email protected]>
2026-01-05 09:45                                               ` shveta malik <[email protected]>
2026-01-05 10:30                                                 ` shveta malik <[email protected]>
2026-01-09 09:00                                                   ` Amit Kapila <[email protected]>
2026-01-09 11:28                                                     ` Hayato Kuroda (Fujitsu) <[email protected]>
2026-01-12 06:01                                                       ` shveta malik <[email protected]>
2026-01-13 02:56                                                         ` Amit Kapila <[email protected]>
2026-01-13 04:09                                                           ` Hayato Kuroda (Fujitsu) <[email protected]>
2026-01-13 05:34                                                             ` shveta malik <[email protected]>
2026-01-14 03:43                                                               ` Hayato Kuroda (Fujitsu) <[email protected]>
2026-01-14 04:08                                                                 ` shveta malik <[email protected]>
2026-01-14 09:58                                                                 ` Amit Kapila <[email protected]>
2026-02-03 19:08                                                                   ` Heikki Linnakangas <[email protected]>
2026-02-11 05:23                                                                     ` Amit Kapila <[email protected]>
2026-02-16 08:52                                                                       ` Amit Kapila <[email protected]>
2026-03-25 02:45                                                                         ` Michael Paquier <[email protected]>
2026-03-25 03:02                                                                           ` Amit Kapila <[email protected]>
2026-03-25 08:21                                                                     ` Amit Kapila <[email protected]>
2026-03-25 09:33                                                                       ` shveta malik <[email protected]>
2026-03-26 06:40                                                                         ` Amit Kapila <[email protected]>

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox