public inbox for [email protected]
help / color / mirror / Atom feedFrom: Ajin Cherian <[email protected]>
To: Zhijie Hou (Fujitsu) <[email protected]>
Cc: Hayato Kuroda (Fujitsu) <[email protected]>
Cc: shveta malik <[email protected]>
Cc: Amit Kapila <[email protected]>
Cc: Ashutosh Sharma <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Subject: Re: [PATCH] Support automatic sequence replication
Date: Thu, 26 Mar 2026 15:25:03 +1100
Message-ID: <CAFPTHDYRMXVo60zMbbGyOOZn3d_ap+Htca7echfFJEE2dQUDsQ@mail.gmail.com> (raw)
In-Reply-To: <TY4PR01MB16907CE099565A5F1A311B2599440A@TY4PR01MB16907.jpnprd01.prod.outlook.com>
References: <CAFPTHDZXX9WQ_X1ZfEvS248T+pKuk6SmCnXcvgPM059N1xPUfA@mail.gmail.com>
<CAJpy0uDLUEjHHME8om1vAf6qkXCeRR6cBvkpK8yWBAC=T0ZFLA@mail.gmail.com>
<CAFPTHDZg1JrunGgOj332hr+gUuH_Jm7skqPpYSvd-QE3yEdRDQ@mail.gmail.com>
<CAJpy0uBz7MCSUkvFJD9ij65vBahNmY+bfCgdGKRqXovYs+K_TA@mail.gmail.com>
<CAJpy0uDsuNqjWd-TmGBxqSS1rnVCJ3B8=SYrtxQ=Vs8kb71QFA@mail.gmail.com>
<CAJpy0uAMWg3KcXtVBS7B0rnchLNrCCVYBByJCzAp=u5LERgtfA@mail.gmail.com>
<CAFPTHDZwEhxhDAeqcPi0GuYN6xBs8gFXHOMUnbg3u2Xigcz4Zg@mail.gmail.com>
<CAE9k0PmTyCU1A9YEf+MRgfeZ8yK1bAYJu=o0bH8DNUTzXejQyQ@mail.gmail.com>
<CAA4eK1L6czEzG4mLNZSyjYC5nX0FMSjjk3csKuxPD3Ph5-7Yvw@mail.gmail.com>
<CAJpy0uAhGQJ=msVsn2GsqWXr+YESJK6x9NBvrUtKvtvp1OVuKQ@mail.gmail.com>
<CAJpy0uAOuu-M6wobH2wHOdTymm-cX9+MqwPyRNoOt=sPKBdCew@mail.gmail.com>
<CAFPTHDZiWYXoKoo4VcBYNH9a=gxDZhfkcBeXt5w6cLw4_ysyKw@mail.gmail.com>
<OS9PR01MB12149D9054CC7F2DC3F0D26A1F577A@OS9PR01MB12149.jpnprd01.prod.outlook.com>
<CAA4eK1KYxQALt46k5uWOO6SUtNjvjOaXwfNjH0AU656YrcGZEw@mail.gmail.com>
<CAFPTHDZYonM+SXG19VVjgWduXQJSuDhcOUWq0NCiiuQubCSt6g@mail.gmail.com>
<CAFPTHDYud1zr0VyizhyhEQXfHMgXVcHrPzE56WUKGCFNskQq2A@mail.gmail.com>
<CAA4eK1JTau3fV7br6xwAV+LXXwM65RuGCuM2J3PQpCONtL1KXA@mail.gmail.com>
<OS9PR01MB1691377CDB1468CDC9820BBEB9470A@OS9PR01MB16913.jpnprd01.prod.outlook.com>
<TY4PR01MB1690715895CDE6FEFA13C2A2C947EA@TY4PR01MB16907.jpnprd01.prod.outlook.com>
<CAJpy0uA1txsV5RhjZjLBDrUjvxVyBDtMXzHr6=DzLHf7ybBrqg@mail.gmail.com>
<TY4PR01MB1690739DE978BCBD12358478E947DA@TY4PR01MB16907.jpnprd01.prod.outlook.com>
<CAJpy0uAfu-VPqCknLLvJ+PUx_cyoR-b70xUNT6Pyv8N-odKizQ@mail.gmail.com>
<CAJpy0uBeAdz6-3P26Eryeq3TyjA-GiKY3z0hFMxzZD=AYGqQ3Q@mail.gmail.com>
<TY4PR01MB169072DEE7CC20E9B06F0164C947DA@TY4PR01MB16907.jpnprd01.prod.outlook.com>
<CAJpy0uC0T_tp62zxJN_2d_A=Ypvf14ebjGFepckeJugW5OHOyA@mail.gmail.com>
<CAJpy0uAmEkjsBS6RxPv9iDcK2kfJ5=bq4Mq1zMCQtaYFoDfbbQ@mail.gmail.com>
<TY4PR01MB169072D89B6A978E60388B87A9445A@TY4PR01MB16907.jpnprd01.prod.outlook.com>
<OS9PR01MB121497F8BFA0114DDF6523DC7F545A@OS9PR01MB12149.jpnprd01.prod.outlook.com>
<TY4PR01MB16907CE099565A5F1A311B2599440A@TY4PR01MB16907.jpnprd01.prod.outlook.com>
On Mon, Mar 16, 2026 at 5:59 PM Zhijie Hou (Fujitsu)
<[email protected]> wrote:
> Overall, we will continue working on this to improve the patch set, but will
> schedule it for PG20.
>
Just rebasing the patch so that it doesn't break cfbot.
regards,
Ajin Cherian
Fujitsu Australia
Attachments:
[application/octet-stream] v12-0003-Synchronize-sequences-directly-in-REFRESH-SEQUEN.patch (15.7K, 2-v12-0003-Synchronize-sequences-directly-in-REFRESH-SEQUEN.patch)
download | inline diff:
From 832496b6175514eb8a00203f376809f3688a26b5 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <[email protected]>
Date: Thu, 26 Mar 2026 14:41:41 +1100
Subject: [PATCH v12 3/3] Synchronize sequences directly in REFRESH SEQUENCES
command.
The ALTER SUBSCRIPTION ... REFRESH SEQUENCES command currently sets all
sequence states in pg_subscription_rel to INIT and relies on the sequence sync
worker to perform the actual synchronization and update states to READY.
With the recent change making the sequence sync worker long-lived, most
sequences are now synchronized in the background, reducing the need for
REFRESH SEQUENCES. However, the command remains necessary for sequences that
haven't been synchronized.
This commit enhances REFRESH SEQUENCES to synchronize sequences directly within
the command itself, eliminating the overhead of launching a worker and updating
catalog entries unnecessarily.
---
doc/src/sgml/logical-replication.sgml | 5 +-
src/backend/commands/subscriptioncmds.c | 27 ++--
.../replication/logical/sequencesync.c | 148 +++++++++++++-----
src/include/replication/logicalworker.h | 5 +
src/test/subscription/t/036_sequences.pl | 49 ++++++
5 files changed, 176 insertions(+), 58 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 3865f617816..e3472cc952e 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1791,8 +1791,9 @@ Publications:
<para>
A <firstterm>sequence synchronization worker</firstterm> will be started
- after executing any of the above subscriber commands. The worker will
- remain running for the life of the subscription, periodically
+ after executing <command>CREATE SUBSCRIPTION</command> or
+ <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command> command. The
+ worker will remain running for the life of the subscription, periodically
synchronizing all published sequences.
</para>
<para>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 7375e214cb4..bf4fef91f56 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1288,25 +1288,20 @@ AlterSubscription_refresh_seq(Subscription *sub)
PG_TRY();
{
- List *subrel_states;
-
check_publications_origin_sequences(wrconn, sub->publications, true,
sub->origin, NULL, 0, sub->name);
- /* Get local sequence list. */
- subrel_states = GetSubscriptionRelations(sub->oid, false, true, false);
- foreach_ptr(SubscriptionRelState, subrel, subrel_states)
- {
- Oid relid = subrel->relid;
-
- UpdateSubscriptionRelState(sub->oid, relid, SUBREL_STATE_INIT,
- InvalidXLogRecPtr, false);
- ereport(DEBUG1,
- errmsg_internal("sequence \"%s.%s\" of subscription \"%s\" set to INIT state",
- get_namespace_name(get_rel_namespace(relid)),
- get_rel_name(relid),
- sub->name));
- }
+ /*
+ * Stop the sequencesync worker to prevent concurrent updates. This
+ * avoids a race condition where the sequence value could be updated
+ * by this command and then immediately moved backward by a
+ * concurrently running worker. Stopping the worker is safe even if it
+ * attempts to restart, as it will wait on the subscription lock
+ * already held by this ALTER SUBSCRIPTION command.
+ */
+ logicalrep_worker_stop(WORKERTYPE_SEQUENCESYNC, sub->oid, InvalidOid);
+
+ AlterSubSyncSequences(wrconn, sub->oid, sub->name, sub->runasowner);
}
PG_FINALLY();
{
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index d12bd31f09d..bead305156d 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -217,7 +217,7 @@ get_sequences_string(List *seqindexes, List *seqinfos, StringInfo buf)
*/
static void
report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
- List *missing_seqs_idx, List *seqinfos)
+ List *missing_seqs_idx, List *seqinfos, char *subname)
{
StringInfo seqstr;
@@ -263,7 +263,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
- MySubscription->name));
+ subname));
}
/*
@@ -420,10 +420,9 @@ check_seq_privileges_and_drift(LogicalRepSequenceInfo *seqinfo,
*/
static CopySeqResult
copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
- bool update_lsn)
+ Oid subid, bool run_as_owner, bool update_lsn)
{
UserContext ucxt;
- bool run_as_owner = MySubscription->runasowner;
Oid seqoid = seqinfo->localrelid;
CopySeqResult result;
bool need_lsn_update = false;
@@ -467,8 +466,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
XLogRecPtr local_page_lsn;
/* Get the local page LSN for comparison with the remote value */
- (void) GetSubscriptionRelState(MySubscription->oid,
- RelationGetRelid(sequence_rel),
+ (void) GetSubscriptionRelState(subid, RelationGetRelid(sequence_rel),
&local_page_lsn);
need_lsn_update = (local_page_lsn != seqinfo->page_lsn);
@@ -480,7 +478,7 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
* cycle (update_lsn is true).
*/
if (seqinfo->relstate == SUBREL_STATE_INIT || need_lsn_update)
- UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
+ UpdateSubscriptionRelState(subid, seqoid, SUBREL_STATE_READY,
seqinfo->page_lsn, false);
return COPYSEQ_SUCCESS;
@@ -499,7 +497,8 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
* Returns true/false if any sequences were actually copied.
*/
static bool
-copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
+copy_sequences(WalReceiverConn *conn, List *seqinfos, Oid subid, char *subname,
+ bool runasowner, bool update_lsn)
{
int cur_batch_base_index = 0;
int n_seqinfos = list_length(seqinfos);
@@ -528,11 +527,16 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
int batch_no_drift = 0;
int batch_missing_count;
Relation sequence_rel = NULL;
+ bool started_tx = false;
WalRcvExecResult *res;
TupleTableSlot *slot;
- StartTransactionCommand();
+ if (!IsTransactionState())
+ {
+ StartTransactionCommand();
+ started_tx = true;
+ }
for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
{
@@ -624,14 +628,15 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
&seqinfo, &seqidx, seqinfos);
if (sync_status == COPYSEQ_ALLOWED)
- sync_status = copy_sequence(seqinfo, sequence_rel, update_lsn);
+ sync_status = copy_sequence(seqinfo, sequence_rel, subid,
+ runasowner, update_lsn);
switch (sync_status)
{
case COPYSEQ_SUCCESS:
elog(DEBUG1,
"logical replication synchronization has updated sequence \"%s.%s\" in subscription \"%s\"",
- seqinfo->nspname, seqinfo->seqname, MySubscription->name);
+ seqinfo->nspname, seqinfo->seqname, subname);
batch_succeeded_count++;
sequence_copied = true;
break;
@@ -704,13 +709,17 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
elog(DEBUG1,
"logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped, %d no drift",
- MySubscription->name,
+ subname,
(cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
batch_size, batch_succeeded_count, batch_mismatched_count,
batch_insuffperm_count, batch_missing_count, batch_skipped_count, batch_no_drift);
- /* Commit this batch, and prepare for next batch */
- CommitTransactionCommand();
+ /*
+ * Commit this batch if started a transaction, and prepare for next
+ * batch.
+ */
+ if (started_tx)
+ CommitTransactionCommand();
if (batch_missing_count)
{
@@ -735,7 +744,7 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
/* Report mismatches, permission issues, or missing sequences */
report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
- missing_seqs_idx, seqinfos);
+ missing_seqs_idx, seqinfos, subname);
return sequence_copied;
}
@@ -751,37 +760,28 @@ invalidate_syncing_sequence_infos(Datum arg, SysCacheIdentifier cacheid,
}
/*
- * Get the list of sequence information for the current subscription.
+ * Get the list of sequence information for the given subscription from
+ * catalog.
*
- * Return cached sequence states if valid; otherwise fetches them from the
- * catalog, caches the result, and return it.
+ * All entries in the returned list are allocated in the specified memory
+ * context.
*/
static List *
-fetch_sequence_infos(void)
+fetch_sequences_from_catalog(Oid subid, MemoryContext ctx)
{
Relation rel;
HeapTuple tup;
ScanKeyData skey[1];
SysScanDesc scan;
- Oid subid = MyLogicalRepWorker->subid;
- List *tmp_seqinfos = NIL;
+ List *seqinfos = NIL;
+ bool started_tx = false;
- if (sequence_infos_valid)
- return sequence_infos;
-
- /* Free the existing invalid cache entries */
- foreach_ptr(LogicalRepSequenceInfo, seqinfo, sequence_infos)
+ if (!IsTransactionState())
{
- pfree(seqinfo->nspname);
- pfree(seqinfo->seqname);
- pfree(seqinfo);
+ StartTransactionCommand();
+ started_tx = true;
}
- list_free(sequence_infos);
- sequence_infos = NIL;
-
- StartTransactionCommand();
-
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
/* Scan for all sequences belonging to this subscription */
@@ -822,14 +822,14 @@ fetch_sequence_infos(void)
Assert(relstate == SUBREL_STATE_INIT || relstate == SUBREL_STATE_READY);
- /* Cache the information in a permanent memory context */
- oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+ /* Cache the information in the given memory context */
+ oldctx = MemoryContextSwitchTo(ctx);
seq = palloc0_object(LogicalRepSequenceInfo);
seq->localrelid = subrel->srrelid;
seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
seq->relstate = relstate;
- tmp_seqinfos = lappend(tmp_seqinfos, seq);
+ seqinfos = lappend(seqinfos, seq);
MemoryContextSwitchTo(oldctx);
table_close(sequence_rel, NoLock);
@@ -839,10 +839,38 @@ fetch_sequence_infos(void)
systable_endscan(scan);
table_close(rel, AccessShareLock);
- sequence_infos = tmp_seqinfos;
- sequence_infos_valid = true;
+ if (started_tx)
+ CommitTransactionCommand();
- CommitTransactionCommand();
+ return seqinfos;
+}
+
+/*
+ * Get the list of sequence information for the current subscription.
+ *
+ * Return cached sequence states if valid; otherwise fetches them from the
+ * catalog, caches the result, and return it.
+ */
+static List *
+fetch_sequence_infos(void)
+{
+ if (sequence_infos_valid)
+ return sequence_infos;
+
+ /* Free the existing invalid cache entries */
+ foreach_ptr(LogicalRepSequenceInfo, seqinfo, sequence_infos)
+ {
+ pfree(seqinfo->nspname);
+ pfree(seqinfo->seqname);
+ pfree(seqinfo);
+ }
+
+ list_free(sequence_infos);
+ sequence_infos = NIL;
+
+ sequence_infos = fetch_sequences_from_catalog(MySubscription->oid,
+ CacheMemoryContext);
+ sequence_infos_valid = true;
return sequence_infos;
}
@@ -947,6 +975,9 @@ start_sequence_sync(void)
seqinfos = fetch_sequence_infos();
sequence_copied = copy_sequences(LogRepWorkerWalRcvConn, seqinfos,
+ MySubscription->oid,
+ MySubscription->name,
+ MySubscription->runasowner,
update_lsn);
MemoryContextReset(SequenceSyncContext);
@@ -1016,3 +1047,40 @@ SequenceSyncWorkerMain(Datum main_arg)
FinishSyncWorker();
}
+
+/*
+ * Wrapper for LogicalRepSyncSequences to synchronize all sequences of a
+ * subscription from outside the sequencesync worker.
+ */
+void
+AlterSubSyncSequences(WalReceiverConn *conn, Oid subid, char *subname,
+ bool runasowner)
+{
+ List *seqinfos;
+
+ Assert(!SequenceSyncContext);
+
+ /*
+ * Fetch sequences directly from the catalog rather than using the
+ * sequence cache, which is maintained only for the sequence sync
+ * worker.
+ */
+ seqinfos = fetch_sequences_from_catalog(subid, CurrentMemoryContext);
+
+ PG_TRY();
+ {
+ /*
+ * Use the current memory context for synchronization. Since this should
+ * be short-lived command context that will be cleaned up automatically,
+ * we can simply assign it as the synchronization context.
+ */
+ SequenceSyncContext = CurrentMemoryContext;
+
+ (void) copy_sequences(conn, seqinfos, subid, subname, runasowner, true);
+ }
+ PG_FINALLY();
+ {
+ SequenceSyncContext = NULL;
+ }
+ PG_END_TRY();
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index 7d748a28da8..73afd7853d0 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -14,6 +14,8 @@
#include <signal.h>
+#include "replication/walreceiver.h"
+
extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending;
extern void ApplyWorkerMain(Datum main_arg);
@@ -31,4 +33,7 @@ extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+extern void AlterSubSyncSequences(WalReceiverConn *conn, Oid subid,
+ char *subname, bool runasowner);
+
#endif /* LOGICALWORKER_H */
diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl
index af190713b2b..8d25ac40ce0 100644
--- a/src/test/subscription/t/036_sequences.pl
+++ b/src/test/subscription/t/036_sequences.pl
@@ -176,4 +176,53 @@ $node_subscriber->wait_for_log(
qr/WARNING: ( [A-Z0-9]+:)? missing sequence on publisher \("public.regress_s4"\)/,
$log_offset);
+##########
+# ALTER SUBSCRIPTION ... REFRESH SEQUENCES synchronizes sequences online,
+# eliminating the need to launch a sequencesync worker.
+##########
+
+# Reduce max_logical_replication_workers to disallow sequence worker from running
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_logical_replication_workers = 0));
+$node_subscriber->restart;
+
+# Verify there is no logical replication apply worker running
+$result = $node_subscriber->safe_psql(
+ 'postgres',
+ "SELECT count(*) FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'");
+
+is($result, '0', 'no logical replication worker is running');
+
+# Increment sequence on publisher
+$node_publisher->safe_psql('postgres',
+ qq(SELECT nextval('regress_s1');));
+
+# The command should fail due to missing sequence ('regress_s4')
+my ($cmdret, $stdout, $stderr) = $node_subscriber->psql('postgres',
+ "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;");
+
+like(
+ $stderr,
+ qr/WARNING: missing sequence on publisher \("public.regress_s4"\)/,
+ "output the wanring for the missing sequence regress_s4");
+
+like(
+ $stderr,
+ qr/ERROR: logical replication sequence synchronization failed for subscription \"regress_seq_sub\"/,
+ "the command failed due to the missing sequence regress_s4");
+
+# Refresh the publication to remove the missing sequence
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION;");
+
+# Sync the sequence regress_s1
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;");
+
+# Get the current sequence value on subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ qq(SELECT last_value FROM regress_s1;));
+
+is($result, '201', 'sequence regress_s1 is synced now');
+
done_testing();
--
2.47.3
[application/octet-stream] v12-0001-Support-automatic-sequence-replication.patch (46.3K, 3-v12-0001-Support-automatic-sequence-replication.patch)
download | inline diff:
From 39164ea8103efa803d93af06ac2ae9be4c6037fe Mon Sep 17 00:00:00 2001
From: Ajin Cherian <[email protected]>
Date: Thu, 26 Mar 2026 14:15:21 +1100
Subject: [PATCH v12 1/3] Support automatic sequence replication.
Currently, sequence values are synchronized from publisher to subscriber only
when the user manually runs ALTER SUBSCRIPTION ... REFRESH PUBLICATION (which
affects only newly subscribed sequences) or REFRESH SEQUENCES. The sequence sync
worker exits immediately after completing each synchronization round.
The primary use case for sequence replication is during upgrades, where it's
recommended that users ensure sequences are in sync by running REFRESH SEQUENCES
before finishing the upgrade. However, this command can be slow when
synchronizing a large number of sequences, potentially increasing downtime.
To address this, this commit makes the sequence sync worker long-lived,
continuously monitoring sequences and resynchronizing them when drift is
detected. The worker uses an adaptive sleep interval: it starts at 2 seconds,
doubles up to a maximum of 30 seconds when no drift is observed, and resets to
the minimum interval once drift is found.
With this change, most sequences are silently synchronized in the background,
eliminating the need to run REFRESH SEQUENCES for the majority of cases.
However, frequently updated sequences may still lag behind, requiring a final
REFRESH SEQUENCES before upgrade completion. Users can monitor progress by
checking whether sequence states transition from INIT to READY in
pg_subscription_rel.
The REFRESH SEQUENCES command is retained for this final synchronization step,
though it currently updates all sequence states to INIT, which has room for
improvement. A future patch will enhance this command to synchronize sequences
directly without launching a worker, reducing catalog overhead.
---
doc/src/sgml/logical-replication.sgml | 39 +-
doc/src/sgml/ref/alter_subscription.sgml | 5 -
src/backend/commands/sequence.c | 41 ++
.../replication/logical/sequencesync.c | 424 +++++++++++++-----
src/backend/replication/logical/syncutils.c | 46 +-
src/backend/replication/logical/worker.c | 11 +
.../utils/activity/wait_event_names.txt | 1 +
src/include/catalog/pg_subscription_rel.h | 1 +
src/include/commands/sequence.h | 1 +
src/include/replication/worker_internal.h | 2 +-
src/test/subscription/t/026_stats.pl | 2 +
src/test/subscription/t/036_sequences.pl | 81 +---
12 files changed, 437 insertions(+), 217 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 72c8d3d59bd..3865f617816 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1791,8 +1791,9 @@ Publications:
<para>
A <firstterm>sequence synchronization worker</firstterm> will be started
- after executing any of the above subscriber commands, and will exit once the
- sequences are synchronized.
+ after executing any of the above subscriber commands. The worker will
+ remain running for the life of the subscription, periodically
+ synchronizing all published sequences.
</para>
<para>
The ability to launch a sequence synchronization worker is limited by the
@@ -1821,18 +1822,26 @@ Publications:
<sect2 id="sequences-out-of-sync">
<title>Refreshing Out-of-Sync Sequences</title>
<para>
- Subscriber sequence values will become out of sync as the publisher
- advances them.
+ Subscriber sequence values can become out of sync as the publisher advances
+ them and the sequence synchronization worker has not yet caught up.
</para>
<para>
To detect this, compare the
<link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>.<structfield>srsublsn</structfield>
on the subscriber with the <structfield>page_lsn</structfield> obtained
from the <link linkend="func-pg-get-sequence-data"><function>pg_get_sequence_data</function></link>
- function for the sequence on the publisher. Then run
+ function for the sequence on the publisher. If drift is detected, the user
+ can wait for the sequence synchronization worker to catch up (by
+ periodically checking whether the LSNs match).
+ </para>
+ <para>
+ Note that the sequence synchronization worker updates
+ <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>.<structfield>srsublsn</structfield>
+ periodically. To reduce the delay and force an immediate update, execute the
<link linkend="sql-altersubscription-params-refresh-sequences">
- <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link> to
- re-synchronize if necessary.
+ <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link> command,
+ which updates the page LSN immediately after synchronizing the sequence
+ value.
</para>
<warning>
<para>
@@ -2339,15 +2348,13 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
<listitem>
<para>
- Incremental sequence changes are not replicated. Although the data in
- serial or identity columns backed by sequences will be replicated as part
- of the table, the sequences themselves do not replicate ongoing changes.
- On the subscriber, a sequence will retain the last value it synchronized
- from the publisher. If the subscriber is used as a read-only database,
- then this should typically not be a problem. If, however, some kind of
- switchover or failover to the subscriber database is intended, then the
- sequences would need to be updated to the latest values, either by
- executing <link linkend="sql-altersubscription-params-refresh-sequences">
+ Incremental sequence changes are continuously replicated. If, however,
+ some kind of switchover or failover to the subscriber database is
+ intended, then the sequences replication could be lagging behind and
+ the sequences on the subscriber should be compared with that of the
+ publisher to make sure that they are up to date, if not they
+ need to be updated to the latest values, either by executing
+ <link linkend="sql-altersubscription-params-refresh-sequences">
<command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>
or by copying the current data from the publisher (perhaps using
<command>pg_dump</command>) or by determining a sufficiently high value
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index f215fb0e5a2..ee96e4823a3 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -202,11 +202,6 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
Previously subscribed tables are not copied, even if a table's row
filter <literal>WHERE</literal> clause has since been modified.
</para>
- <para>
- Previously subscribed sequences are not re-synchronized. To do that,
- use <link linkend="sql-altersubscription-params-refresh-sequences">
- <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>.
- </para>
<para>
See <xref linkend="sequence-definition-mismatches"/> for recommendations on how
to handle any warnings about sequence definition differences between
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index 551667650ba..9a9fa6e25e6 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -929,6 +929,47 @@ lastval(PG_FUNCTION_ARGS)
PG_RETURN_INT64(result);
}
+/*
+ * Read the current sequence values (last_value and is_called)
+ *
+ * This is a read-only operation used by logical replication sequence
+ * synchronization to detect drift. The caller must hold a lock on the sequence.
+ *
+ * Return false if the caller does not have sufficient privileges to access the
+ * sequence, true otherwise.
+ */
+bool
+GetSequence(Relation seqrel, int64 *last_value, bool *is_called)
+{
+ Buffer buf;
+ HeapTupleData seqtuple;
+ Form_pg_sequence_data seq;
+ Oid relid = RelationGetRelid(seqrel);
+
+ /* Confirm that the relation is a sequence and is locked */
+ Assert(seqrel->rd_rel->relkind == RELKIND_SEQUENCE);
+ Assert(CheckRelationLockedByMe(seqrel, AccessShareLock, true));
+
+ if (pg_class_aclcheck(relid, GetUserId(), ACL_SELECT) != ACLCHECK_OK)
+ {
+ *last_value = 0;
+ *is_called = false;
+ return false;
+ }
+
+ /* Read the sequence tuple */
+ seq = read_seq_tuple(seqrel, &buf, &seqtuple);
+
+ /* Extract the values */
+ *last_value = seq->last_value;
+ *is_called = seq->is_called;
+
+ /* Release buffer */
+ UnlockReleaseBuffer(buf);
+
+ return true;
+}
+
/*
* Main internal procedure that handles 2 & 3 arg forms of SETVAL.
*
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index a4fb6783ba9..18321d3b316 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -19,25 +19,32 @@
* CREATE SUBSCRIPTION
* ALTER SUBSCRIPTION ... REFRESH PUBLICATION
*
- * Executing the following command resets all sequences in the subscription to
- * state INIT, triggering re-synchronization:
- * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
- *
- * The apply worker periodically scans pg_subscription_rel for sequences in
- * INIT state. When such sequences are found, it spawns a sequencesync worker
- * to handle synchronization.
- *
- * A single sequencesync worker is responsible for synchronizing all sequences.
- * It begins by retrieving the list of sequences that are flagged for
- * synchronization, i.e., those in the INIT state. These sequences are then
- * processed in batches, allowing multiple entries to be synchronized within a
- * single transaction. The worker fetches the current sequence values and page
- * LSNs from the remote publisher, updates the corresponding sequences on the
- * local subscriber, and finally marks each sequence as READY upon successful
+ * The apply worker periodically scans pg_subscription_rel for sequences.
+ * When sequences are found, it spawns a sequencesync worker to handle
* synchronization.
*
+ * A single sequencesync worker is responsible for synchronizing all sequences
+ * for a subscription. It begins by retrieving the list of sequences. These
+ * sequences are then processed in batches, allowing multiple entries to be
+ * synchronized within a single transaction. The worker fetches the current
+ * sequence values and page LSNs from the remote publisher and updates the
+ * corresponding sequences on the local subscriber. Sequences in the INIT
+ * state are unconditionally updated to the latest values from the publisher
+ * and then moved to the READY state. For sequences already in the READY
+ * state, the worker checks for drift and updates them only when needed.
+ *
* Sequence state transitions follow this pattern:
- * INIT -> READY
+
+ * (synchronize)
+ * INIT --------------> READY ->-+
+ * ^ | (check-drift/synchronize)
+ * | |
+ * +--<---+
+ *
+ * Between cycles, the worker sleeps for SEQSYNC_MIN_SLEEP_MS. If no drift is
+ * observed in any sequence, the sleep interval doubles after each wake cycle
+ * up to SEQSYNC_MAX_SLEEP_MS. When drift is detected, the interval resets to
+ * the minimum to ensure timely updates.
*
* To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
* sequences are synchronized per transaction. The locks on the sequence
@@ -60,6 +67,7 @@
#include "postmaster/interrupt.h"
#include "replication/logicalworker.h"
#include "replication/worker_internal.h"
+#include "storage/latch.h"
#include "storage/lwlock.h"
#include "utils/acl.h"
#include "utils/builtins.h"
@@ -71,31 +79,42 @@
#include "utils/pg_lsn.h"
#include "utils/syscache.h"
#include "utils/usercontext.h"
+#include "utils/wait_event.h"
#define REMOTE_SEQ_COL_COUNT 10
typedef enum CopySeqResult
{
COPYSEQ_SUCCESS,
+ COPYSEQ_ALLOWED,
COPYSEQ_MISMATCH,
COPYSEQ_INSUFFICIENT_PERM,
- COPYSEQ_SKIPPED
+ COPYSEQ_SKIPPED,
+ COPYSEQ_NO_DRIFT,
} CopySeqResult;
-static List *seqinfos = NIL;
+/* Sleep intervals for sync */
+#define SEQSYNC_MIN_SLEEP_MS 2000 /* 2 seconds */
+#define SEQSYNC_MAX_SLEEP_MS 30000 /* 30 seconds */
+
+static MemoryContext SequenceSyncContext = NULL;
/*
- * Apply worker determines if sequence synchronization is needed.
+ * Apply worker determines whether a sequence sync worker is needed.
*
* Start a sequencesync worker if one is not already running. The active
* sequencesync worker will handle all pending sequence synchronization. If any
* sequences remain unsynchronized after it exits, a new worker can be started
* in the next iteration.
+ *
+ * The pointer to the sequencesync worker is cached to avoid scanning the
+ * workers array each time via logicalrep_worker_find().
*/
void
-ProcessSequencesForSync(void)
+MaybeLaunchSequenceSyncWorker(void)
{
- LogicalRepWorker *sequencesync_worker;
+ static LogicalRepWorker *sequencesync_worker = NULL;
+
int nsyncworkers;
bool has_pending_sequences;
bool started_tx;
@@ -113,6 +132,19 @@ ProcessSequencesForSync(void)
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ /*
+ * Quick exit if the sequence sync worker for the current subscription is
+ * already alive.
+ */
+ if (sequencesync_worker &&
+ sequencesync_worker->proc &&
+ isSequenceSyncWorker(sequencesync_worker) &&
+ sequencesync_worker->subid == MyLogicalRepWorker->subid)
+ {
+ LWLockRelease(LogicalRepWorkerLock);
+ return;
+ }
+
/* Check if there is a sequencesync worker already running? */
sequencesync_worker = logicalrep_worker_find(WORKERTYPE_SEQUENCESYNC,
MyLogicalRepWorker->subid,
@@ -145,7 +177,7 @@ ProcessSequencesForSync(void)
* for the given list of sequence indexes.
*/
static void
-get_sequences_string(List *seqindexes, StringInfo buf)
+get_sequences_string(List *seqindexes, List *seqinfos, StringInfo buf)
{
resetStringInfo(buf);
foreach_int(seqidx, seqindexes)
@@ -172,7 +204,7 @@ get_sequences_string(List *seqindexes, StringInfo buf)
*/
static void
report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
- List *missing_seqs_idx)
+ List *missing_seqs_idx, List *seqinfos)
{
StringInfo seqstr;
@@ -184,7 +216,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
if (mismatched_seqs_idx)
{
- get_sequences_string(mismatched_seqs_idx, seqstr);
+ get_sequences_string(mismatched_seqs_idx, seqinfos, seqstr);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
@@ -195,7 +227,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
if (insuffperm_seqs_idx)
{
- get_sequences_string(insuffperm_seqs_idx, seqstr);
+ get_sequences_string(insuffperm_seqs_idx, seqinfos, seqstr);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_plural("insufficient privileges on sequence (%s)",
@@ -206,7 +238,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
if (missing_seqs_idx)
{
- get_sequences_string(missing_seqs_idx, seqstr);
+ get_sequences_string(missing_seqs_idx, seqinfos, seqstr);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_plural("missing sequence on publisher (%s)",
@@ -230,7 +262,8 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
*/
static CopySeqResult
get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
- LogicalRepSequenceInfo **seqinfo, int *seqidx)
+ LogicalRepSequenceInfo **seqinfo, int *seqidx,
+ List *seqinfos)
{
bool isnull;
int col = 0;
@@ -241,7 +274,7 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
int64 remote_min;
int64 remote_max;
bool remote_cycle;
- CopySeqResult result = COPYSEQ_SUCCESS;
+ CopySeqResult result = COPYSEQ_ALLOWED;
HeapTuple tup;
Form_pg_sequence local_seq;
LogicalRepSequenceInfo *seqinfo_local;
@@ -326,32 +359,77 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
}
/*
- * Apply remote sequence state to local sequence and mark it as
- * synchronized (READY).
+ * Check whether the user has required privileges on the sequence and
+ * whether the sequence has drifted.
*/
static CopySeqResult
-copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
+check_seq_privileges_and_drift(LogicalRepSequenceInfo *seqinfo,
+ Relation sequence_rel)
{
- UserContext ucxt;
AclResult aclresult;
+ Oid seqoid = seqinfo->localrelid;
+
+ /* Perform drift check if it's not the initial sync */
+ if (seqinfo->relstate == SUBREL_STATE_READY)
+ {
+ int64 local_last_value;
+ bool local_is_called;
+
+ /*
+ * Skip synchronization if the current user does not have sufficient
+ * privileges to read the sequence data.
+ */
+ if (!GetSequence(sequence_rel, &local_last_value, &local_is_called))
+ return COPYSEQ_INSUFFICIENT_PERM;
+
+ /*
+ * Skip synchronization if the sequence has not drifted from the
+ * publisher's value.
+ */
+ if (local_last_value == seqinfo->last_value &&
+ local_is_called == seqinfo->is_called)
+ return COPYSEQ_NO_DRIFT;
+ }
+
+ /* Verify that the current user can update the sequence */
+ aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
+
+ if (aclresult != ACLCHECK_OK)
+ return COPYSEQ_INSUFFICIENT_PERM;
+
+ return COPYSEQ_ALLOWED;
+}
+
+/*
+ * Apply remote sequence state to local sequence. For sequences in INIT state,
+ * always synchronize and then move them to READY state upon completion. For
+ * sequences already in READY state, synchronize only if drift is detected.
+ */
+static CopySeqResult
+copy_sequence(LogicalRepSequenceInfo *seqinfo, Relation sequence_rel,
+ bool update_lsn)
+{
+ UserContext ucxt;
bool run_as_owner = MySubscription->runasowner;
Oid seqoid = seqinfo->localrelid;
+ CopySeqResult result;
+ bool need_lsn_update = false;
/*
* If the user did not opt to run as the owner of the subscription
* ('run_as_owner'), then copy the sequence as the owner of the sequence.
*/
if (!run_as_owner)
- SwitchToUntrustedUser(seqowner, &ucxt);
+ SwitchToUntrustedUser(sequence_rel->rd_rel->relowner, &ucxt);
- aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
+ result = check_seq_privileges_and_drift(seqinfo, sequence_rel);
- if (aclresult != ACLCHECK_OK)
+ if (result != COPYSEQ_ALLOWED)
{
if (!run_as_owner)
RestoreUserContext(&ucxt);
- return COPYSEQ_INSUFFICIENT_PERM;
+ return result;
}
/*
@@ -368,20 +446,47 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
RestoreUserContext(&ucxt);
/*
- * Record the remote sequence's LSN in pg_subscription_rel and mark the
- * sequence as READY.
+ * If LSN updates are requested, check whether the remote LSN differs from
+ * the locally stored value.
*/
- UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
- seqinfo->page_lsn, false);
+ if (update_lsn)
+ {
+ XLogRecPtr local_page_lsn;
+
+ /* Get the local page LSN for comparison with the remote value */
+ (void) GetSubscriptionRelState(MySubscription->oid,
+ RelationGetRelid(sequence_rel),
+ &local_page_lsn);
+
+ need_lsn_update = (local_page_lsn != seqinfo->page_lsn);
+ }
+
+ /*
+ * Update the catalog if either the sequence is in INIT state and needs to
+ * transition to READY, or the LSN has changed and this is an LSN update
+ * cycle (update_lsn is true).
+ */
+ if (seqinfo->relstate == SUBREL_STATE_INIT || need_lsn_update)
+ UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
+ seqinfo->page_lsn, false);
return COPYSEQ_SUCCESS;
}
/*
* Copy existing data of sequences from the publisher.
+ *
+ * Sequences in INIT state are always synchronized. Sequences in READY state are
+ * synchronized only when drift is detected.
+ *
+ * When 'update_lsn' is true, update the page LSN in pg_subscription_rel for any
+ * synchronized sequences whose LSN has changed. When false, the LSN is updated
+ * only for sequences transitioning from INIT to READY state.
+ *
+ * Returns true/false if any sequences were actually copied.
*/
-static void
-copy_sequences(WalReceiverConn *conn)
+static bool
+copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
{
int cur_batch_base_index = 0;
int n_seqinfos = list_length(seqinfos);
@@ -391,13 +496,10 @@ copy_sequences(WalReceiverConn *conn)
StringInfo seqstr = makeStringInfo();
StringInfo cmd = makeStringInfo();
MemoryContext oldctx;
+ bool sequence_copied = false;
#define MAX_SEQUENCES_SYNC_PER_BATCH 100
- elog(DEBUG1,
- "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
- MySubscription->name, n_seqinfos);
-
while (cur_batch_base_index < n_seqinfos)
{
Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
@@ -407,6 +509,7 @@ copy_sequences(WalReceiverConn *conn)
int batch_mismatched_count = 0;
int batch_skipped_count = 0;
int batch_insuffperm_count = 0;
+ int batch_no_drift = 0;
int batch_missing_count;
Relation sequence_rel = NULL;
@@ -502,28 +605,28 @@ copy_sequences(WalReceiverConn *conn)
}
sync_status = get_and_validate_seq_info(slot, &sequence_rel,
- &seqinfo, &seqidx);
- if (sync_status == COPYSEQ_SUCCESS)
- sync_status = copy_sequence(seqinfo,
- sequence_rel->rd_rel->relowner);
+ &seqinfo, &seqidx, seqinfos);
+
+ if (sync_status == COPYSEQ_ALLOWED)
+ sync_status = copy_sequence(seqinfo, sequence_rel, update_lsn);
switch (sync_status)
{
case COPYSEQ_SUCCESS:
elog(DEBUG1,
- "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
- MySubscription->name, seqinfo->nspname,
- seqinfo->seqname);
+ "logical replication synchronization has updated sequence \"%s.%s\" in subscription \"%s\"",
+ seqinfo->nspname, seqinfo->seqname, MySubscription->name);
batch_succeeded_count++;
+ sequence_copied = true;
break;
case COPYSEQ_MISMATCH:
/*
- * Remember mismatched sequences in a long-lived memory
- * context since these will be used after the transaction
- * is committed.
+ * Remember mismatched sequences in SequenceSyncContext
+ * since these will be used after the transaction is
+ * committed.
*/
- oldctx = MemoryContextSwitchTo(ApplyContext);
+ oldctx = MemoryContextSwitchTo(SequenceSyncContext);
mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
seqidx);
MemoryContextSwitchTo(oldctx);
@@ -532,11 +635,11 @@ copy_sequences(WalReceiverConn *conn)
case COPYSEQ_INSUFFICIENT_PERM:
/*
- * Remember sequences with insufficient privileges in a
- * long-lived memory context since these will be used
- * after the transaction is committed.
+ * Remember sequences with insufficient privileges in
+ * SequenceSyncContext since these will be used after the
+ * transaction is committed.
*/
- oldctx = MemoryContextSwitchTo(ApplyContext);
+ oldctx = MemoryContextSwitchTo(SequenceSyncContext);
insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
seqidx);
MemoryContextSwitchTo(oldctx);
@@ -559,6 +662,13 @@ copy_sequences(WalReceiverConn *conn)
batch_skipped_count++;
}
break;
+ case COPYSEQ_NO_DRIFT:
+ /* Nothing to do */
+ batch_no_drift++;
+ break;
+ default:
+ elog(ERROR, "unrecognized sequence replication result: %d", (int) sync_status);
+
}
if (sequence_rel)
@@ -573,14 +683,15 @@ copy_sequences(WalReceiverConn *conn)
batch_missing_count = batch_size - (batch_succeeded_count +
batch_mismatched_count +
batch_insuffperm_count +
- batch_skipped_count);
+ batch_skipped_count +
+ batch_no_drift);
elog(DEBUG1,
- "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
+ "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped, %d no drift",
MySubscription->name,
(cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
batch_size, batch_succeeded_count, batch_mismatched_count,
- batch_insuffperm_count, batch_missing_count, batch_skipped_count);
+ batch_insuffperm_count, batch_missing_count, batch_skipped_count, batch_no_drift);
/* Commit this batch, and prepare for next batch */
CommitTransactionCommand();
@@ -608,47 +719,50 @@ copy_sequences(WalReceiverConn *conn)
/* Report mismatches, permission issues, or missing sequences */
report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
- missing_seqs_idx);
+ missing_seqs_idx, seqinfos);
+
+ return sequence_copied;
}
/*
* Identifies sequences that require synchronization and initiates the
* synchronization process.
+ *
+ * Returns true if sequences have been updated.
*/
-static void
-LogicalRepSyncSequences(void)
+static bool
+LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
{
- char *err;
- bool must_use_password;
Relation rel;
HeapTuple tup;
- ScanKeyData skey[2];
+ ScanKeyData skey[1];
SysScanDesc scan;
Oid subid = MyLogicalRepWorker->subid;
- StringInfoData app_name;
+ bool sequence_copied = false;
+ List *seqinfos = NIL;
+ MemoryContext oldctx;
+
+ Assert(SequenceSyncContext);
StartTransactionCommand();
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+ /* Scan for all sequences belonging to this subscription */
ScanKeyInit(&skey[0],
Anum_pg_subscription_rel_srsubid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(subid));
- ScanKeyInit(&skey[1],
- Anum_pg_subscription_rel_srsubstate,
- BTEqualStrategyNumber, F_CHAREQ,
- CharGetDatum(SUBREL_STATE_INIT));
-
scan = systable_beginscan(rel, InvalidOid, false,
- NULL, 2, skey);
+ NULL, 1, skey);
+
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_subscription_rel subrel;
LogicalRepSequenceInfo *seq;
Relation sequence_rel;
- MemoryContext oldctx;
+ char relstate;
CHECK_FOR_INTERRUPTS();
@@ -667,18 +781,21 @@ LogicalRepSyncSequences(void)
continue;
}
+ relstate = subrel->srsubstate;
+
+ Assert(relstate == SUBREL_STATE_INIT || relstate == SUBREL_STATE_READY);
+
/*
* Worker needs to process sequences across transaction boundary, so
- * allocate them under long-lived context.
+ * allocate them under SequenceSyncContext.
*/
- oldctx = MemoryContextSwitchTo(ApplyContext);
-
+ oldctx = MemoryContextSwitchTo(SequenceSyncContext);
seq = palloc0_object(LogicalRepSequenceInfo);
seq->localrelid = subrel->srrelid;
seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
+ seq->relstate = relstate;
seqinfos = lappend(seqinfos, seq);
-
MemoryContextSwitchTo(oldctx);
table_close(sequence_rel, NoLock);
@@ -694,36 +811,16 @@ LogicalRepSyncSequences(void)
* Exit early if no catalog entries found, likely due to concurrent drops.
*/
if (!seqinfos)
- return;
-
- /* Is the use of a password mandatory? */
- must_use_password = MySubscription->passwordrequired &&
- !MySubscription->ownersuperuser;
+ return false;
- initStringInfo(&app_name);
- appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
- MySubscription->oid, GetSystemIdentifier());
+ /* Process sequences */
+ sequence_copied = copy_sequences(conn, seqinfos, update_lsn);
- /*
- * Establish the connection to the publisher for sequence synchronization.
- */
- LogRepWorkerWalRcvConn =
- walrcv_connect(MySubscription->conninfo, true, true,
- must_use_password,
- app_name.data, &err);
- if (LogRepWorkerWalRcvConn == NULL)
- ereport(ERROR,
- errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
- MySubscription->name, err));
-
- pfree(app_name.data);
-
- copy_sequences(LogRepWorkerWalRcvConn);
+ return sequence_copied;
}
/*
- * Execute the initial sync with error handling. Disable the subscription,
+ * Execute the sequence sync with error handling. Disable the subscription,
* if required.
*
* Note that we don't handle FATAL errors which are probably because of system
@@ -736,8 +833,117 @@ start_sequence_sync(void)
PG_TRY();
{
- /* Call initial sync. */
- LogicalRepSyncSequences();
+ char *err;
+ bool must_use_password;
+ StringInfoData app_name;
+ long sleep_ms = SEQSYNC_MIN_SLEEP_MS;
+ TimestampTz next_lsn_update = GetCurrentTimestamp();
+
+ /* Is the use of a password mandatory? */
+ must_use_password = MySubscription->passwordrequired &&
+ !MySubscription->ownersuperuser;
+
+ initStringInfo(&app_name);
+ appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
+ MySubscription->oid, GetSystemIdentifier());
+
+ /*
+ * Establish the connection to the publisher for sequence
+ * synchronization.
+ */
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, true,
+ must_use_password,
+ app_name.data, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
+ MySubscription->name, err));
+
+ pfree(app_name.data);
+
+ /*
+ * Init the SequenceSyncContext which we clean up after each sequence
+ * synchronization.
+ */
+ SequenceSyncContext = AllocSetContextCreate(ApplyContext,
+ "SequenceSyncContext",
+ ALLOCSET_DEFAULT_SIZES);
+
+ for (;;)
+ {
+ bool sequence_copied = false;
+ MemoryContext oldctx;
+ bool update_lsn;
+ TimestampTz now = GetCurrentTimestamp();
+
+ CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /* Process any invalidation messages that might have accumulated */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+
+ /*
+ * We avoid updating pg_subscription_rel's page LSN in every cycle
+ * to prevent excessive catalog invalidations, which would slow
+ * the apply worker that relies on this cache (see
+ * FetchRelationStates). Instead, updates occur every 30 seconds.
+ */
+ update_lsn = (now >= next_lsn_update);
+
+ /*
+ * Perform sequence synchronization under SequenceSyncContext and
+ * reset it each cycle to avoid manual memory management.
+ */
+ oldctx = MemoryContextSwitchTo(SequenceSyncContext);
+
+ /*
+ * Synchronize all sequences (both READY and INIT states).
+ */
+ sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn,
+ update_lsn);
+
+ MemoryContextReset(SequenceSyncContext);
+ MemoryContextSwitchTo(oldctx);
+
+ /*
+ * Adjust sleep interval based on sync activity. If sequences were
+ * copied, reset to the minimum interval to poll more frequently.
+ * Otherwise, exponentially back off up to the maximum interval.
+ */
+ if (sequence_copied)
+ sleep_ms = SEQSYNC_MIN_SLEEP_MS;
+ else
+ sleep_ms = Min(sleep_ms * 2, SEQSYNC_MAX_SLEEP_MS);
+
+ /* Refresh timestamp after potentially time-consuming sync work */
+ now = GetCurrentTimestamp();
+
+ /*
+ * Schedule the next LSN update. If we just performed an update,
+ * set the next update time to 30 seconds from now. Otherwise,
+ * ensure the sleep interval doesn't exceed the time remaining
+ * until the next update.
+ */
+ if (update_lsn)
+ next_lsn_update = TimestampTzPlusMilliseconds(now, SEQSYNC_MAX_SLEEP_MS);
+ else
+ sleep_ms = Min(sleep_ms, TimestampDifferenceMilliseconds(now, next_lsn_update));
+
+ /* Sleep for the configured interval */
+ (void) WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ sleep_ms,
+ WAIT_EVENT_LOGICAL_SEQSYNC_MAIN);
+ ResetLatch(MyLatch);
+ }
}
PG_CATCH();
{
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
index ef61ca0437d..3a0dc8669f9 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -172,7 +172,7 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
case WORKERTYPE_APPLY:
ProcessSyncingTablesForApply(current_lsn);
- ProcessSequencesForSync();
+ MaybeLaunchSequenceSyncWorker();
break;
case WORKERTYPE_SEQUENCESYNC:
@@ -191,13 +191,13 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
*
* The pg_subscription_rel catalog is shared by tables and sequences. Changes
* to either sequences or tables can affect the validity of relation states, so
- * we identify non-READY tables and non-READY sequences together to ensure
+ * we identify non-READY tables and sequences (in any state) together to ensure
* consistency.
*
* has_pending_subtables: true if the subscription has one or more tables that
* are not in READY state, otherwise false.
* has_pending_subsequences: true if the subscription has one or more sequences
- * that are not in READY state, otherwise false.
+ * (in any state), otherwise false.
*/
void
FetchRelationStates(bool *has_pending_subtables,
@@ -205,23 +205,22 @@ FetchRelationStates(bool *has_pending_subtables,
bool *started_tx)
{
/*
- * has_subtables and has_subsequences_non_ready are declared as static,
- * since the same value can be used until the system table is invalidated.
+ * has_subtables and has_subsequences are declared as static, since the
+ * same value can be used until the system table is invalidated.
*/
static bool has_subtables = false;
- static bool has_subsequences_non_ready = false;
+ static bool has_subsequences = false;
*started_tx = false;
-
if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
{
MemoryContext oldctx;
List *rstates;
+ List *seq_states;
SubscriptionRelState *rstate;
relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
- has_subsequences_non_ready = false;
-
+ has_subsequences = false;
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
table_states_not_ready = NIL;
@@ -231,27 +230,28 @@ FetchRelationStates(bool *has_pending_subtables,
StartTransactionCommand();
*started_tx = true;
}
-
- /* Fetch tables and sequences that are in non-READY state. */
- rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
+ /* Fetch tables that are in non-READY state. */
+ rstates = GetSubscriptionRelations(MySubscription->oid, true, false,
true);
-
+ /* Fetch all sequences (regardless of state). */
+ seq_states = GetSubscriptionRelations(MySubscription->oid, false, true,
+ false);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
foreach_ptr(SubscriptionRelState, subrel, rstates)
{
- if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
- has_subsequences_non_ready = true;
- else
- {
- rstate = palloc_object(SubscriptionRelState);
- memcpy(rstate, subrel, sizeof(SubscriptionRelState));
- table_states_not_ready = lappend(table_states_not_ready,
- rstate);
- }
+ rstate = palloc_object(SubscriptionRelState);
+ memcpy(rstate, subrel, sizeof(SubscriptionRelState));
+ table_states_not_ready = lappend(table_states_not_ready,
+ rstate);
}
+
+ /* Check if there are any sequences. */
+ has_subsequences = (seq_states != NIL);
MemoryContextSwitchTo(oldctx);
+ list_free_deep(seq_states);
+
/*
* Does the subscription have tables?
*
@@ -277,5 +277,5 @@ FetchRelationStates(bool *has_pending_subtables,
*has_pending_subtables = has_subtables;
if (has_pending_subsequences)
- *has_pending_subsequences = has_subsequences_non_ready;
+ *has_pending_subsequences = has_subsequences;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 27d398d576d..c8841831530 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -5102,6 +5102,9 @@ maybe_reread_subscription(void)
* worker won't restart if the streaming option's value is changed from
* 'parallel' to any other value or the server decides not to stream the
* in-progress transaction.
+ *
+ * Note: some parameters may not be relevant to the sequence sync worker,
+ * but exit anyway.
*/
if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
strcmp(newsub->name, MySubscription->name) != 0 ||
@@ -5117,6 +5120,10 @@ maybe_reread_subscription(void)
ereport(LOG,
(errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because of a parameter change",
MySubscription->name)));
+ else if (am_sequencesync_worker())
+ ereport(LOG,
+ (errmsg("logical replication sequence synchronization worker for subscription \"%s\" will stop because of a parameter change",
+ MySubscription->name)));
else
ereport(LOG,
(errmsg("logical replication worker for subscription \"%s\" will restart because of a parameter change",
@@ -5135,6 +5142,10 @@ maybe_reread_subscription(void)
ereport(LOG,
errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
MySubscription->name));
+ else if (am_sequencesync_worker())
+ ereport(LOG,
+ errmsg("logical replication sequence synchronization worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
+ MySubscription->name));
else
ereport(LOG,
errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4aa864fe3c3..a91085e7723 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -61,6 +61,7 @@ IO_WORKER_MAIN "Waiting in main loop of IO Worker process."
LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process."
LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process."
LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process."
+LOGICAL_SEQSYNC_MAIN "Waiting in main loop of logical replication sequence sync process."
RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot synchronization."
REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down."
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index 502640d3018..86574b69169 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -96,6 +96,7 @@ typedef struct LogicalRepSequenceInfo
char *seqname;
char *nspname;
Oid localrelid;
+ char relstate;
/* Sequence information retrieved from the publisher node */
XLogRecPtr page_lsn;
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 2c3c4a3f074..fd4f69bdd1c 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -47,6 +47,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void SequenceChangePersistence(Oid relid, char newrelpersistence);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
+extern bool GetSequence(Relation seqrel, int64 *last_value, bool *is_called);
extern void SetSequence(Oid relid, int64 next, bool iscalled);
extern void ResetSequenceCaches(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 745b7d9e969..4c7b6f4634b 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -285,7 +285,7 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state);
extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
-extern void ProcessSequencesForSync(void);
+extern void MaybeLaunchSequenceSyncWorker(void);
pg_noreturn extern void FinishSyncWorker(void);
extern void InvalidateSyncingRelStates(Datum arg, SysCacheIdentifier cacheid,
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index 5d457060a02..2fe209f461f 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,8 @@ $node_publisher->start;
# Create subscriber node.
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+ "max_logical_replication_workers = 10");
$node_subscriber->start;
diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl
index 471780a3585..af190713b2b 100644
--- a/src/test/subscription/t/036_sequences.pl
+++ b/src/test/subscription/t/036_sequences.pl
@@ -75,18 +75,14 @@ is($result, '100|t',
##########
## ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new
-# sequences of the publisher, but changes to existing sequences should
-# not be synced.
+# sequences of the publisher.
##########
-# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1'
+# Create a new sequence 'regress_s2'
$node_publisher->safe_psql(
'postgres', qq(
CREATE SEQUENCE regress_s2;
INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100);
-
- -- Existing sequence
- INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100);
));
# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION
@@ -97,19 +93,6 @@ $result = $node_subscriber->safe_psql(
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
-$result = $node_publisher->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s1;
-));
-is($result, '200|t', 'Check sequence value in the publisher');
-
-# Check - existing sequence ('regress_s1') is not synced
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s1;
-));
-is($result, '100|t', 'REFRESH PUBLICATION will not sync existing sequence');
-
# Check - newly published sequence ('regress_s2') is synced
$result = $node_subscriber->safe_psql(
'postgres', qq(
@@ -119,16 +102,13 @@ is($result, '100|t',
'REFRESH PUBLICATION will sync newly published sequence');
##########
-# Test: REFRESH SEQUENCES and REFRESH PUBLICATION (copy_data = false)
-#
-# 1. ALTER SUBSCRIPTION ... REFRESH SEQUENCES should re-synchronize all
-# existing sequences, but not synchronize newly added ones.
-# 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION with (copy_data = false) should
-# also not update sequence values for newly added sequences.
+# Test:
+# 1. Automatic update of existing sequence values
+# 2. Newly added sequences are not automatically updated.
##########
-# Create a new sequence 'regress_s3', and update the existing sequence
-# 'regress_s2'.
+# Create a new sequence 'regress_s3', and update the existing sequences
+# 'regress_s2' and 'regress_s1'.
$node_publisher->safe_psql(
'postgres', qq(
CREATE SEQUENCE regress_s3;
@@ -136,53 +116,28 @@ $node_publisher->safe_psql(
-- Existing sequence
INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100);
+ INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100);
));
-# 1. Do ALTER SUBSCRIPTION ... REFRESH SEQUENCES
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;
-));
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
-
# Check - existing sequences ('regress_s1' and 'regress_s2') are synced
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s1;
-));
-is($result, '200|t', 'REFRESH SEQUENCES will sync existing sequences');
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s2;
-));
-is($result, '200|t', 'REFRESH SEQUENCES will sync existing sequences');
-# Check - newly published sequence ('regress_s3') is not synced
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s3;
-));
-is($result, '1|f',
- 'REFRESH SEQUENCES will not sync newly published sequence');
+# Poll until regress_s1 reflects the updated sequence value
+$node_subscriber->poll_query_until('postgres',
+ qq(SELECT last_value = 200 AND is_called = 't' FROM regress_s1;))
+ or die "Timed out while waiting for regress_s1 sequence to sync";
-# 2. Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data as false
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION WITH (copy_data = false);
-));
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Poll until regress_s2 reflects the updated sequence value
+$node_subscriber->poll_query_until('postgres',
+ qq(SELECT last_value = 200 AND is_called = 't' FROM regress_s2;))
+ or die "Timed out while waiting for regress_s2 sequence to sync";
-# Check - newly published sequence ('regress_s3') is not synced with copy_data
-# as false.
+# Check - newly published sequence ('regress_s3') is not synced
$result = $node_subscriber->safe_psql(
'postgres', qq(
SELECT last_value, is_called FROM regress_s3;
));
is($result, '1|f',
- 'REFRESH PUBLICATION will not sync newly published sequence with copy_data as false'
-);
+ 'Newly published sequences are not synced automatically');
##########
# ALTER SUBSCRIPTION ... REFRESH PUBLICATION should report an error when:
--
2.47.3
[application/octet-stream] v12-0002-Cache-sequence-information-in-the-sequence-sync-.patch (6.2K, 4-v12-0002-Cache-sequence-information-in-the-sequence-sync-.patch)
download | inline diff:
From 4c9c11ee6e07c799ef5cd2845a248d8a70e8d09e Mon Sep 17 00:00:00 2001
From: Ajin Cherian <[email protected]>
Date: Thu, 26 Mar 2026 14:26:52 +1100
Subject: [PATCH v12 2/3] Cache sequence information in the sequence sync
worker.
Previously, the sequence sync worker would fetch sequence metadata from
the catalog each time it needed to synchronize sequences. This could be
inefficient when many sequences are involved, as the worker would need
to repeatedly open and scan pg_subscription_rel.
To improve this, introduce a cache for sequence information in the sequence sync
worker. The cache is populated on first use and kept across synchronization
cycles. It is invalidated when pg_subscription_rel is modified, ensuring that
changes to subscription relations are reflected promptly.
---
.../replication/logical/sequencesync.c | 93 +++++++++++++------
1 file changed, 66 insertions(+), 27 deletions(-)
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index 18321d3b316..d12bd31f09d 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -99,6 +99,19 @@ typedef enum CopySeqResult
static MemoryContext SequenceSyncContext = NULL;
+/*
+ * Cached list of sequence information (LogicalRepSequenceInfo) for the current
+ * subscription. The cache is invalidated when pg_subscription_rel is modified.
+ *
+ * Note: To avoid the cost of searching for a specific sequence on relcache
+ * invalidation, we do not invalidate the cache immediately when a sequence is
+ * altered (e.g., renamed or moved to another namespace). Instead, we validate
+ * the sequence name and namespace when next attempting to sync it, at which
+ * point we verify the local sequence state.
+ */
+static List *sequence_infos = NIL;
+static bool sequence_infos_valid = false;
+
/*
* Apply worker determines whether a sequence sync worker is needed.
*
@@ -500,6 +513,9 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
#define MAX_SEQUENCES_SYNC_PER_BATCH 100
+ if (seqinfos == NIL)
+ return false;
+
while (cur_batch_base_index < n_seqinfos)
{
Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
@@ -725,24 +741,44 @@ copy_sequences(WalReceiverConn *conn, List *seqinfos, bool update_lsn)
}
/*
- * Identifies sequences that require synchronization and initiates the
- * synchronization process.
+ * Callback from syscache invalidation.
+ */
+static void
+invalidate_syncing_sequence_infos(Datum arg, SysCacheIdentifier cacheid,
+ uint32 hashvalue)
+{
+ sequence_infos_valid = false;
+}
+
+/*
+ * Get the list of sequence information for the current subscription.
*
- * Returns true if sequences have been updated.
+ * Return cached sequence states if valid; otherwise fetches them from the
+ * catalog, caches the result, and return it.
*/
-static bool
-LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
+static List *
+fetch_sequence_infos(void)
{
Relation rel;
HeapTuple tup;
ScanKeyData skey[1];
SysScanDesc scan;
Oid subid = MyLogicalRepWorker->subid;
- bool sequence_copied = false;
- List *seqinfos = NIL;
- MemoryContext oldctx;
+ List *tmp_seqinfos = NIL;
+
+ if (sequence_infos_valid)
+ return sequence_infos;
- Assert(SequenceSyncContext);
+ /* Free the existing invalid cache entries */
+ foreach_ptr(LogicalRepSequenceInfo, seqinfo, sequence_infos)
+ {
+ pfree(seqinfo->nspname);
+ pfree(seqinfo->seqname);
+ pfree(seqinfo);
+ }
+
+ list_free(sequence_infos);
+ sequence_infos = NIL;
StartTransactionCommand();
@@ -763,6 +799,7 @@ LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
LogicalRepSequenceInfo *seq;
Relation sequence_rel;
char relstate;
+ MemoryContext oldctx;
CHECK_FOR_INTERRUPTS();
@@ -785,17 +822,14 @@ LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
Assert(relstate == SUBREL_STATE_INIT || relstate == SUBREL_STATE_READY);
- /*
- * Worker needs to process sequences across transaction boundary, so
- * allocate them under SequenceSyncContext.
- */
- oldctx = MemoryContextSwitchTo(SequenceSyncContext);
+ /* Cache the information in a permanent memory context */
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
seq = palloc0_object(LogicalRepSequenceInfo);
seq->localrelid = subrel->srrelid;
seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
seq->relstate = relstate;
- seqinfos = lappend(seqinfos, seq);
+ tmp_seqinfos = lappend(tmp_seqinfos, seq);
MemoryContextSwitchTo(oldctx);
table_close(sequence_rel, NoLock);
@@ -805,18 +839,12 @@ LogicalRepSyncSequences(WalReceiverConn *conn, bool update_lsn)
systable_endscan(scan);
table_close(rel, AccessShareLock);
- CommitTransactionCommand();
-
- /*
- * Exit early if no catalog entries found, likely due to concurrent drops.
- */
- if (!seqinfos)
- return false;
+ sequence_infos = tmp_seqinfos;
+ sequence_infos_valid = true;
- /* Process sequences */
- sequence_copied = copy_sequences(conn, seqinfos, update_lsn);
+ CommitTransactionCommand();
- return sequence_copied;
+ return sequence_infos;
}
/*
@@ -831,6 +859,14 @@ start_sequence_sync(void)
{
Assert(am_sequencesync_worker());
+ /*
+ * Setup callback for syscache so that we know when something changes in
+ * the subscription relation state.
+ */
+ CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
+ invalidate_syncing_sequence_infos,
+ (Datum) 0);
+
PG_TRY();
{
char *err;
@@ -876,6 +912,7 @@ start_sequence_sync(void)
bool sequence_copied = false;
MemoryContext oldctx;
bool update_lsn;
+ List *seqinfos;
TimestampTz now = GetCurrentTimestamp();
CHECK_FOR_INTERRUPTS();
@@ -907,8 +944,10 @@ start_sequence_sync(void)
/*
* Synchronize all sequences (both READY and INIT states).
*/
- sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn,
- update_lsn);
+ seqinfos = fetch_sequence_infos();
+
+ sequence_copied = copy_sequences(LogRepWorkerWalRcvConn, seqinfos,
+ update_lsn);
MemoryContextReset(SequenceSyncContext);
MemoryContextSwitchTo(oldctx);
--
2.47.3
view thread (58+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: [PATCH] Support automatic sequence replication
In-Reply-To: <CAFPTHDYRMXVo60zMbbGyOOZn3d_ap+Htca7echfFJEE2dQUDsQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox