public inbox for [email protected]  
help / color / mirror / Atom feed
From: Hayato Kuroda (Fujitsu) <[email protected]>
To: 'Amit Kapila' <[email protected]>
Cc: Doruk Yilmaz <[email protected]>
Cc: [email protected] <[email protected]>
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
Date: Wed, 17 Sep 2025 09:22:45 +0000
Message-ID: <OS7PR01MB149681B14B2432A9CEA7A3586F517A@OS7PR01MB14968.jpnprd01.prod.outlook.com> (raw)
In-Reply-To: <CAA4eK1++mHd-SsHyJd2ZB26F7kCz--LbtjQLuQ0h3z9mcYK-AQ@mail.gmail.com>
References: <CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com>
	<[email protected]>
	<CAMPB6we7+97L72Ru0=WxMDi24xMbZgr2B8Nwoo5i=r=UNuG_gQ@mail.gmail.com>
	<[email protected]>
	<CAA4eK1JfPPFTmz7mUk26zPH8+qH9UBpkquxw75x7Ngx_D_6XXQ@mail.gmail.com>
	<CAMPB6wfgvWjSvKNPoJkRqaL46geRDoL++Pt_3Czc2QNAdpVQHw@mail.gmail.com>
	<CAA4eK1JC6yB6q52qEZ5dLNWRUEZoO-aa_XKBZ3_mcb=V2z7zug@mail.gmail.com>
	<CAMPB6weUqU6P2w5VUGVSLKWcvU1AQHmW+7O9qc9yD4CB5kEYVA@mail.gmail.com>
	<CAA4eK1Lm_W5j3DPj6PDSTyodGu87QgxpNwwsi-wVR0+B1FSOoA@mail.gmail.com>
	<CAMPB6wckvkKrXVPH5j8Ske2cVedkb-TRLdnOb5e74zYM1CynGw@mail.gmail.com>
	<CAA4eK1+NDjprcKvr0p2GDMTCs9yxFCY41bOd+6avqAm2n+TXdQ@mail.gmail.com>
	<CAMPB6wdc10tc7gpVXG75r51M41zVSabip9Lz7hssWEtyhecWww@mail.gmail.com>
	<OSCPR01MB14966201F1DCB853145912FF1F53DA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
	<CAMPB6wcOWBURHB1igRgCjD3geAemdoATfkKByMwrMM1TgMN64w@mail.gmail.com>
	<OSCPR01MB14966BF4CA9B767C259DBD9CDF53EA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
	<CAMPB6wdtKZLEeZ7UW=DYmXWv8Y=uVGrDqXTMhT19Z4VTzo3cfg@mail.gmail.com>
	<CAA4eK1LHVd8wQzauWgeEV436L7btrCfujPH1sR196sY_Mp8zYA@mail.gmail.com>
	<CAMPB6wdPtjbR93oB1XJtYkRtTR64BJG4o5a+0DSSez=puuyuGA@mail.gmail.com>
	<OSCPR01MB14966FC456D053AB00A2EB278F514A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
	<CAA4eK1++mHd-SsHyJd2ZB26F7kCz--LbtjQLuQ0h3z9mcYK-AQ@mail.gmail.com>

Dear Amit,

> Can we think of writing a few tests for this newly exposed functionality?

I considered a test, please see attached files. 0001 was not changed from v6 and
0002 contained tests. Here, two sessions were opened and confirmed that they can
set the same origin.

BTW, while testing I found the existing issue of this function. Since the
session_replication_state is set before the pid check, there is a case that
session origin retains in case of failure. Here is a quick reproducer:

```
postgres=# SELECT pg_replication_origin_create('origin');
 pg_replication_origin_create 
------------------------------
                            1
(1 row)

postgres=# -- run origin_session_setup with incorrect parameter
postgres=# SELECT pg_replication_origin_session_setup('origin', -1);
ERROR:  could not find replication state slot for replication origin with OID 1 which was acquired by -1
postgres=# -- run origin_session_setup again with correct parameter
postgres=# SELECT pg_replication_origin_session_setup('origin');
ERROR:  cannot setup replication origin when one is already setup
```

The issue has exist since we introduces the parallel apply, but it has not been
found till now. Because parallel apply workers have not specified the invalid
pid. It can be more likely to happen so it's time to fix at the same time.

Idea for fix is that use local replication state and then at end assign it to
process-level. 0003 implemented that.

How do you feel?

Best regards,
Hayato Kuroda
FUJITSU LIMITED 



Attachments:

  [application/octet-stream] v7-0001-pg_replication_origin_session_setup-pid-parameter.patch (5.9K, 2-v7-0001-pg_replication_origin_session_setup-pid-parameter.patch)
  download | inline diff:
From 566b20a80856c1fe49efb63930cbb695903a8235 Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v7 1/3] pg_replication_origin_session_setup: pid parameter

Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
 doc/src/sgml/func/func-admin.sgml        | 23 +++++++++++++++++++++--
 src/backend/catalog/system_functions.sql |  9 ++++++++-
 src/backend/replication/logical/origin.c |  4 +++-
 src/include/catalog/pg_proc.dat          |  2 +-
 4 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..1b465bc8ba7 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         <indexterm>
          <primary>pg_replication_origin_session_setup</primary>
         </indexterm>
-        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+        <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
         <returnvalue>void</returnvalue>
        </para>
        <para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         origin, allowing replay progress to be tracked.
         Can only be used if no origin is currently selected.
         Use <function>pg_replication_origin_session_reset</function> to undo.
-       </para></entry>
+        If multiple processes can safely use the same replication origin (for
+        example, parallel apply processes), the optional <parameter>pid</parameter>
+        parameter can be used to specify the process ID of the first process.
+        The first process must provide <parameter>pid</parameter> equals to
+        <literal>0</literal> and the other processes that share the same
+        replication origin should provide the process ID of the first process.
+       </para>
+       <caution>
+        <para>
+         When multiple processes share the same replication origin, it is critical
+         to maintain commit order to prevent data inconsistency. While processes
+         may send operations out of order, they must commit transactions in the
+         correct sequence to ensure proper replication consistency. The recommended workflow
+         for each worker is: set up the replication origin session with the first process's PID,
+         apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+         with the LSN and commit timestamp before committing, then commit the
+         transaction only if everything succeeded.
+        </para>
+       </caution>
+      </entry>
       </row>
 
       <row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
 CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
 AS 'pg_stat_reset_slru';
 
+CREATE OR REPLACE FUNCTION
+  pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
 --
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
 
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
 
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..98d47e1beb8 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
 {
 	char	   *name;
 	RepOriginId origin;
+	int			pid;
 
 	replorigin_check_prerequisites(true, false);
 
 	name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
 	origin = replorigin_by_name(name, false);
-	replorigin_session_setup(origin, 0);
+	pid = PG_GETARG_INT32(1);
+	replorigin_session_setup(origin, pid);
 
 	replorigin_session_origin = origin;
 
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
 { oid => '6006',
   descr => 'configure session to maintain replication progress tracking for the passed in origin',
   proname => 'pg_replication_origin_session_setup', provolatile => 'v',
-  proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+  proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
   prosrc => 'pg_replication_origin_session_setup' },
 
 { oid => '6007', descr => 'teardown configured replication progress tracking',
-- 
2.47.3



  [application/octet-stream] v7-0002-add-test.patch (3.3K, 3-v7-0002-add-test.patch)
  download | inline diff:
From 3a5018870916f0be07797127c628e152b3c920bd Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 17 Sep 2025 15:20:45 +0900
Subject: [PATCH v7 2/3] add test

---
 contrib/test_decoding/meson.build          |  1 +
 contrib/test_decoding/t/002_repl_origin.pl | 73 ++++++++++++++++++++++
 2 files changed, 74 insertions(+)
 create mode 100644 contrib/test_decoding/t/002_repl_origin.pl

diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..e00e03ba08d 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -74,6 +74,7 @@ tests += {
   'tap': {
     'tests': [
       't/001_repl_stats.pl',
+      't/002_repl_origin.pl',
     ],
   },
 }
diff --git a/contrib/test_decoding/t/002_repl_origin.pl b/contrib/test_decoding/t/002_repl_origin.pl
new file mode 100644
index 00000000000..e1aa57b0995
--- /dev/null
+++ b/contrib/test_decoding/t/002_repl_origin.pl
@@ -0,0 +1,73 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+
+# Test session replication origin setup, especially by the multiple sessions
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Test set-up
+my $node = PostgreSQL::Test::Cluster->new('test');
+$node->init(allows_streaming => 'logical');
+$node->append_conf('postgresql.conf', 'autovacuum = off');
+$node->start;
+
+# Create a replication origin
+$node->safe_psql('postgres',
+	"SELECT pg_replication_origin_create('origin');");
+
+# Bump the query timeout to avoid false negatives on slow test systems.
+my $psql_timeout_secs = 4 * $PostgreSQL::Test::Utils::timeout_default;
+
+# Start a background session
+my $session1 = $node->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+# Setup the replication origin to the session
+my $pid = $session1->query_safe(
+	qq(SELECT pg_replication_origin_session_setup('origin');
+    SELECT pg_backend_pid();));
+
+is( $session1->query_safe(
+		qq(SELECT pg_replication_origin_session_is_setup();)),
+	't',
+	"A replication origin is assigned to the session");
+
+# Start another session
+my $session2 = $node->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $psql_timeout_secs);
+
+# Attach to the same replication origin
+$session2->query_safe(
+	qq(SELECT pg_replication_origin_session_setup('origin', $pid);));
+
+is( $session2->query_safe(
+		qq(SELECT pg_replication_origin_session_is_setup();)),
+	't',
+	"Replication origin can accept multiple assignment");
+
+# Emit a transactional message to update the local_lsn and store the current
+# value.
+$session1->query_safe(
+	qq(SELECT pg_logical_emit_message(true, 'prefix', 'message on session1');)
+);
+my $old_lsn = $session1->query_safe(
+	qq(SELECT local_lsn FROM pg_replication_origin_status;));
+
+# Emit a transactional message from another session
+$session2->query_safe(
+	qq(SELECT pg_logical_emit_message(true, 'prefix', 'message on session2');)
+);
+
+# Confirm local_lsn can be updated by concurrent processes
+my $new_lsn = $session1->query_safe(
+	qq(SELECT local_lsn FROM pg_replication_origin_status;));
+is($session1->query_safe(qq(SELECT '$old_lsn' < '$new_lsn')),
+	't', "Replication origin can be advanced by both sessions");
+
+done_testing();
-- 
2.47.3



  [application/octet-stream] v7-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patch (4.5K, 4-v7-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patch)
  download | inline diff:
From aef02a788bacc87400872a6a8d42d9aeb0301001 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 17 Sep 2025 18:15:33 +0900
Subject: [PATCH v7 3/3] Avoid setting ReplicationState in case of ERROR

---
 contrib/test_decoding/expected/replorigin.out |  3 ++
 contrib/test_decoding/sql/replorigin.sql      |  3 ++
 src/backend/replication/logical/origin.c      | 31 +++++++++++++------
 3 files changed, 27 insertions(+), 10 deletions(-)

diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..4f64ea8942f 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 ERROR:  duplicate key value violates unique constraint "pg_replication_origin_roname_index"
 DETAIL:  Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR:  could not find replication state slot for replication origin with OID 1 which was acquired by -1
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
  pg_replication_origin_create 
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..d899d5cdc18 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 -- ensure duplicate creations fail
 SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
 
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('regress_test_decoding: temp');
 SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 98d47e1beb8..0bbc96bcee5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1122,6 +1122,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 	static bool registered_cleanup;
 	int			i;
 	int			free_slot = -1;
+	ReplicationState *candidate_state = NULL;
+	bool		initialized = false;
 
 	if (!registered_cleanup)
 	{
@@ -1168,34 +1170,43 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
 		}
 
 		/* ok, found slot */
-		session_replication_state = curstate;
+		candidate_state = curstate;
 		break;
 	}
 
 
-	if (session_replication_state == NULL && free_slot == -1)
+	if (candidate_state == NULL && free_slot == -1)
 		ereport(ERROR,
 				(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
 				 errmsg("could not find free replication state slot for replication origin with ID %d",
 						node),
 				 errhint("Increase \"max_active_replication_origins\" and try again.")));
-	else if (session_replication_state == NULL)
+	else if (candidate_state == NULL)
 	{
 		/* initialize new slot */
-		session_replication_state = &replication_states[free_slot];
-		Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
-		Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
-		session_replication_state->roident = node;
+		candidate_state = &replication_states[free_slot];
+		Assert(candidate_state->remote_lsn == InvalidXLogRecPtr);
+		Assert(candidate_state->local_lsn == InvalidXLogRecPtr);
+		candidate_state->roident = node;
+		initialized = true;
 	}
 
 
-	Assert(session_replication_state->roident != InvalidRepOriginId);
+	Assert(candidate_state->roident != InvalidRepOriginId);
 
 	if (acquired_by == 0)
-		session_replication_state->acquired_by = MyProcPid;
-	else if (session_replication_state->acquired_by != acquired_by)
+		candidate_state->acquired_by = MyProcPid;
+	else if (candidate_state->acquired_by != acquired_by)
+	{
+		if (initialized)
+			candidate_state->roident = InvalidRepOriginId;
+
 		elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
 			 node, acquired_by);
+	}
+
+	/* Candidate slot looks ok, use it */
+	session_replication_state = candidate_state;
 
 	LWLockRelease(ReplicationOriginLock);
 
-- 
2.47.3



view thread (46+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected]
  Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
  In-Reply-To: <OS7PR01MB149681B14B2432A9CEA7A3586F517A@OS7PR01MB14968.jpnprd01.prod.outlook.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox