public inbox for [email protected]
help / color / mirror / Atom feedFrom: Ajin Cherian <[email protected]>
To: shveta malik <[email protected]>
Cc: Ashutosh Sharma <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Cc: Amit Kapila <[email protected]>
Subject: Re: [PATCH] Support automatic sequence replication
Date: Fri, 20 Feb 2026 21:57:16 +1100
Message-ID: <CAFPTHDZiWYXoKoo4VcBYNH9a=gxDZhfkcBeXt5w6cLw4_ysyKw@mail.gmail.com> (raw)
In-Reply-To: <CAJpy0uAOuu-M6wobH2wHOdTymm-cX9+MqwPyRNoOt=sPKBdCew@mail.gmail.com>
References: <CAFPTHDZXX9WQ_X1ZfEvS248T+pKuk6SmCnXcvgPM059N1xPUfA@mail.gmail.com>
<CAJpy0uDLUEjHHME8om1vAf6qkXCeRR6cBvkpK8yWBAC=T0ZFLA@mail.gmail.com>
<CAFPTHDZg1JrunGgOj332hr+gUuH_Jm7skqPpYSvd-QE3yEdRDQ@mail.gmail.com>
<CAJpy0uBz7MCSUkvFJD9ij65vBahNmY+bfCgdGKRqXovYs+K_TA@mail.gmail.com>
<CAJpy0uDsuNqjWd-TmGBxqSS1rnVCJ3B8=SYrtxQ=Vs8kb71QFA@mail.gmail.com>
<CAJpy0uAMWg3KcXtVBS7B0rnchLNrCCVYBByJCzAp=u5LERgtfA@mail.gmail.com>
<CAFPTHDZwEhxhDAeqcPi0GuYN6xBs8gFXHOMUnbg3u2Xigcz4Zg@mail.gmail.com>
<CAE9k0PmTyCU1A9YEf+MRgfeZ8yK1bAYJu=o0bH8DNUTzXejQyQ@mail.gmail.com>
<CAA4eK1L6czEzG4mLNZSyjYC5nX0FMSjjk3csKuxPD3Ph5-7Yvw@mail.gmail.com>
<CAJpy0uAhGQJ=msVsn2GsqWXr+YESJK6x9NBvrUtKvtvp1OVuKQ@mail.gmail.com>
<CAJpy0uAOuu-M6wobH2wHOdTymm-cX9+MqwPyRNoOt=sPKBdCew@mail.gmail.com>
On Fri, Feb 13, 2026 at 4:43 PM Peter Smith <[email protected]> wrote:
>
> Hi Ajin.
>
> Some review comments for patch v4-0001
>
> ======
> src/backend/commands/sequence.c
>
> GetSequence:
>
> 1.
> +/*
> + * Read the current sequence values (last_value and is_called)
> + *
> + * This is a read-only operation that acquires AccessShareLock on the sequence.
> + * Used by logical replication sequence synchronization to detect drift.
> + */
>
> The comment seems stale. e.g. the function is not acquiring a lock
> anymore, contrary to what that comment says.
>
Fixed.
>
> ======
> .../replication/logical/sequencesync.c
>
> 2.
> -static List *seqinfos = NIL;
>
> The removal of this global was not strictly part of this patch; it is
> more like a prerequisite to make everything tidier, so your new code
> does not go further down that track of side-affecting a global. From
> that POV, I thought this global removal should be implemented as a
> first/separate (0001) patch so that it might be quickly reviewed and
> committed independently of the new seq-sync logic.
>
Keeping it as is for the time being. Let's see what everybody else thinks.
> ~~~
>
> LogicalRepSyncSequences:
>
> 3.
> + /* Error on unexpected states */
> + if (relstate != SUBREL_STATE_INIT && relstate != SUBREL_STATE_READY)
> + {
> + table_close(sequence_rel, NoLock);
> + ereport(ERROR,
> + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> + errmsg("unexpected relstate '%c' for sequence \"%s.%s\" in
> subscription \"%s\"",
> + relstate,
> + get_namespace_name(RelationGetNamespace(sequence_rel)),
> + RelationGetRelationName(sequence_rel),
> + MySubscription->name)));
> + }
> +
>
> How is this possible? Should it just be Assert?
>
Fixed.
> ~~~
>
> start_sequence_sync:
>
> 4.
> + /*
> + * Synchronize all sequences (both READY and INIT states).
> + * The function will process INIT sequences first, then READY sequences.
> + */
> + sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn);
>
> Why is talking about the processing order relevant?
>
>
Removed.
>
> ======
> src/backend/replication/logical/syncutils.c
>
> 5.
> + /* Check if any new sequences need syncing */
> + ProcessSequencesForSync();
> +
>
> Maybe don't say "new" because IIUC it also handles older sequences
> where the values have drifted.
>
> ======
> src/test/subscription/t/036_sequences.pl
>
> 6.
> -$result = $node_publisher->safe_psql(
> - 'postgres', qq(
> - SELECT last_value, is_called FROM regress_s1;
> -));
> -is($result, '200|t', 'Check sequence value in the publisher');
> -
> -# Check - existing sequence ('regress_s1') is not synced
> -$result = $node_subscriber->safe_psql(
> - 'postgres', qq(
> - SELECT last_value, is_called FROM regress_s1;
> -));
> -is($result, '100|t', 'REFRESH PUBLICATION will not sync existing sequence');
> -
>
> Since you are no longer testing "existing sequences" in this test
> part, should you also remove the earlier INSERT for 'regress_s1'?
>
Fixed both the above comments.
On Tue, Feb 17, 2026 at 10:21 PM Ashutosh Sharma <[email protected]> wrote:
>
>
> How about retaining ALTER SUBSCRIPTION ... REFRESH SEQUENCES command?
>
> It can be useful in scenarios where automatic sequence replication
> cannot be enabled, for example, due to the max_worker_processes limit
> on the server preventing a new worker from being registered. Since
> increasing max_worker_processes requires a server restart, which the
> user may not want to perform immediately, this command would provide a
> way to manually synchronize the sequences.
>
Retained ALTER SUBSCRIPTION ... REFRESH SEQUENCES similar to the
behaviour on HEAD, it only changes the state of the sequence to INIT.
The sequence worker will update these sequences unconditionally.
>
> One minor comment:
>
> * Sequence state transitions follow this pattern:
> - * INIT -> READY
> + * INIT --> READY ->-+
> + * ^ | (check/synchronzize)
> + * | |
> + * +--<---+
>
>
> "synchronzize" → "synchronize"
>
Fixed.
On Wed, Feb 18, 2026 at 9:04 PM shveta malik <[email protected]> wrote:
>
> Few comments:
>
> 1)
> + /* Check if any new sequences need syncing */
> + ProcessSequencesForSync();
> +
>
> Shall we name it 'maybe_start_seqsync_worker()' (or something better?)
> and move it immediately after 'maybe_advance_nonremovable_xid()'.
> Thoughts? This is because we do not process sequences here, we just
> check if the subscription includes sequences and start worker.
>
Since snak_case are used for static functions, I've renamed it to
MaybeLaunchSequenceSyncWorker and called it immediately after
maybe_advance_nonremovable_xid()
> 2)
> We can also change the comment atop ProcessSequencesForSync to mention
> that. Currently it says:
> /*
> * Apply worker determines if sequence synchronization is needed.
> *
> * Start a sequencesync worker if one is not already running.
>
> Now, we shall change it to:
> Apply worker determines whether a sequence sync worker is needed.
>
> Check if the subscription includes sequences and start a sequencesync
> worker if one is not already running.
>
Fixed.
> 3)
> I noticed that copy_sequences is invoked twice per sync cycle—once for
> sequences in the INIT state, and once for sequences in the READY state
> to detect drift. This means we ping the primary twice during each sync
> cycle. We should consider combining this logic into a single
> copy_sequences call. Since we already have the state information
> (INIT, READY, etc.) for each local sequence, copy_sequences can
> internally check the current state and decide whether to transition a
> sequence to READY based on its previous state. This approach would
> allow us to fetch all required information from the primary in a
> single call.
>
Changed it.
> I also think that we do not even need to mention the states here and
> we can give a single message instead of 2:
> DEBUG: logical replication sequence synchronization for subscription
> "sub1" - for sequences in INIT state batch #1 = 5 attempted, 5
> succeeded, 0 mismatched, 0 insufficient permission, 0 missing from
> publisher, 0 skipped, 0 no drift
> DEBUG: logical replication sequence synchronization for subscription
> "sub1" - for sequences in READY state batch #1 = 5 attempted, 0
> succeeded, 0 mismatched, 0 insufficient permission, 0 missing from
> publisher, 0 skipped, 5 no drift
>
Yes, since states are now specific to each sequence and not to
batches, I've removed it.
I've also found that the sequence worker was not being stopped when
SUBSCRIPTION was disabled. I've added checks to do this as well.
All the above changes are part of the attached v5 patch .
regards,
Ajin Cherian
Fujitsu Australia
Attachments:
[application/octet-stream] v5-0001-Support-automatic-sequence-replication.patch (38.8K, 2-v5-0001-Support-automatic-sequence-replication.patch)
download | inline diff:
From e947256178a88887db2f9c046b8a675c0ae87d24 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <[email protected]>
Date: Fri, 20 Feb 2026 21:10:42 +1100
Subject: [PATCH v5] Support automatic sequence replication.
Logical replication sequences can drift between publisher and
subscriber as values are consumed independently on each node.
Previously, the sequence sync worker exited after the initial
synchronization, allowing sequences to diverge over time.
This change keeps the sequence sync worker running continuously
so it can monitor sequences and resynchronize them when drift
is detected. The worker uses an adaptive sleep interval:
it starts at 2 seconds, doubles up to a maximum of 30 seconds
when no drift is observed, and resets to the minimum interval
once drift is found.
Sequences remain in the READY state during continuous
synchronization.
Author: Ajin Cherian <[email protected]>
Reviewed-by: Shveta Malik <[email protected]>
Reviewed-by: Peter Smith <[email protected]>
Reviewed-by: Ashutosh Sharma <[email protected]>
---
doc/src/sgml/logical-replication.sgml | 56 +---
doc/src/sgml/ref/alter_subscription.sgml | 9 -
src/backend/commands/sequence.c | 27 ++
.../replication/logical/sequencesync.c | 302 +++++++++++++-----
src/backend/replication/logical/syncutils.c | 48 ++-
src/backend/replication/logical/worker.c | 4 +
src/bin/psql/tab-complete.in.c | 4 +-
src/include/catalog/pg_subscription_rel.h | 1 +
src/include/commands/sequence.h | 1 +
src/include/replication/worker_internal.h | 2 +-
src/test/subscription/t/026_stats.pl | 2 +
src/test/subscription/t/036_sequences.pl | 79 +----
12 files changed, 300 insertions(+), 235 deletions(-)
diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 5028fe9af09..cfe9c2788c4 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -1775,20 +1775,14 @@ Publications:
to synchronize only newly added sequences.
</para>
</listitem>
- <listitem>
- <para>
- use <link linkend="sql-altersubscription-params-refresh-sequences">
- <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>
- to re-synchronize all sequences currently known to the subscription.
- </para>
- </listitem>
</itemizedlist>
</para>
<para>
A <firstterm>sequence synchronization worker</firstterm> will be started
- after executing any of the above subscriber commands, and will exit once the
- sequences are synchronized.
+ after executing any of the above subscriber commands. The worker will
+ remain running for the life of the subscription, periodically
+ synchronizing all published sequences.
</para>
<para>
The ability to launch a sequence synchronization worker is limited by the
@@ -1815,21 +1809,7 @@ Publications:
</sect2>
<sect2 id="sequences-out-of-sync">
- <title>Refreshing Out-of-Sync Sequences</title>
- <para>
- Subscriber sequence values will become out of sync as the publisher
- advances them.
- </para>
- <para>
- To detect this, compare the
- <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>.<structfield>srsublsn</structfield>
- on the subscriber with the <structfield>page_lsn</structfield> obtained
- from the <link linkend="func-pg-get-sequence-data"><function>pg_get_sequence_data</function></link>
- function for the sequence on the publisher. Then run
- <link linkend="sql-altersubscription-params-refresh-sequences">
- <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link> to
- re-synchronize if necessary.
- </para>
+ <title>Out-of-Sync Sequences</title>
<warning>
<para>
Each sequence caches a block of values (typically 32) in memory before
@@ -1961,16 +1941,6 @@ Publications:
------------+-----------+------------
610 | t | 0/017CEDF8
(1 row)
-</programlisting></para>
-
- <para>
- The difference between the sequence page LSNs on the publisher and the
- sequence page LSNs on the subscriber indicates that the sequences are out
- of sync. Re-synchronize all sequences known to the subscriber using
- <link linkend="sql-altersubscription-params-refresh-sequences">
- <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>.
-<programlisting>
-/* sub # */ ALTER SUBSCRIPTION sub1 REFRESH SEQUENCES;
</programlisting></para>
<para>
@@ -2333,24 +2303,6 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
</para>
</listitem>
- <listitem>
- <para>
- Incremental sequence changes are not replicated. Although the data in
- serial or identity columns backed by sequences will be replicated as part
- of the table, the sequences themselves do not replicate ongoing changes.
- On the subscriber, a sequence will retain the last value it synchronized
- from the publisher. If the subscriber is used as a read-only database,
- then this should typically not be a problem. If, however, some kind of
- switchover or failover to the subscriber database is intended, then the
- sequences would need to be updated to the latest values, either by
- executing <link linkend="sql-altersubscription-params-refresh-sequences">
- <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>
- or by copying the current data from the publisher (perhaps using
- <command>pg_dump</command>) or by determining a sufficiently high value
- from the tables themselves.
- </para>
- </listitem>
-
<listitem>
<para>
Replication of <command>TRUNCATE</command> commands is supported, but
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 27c06439f4f..385f685456b 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -190,11 +190,6 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
Previously subscribed tables are not copied, even if a table's row
filter <literal>WHERE</literal> clause has since been modified.
</para>
- <para>
- Previously subscribed sequences are not re-synchronized. To do that,
- use <link linkend="sql-altersubscription-params-refresh-sequences">
- <command>ALTER SUBSCRIPTION ... REFRESH SEQUENCES</command></link>.
- </para>
<para>
See <xref linkend="sequence-definition-mismatches"/> for recommendations on how
to handle any warnings about sequence definition differences between
@@ -236,10 +231,6 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
recommendations on how to handle any warnings about sequence definition
differences between the publisher and the subscriber.
</para>
- <para>
- See <xref linkend="sequences-out-of-sync"/> for recommendations on how to
- identify and handle out-of-sync sequences.
- </para>
</listitem>
</varlistentry>
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index e1b808bbb60..aa815dd19af 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -929,6 +929,33 @@ lastval(PG_FUNCTION_ARGS)
PG_RETURN_INT64(result);
}
+/*
+ * Read the current sequence values (last_value and is_called)
+ *
+ * This is a read-only operation used by logical replication sequence
+ * synchronization to detect drift.
+ */
+void
+GetSequence(Relation seqrel, int64 *last_value, bool *is_called)
+{
+ Buffer buf;
+ HeapTupleData seqtuple;
+ Form_pg_sequence_data seq;
+
+ /* Confirm that the relation is a sequence */
+ Assert(seqrel->rd_rel->relkind == RELKIND_SEQUENCE);
+
+ /* Read the sequence tuple */
+ seq = read_seq_tuple(seqrel, &buf, &seqtuple);
+
+ /* Extract the values */
+ *last_value = seq->last_value;
+ *is_called = seq->is_called;
+
+ /* Release buffer */
+ UnlockReleaseBuffer(buf);
+}
+
/*
* Main internal procedure that handles 2 & 3 arg forms of SETVAL.
*
diff --git a/src/backend/replication/logical/sequencesync.c b/src/backend/replication/logical/sequencesync.c
index 165f909b3ba..9b502a89dd6 100644
--- a/src/backend/replication/logical/sequencesync.c
+++ b/src/backend/replication/logical/sequencesync.c
@@ -19,10 +19,6 @@
* CREATE SUBSCRIPTION
* ALTER SUBSCRIPTION ... REFRESH PUBLICATION
*
- * Executing the following command resets all sequences in the subscription to
- * state INIT, triggering re-synchronization:
- * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
- *
* The apply worker periodically scans pg_subscription_rel for sequences in
* INIT state. When such sequences are found, it spawns a sequencesync worker
* to handle synchronization.
@@ -36,8 +32,24 @@
* local subscriber, and finally marks each sequence as READY upon successful
* synchronization.
*
+ * The sequencesync worker then fetches all sequences that are
+ * in the READY state, queries the publisher for current sequence values, and
+ * updates any sequences that have drifted and then goes to sleep. The sleep
+ * interval starts as SEQSYNC_MIN_SLEEP_MS and doubles after each wake cycle
+ * (up to SEQSYNC_MAX_SLEEP_MS). When drift is detected, the interval resets to
+ * the minimum to ensure timely updates.
+ *
+ * After CREATE SUBSCRIPTION, sequences begin in the INIT state. Sequences
+ * added through ALTER SUBSCRIPTION.. REFRESH PUBLICATION also start in the INIT
+ * state. All INIT sequences are synchronized unconditionally, then transition
+ * to the READY state. Once in the READY state, sequences are checked for drift
+ * from the publisher and synchronized only when drift is detected.
+ *
* Sequence state transitions follow this pattern:
- * INIT -> READY
+ * INIT --> READY ->-+
+ * ^ | (check/synchronize)
+ * | |
+ * +--<---+
*
* To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
* sequences are synchronized per transaction. The locks on the sequence
@@ -60,6 +72,7 @@
#include "postmaster/interrupt.h"
#include "replication/logicalworker.h"
#include "replication/worker_internal.h"
+#include "storage/ipc.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@@ -78,21 +91,27 @@ typedef enum CopySeqResult
COPYSEQ_SUCCESS,
COPYSEQ_MISMATCH,
COPYSEQ_INSUFFICIENT_PERM,
- COPYSEQ_SKIPPED
+ COPYSEQ_SKIPPED,
+ COPYSEQ_NOWORK,
} CopySeqResult;
-static List *seqinfos = NIL;
+/* Sleep intervals for sync */
+#define SEQSYNC_MIN_SLEEP_MS 2000 /* 2 seconds */
+#define SEQSYNC_MAX_SLEEP_MS 30000 /* 30 seconds */
+
+static long sleep_ms = SEQSYNC_MIN_SLEEP_MS;
/*
- * Apply worker determines if sequence synchronization is needed.
+ * Apply worker determines whether a sequence sync worker is needed.
*
- * Start a sequencesync worker if one is not already running. The active
- * sequencesync worker will handle all pending sequence synchronization. If any
- * sequences remain unsynchronized after it exits, a new worker can be started
- * in the next iteration.
+ * Check if the subscription includes sequences and start a sequencesync
+ * worker if one is not already running. The active sequencesync worker will
+ * handle all pending sequence synchronization. If any sequences remain
+ * unsynchronized after it exits, a new worker can be started in the next
+ * iteration.
*/
void
-ProcessSequencesForSync(void)
+MaybeLaunchSequenceSyncWorker(void)
{
LogicalRepWorker *sequencesync_worker;
int nsyncworkers;
@@ -144,7 +163,7 @@ ProcessSequencesForSync(void)
* for the given list of sequence indexes.
*/
static void
-get_sequences_string(List *seqindexes, StringInfo buf)
+get_sequences_string(List *seqindexes, List *seqinfos, StringInfo buf)
{
resetStringInfo(buf);
foreach_int(seqidx, seqindexes)
@@ -171,7 +190,7 @@ get_sequences_string(List *seqindexes, StringInfo buf)
*/
static void
report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
- List *missing_seqs_idx)
+ List *missing_seqs_idx, List *seqinfos)
{
StringInfo seqstr;
@@ -183,7 +202,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
if (mismatched_seqs_idx)
{
- get_sequences_string(mismatched_seqs_idx, seqstr);
+ get_sequences_string(mismatched_seqs_idx, seqinfos, seqstr);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
@@ -194,7 +213,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
if (insuffperm_seqs_idx)
{
- get_sequences_string(insuffperm_seqs_idx, seqstr);
+ get_sequences_string(insuffperm_seqs_idx, seqinfos, seqstr);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_plural("insufficient privileges on sequence (%s)",
@@ -205,7 +224,7 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
if (missing_seqs_idx)
{
- get_sequences_string(missing_seqs_idx, seqstr);
+ get_sequences_string(missing_seqs_idx, seqinfos, seqstr);
ereport(WARNING,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg_plural("missing sequence on publisher (%s)",
@@ -229,7 +248,8 @@ report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
*/
static CopySeqResult
get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
- LogicalRepSequenceInfo **seqinfo, int *seqidx)
+ LogicalRepSequenceInfo **seqinfo, int *seqidx,
+ List *seqinfos)
{
bool isnull;
int col = 0;
@@ -325,11 +345,12 @@ get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel,
}
/*
- * Apply remote sequence state to local sequence and mark it as
- * synchronized (READY).
+ * Apply remote sequence state to local sequence. If we are doing this
+ * for sequences in the INIT state, move them to the READY state once
+ * synchronized.
*/
static CopySeqResult
-copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
+copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner, char relstate)
{
UserContext ucxt;
AclResult aclresult;
@@ -368,19 +389,46 @@ copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
/*
* Record the remote sequence's LSN in pg_subscription_rel and mark the
- * sequence as READY.
+ * sequence as READY if updating a sequence that is in INIT state.
*/
- UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
- seqinfo->page_lsn, false);
+ if (relstate == SUBREL_STATE_INIT)
+ UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
+ seqinfo->page_lsn, false);
return COPYSEQ_SUCCESS;
}
+/*
+ * check_sequence_drift
+ *
+ * Check if the remote sequence values differ from the local sequence.
+ * Returns true/false if any sequences drifted.
+ */
+static bool
+check_sequence_drift(Relation sequence_rel, LogicalRepSequenceInfo *seqinfo)
+{
+ int64 local_last_value;
+ bool local_is_called;
+
+ /* Get current local sequence state */
+ GetSequence(sequence_rel, &local_last_value, &local_is_called);
+
+ /* Check if values have drifted and return accordingly */
+ return (local_last_value != seqinfo->last_value ||
+ local_is_called != seqinfo->is_called);
+}
+
/*
* Copy existing data of sequences from the publisher.
+ *
+ * If relstate is SUBREL_STATE_READY, only synchronize sequences that
+ * have drifted from their publisher values. Otherwise, synchronize
+ * all sequences.
+ *
+ * Returns true/false if any sequences were actually copied.
*/
-static void
-copy_sequences(WalReceiverConn *conn)
+static bool
+copy_sequences(WalReceiverConn *conn, List *seqinfos)
{
int cur_batch_base_index = 0;
int n_seqinfos = list_length(seqinfos);
@@ -390,13 +438,10 @@ copy_sequences(WalReceiverConn *conn)
StringInfo seqstr = makeStringInfo();
StringInfo cmd = makeStringInfo();
MemoryContext oldctx;
+ bool sequence_copied = false;
#define MAX_SEQUENCES_SYNC_PER_BATCH 100
- elog(DEBUG1,
- "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
- MySubscription->name, n_seqinfos);
-
while (cur_batch_base_index < n_seqinfos)
{
Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
@@ -406,6 +451,7 @@ copy_sequences(WalReceiverConn *conn)
int batch_mismatched_count = 0;
int batch_skipped_count = 0;
int batch_insuffperm_count = 0;
+ int batch_no_drift = 0;
int batch_missing_count;
Relation sequence_rel = NULL;
@@ -501,20 +547,34 @@ copy_sequences(WalReceiverConn *conn)
}
sync_status = get_and_validate_seq_info(slot, &sequence_rel,
- &seqinfo, &seqidx);
+ &seqinfo, &seqidx, seqinfos);
+
+ /*
+ * For sequences in INIT state, always sync.
+ * Otherwise, for sequences in READY state, only sync if there's drift.
+ */
if (sync_status == COPYSEQ_SUCCESS)
- sync_status = copy_sequence(seqinfo,
- sequence_rel->rd_rel->relowner);
+ {
+ if ((seqinfo->relstate == SUBREL_STATE_INIT) ||
+ check_sequence_drift(sequence_rel, seqinfo))
+ sync_status = copy_sequence(seqinfo,
+ sequence_rel->rd_rel->relowner,
+ seqinfo->relstate);
+ else
+ sync_status = COPYSEQ_NOWORK;
+ }
switch (sync_status)
{
case COPYSEQ_SUCCESS:
elog(DEBUG1,
- "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
+ "logical replication sync for subscription \"%s\", sequence \"%s.%s\" has been updated",
MySubscription->name, seqinfo->nspname,
seqinfo->seqname);
batch_succeeded_count++;
+ sequence_copied = true;
break;
+
case COPYSEQ_MISMATCH:
/*
@@ -528,6 +588,7 @@ copy_sequences(WalReceiverConn *conn)
MemoryContextSwitchTo(oldctx);
batch_mismatched_count++;
break;
+
case COPYSEQ_INSUFFICIENT_PERM:
/*
@@ -541,6 +602,7 @@ copy_sequences(WalReceiverConn *conn)
MemoryContextSwitchTo(oldctx);
batch_insuffperm_count++;
break;
+
case COPYSEQ_SKIPPED:
/*
@@ -558,6 +620,15 @@ copy_sequences(WalReceiverConn *conn)
batch_skipped_count++;
}
break;
+
+ case COPYSEQ_NOWORK:
+ /* Nothing to do */
+ batch_no_drift++;
+ break;
+
+ default:
+ elog(ERROR, "unrecognized Sequence replication result: %d", (int) sync_status);
+
}
if (sequence_rel)
@@ -572,14 +643,15 @@ copy_sequences(WalReceiverConn *conn)
batch_missing_count = batch_size - (batch_succeeded_count +
batch_mismatched_count +
batch_insuffperm_count +
- batch_skipped_count);
+ batch_skipped_count +
+ batch_no_drift);
elog(DEBUG1,
- "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
- MySubscription->name,
- (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
- batch_size, batch_succeeded_count, batch_mismatched_count,
- batch_insuffperm_count, batch_missing_count, batch_skipped_count);
+ "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped, %d no drift",
+ MySubscription->name,
+ (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
+ batch_size, batch_succeeded_count, batch_mismatched_count,
+ batch_insuffperm_count, batch_missing_count, batch_skipped_count, batch_no_drift);
/* Commit this batch, and prepare for next batch */
CommitTransactionCommand();
@@ -607,51 +679,53 @@ copy_sequences(WalReceiverConn *conn)
/* Report mismatches, permission issues, or missing sequences */
report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
- missing_seqs_idx);
+ missing_seqs_idx, seqinfos);
+
+ return sequence_copied;
}
/*
* Identifies sequences that require synchronization and initiates the
* synchronization process.
+ *
+ * Returns true if sequences have been updated.
*/
-static void
-LogicalRepSyncSequences(void)
+static bool
+LogicalRepSyncSequences(WalReceiverConn *conn)
{
- char *err;
- bool must_use_password;
Relation rel;
HeapTuple tup;
- ScanKeyData skey[2];
+ ScanKeyData skey[1];
SysScanDesc scan;
Oid subid = MyLogicalRepWorker->subid;
- StringInfoData app_name;
+ bool sequence_copied = false;
+ List *seqinfos = NIL;
StartTransactionCommand();
rel = table_open(SubscriptionRelRelationId, AccessShareLock);
+ /* Scan for all sequences belonging to this subscription */
ScanKeyInit(&skey[0],
Anum_pg_subscription_rel_srsubid,
BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(subid));
- ScanKeyInit(&skey[1],
- Anum_pg_subscription_rel_srsubstate,
- BTEqualStrategyNumber, F_CHAREQ,
- CharGetDatum(SUBREL_STATE_INIT));
-
scan = systable_beginscan(rel, InvalidOid, false,
- NULL, 2, skey);
+ NULL, 1, skey);
+
while (HeapTupleIsValid(tup = systable_getnext(scan)))
{
Form_pg_subscription_rel subrel;
LogicalRepSequenceInfo *seq;
Relation sequence_rel;
MemoryContext oldctx;
+ char relstate;
CHECK_FOR_INTERRUPTS();
subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
+ relstate = subrel->srsubstate;
sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
@@ -666,6 +740,8 @@ LogicalRepSyncSequences(void)
continue;
}
+ Assert(relstate == SUBREL_STATE_INIT || relstate == SUBREL_STATE_READY);
+
/*
* Worker needs to process sequences across transaction boundary, so
* allocate them under long-lived context.
@@ -676,6 +752,7 @@ LogicalRepSyncSequences(void)
seq->localrelid = subrel->srrelid;
seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
+ seq->relstate = relstate;
seqinfos = lappend(seqinfos, seq);
MemoryContextSwitchTo(oldctx);
@@ -693,36 +770,19 @@ LogicalRepSyncSequences(void)
* Exit early if no catalog entries found, likely due to concurrent drops.
*/
if (!seqinfos)
- return;
+ return false;
- /* Is the use of a password mandatory? */
- must_use_password = MySubscription->passwordrequired &&
- !MySubscription->ownersuperuser;
+ /* Process sequences */
+ sequence_copied = copy_sequences(conn, seqinfos);
- initStringInfo(&app_name);
- appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
- MySubscription->oid, GetSystemIdentifier());
+ /* Clean up */
+ list_free(seqinfos);
- /*
- * Establish the connection to the publisher for sequence synchronization.
- */
- LogRepWorkerWalRcvConn =
- walrcv_connect(MySubscription->conninfo, true, true,
- must_use_password,
- app_name.data, &err);
- if (LogRepWorkerWalRcvConn == NULL)
- ereport(ERROR,
- errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
- MySubscription->name, err));
-
- pfree(app_name.data);
-
- copy_sequences(LogRepWorkerWalRcvConn);
+ return sequence_copied;
}
/*
- * Execute the initial sync with error handling. Disable the subscription,
+ * Execute the sequence sync with error handling. Disable the subscription,
* if required.
*
* Note that we don't handle FATAL errors which are probably because of system
@@ -735,8 +795,90 @@ start_sequence_sync(void)
PG_TRY();
{
- /* Call initial sync. */
- LogicalRepSyncSequences();
+ char *err;
+ bool must_use_password;
+ StringInfoData app_name;
+
+ /* Is the use of a password mandatory? */
+ must_use_password = MySubscription->passwordrequired &&
+ !MySubscription->ownersuperuser;
+
+ initStringInfo(&app_name);
+ appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
+ MySubscription->oid, GetSystemIdentifier());
+
+ /*
+ * Establish the connection to the publisher for sequence synchronization.
+ */
+ LogRepWorkerWalRcvConn =
+ walrcv_connect(MySubscription->conninfo, true, true,
+ must_use_password,
+ app_name.data, &err);
+ if (LogRepWorkerWalRcvConn == NULL)
+ ereport(ERROR,
+ errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
+ MySubscription->name, err));
+
+ pfree(app_name.data);
+
+ for (;;)
+ {
+ bool sequence_copied = false;
+ MemoryContext oldctx;
+
+ CHECK_FOR_INTERRUPTS();
+
+
+ /* Need to start transaction for cache lookup */
+ StartTransactionCommand();
+
+ /*
+ * Check if subscription has been disabled.
+ * Switch to ApplyContext that outlives the transaction so that
+ * MySubscription remains valid even after CommitTransactionCommand().
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
+ MemoryContextSwitchTo(oldctx);
+
+ if (!MySubscription->enabled)
+ {
+ ereport(LOG,
+ (errmsg("logical replication sequencesync worker for subscription \"%s\" will stop because the subscription was disabled",
+ MySubscription->name)));
+ proc_exit(0);
+ }
+
+ CommitTransactionCommand();
+
+ /*
+ * Synchronize all sequences (both READY and INIT states).
+ */
+ sequence_copied = LogicalRepSyncSequences(LogRepWorkerWalRcvConn);
+
+ /* Adjust sleep interval based on whether sequences were copied over */
+ if (sequence_copied)
+ {
+ sleep_ms = SEQSYNC_MIN_SLEEP_MS;
+ }
+ else
+ {
+
+ /*
+ * Double the sleep time, but not beyond
+ * the maximum allowable value.
+ */
+ sleep_ms = Min(sleep_ms * 2, SEQSYNC_MAX_SLEEP_MS);
+ }
+
+ /* Sleep for the configured interval */
+ (void) WaitLatch(MyLatch,
+ WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
+ sleep_ms,
+ WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
+ ResetLatch(MyLatch);
+ }
}
PG_CATCH();
{
diff --git a/src/backend/replication/logical/syncutils.c b/src/backend/replication/logical/syncutils.c
index 535ffb6f09e..ac7669bc273 100644
--- a/src/backend/replication/logical/syncutils.c
+++ b/src/backend/replication/logical/syncutils.c
@@ -171,7 +171,6 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
case WORKERTYPE_APPLY:
ProcessSyncingTablesForApply(current_lsn);
- ProcessSequencesForSync();
break;
case WORKERTYPE_SEQUENCESYNC:
@@ -190,13 +189,13 @@ ProcessSyncingRelations(XLogRecPtr current_lsn)
*
* The pg_subscription_rel catalog is shared by tables and sequences. Changes
* to either sequences or tables can affect the validity of relation states, so
- * we identify non-READY tables and non-READY sequences together to ensure
+ * we identify non-READY tables and sequences (in any state) together to ensure
* consistency.
*
* has_pending_subtables: true if the subscription has one or more tables that
* are not in READY state, otherwise false.
* has_pending_subsequences: true if the subscription has one or more sequences
- * that are not in READY state, otherwise false.
+ * (in any state), otherwise false.
*/
void
FetchRelationStates(bool *has_pending_subtables,
@@ -204,53 +203,47 @@ FetchRelationStates(bool *has_pending_subtables,
bool *started_tx)
{
/*
- * has_subtables and has_subsequences_non_ready are declared as static,
+ * has_subtables and has_subsequences are declared as static,
* since the same value can be used until the system table is invalidated.
*/
static bool has_subtables = false;
- static bool has_subsequences_non_ready = false;
-
+ static bool has_subsequences = false;
*started_tx = false;
-
if (relation_states_validity != SYNC_RELATIONS_STATE_VALID)
{
MemoryContext oldctx;
List *rstates;
+ List *seq_states;
SubscriptionRelState *rstate;
-
relation_states_validity = SYNC_RELATIONS_STATE_REBUILD_STARTED;
- has_subsequences_non_ready = false;
-
+ has_subsequences = false;
/* Clean the old lists. */
list_free_deep(table_states_not_ready);
table_states_not_ready = NIL;
-
if (!IsTransactionState())
{
StartTransactionCommand();
*started_tx = true;
}
-
- /* Fetch tables and sequences that are in non-READY state. */
- rstates = GetSubscriptionRelations(MySubscription->oid, true, true,
+ /* Fetch tables that are in non-READY state. */
+ rstates = GetSubscriptionRelations(MySubscription->oid, true, false,
true);
-
+ /* Fetch all sequences (regardless of state). */
+ seq_states = GetSubscriptionRelations(MySubscription->oid, false, true,
+ false);
/* Allocate the tracking info in a permanent memory context. */
oldctx = MemoryContextSwitchTo(CacheMemoryContext);
foreach_ptr(SubscriptionRelState, subrel, rstates)
{
- if (get_rel_relkind(subrel->relid) == RELKIND_SEQUENCE)
- has_subsequences_non_ready = true;
- else
- {
- rstate = palloc_object(SubscriptionRelState);
- memcpy(rstate, subrel, sizeof(SubscriptionRelState));
- table_states_not_ready = lappend(table_states_not_ready,
- rstate);
- }
+ rstate = palloc_object(SubscriptionRelState);
+ memcpy(rstate, subrel, sizeof(SubscriptionRelState));
+ table_states_not_ready = lappend(table_states_not_ready,
+ rstate);
}
+ /* Check if there are any sequences. */
+ has_subsequences = (seq_states != NIL);
MemoryContextSwitchTo(oldctx);
-
+ list_free_deep(seq_states);
/*
* Does the subscription have tables?
*
@@ -260,7 +253,6 @@ FetchRelationStates(bool *has_pending_subtables,
*/
has_subtables = (table_states_not_ready != NIL) ||
HasSubscriptionTables(MySubscription->oid);
-
/*
* If the subscription relation cache has been invalidated since we
* entered this routine, we still use and return the relations we just
@@ -271,10 +263,8 @@ FetchRelationStates(bool *has_pending_subtables,
if (relation_states_validity == SYNC_RELATIONS_STATE_REBUILD_STARTED)
relation_states_validity = SYNC_RELATIONS_STATE_VALID;
}
-
if (has_pending_subtables)
*has_pending_subtables = has_subtables;
-
if (has_pending_subsequences)
- *has_pending_subsequences = has_subsequences_non_ready;
+ *has_pending_subsequences = has_subsequences;
}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 32725c48623..5b479be7ec7 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4219,6 +4219,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
ProcessConfigFile(PGC_SIGHUP);
}
+
if (rc & WL_TIMEOUT)
{
/*
@@ -4266,6 +4267,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
maybe_advance_nonremovable_xid(&rdt_data, false);
+ /* Check if any new sequences need syncing */
+ MaybeLaunchSequenceSyncWorker();
+
/*
* Force reporting to ensure long idle periods don't lead to
* arbitrarily delayed stats. Stats can only be reported outside
diff --git a/src/bin/psql/tab-complete.in.c b/src/bin/psql/tab-complete.in.c
index 8b91bc00062..b8bdfebb15a 100644
--- a/src/bin/psql/tab-complete.in.c
+++ b/src/bin/psql/tab-complete.in.c
@@ -2331,8 +2331,8 @@ match_previous_words(int pattern_id,
/* ALTER SUBSCRIPTION <name> */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny))
COMPLETE_WITH("CONNECTION", "ENABLE", "DISABLE", "OWNER TO",
- "RENAME TO", "REFRESH PUBLICATION", "REFRESH SEQUENCES",
- "SET", "SKIP (", "ADD PUBLICATION", "DROP PUBLICATION");
+ "RENAME TO", "REFRESH PUBLICATION", "SET", "SKIP (",
+ "ADD PUBLICATION", "DROP PUBLICATION");
/* ALTER SUBSCRIPTION <name> REFRESH */
else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "REFRESH"))
COMPLETE_WITH("PUBLICATION", "SEQUENCES");
diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h
index f810b34c78d..a7c808acf23 100644
--- a/src/include/catalog/pg_subscription_rel.h
+++ b/src/include/catalog/pg_subscription_rel.h
@@ -92,6 +92,7 @@ typedef struct LogicalRepSequenceInfo
char *seqname;
char *nspname;
Oid localrelid;
+ char relstate;
/* Sequence information retrieved from the publisher node */
XLogRecPtr page_lsn;
diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h
index 2c3c4a3f074..f00eea9fbd1 100644
--- a/src/include/commands/sequence.h
+++ b/src/include/commands/sequence.h
@@ -47,6 +47,7 @@ extern ObjectAddress AlterSequence(ParseState *pstate, AlterSeqStmt *stmt);
extern void SequenceChangePersistence(Oid relid, char newrelpersistence);
extern void DeleteSequenceTuple(Oid relid);
extern void ResetSequence(Oid seq_relid);
+extern void GetSequence(Relation seqrel, int64 *last_value, bool *is_called);
extern void SetSequence(Oid relid, int64 next, bool iscalled);
extern void ResetSequenceCaches(void);
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index c1285fdd1bc..537e13fe9b8 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -286,7 +286,7 @@ extern void UpdateTwoPhaseState(Oid suboid, char new_state);
extern void ProcessSyncingTablesForSync(XLogRecPtr current_lsn);
extern void ProcessSyncingTablesForApply(XLogRecPtr current_lsn);
-extern void ProcessSequencesForSync(void);
+extern void MaybeLaunchSequenceSyncWorker(void);
pg_noreturn extern void FinishSyncWorker(void);
extern void InvalidateSyncingRelStates(Datum arg, int cacheid, uint32 hashvalue);
diff --git a/src/test/subscription/t/026_stats.pl b/src/test/subscription/t/026_stats.pl
index 5d457060a02..2fe209f461f 100644
--- a/src/test/subscription/t/026_stats.pl
+++ b/src/test/subscription/t/026_stats.pl
@@ -16,6 +16,8 @@ $node_publisher->start;
# Create subscriber node.
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+ "max_logical_replication_workers = 10");
$node_subscriber->start;
diff --git a/src/test/subscription/t/036_sequences.pl b/src/test/subscription/t/036_sequences.pl
index 471780a3585..1d81518fe22 100644
--- a/src/test/subscription/t/036_sequences.pl
+++ b/src/test/subscription/t/036_sequences.pl
@@ -75,8 +75,7 @@ is($result, '100|t',
##########
## ALTER SUBSCRIPTION ... REFRESH PUBLICATION should cause sync of new
-# sequences of the publisher, but changes to existing sequences should
-# not be synced.
+# sequences of the publisher.
##########
# Create a new sequence 'regress_s2', and update existing sequence 'regress_s1'
@@ -84,9 +83,6 @@ $node_publisher->safe_psql(
'postgres', qq(
CREATE SEQUENCE regress_s2;
INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100);
-
- -- Existing sequence
- INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100);
));
# Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION
@@ -97,19 +93,6 @@ $result = $node_subscriber->safe_psql(
$node_subscriber->poll_query_until('postgres', $synced_query)
or die "Timed out while waiting for subscriber to synchronize data";
-$result = $node_publisher->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s1;
-));
-is($result, '200|t', 'Check sequence value in the publisher');
-
-# Check - existing sequence ('regress_s1') is not synced
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s1;
-));
-is($result, '100|t', 'REFRESH PUBLICATION will not sync existing sequence');
-
# Check - newly published sequence ('regress_s2') is synced
$result = $node_subscriber->safe_psql(
'postgres', qq(
@@ -119,16 +102,13 @@ is($result, '100|t',
'REFRESH PUBLICATION will sync newly published sequence');
##########
-# Test: REFRESH SEQUENCES and REFRESH PUBLICATION (copy_data = false)
-#
-# 1. ALTER SUBSCRIPTION ... REFRESH SEQUENCES should re-synchronize all
-# existing sequences, but not synchronize newly added ones.
-# 2. ALTER SUBSCRIPTION ... REFRESH PUBLICATION with (copy_data = false) should
-# also not update sequence values for newly added sequences.
+# Test:
+# 1. Automatic update of existing sequence values
+# 2. Newly added sequences are not automatically updated.
##########
-# Create a new sequence 'regress_s3', and update the existing sequence
-# 'regress_s2'.
+# Create a new sequence 'regress_s3', and update the existing sequences
+# 'regress_s2' and 'regress_s1'.
$node_publisher->safe_psql(
'postgres', qq(
CREATE SEQUENCE regress_s3;
@@ -136,53 +116,28 @@ $node_publisher->safe_psql(
-- Existing sequence
INSERT INTO regress_seq_test SELECT nextval('regress_s2') FROM generate_series(1,100);
+ INSERT INTO regress_seq_test SELECT nextval('regress_s1') FROM generate_series(1,100);
));
-# 1. Do ALTER SUBSCRIPTION ... REFRESH SEQUENCES
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- ALTER SUBSCRIPTION regress_seq_sub REFRESH SEQUENCES;
-));
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
-
# Check - existing sequences ('regress_s1' and 'regress_s2') are synced
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s1;
-));
-is($result, '200|t', 'REFRESH SEQUENCES will sync existing sequences');
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s2;
-));
-is($result, '200|t', 'REFRESH SEQUENCES will sync existing sequences');
-# Check - newly published sequence ('regress_s3') is not synced
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- SELECT last_value, is_called FROM regress_s3;
-));
-is($result, '1|f',
- 'REFRESH SEQUENCES will not sync newly published sequence');
+# Poll until regress_s1 reflects the updated sequence value
+$node_subscriber->poll_query_until('postgres',
+ qq(SELECT last_value = 200 AND is_called = 't' FROM regress_s1;))
+ or die "Timed out while waiting for regress_s1 sequence to sync";
-# 2. Do ALTER SUBSCRIPTION ... REFRESH PUBLICATION with copy_data as false
-$result = $node_subscriber->safe_psql(
- 'postgres', qq(
- ALTER SUBSCRIPTION regress_seq_sub REFRESH PUBLICATION WITH (copy_data = false);
-));
-$node_subscriber->poll_query_until('postgres', $synced_query)
- or die "Timed out while waiting for subscriber to synchronize data";
+# Poll until regress_s2 reflects the updated sequence value
+$node_subscriber->poll_query_until('postgres',
+ qq(SELECT last_value = 200 AND is_called = 't' FROM regress_s2;))
+ or die "Timed out while waiting for regress_s2 sequence to sync";
-# Check - newly published sequence ('regress_s3') is not synced with copy_data
-# as false.
+# Check - newly published sequence ('regress_s3') is not synced
$result = $node_subscriber->safe_psql(
'postgres', qq(
SELECT last_value, is_called FROM regress_s3;
));
is($result, '1|f',
- 'REFRESH PUBLICATION will not sync newly published sequence with copy_data as false'
-);
+ 'Newly published sequences are not synced automatically');
##########
# ALTER SUBSCRIPTION ... REFRESH PUBLICATION should report an error when:
--
2.47.3
view thread (58+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: [PATCH] Support automatic sequence replication
In-Reply-To: <CAFPTHDZiWYXoKoo4VcBYNH9a=gxDZhfkcBeXt5w6cLw4_ysyKw@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox