public inbox for [email protected]
help / color / mirror / Atom feedFrom: Hayato Kuroda (Fujitsu) <[email protected]>
To: 'Amit Kapila' <[email protected]>
Cc: Doruk Yilmaz <[email protected]>
Cc: [email protected] <[email protected]>
Cc: Hayato Kuroda (Fujitsu) <[email protected]>
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
Date: Thu, 18 Sep 2025 07:37:11 +0000
Message-ID: <OSCPR01MB14966F65DE462D0A479B8ADD6F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com> (raw)
In-Reply-To: <OS7PR01MB149681B14B2432A9CEA7A3586F517A@OS7PR01MB14968.jpnprd01.prod.outlook.com>
References: <CAMPB6wfe4zLjJL8jiZV5kjjpwBM2=rTRme0UCL7Ra4L8MTVdOg@mail.gmail.com>
<[email protected]>
<CAMPB6we7+97L72Ru0=WxMDi24xMbZgr2B8Nwoo5i=r=UNuG_gQ@mail.gmail.com>
<[email protected]>
<CAA4eK1JfPPFTmz7mUk26zPH8+qH9UBpkquxw75x7Ngx_D_6XXQ@mail.gmail.com>
<CAMPB6wfgvWjSvKNPoJkRqaL46geRDoL++Pt_3Czc2QNAdpVQHw@mail.gmail.com>
<CAA4eK1JC6yB6q52qEZ5dLNWRUEZoO-aa_XKBZ3_mcb=V2z7zug@mail.gmail.com>
<CAMPB6weUqU6P2w5VUGVSLKWcvU1AQHmW+7O9qc9yD4CB5kEYVA@mail.gmail.com>
<CAA4eK1Lm_W5j3DPj6PDSTyodGu87QgxpNwwsi-wVR0+B1FSOoA@mail.gmail.com>
<CAMPB6wckvkKrXVPH5j8Ske2cVedkb-TRLdnOb5e74zYM1CynGw@mail.gmail.com>
<CAA4eK1+NDjprcKvr0p2GDMTCs9yxFCY41bOd+6avqAm2n+TXdQ@mail.gmail.com>
<CAMPB6wdc10tc7gpVXG75r51M41zVSabip9Lz7hssWEtyhecWww@mail.gmail.com>
<OSCPR01MB14966201F1DCB853145912FF1F53DA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAMPB6wcOWBURHB1igRgCjD3geAemdoATfkKByMwrMM1TgMN64w@mail.gmail.com>
<OSCPR01MB14966BF4CA9B767C259DBD9CDF53EA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAMPB6wdtKZLEeZ7UW=DYmXWv8Y=uVGrDqXTMhT19Z4VTzo3cfg@mail.gmail.com>
<CAA4eK1LHVd8wQzauWgeEV436L7btrCfujPH1sR196sY_Mp8zYA@mail.gmail.com>
<CAMPB6wdPtjbR93oB1XJtYkRtTR64BJG4o5a+0DSSez=puuyuGA@mail.gmail.com>
<OSCPR01MB14966FC456D053AB00A2EB278F514A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
<CAA4eK1++mHd-SsHyJd2ZB26F7kCz--LbtjQLuQ0h3z9mcYK-AQ@mail.gmail.com>
<OS7PR01MB149681B14B2432A9CEA7A3586F517A@OS7PR01MB14968.jpnprd01.prod.outlook.com>
Dear hackers,
> I considered a test, please see attached files. 0001 was not changed from v6 and
> 0002 contained tests. Here, two sessions were opened and confirmed that they
> can
> set the same origin.
After considering and verifying more, it is more efficient to test via isolation
tester. Attached patchset does like that.
On my env, the duration became 10x faster because it does not start the instance
within the test.
In the test file, two sessions s0 and s1 are launched, they set the same session
origin. They insert local_lsn to a table and confirm latter insertion has larger
value.
One hacky point is to obtain pid for s0 from s1. Below contains my analysis.
application_name is controlled by the isolation_main.c and isolationtester.c.
When the isolation test works, initially isolation_main starts and launches
isolaiontester process, one per spec file.
In main.c, the application_name is set to "isolation/${testname}" at the starter
function. Then, after isolationtester parses the spec file, it appends given
name to each session. This is done at line 193.
Best regards,
Hayato Kuroda
FUJITSU LIMITED
Attachments:
[application/octet-stream] v8-0001-pg_replication_origin_session_setup-pid-parameter.patch (5.9K, 2-v8-0001-pg_replication_origin_session_setup-pid-parameter.patch)
download | inline diff:
From 6315206a177be337669883376218cef362d8412b Mon Sep 17 00:00:00 2001
From: Doruk <[email protected]>
Date: Mon, 8 Sep 2025 14:22:15 +0300
Subject: [PATCH v8 1/3] pg_replication_origin_session_setup: pid parameter
Since the introduction of parallel apply workers (commit 216a784829c),
the replorigin_session_setup() was extended to accept an extra
parameter: pid. This process ID is used to inform that multiple
processes are sharing the same replication origin to apply changes in
parallel. The replorigin_session_setup function has a SQL user
interface: pg_replication_origin_session_setup. This commit adds an
optional parameter that passes the process ID to the internal function
replorigin_session_setup. It allows multiple processes to use the same
replication origin if you are using the replication functions.
---
doc/src/sgml/func/func-admin.sgml | 23 +++++++++++++++++++++--
src/backend/catalog/system_functions.sql | 9 ++++++++-
src/backend/replication/logical/origin.c | 4 +++-
src/include/catalog/pg_proc.dat | 2 +-
4 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml
index 57ff333159f..1b465bc8ba7 100644
--- a/doc/src/sgml/func/func-admin.sgml
+++ b/doc/src/sgml/func/func-admin.sgml
@@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
<indexterm>
<primary>pg_replication_origin_session_setup</primary>
</indexterm>
- <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> )
+ <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> )
<returnvalue>void</returnvalue>
</para>
<para>
@@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
origin, allowing replay progress to be tracked.
Can only be used if no origin is currently selected.
Use <function>pg_replication_origin_session_reset</function> to undo.
- </para></entry>
+ If multiple processes can safely use the same replication origin (for
+ example, parallel apply processes), the optional <parameter>pid</parameter>
+ parameter can be used to specify the process ID of the first process.
+ The first process must provide <parameter>pid</parameter> equals to
+ <literal>0</literal> and the other processes that share the same
+ replication origin should provide the process ID of the first process.
+ </para>
+ <caution>
+ <para>
+ When multiple processes share the same replication origin, it is critical
+ to maintain commit order to prevent data inconsistency. While processes
+ may send operations out of order, they must commit transactions in the
+ correct sequence to ensure proper replication consistency. The recommended workflow
+ for each worker is: set up the replication origin session with the first process's PID,
+ apply changes within transactions, call <function>pg_replication_origin_xact_setup</function>
+ with the LSN and commit timestamp before committing, then commit the
+ transaction only if everything succeeded.
+ </para>
+ </caution>
+ </entry>
</row>
<row>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 566f308e443..2d946d6d9e9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -650,6 +650,13 @@ LANGUAGE INTERNAL
CALLED ON NULL INPUT VOLATILE PARALLEL SAFE
AS 'pg_stat_reset_slru';
+CREATE OR REPLACE FUNCTION
+ pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0)
+RETURNS void
+LANGUAGE INTERNAL
+STRICT VOLATILE PARALLEL UNSAFE
+AS 'pg_replication_origin_session_setup';
+
--
-- The default permissions for functions mean that anyone can execute them.
-- A number of functions shouldn't be executable by just anyone, but rather
@@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM
REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public;
-REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public;
REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 87f10e50dcc..98d47e1beb8 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
{
char *name;
RepOriginId origin;
+ int pid;
replorigin_check_prerequisites(true, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
origin = replorigin_by_name(name, false);
- replorigin_session_setup(origin, 0);
+ pid = PG_GETARG_INT32(1);
+ replorigin_session_setup(origin, pid);
replorigin_session_origin = origin;
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 03e82d28c87..01eba3b5a19 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -12235,7 +12235,7 @@
{ oid => '6006',
descr => 'configure session to maintain replication progress tracking for the passed in origin',
proname => 'pg_replication_origin_session_setup', provolatile => 'v',
- proparallel => 'u', prorettype => 'void', proargtypes => 'text',
+ proparallel => 'u', prorettype => 'void', proargtypes => 'text int4',
prosrc => 'pg_replication_origin_session_setup' },
{ oid => '6007', descr => 'teardown configured replication progress tracking',
--
2.47.3
[application/octet-stream] v8-0002-add-test.patch (6.3K, 3-v8-0002-add-test.patch)
download | inline diff:
From ee0f1539ddb8d68cc80af229fa0afb5d58e55e92 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 17 Sep 2025 15:20:45 +0900
Subject: [PATCH v8 2/3] add test
---
contrib/test_decoding/Makefile | 2 +-
.../test_decoding/expected/repl_origin.out | 79 +++++++++++++++++++
contrib/test_decoding/meson.build | 1 +
contrib/test_decoding/specs/repl_origin.spec | 56 +++++++++++++
4 files changed, 137 insertions(+), 1 deletion(-)
create mode 100644 contrib/test_decoding/expected/repl_origin.out
create mode 100644 contrib/test_decoding/specs/repl_origin.spec
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 02e961f4d31..8aa80054944 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -9,7 +9,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
twophase_snapshot slot_creation_error catalog_change_snapshot \
- skip_snapshot_restore invalidation_distribution
+ skip_snapshot_restore invalidation_distribution repl_origin
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/repl_origin.out b/contrib/test_decoding/expected/repl_origin.out
new file mode 100644
index 00000000000..9ef80217b9d
--- /dev/null
+++ b/contrib/test_decoding/expected/repl_origin.out
@@ -0,0 +1,79 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset
+step s0_setup: SELECT pg_replication_origin_session_setup('origin');
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s0_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s1_setup:
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/repl_origin/s0';
+
+pg_replication_origin_session_setup
+-----------------------------------
+
+(1 row)
+
+step s1_is_setup: SELECT pg_replication_origin_session_is_setup();
+pg_replication_origin_session_is_setup
+--------------------------------------
+t
+(1 row)
+
+step s0_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s0_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+
+step s1_add_message:
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+
+?column?
+--------
+ 1
+(1 row)
+
+step s1_store_lsn:
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+
+step s0_compare:
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+
+?column?
+--------
+t
+(1 row)
+
+step s0_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
+step s1_reset: SELECT pg_replication_origin_session_reset();
+pg_replication_origin_session_reset
+-----------------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/meson.build b/contrib/test_decoding/meson.build
index 25f6b8a9082..6d687eeb2d7 100644
--- a/contrib/test_decoding/meson.build
+++ b/contrib/test_decoding/meson.build
@@ -64,6 +64,7 @@ tests += {
'slot_creation_error',
'skip_snapshot_restore',
'invalidation_distribution',
+ 'repl_origin',
],
'regress_args': [
'--temp-config', files('logical.conf'),
diff --git a/contrib/test_decoding/specs/repl_origin.spec b/contrib/test_decoding/specs/repl_origin.spec
new file mode 100644
index 00000000000..266ce553444
--- /dev/null
+++ b/contrib/test_decoding/specs/repl_origin.spec
@@ -0,0 +1,56 @@
+# Test multi-session replication origin manipulations; ensure local_lsn can be
+# updated by all attached sessions.
+
+setup
+{
+ SELECT pg_replication_origin_create('origin');
+ CREATE UNLOGGED TABLE local_lsn_store (session int, lsn pg_lsn);
+}
+
+teardown
+{
+ SELECT pg_replication_origin_drop('origin');
+ DROP TABLE local_lsn_store;
+}
+
+session "s0"
+setup { SET synchronous_commit = on; }
+step "s0_setup" { SELECT pg_replication_origin_session_setup('origin'); }
+step "s0_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s0_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s0');
+}
+step "s0_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 0, local_lsn FROM pg_replication_origin_status;
+}
+step "s0_compare" {
+ SELECT s0.lsn < s1.lsn
+ FROM local_lsn_store as s0, local_lsn_store as s1
+ WHERE s0.session = 0 AND s1.session = 1;
+}
+step "s0_reset" { SELECT pg_replication_origin_session_reset(); }
+
+session "s1"
+setup { SET synchronous_commit = on; }
+step "s1_setup" {
+ SELECT pg_replication_origin_session_setup('origin', pid)
+ FROM pg_stat_activity
+ WHERE application_name = 'isolation/repl_origin/s0';
+}
+step "s1_is_setup" { SELECT pg_replication_origin_session_is_setup(); }
+step "s1_add_message" {
+ SELECT 1
+ FROM pg_logical_emit_message(true, 'prefix', 'message on s1');
+}
+step "s1_store_lsn" {
+ INSERT INTO local_lsn_store
+ SELECT 1, local_lsn FROM pg_replication_origin_status;
+}
+step "s1_reset" { SELECT pg_replication_origin_session_reset(); }
+
+# Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions
+# commits a transaction and store the local_lsn of the replication origin.
+# Compare LSNs and expect latter transaction (done by s1) has larger local_lsn.
+permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset"
--
2.47.3
[application/octet-stream] v8-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patch (4.5K, 4-v8-0003-Avoid-setting-ReplicationState-in-case-of-ERROR.patch)
download | inline diff:
From 601cbb02265a5373d298e6803715d30c2370a111 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 17 Sep 2025 18:15:33 +0900
Subject: [PATCH v8 3/3] Avoid setting ReplicationState in case of ERROR
---
contrib/test_decoding/expected/replorigin.out | 3 ++
contrib/test_decoding/sql/replorigin.sql | 3 ++
src/backend/replication/logical/origin.c | 31 +++++++++++++------
3 files changed, 27 insertions(+), 10 deletions(-)
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c85e1a01b23..4f64ea8942f 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists.
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+ERROR: could not find replication state slot for replication origin with OID 1 which was acquired by -1
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
pg_replication_origin_create
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e71ee02d050..d899d5cdc18 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
-- ensure duplicate creations fail
SELECT pg_replication_origin_create('regress_test_decoding: regression_slot');
+-- ensure session setup with invalid pid fail
+SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1);
+
--ensure deletions work (once)
SELECT pg_replication_origin_create('regress_test_decoding: temp');
SELECT pg_replication_origin_drop('regress_test_decoding: temp');
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 98d47e1beb8..0bbc96bcee5 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -1122,6 +1122,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
static bool registered_cleanup;
int i;
int free_slot = -1;
+ ReplicationState *candidate_state = NULL;
+ bool initialized = false;
if (!registered_cleanup)
{
@@ -1168,34 +1170,43 @@ replorigin_session_setup(RepOriginId node, int acquired_by)
}
/* ok, found slot */
- session_replication_state = curstate;
+ candidate_state = curstate;
break;
}
- if (session_replication_state == NULL && free_slot == -1)
+ if (candidate_state == NULL && free_slot == -1)
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
errmsg("could not find free replication state slot for replication origin with ID %d",
node),
errhint("Increase \"max_active_replication_origins\" and try again.")));
- else if (session_replication_state == NULL)
+ else if (candidate_state == NULL)
{
/* initialize new slot */
- session_replication_state = &replication_states[free_slot];
- Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
- Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
- session_replication_state->roident = node;
+ candidate_state = &replication_states[free_slot];
+ Assert(candidate_state->remote_lsn == InvalidXLogRecPtr);
+ Assert(candidate_state->local_lsn == InvalidXLogRecPtr);
+ candidate_state->roident = node;
+ initialized = true;
}
- Assert(session_replication_state->roident != InvalidRepOriginId);
+ Assert(candidate_state->roident != InvalidRepOriginId);
if (acquired_by == 0)
- session_replication_state->acquired_by = MyProcPid;
- else if (session_replication_state->acquired_by != acquired_by)
+ candidate_state->acquired_by = MyProcPid;
+ else if (candidate_state->acquired_by != acquired_by)
+ {
+ if (initialized)
+ candidate_state->roident = InvalidRepOriginId;
+
elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d",
node, acquired_by);
+ }
+
+ /* Candidate slot looks ok, use it */
+ session_replication_state = candidate_state;
LWLockRelease(ReplicationOriginLock);
--
2.47.3
view thread (46+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected]
Subject: RE: [Patch] add new parameter to pg_replication_origin_session_setup
In-Reply-To: <OSCPR01MB14966F65DE462D0A479B8ADD6F516A@OSCPR01MB14966.jpnprd01.prod.outlook.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox