public inbox for [email protected]
help / color / mirror / Atom feedFrom: Hayato Kuroda (Fujitsu) <[email protected]>
To: 'Amit Kapila' <[email protected]>
Cc: Doruk Yilmaz <[email protected]>
Cc: [email protected] <[email protected]>
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
Date: Thu, 18 Sep 2025 14:19:32 +0000
Message-ID: <OSCPR01MB14966B68C2148C1BC462AA906F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com> (raw)
In-Reply-To: <CAA4eK1LeyzuiRPZB+o7mO0pB6_=tpkjoum5Hj+t1SYydS4K2kQ@mail.gmail.com>
References: <CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com>
<[email protected]>
<CAMPB6we7+97L72Ru0=WxMDi24xMbZgr2B8Nwoo5i=r=UNuG_gQ@mail.gmail.com>
<[email protected]>
<CAA4eK1JfPPFTmz7mUk26zPH8+qH9UBpkquxw75x7Ngx_D_6XXQ@mail.gmail.com>
<CAMPB6wfgvWjSvKNPoJkRqaL46geRDoL++Pt_3Czc2QNAdpVQHw@mail.gmail.com>
<CAA4eK1JC6yB6q52qEZ5dLNWRUEZoO-aa_XKBZ3_mcb=V2z7zug@mail.gmail.com>
<CAMPB6weUqU6P2w5VUGVSLKWcvU1AQHmW+7O9qc9yD4CB5kEYVA@mail.gmail.com>
<CAA4eK1Lm_W5j3DPj6PDSTyodGu87QgxpNwwsi-wVR0+B1FSOoA@mail.gmail.com>
<CAMPB6wckvkKrXVPH5j8Ske2cVedkb-TRLdnOb5e74zYM1CynGw@mail.gmail.com>
<CAA4eK1+NDjprcKvr0p2GDMTCs9yxFCY41bOd+6avqAm2n+TXdQ@mail.gmail.com>
<CAMPB6wdc10tc7gpVXG75r51M41zVSabip9Lz7hssWEtyhecWww@mail.gmail.com>
<OSCPR01MB14966201F1DCB853145912FF1F53DA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAMPB6wcOWBURHB1igRgCjD3geAemdoATfkKByMwrMM1TgMN64w@mail.gmail.com>
<OSCPR01MB14966BF4CA9B767C259DBD9CDF53EA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAMPB6wdtKZLEeZ7UW=DYmXWv8Y=uVGrDqXTMhT19Z4VTzo3cfg@mail.gmail.com>
<CAA4eK1LHVd8wQzauWgeEV436L7btrCfujPH1sR196sY_Mp8zYA@mail.gmail.com>
<CAMPB6wdPtjbR93oB1XJtYkRtTR64BJG4o5a+0DSSez=puuyuGA@mail.gmail.com>
<OSCPR01MB14966FC456D053AB00A2EB278F514A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAA4eK1++mHd-SsHyJd2ZB26F7kCz--LbtjQLuQ0h3z9mcYK-AQ@mail.gmail.com>
<OS7PR01MB149681B14B2432A9CEA7A3586F517A@OS7PR01MB14968.jpnprd01.prod.outlook.com>
<OSCPR01MB14966F65DE462D0A479B8ADD6F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAA4eK1LeyzuiRPZB+o7mO0pB6_=tpkjoum5Hj+t1SYydS4K2kQ@mail.gmail.com>
Dear Amit,
>
> Few comments:
> 1. +step "s0_compare" {
> + SELECT s0.lsn < s1.lsn
> + FROM local_lsn_store as s0, local_lsn_store as s1
> + WHERE s0.session = 0 AND s1.session = 1;
> +}
>
> This appears to be a bit tricky to compare the values. Doing a
> sequential scan won't guarantee the order of rows' appearance. Can't
> we somehow get the two rows ordered by session_id and compare their
> values?
I considered another way to use the CTE for session 0. How do you feel?
> 2.
> + else if (candidate_state->acquired_by != acquired_by)
> + {
> + if (initialized)
> + candidate_state->roident = InvalidRepOriginId;
> +
> elog(ERROR, "could not find replication state slot for replication
> origin with OID %u which was acquired by %d",
> node, acquired_by);
> + }
>
> This doesn't appear neat. Instead, how about checking this case before
> setting current_state as shown in attached. If we do that, we
> shouldn't even need new variables like current_state and initialized.
Your approach cannot work when the specified origin is not used yet after the
instance starts. In this case the origin has not exist in the replication_states
yet and new slot is initialized.
Per current understanding, two ERRORs are needed to avoid adding new variables;
first one is in the loop, and second one is in session_replication_state == NULL
case. Latter one indicates the case that origin is inactive but PID is specified
so different error message can be set.
> Additionally, as shown in attached, it is better to make this a
> user-facing error by using ereport.
Indeed, elog() were replaced with ereport().
> 3. Merge all patches as I don't see the need to do any backpatch here.
Sure.
Attached patch includes all changes. Thought?
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
[application/octet-stream] v9-0001-pg_replication_origin_session_setup-pid-parameter.patch (15.4K, 2-v9-0001-pg_replication_origin_session_setup-pid-parameter.patch)
download | inline diff:
From 7a41bc12c7464cc9dca9f3bfd6d0548538f64ed9 Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v9] pg_replication_origin_session_setup: pid parameter
Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
contrib/test_decoding/Makefile | 2 +-
.../test_decoding/expected/repl_origin.out | 79 +++++++++++++++++++
contrib/test_decoding/expected/replorigin.out | 3 +
contrib/test_decoding/meson.build | 1 +
contrib/test_decoding/specs/repl_origin.spec | 56 +++++++++++++
contrib/test_decoding/sql/replorigin.sql | 3 +
doc/src/sgml/func/func-admin.sgml | 23 +++++-
src/backend/catalog/system_functions.sql | 9 ++-
src/backend/replication/logical/origin.c | 24 +++++-
src/include/catalog/pg_proc.dat | 2 +-
10 files changed, 193 insertions(+), 9 deletions(-)
create mode 100644 contrib/test_decoding/expected/repl_origin.out
create mode 100644 contrib/test_decoding/specs/repl_origin.spec
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 02e961f4d31..8aa80054944 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error catalog_change_snapshot \
- skip_snapshot_restore invalidation_distribution
+ skip_snapshot_restore invalidation_distribution repl_origin
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/repl_origin.out b/contrib/test_decoding/expected/repl_origin.out
new file mode 100644
index 00000000000..9ef80217b9d
--- /dev/null
+++ b/contrib/test_decoding/expected/repl_origin.out
@@ -0,0 +1,79 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/repl_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s0_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+
+step s1_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s1_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+
+step s0_compare:
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+
+?column?
+--------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..cb9d63e20c1 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure inactive origin cannot be set as session one if pid is specified
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR: replication origin with ID 1 is inactive but PID -1 was specified
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
pg_replication_origin_create
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..6d687eeb2d7 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -64,6 +64,7 @@ tests += {
'slot_creation_error',
'skip_snapshot_restore',
'invalidation_distribution',
+ 'repl_origin',
],
'regress_args': [
'--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/repl_origin.spec b/contrib/test_decoding/specs/repl_origin.spec
new file mode 100644
index 00000000000..266ce553444
--- /dev/null
+++ b/contrib/test_decoding/specs/repl_origin.spec
@@ -0,0 +1,56 @@
+# Test multi-session replication origin manipulations; ensure local_lsn can be
+# updated by all attached sessions.
+
+setup
+{
+ SELECT pg_replication_origin_create('origin');
+ CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
+}
+
+teardown
+{
+ SELECT pg_replication_origin_drop('origin');
+ DROP TABLE local_lsn_store;
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
+step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s0_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+}
+step "s0_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+}
+step "s0_compare" {
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+}
+step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_setup" {
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/repl_origin/s0';
+}
+step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s1_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+}
+step "s1_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+}
+step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+
+# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
+# commits a transaction and store the local_lsn of the replication origin.
+# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..17f2b888238 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+-- ensure inactive origin cannot be set as session one if pid is specified
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..1b465bc8ba7 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
- </para></entry>
+ If multiple processes can safely use the same replication origin (for
+ example, parallel apply processes), the optional <parameter>pid</parameter>
+ parameter can be used to specify the process ID of the first process.
+ The first process must provide <parameter>pid</parameter> equals to
+ <literal>0</literal> and the other processes that share the same
+ replication origin should provide the process ID of the first process.
+ </para>
+ <caution>
+ <para>
+ When multiple processes share the same replication origin, it is critical
+ to maintain commit order to prevent data inconsistency. While processes
+ may send operations out of order, they must commit transactions in the
+ correct sequence to ensure proper replication consistency. The recommended workflow
+ for each worker is: set up the replication origin session with the first process's PID,
+ apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+ with the LSN and commit timestamp before committing, then commit the
+ transaction only if everything succeeded.
+ </para>
+ </caution>
+ </entry>
</row>
<row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..a88b0e6d1ea 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1167,6 +1167,14 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
curstate->roident, curstate->acquired_by)));
}
+ else if (curstate->acquired_by != acquired_by)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
+ node, acquired_by)));
+ }
+
/* ok, found slot */
session_replication_state = curstate;
break;
@@ -1181,6 +1189,13 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
errhint("Increase \"max_active_replication_origins\" and try again.")));
else if (session_replication_state == NULL)
{
+ /* The origin is not used but PID is specified */
+ if(acquired_by)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("replication origin with ID %d is inactive but PID %d was specified",
+ node, acquired_by)));
+
/* initialize new slot */
session_replication_state = &replication_states[free_slot];
Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
@@ -1193,9 +1208,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
if (acquired_by == 0)
session_replication_state->acquired_by = MyProcPid;
- else if (session_replication_state->acquired_by != acquired_by)
- elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
- node, acquired_by);
+ else
+ Assert(session_replication_state->acquired_by == acquired_by);
LWLockRelease(ReplicationOriginLock);
@@ -1374,12 +1388,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
--
2.47.3
view thread (46+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected]
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
In-Reply-To: <OSCPR01MB14966B68C2148C1BC462AA906F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox