public inbox for [email protected]
help / color / mirror / Atom feedFrom: Antonin Houska <[email protected]>
Subject: [PATCH] Reproduce filtering issue.
Date: Mon, 23 Mar 2026 12:50:15 +0100
---
contrib/test_decoding/expected/filtering.out | 74 ++++++++++++++
contrib/test_decoding/specs/filtering.spec | 101 +++++++++++++++++++
src/backend/executor/nodeModifyTable.c | 2 +
src/backend/replication/logical/snapbuild.c | 3 +
src/test/isolation/isolationtester.c | 9 +-
5 files changed, 188 insertions(+), 1 deletion(-)
create mode 100644 contrib/test_decoding/expected/filtering.out
create mode 100644 contrib/test_decoding/specs/filtering.spec
diff --git a/contrib/test_decoding/expected/filtering.out b/contrib/test_decoding/expected/filtering.out
new file mode 100644
index 00000000000..6ba9690509f
--- /dev/null
+++ b/contrib/test_decoding/expected/filtering.out
@@ -0,0 +1,74 @@
+Parsed test spec with 5 sessions
+
+starting permutation: s1_assign_xid s3_repack s2_assign_xid s1_rollback s4_insert s5_wakeup_snapbuild s2_rollback s5_wakeup_insert_speculative s4_insert_commit s5_wakeup_repack
+injection_points_attach
+-----------------------
+
+(1 row)
+
+injection_points_attach
+-----------------------
+
+(1 row)
+
+step s1_assign_xid:
+ BEGIN;
+ CREATE TABLE c(i int);
+
+step s3_repack:
+ REPACK (CONCURRENTLY) a;
+ <waiting ...>
+step s2_assign_xid:
+ BEGIN;
+ CREATE TABLE d(i int);
+
+step s1_rollback:
+ ROLLBACK;
+
+step s4_insert:
+ BEGIN;
+ INSERT INTO t(i)
+ SELECT max(i) + 1 FROM t ON CONFLICT (i) DO UPDATE SET i=EXCLUDED.i;
+ <waiting ...>
+step s5_wakeup_snapbuild:
+ SELECT injection_points_wakeup('snapbuild-full');
+
+injection_points_wakeup
+-----------------------
+
+(1 row)
+
+step s2_rollback:
+ ROLLBACK;
+
+step s5_wakeup_insert_speculative:
+ SELECT injection_points_wakeup('insert-speculative-before-confirm');
+
+injection_points_wakeup
+-----------------------
+
+(1 row)
+
+step s4_insert: <... completed>
+step s4_insert_commit:
+ COMMIT;
+
+step s5_wakeup_repack:
+ SELECT injection_points_wakeup('repack-concurrently-before-lock');
+
+injection_points_wakeup
+-----------------------
+
+(1 row)
+
+step s3_repack: <... completed>
+injection_points_detach
+-----------------------
+
+(1 row)
+
+injection_points_detach
+-----------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/specs/filtering.spec b/contrib/test_decoding/specs/filtering.spec
new file mode 100644
index 00000000000..a02c9f8facb
--- /dev/null
+++ b/contrib/test_decoding/specs/filtering.spec
@@ -0,0 +1,101 @@
+setup
+{
+ CREATE TABLE a(i int primary key, j int) WITH (autovacuum_enabled = off);
+ INSERT INTO a(i, j) VALUES (1, 1), (2, 2);
+ CREATE TABLE t(i int primary key);
+ INSERT INTO t(i) VALUES (1);
+ CREATE EXTENSION injection_points;
+}
+
+session s1
+step s1_assign_xid
+{
+ BEGIN;
+ CREATE TABLE c(i int);
+}
+step s1_rollback
+{
+ ROLLBACK;
+}
+
+session s2
+step s2_assign_xid
+{
+ BEGIN;
+ CREATE TABLE d(i int);
+}
+step s2_rollback
+{
+ ROLLBACK;
+}
+
+session s3
+setup
+{
+ SELECT injection_points_attach('snapbuild-full', 'wait');
+ SELECT injection_points_attach('repack-concurrently-before-lock', 'wait');
+}
+step s3_repack
+{
+ REPACK (CONCURRENTLY) a;
+}
+teardown
+{
+ SELECT injection_points_detach('repack-concurrently-before-lock');
+ SELECT injection_points_detach('snapbuild-full');
+}
+
+session s4
+setup
+{
+ SELECT injection_points_set_local();
+ SELECT injection_points_attach('insert-speculative-before-confirm', 'wait');
+}
+step s4_insert
+{
+ BEGIN;
+ INSERT INTO t(i)
+ SELECT max(i) + 1 FROM t ON CONFLICT (i) DO UPDATE SET i=EXCLUDED.i;
+}
+step s4_insert_commit
+{
+ COMMIT;
+}
+teardown
+{
+ SELECT injection_points_detach('insert-speculative-before-confirm');
+}
+
+session s5
+step s5_wakeup_snapbuild
+{
+ SELECT injection_points_wakeup('snapbuild-full');
+}
+step s5_wakeup_insert_speculative
+{
+ SELECT injection_points_wakeup('insert-speculative-before-confirm');
+}
+step s5_wakeup_repack
+{
+ SELECT injection_points_wakeup('repack-concurrently-before-lock');
+}
+
+permutation
+# Bring the snapshot builder to the FULL_SNAPSHOT state.
+s1_assign_xid
+s3_repack
+s2_assign_xid
+s1_rollback
+# Perform the speculative insert, but no confirmation so far. The snapshot
+# builder should decode it.
+s4_insert
+# Let the snapshout builder achieve CONSISTENT state and finish the setup.
+s5_wakeup_snapbuild
+s2_rollback
+# While REPACK is waiting on repack-concurrently-before-lock, let the insert
+# get confirmed. Relation filtering is now enabled.
+s5_wakeup_insert_speculative
+s4_insert_commit
+# REPACK should now decode the speculative insert and decode the speculative
+# insert (with the confirmation record filtered out).
+s5_wakeup_repack
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 680c29f35d5..6d5482e5746 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -1232,6 +1232,8 @@ ExecInsert(ModifyTableContext *context,
slot, arbiterIndexes,
&specConflict);
+ INJECTION_POINT("insert-speculative-before-confirm", NULL);
+
/* adjust the tuple's state accordingly */
table_tuple_complete_speculative(resultRelationDesc, slot,
specToken, !specConflict);
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index fbdd4600a2b..883e5b6e18f 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -141,6 +141,7 @@
#include "storage/procarray.h"
#include "storage/standby.h"
#include "utils/builtins.h"
+#include "utils/injection_point.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/snapshot.h"
@@ -1390,6 +1391,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->state = SNAPBUILD_FULL_SNAPSHOT;
builder->next_phase_at = running->nextXid;
+ INJECTION_POINT("snapbuild-full", NULL);
+
ereport(LOG,
errmsg("logical decoding found initial consistent point at %X/%08X",
LSN_FORMAT_ARGS(lsn)),
diff --git a/src/test/isolation/isolationtester.c b/src/test/isolation/isolationtester.c
index 440c875b8ac..8f17ee412c9 100644
--- a/src/test/isolation/isolationtester.c
+++ b/src/test/isolation/isolationtester.c
@@ -216,15 +216,22 @@ main(int argc, char **argv)
* exactly expect concurrent use of test tables. However, autovacuum will
* occasionally take AccessExclusiveLock to truncate a table, and we must
* ignore that transient wait.
+ *
+ * If the session's backend is blocked, and if its background worker is
+ * waiting on an injection point, we assume that the injection point is
+ * the reason for the backend to be blocked. That's what we check in the
+ * second query of the UNION. XXX Should we use a separate query for that?
*/
initPQExpBuffer(&wait_query);
appendPQExpBufferStr(&wait_query,
+ "WITH blocking(res) AS ("
"SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
/* The spec syntax requires at least one session; assume that here. */
appendPQExpBufferStr(&wait_query, conns[1].backend_pid_str);
for (i = 2; i < nconns; i++)
appendPQExpBuffer(&wait_query, ",%s", conns[i].backend_pid_str);
- appendPQExpBufferStr(&wait_query, "}')");
+ appendPQExpBufferStr(&wait_query, "}') UNION "
+ "SELECT pg_catalog.pg_isolation_test_session_is_blocked(pid, '{}') FROM pg_stat_activity WHERE leader_pid=$1) SELECT bool_or(res) FROM blocking");
res = PQprepare(conns[0].conn, PREP_WAITING, wait_query.data, 0, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
--
2.47.3
--=-=-=--
view thread (728+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected]
Subject: Re: [PATCH] Reproduce filtering issue.
In-Reply-To: <no-message-id-498517@localhost>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox