public inbox for [email protected]  
help / color / mirror / Atom feed
From: Mark Dilger <[email protected]>
To: Pg Hackers <[email protected]>
Subject: Bug in logical decoding with DDL and subtransactions
Date: Wed, 29 Apr 2026 21:59:04 -0700
Message-ID: <CAHgHdKu5e3XY5e90Tuaxq_R4WrKxSV734Q+Lwo5y39Omp2A-Gg@mail.gmail.com> (raw)

There is a bug in logical decoding with CREATE and subtransactions.  If a
CREATE statement creates a row in a catalog during a subtransaction, but
that subtransaction gets rolled back to the savepoint, and other things
happen which trigger page pruning on the catalog page, and the original
transaction (perhaps in a new subtransaction) then does another CREATE
operation, a new row can get inserted into the same catalog at the same
TID.  During logical decoding, this can trigger an Assertion, and in
non-assert builds, could silently corrupt the decoder's catalog visibility,
which could cause it to produce incorrect output (wrong column mappings,
etc.)

This bug appears to go all the way back to 9.4 where logical replication
was introduced.

Arseny Sher hit the cmax variant of this exact bug, and Alvaro fixed the
cmax version of it, but appears not to have seen the danger for cmin also
existed, rather writing the comment, "if so it must have the same cmin."
 (commit 350cdcd5e6d, 2019)

Creating a short deterministic reproducer is difficult, because the catalog
table must be set up such that page pruning will happen.  (I have a 24K
line reproducer, which seems too big to attach for the list.)  A fuzz
tester is attached instead.

The attached patch fixes the problem without fixing the fundamental
architectural shortcut the code is taking.  The comment in xl_heap_new_cid
("store toplevel xid so we don't have to merge cids from different
transactions") indicates an intentional design choice.  A more complete fix
could also be considered, but is not included here.

-- 

*Mark Dilger*


Attachments:

  [application/octet-stream] fix-tuplecid-cmin-upstream.patch (5.3K, 3-fix-tuplecid-cmin-upstream.patch)
  download | inline diff:
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 11139a910b8..6c0125818cd 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -145,6 +145,7 @@ typedef struct ReorderBufferTupleCidEnt
 	CommandId	cmin;
 	CommandId	cmax;
 	CommandId	combocid;		/* just for debugging */
+	TransactionId subxid;		/* subtransaction that wrote this entry */
 } ReorderBufferTupleCidEnt;
 
 /* Virtual file descriptor with file offset tracking */
@@ -1878,16 +1879,60 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
 			ent->cmin = change->data.tuplecid.cmin;
 			ent->cmax = change->data.tuplecid.cmax;
 			ent->combocid = change->data.tuplecid.combocid;
+			ent->subxid = change->data.tuplecid.subxid;
 		}
-		else
+		else if (ent->cmin != change->data.tuplecid.cmin)
 		{
 			/*
-			 * Maybe we already saw this tuple before in this transaction, but
-			 * if so it must have the same cmin.
+			 * The same TID appears with a different cmin.  This happens when
+			 * a subtransaction inserts a catalog tuple, then a ROLLBACK TO
+			 * SAVEPOINT aborts that subtransaction, and a later operation
+			 * reuses the same TID (after page pruning reclaims the dead
+			 * tuple's slot).  Both xl_heap_new_cid records remain in the WAL
+			 * under the top-level xid.
+			 *
+			 * We resolve this by checking which entry's subtransaction
+			 * aborted.  The entry from the aborted subtransaction is stale
+			 * and should be discarded.
 			 */
-			Assert(ent->cmin == change->data.tuplecid.cmin);
-
+			if (TransactionIdDidAbort(ent->subxid))
+			{
+				/* Existing entry is from an aborted subtxn; replace it. */
+				ent->cmin = change->data.tuplecid.cmin;
+				ent->cmax = change->data.tuplecid.cmax;
+				ent->combocid = change->data.tuplecid.combocid;
+				ent->subxid = change->data.tuplecid.subxid;
+			}
+			else if (TransactionIdDidAbort(change->data.tuplecid.subxid))
+			{
+				/* New entry is from an aborted subtxn; skip it. */
+			}
+			else
+			{
+				/*
+				 * Neither subtransaction aborted — this shouldn't happen.
+				 * Keep the existing entry but log a warning.
+				 */
+				elog(WARNING, "tuplecid cmin mismatch with no aborted "
+					 "subtransaction: rel %u/%u/%u tid (%u,%u) "
+					 "existing cmin %u (subxid %u) "
+					 "new cmin %u (subxid %u)",
+					 key.rlocator.spcOid,
+					 key.rlocator.dbOid,
+					 key.rlocator.relNumber,
+					 ItemPointerGetBlockNumber(&key.tid),
+					 ItemPointerGetOffsetNumber(&key.tid),
+					 ent->cmin, ent->subxid,
+					 change->data.tuplecid.cmin,
+					 change->data.tuplecid.subxid);
+			}
+		}
+		else
+		{
 			/*
+			 * Same cmin — this is the normal case where the same tuple is
+			 * seen multiple times (e.g. insert then update).  Update cmax.
+			 *
 			 * cmax may be initially invalid, but once set it can only grow,
 			 * and never become invalid again.
 			 */
@@ -3440,7 +3485,8 @@ void
 ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 							 XLogRecPtr lsn, RelFileLocator locator,
 							 ItemPointerData tid, CommandId cmin,
-							 CommandId cmax, CommandId combocid)
+							 CommandId cmax, CommandId combocid,
+							 TransactionId subxid)
 {
 	ReorderBufferChange *change = ReorderBufferAllocChange(rb);
 	ReorderBufferTXN *txn;
@@ -3452,6 +3498,7 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 	change->data.tuplecid.cmin = cmin;
 	change->data.tuplecid.cmax = cmax;
 	change->data.tuplecid.combocid = combocid;
+	change->data.tuplecid.subxid = subxid;
 	change->lsn = lsn;
 	change->txn = txn;
 	change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index adf18c397db..3c9eba3dbaa 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -700,7 +700,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 	ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
 								 xlrec->target_locator, xlrec->target_tid,
 								 xlrec->cmin, xlrec->cmax,
-								 xlrec->combocid);
+								 xlrec->combocid, xid);
 
 	/* figure out new command id */
 	if (xlrec->cmin != InvalidCommandId &&
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index fa0745552f8..0072c7e5aa1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -146,6 +146,7 @@ typedef struct ReorderBufferChange
 			CommandId	cmin;
 			CommandId	cmax;
 			CommandId	combocid;
+			TransactionId subxid;
 		}			tuplecid;
 
 		/* Invalidation. */
@@ -748,7 +749,9 @@ extern void ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
 extern void ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 										 XLogRecPtr lsn, RelFileLocator locator,
 										 ItemPointerData tid,
-										 CommandId cmin, CommandId cmax, CommandId combocid);
+										 CommandId cmin, CommandId cmax,
+										 CommandId combocid,
+										 TransactionId subxid);
 extern void ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 										  Size nmsgs, SharedInvalidationMessage *msgs);
 extern void ReorderBufferAddDistributedInvalidations(ReorderBuffer *rb, TransactionId xid,


  [text/x-perl-script] fuzz_cmin.pl (12.5K, 4-fuzz_cmin.pl)
  download | inline:
#!/usr/bin/env perl
#
# fuzz_cmin.pl — Fuzzer for tuplecid cmin mismatch in logical decoding
#
# Demonstrates a bug in ReorderBufferBuildTupleCidHash where catalog
# TID reuse after savepoint rollback causes an Assert failure (cassert
# builds) or silent catalog visibility corruption (production builds).
#
# WHAT IT DOES
#
#   1. Initializes a fresh PostgreSQL cluster with wal_level=logical.
#   2. Creates a database, a logical replication slot (test_decoding),
#      and 30 seed tables to populate catalog pages.
#   3. Runs up to N iterations.  Each iteration:
#      a. Generates a random transaction with CREATE TABLE, DROP TABLE,
#         SAVEPOINT, ROLLBACK TO SAVEPOINT, and ALTER TABLE operations.
#      b. Commits the transaction.
#      c. Calls pg_logical_slot_get_changes() to trigger logical decoding.
#   4. If the server crashes (Assert or PANIC), reports the iteration
#      number and saves the failing SQL to a file.
#
# WHAT TO EXPECT
#
#   On an UNPATCHED --enable-cassert build, the fuzzer typically triggers
#   within 300-1000 iterations (15-60 seconds) with output like:
#
#     !!! CMIN MISMATCH on iteration 389 !!!
#     Reproducer: fuzz_cases/case_389.sql
#     Panic details:
#     TRAP: failed Assert("ent->cmin == change->data.tuplecid.cmin"), ...
#
#   On a PATCHED build, it runs to completion with no crashes.
#
#   On a production (non-assert) build, the bug is silent — no crash,
#   but the logical decoder may produce incorrect output.
#
# REQUIREMENTS
#
#   - A PostgreSQL installation (--enable-cassert recommended).
#     Set PGINSTALL to the installation prefix, e.g.:
#       export PGINSTALL=/usr/local/pgsql
#     The directory must contain bin/initdb, bin/pg_ctl, bin/psql.
#   - The test_decoding contrib module must be installed.
#   - No other PostgreSQL instance using port 54329 (or set PGPORT).
#
# USAGE
#
#   perl fuzz_cmin.pl [max_iterations]
#
#   Default is 100000 iterations.  The fuzzer stops early on crash.
#   Data directory and test cases are written to fuzz_data/ and
#   fuzz_cases/ in the current directory.
#
# ENVIRONMENT VARIABLES
#
#   PGINSTALL   PostgreSQL installation prefix (default: /usr/local/pgsql)
#   PGPORT      Port number (default: 54329)
#

use strict;
use warnings;
use File::Path qw(rmtree);
use POSIX qw(WIFEXITED WEXITSTATUS);

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

my $max_iterations = $ARGV[0] || 100_000;
my $pg_install     = $ENV{PGINSTALL} || '/usr/local/pgsql';
my $port           = $ENV{PGPORT}    || 54329;
my $base_dir       = 'fuzz_data';
my $cases_dir      = 'fuzz_cases';
my $dbname         = 'fuzzdb';
my $slot_name      = 'fuzz_slot';

my $pg_ctl  = "$pg_install/bin/pg_ctl";
my $initdb  = "$pg_install/bin/initdb";
my $psql    = "$pg_install/bin/psql";

die "PGINSTALL=$pg_install does not contain bin/initdb\n"
    unless -x $initdb;

# Tunable ranges for test case generation
my $min_ops_per_txn     = 10;
my $max_ops_per_txn     = 80;
my $min_cols_per_table  = 5;
my $max_cols_per_table  = 25;
my $max_live_tables     = 30;
my $savepoint_prob      = 0.15;
my $rollback_prob       = 0.40;
my $alter_add_prob      = 0.10;
my $alter_drop_prob     = 0.05;

my @col_types = (
    ('integer')  x 5,
    ('bigint')   x 3,
    ('text')     x 4,
    ('boolean')  x 3,
    ('real')     x 2,
    'double precision',
    'numeric',
    'date',
    'timestamp',
    'bytea',
    'varchar(100)',
    'char(10)',
    'smallint',
    'oid',
    'name',
);

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

sub pick { return $_[int(rand(@_))] }

sub run_psql {
    my ($db, $sql, %opts) = @_;
    my $timeout = $opts{timeout} || 30;

    my $tmpfile = "/tmp/fuzz_cmin_$$.sql";
    open(my $fh, '>', $tmpfile) or die "Cannot write $tmpfile: $!\n";
    print $fh $sql;
    close($fh);

    my $cmd = "$psql -X -h /tmp -p $port -d $db -f $tmpfile 2>&1";
    my $output;
    eval {
        local $SIG{ALRM} = sub { die "timeout\n" };
        alarm($timeout);
        $output = `$cmd`;
        alarm(0);
    };
    my $exit_status = $?;
    alarm(0);

    unlink($tmpfile);

    if ($@ && $@ eq "timeout\n") {
        return (undef, "TIMEOUT after ${timeout}s");
    }

    my $ok = WIFEXITED($exit_status) && WEXITSTATUS($exit_status) == 0;
    return ($ok, $output);
}

sub server_alive {
    my ($ok) = run_psql('postgres', "SELECT 1;", timeout => 10);
    return $ok;
}

sub start_server {
    print "Starting server on port $port...\n";
    system("$pg_ctl -D $base_dir -l $base_dir/logfile -w start "
         . "-o '-p $port -k /tmp' >/dev/null 2>&1");
    die "Failed to start server\n" unless server_alive();
    print "Server running.\n";
}

sub stop_server {
    system("$pg_ctl -D $base_dir -m fast stop >/dev/null 2>&1");
}

sub check_log_for_panic {
    open(my $fh, '<', "$base_dir/logfile") or return "";
    my @matches;
    while (<$fh>) {
        push @matches, $_ if /tuplecid cmin mismatch|matching entry #|PANIC|TRAP/;
    }
    close($fh);
    return join("", @matches);
}

# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------

print "=== cmin mismatch fuzzer ===\n";
print "PG install: $pg_install\n";
print "Data dir:   $base_dir\n";
print "Port:       $port\n\n";

# Clean up any previous run
stop_server();
if (-d $base_dir) {
    rmtree($base_dir) or die "Cannot remove $base_dir: $!\n";
}
mkdir($cases_dir) unless -d $cases_dir;

# Initialize cluster
print "Initializing cluster...\n";
system("$initdb -D $base_dir --no-locale -A trust >/dev/null 2>&1") == 0
    or die "initdb failed\n";

# Configure for logical decoding
open(my $conf, '>>', "$base_dir/postgresql.conf") or die $!;
print $conf <<'CONF';
wal_level = logical
max_replication_slots = 4
max_wal_senders = 4
shared_buffers = '64MB'
log_min_messages = warning
log_line_prefix = '%m [%p] '
CONF
close($conf);

start_server();

# Create database and logical replication slot
print "Creating database and replication slot...\n";
run_psql('postgres', "CREATE DATABASE $dbname;");
run_psql($dbname,
    "SELECT pg_create_logical_replication_slot('$slot_name', 'test_decoding');");

# Pre-seed: create tables to populate catalog pages so that subsequent
# CREATE/DROP cycles within a transaction can trigger page pruning and
# TID reuse on catalog pages.
print "Pre-seeding catalog pages (30 tables)...\n";
for my $i (1..30) {
    my @cols = map { "c$_ integer" } 1..20;
    run_psql($dbname, "CREATE TABLE seed_$i (" . join(', ', @cols) . ");");
}
# Consume the seeding WAL
run_psql($dbname,
    "SELECT count(*) FROM pg_logical_slot_get_changes('$slot_name', NULL, NULL);");

print "Setup complete. Starting fuzzer.\n\n";

# ---------------------------------------------------------------------------
# Test case generator
# ---------------------------------------------------------------------------

sub generate_test_case {
    my $sql = "";
    my @live_tables;
    my @savepoints;
    my %sp_tables;
    my $table_counter = 0;
    my $sp_counter = 0;
    my $col_counter = 0;

    my $num_ops = $min_ops_per_txn +
                  int(rand($max_ops_per_txn - $min_ops_per_txn));

    $sql .= "BEGIN;\n";

    for my $op_num (1..$num_ops) {
        # Maybe create a savepoint
        if (rand() < $savepoint_prob && @savepoints < 3) {
            $sp_counter++;
            my $sp = "sp_$sp_counter";
            push @savepoints, $sp;
            $sp_tables{$sp} = [];
            $sql .= "SAVEPOINT $sp;\n";
            next;
        }

        # Maybe rollback to savepoint
        if (rand() < $rollback_prob && @savepoints > 0) {
            my $sp = $savepoints[-1];
            for my $t (@{$sp_tables{$sp}}) {
                @live_tables = grep { $_ ne $t } @live_tables;
            }
            $sql .= "ROLLBACK TO SAVEPOINT $sp;\n";
            $sp_tables{$sp} = [];
            next;
        }

        # Maybe ALTER TABLE ADD COLUMN
        if (rand() < $alter_add_prob && @live_tables > 0) {
            my $tbl = pick(@live_tables);
            $col_counter++;
            my $typ = pick(@col_types);
            $sql .= "ALTER TABLE $tbl ADD COLUMN extra_$col_counter $typ;\n";
            next;
        }

        # Maybe ALTER TABLE DROP COLUMN
        if (rand() < $alter_drop_prob && @live_tables > 0) {
            my $tbl = pick(@live_tables);
            if ($col_counter > 0) {
                my $c = 1 + int(rand($col_counter));
                $sql .= "DO \$\$ BEGIN "
                       . "ALTER TABLE $tbl DROP COLUMN IF EXISTS extra_$c; "
                       . "EXCEPTION WHEN OTHERS THEN NULL; "
                       . "END \$\$;\n";
            }
            next;
        }

        # Main choice: CREATE or DROP
        my $do_create = (@live_tables < 3)                ? 1 :
                        (@live_tables >= $max_live_tables) ? 0 :
                        (rand() < 0.55)                   ? 1 : 0;

        if ($do_create) {
            $table_counter++;
            my $tbl = "fuzz_t$table_counter";
            my $ncols = $min_cols_per_table +
                        int(rand($max_cols_per_table - $min_cols_per_table));
            my @cols;
            for my $c (1..$ncols) {
                push @cols, "c$c " . pick(@col_types);
            }
            $sql .= "CREATE TABLE $tbl (" . join(', ', @cols) . ");\n";
            push @live_tables, $tbl;
            if (@savepoints) {
                push @{$sp_tables{$savepoints[-1]}}, $tbl;
            }
        }
        else {
            my $idx = int(rand(@live_tables));
            my $tbl = splice(@live_tables, $idx, 1);
            $sql .= "DROP TABLE $tbl;\n";
        }
    }

    $sql .= "COMMIT;\n";
    return $sql;
}

# ---------------------------------------------------------------------------
# Main fuzzer loop
# ---------------------------------------------------------------------------

my $start_time = time();

for my $iter (1..$max_iterations) {
    my $test_sql = generate_test_case();

    # Save to file before executing (in case server crashes hard)
    my $case_file = "$cases_dir/case_$iter.sql";
    open(my $fh, '>', $case_file) or die "Cannot write $case_file: $!\n";
    print $fh $test_sql;
    close($fh);

    # Execute the transaction
    my ($txn_ok, $txn_out) = run_psql($dbname, $test_sql, timeout => 60);

    if (!$txn_ok) {
        if (!server_alive()) {
            my $panic = check_log_for_panic();
            print "\n!!! SERVER CRASHED on iteration $iter !!!\n";
            print "Reproducer: $case_file\n";
            print "Server log extract:\n$panic\n" if $panic;
            last;
        }
        # Server alive, txn just failed — consume WAL and continue
        run_psql($dbname,
            "SELECT count(*) FROM pg_logical_slot_get_changes("
            . "'$slot_name', NULL, NULL);", timeout => 30);
        next;
    }

    # Transaction committed — trigger logical decoding
    my ($dec_ok, $dec_out) = run_psql($dbname,
        "SELECT count(*) FROM pg_logical_slot_get_changes("
        . "'$slot_name', NULL, NULL);", timeout => 120);

    if (!$dec_ok) {
        sleep(2);
        my $panic = check_log_for_panic();

        if ($panic =~ /tuplecid cmin mismatch|TRAP.*Assert/) {
            print "\n!!! CMIN MISMATCH on iteration $iter !!!\n";
            print "Reproducer: $case_file\n";
            print "Server log extract:\n$panic\n";
            last;
        }

        if (!server_alive()) {
            print "\n!!! SERVER CRASHED during decode, iteration $iter !!!\n";
            print "Reproducer: $case_file\n";
            print "Server log extract:\n$panic\n" if $panic;
            last;
        }
    }

    # Periodic cleanup: drop fuzz_ tables to prevent unbounded catalog growth
    if ($iter % 50 == 0) {
        run_psql($dbname, <<'CLEANUP');
DO $$ DECLARE r record;
BEGIN
    FOR r IN SELECT schemaname || '.' || tablename AS t
             FROM pg_tables WHERE tablename LIKE 'fuzz_%'
    LOOP
        EXECUTE 'DROP TABLE IF EXISTS ' || r.t || ' CASCADE';
    END LOOP;
END $$;
CLEANUP
        run_psql($dbname,
            "SELECT count(*) FROM pg_logical_slot_get_changes("
            . "'$slot_name', NULL, NULL);", timeout => 30);
    }

    # Progress
    if ($iter % 100 == 0) {
        my $elapsed = time() - $start_time;
        printf "  [%d iterations, %ds elapsed, %.1f iter/s]\n",
               $iter, $elapsed, $iter / ($elapsed || 1);
    }
}

my $elapsed = time() - $start_time;
print "\nFuzzer finished after $elapsed seconds.\n";

# Cleanup
stop_server();

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected]
  Subject: Re: Bug in logical decoding with DDL and subtransactions
  In-Reply-To: <CAHgHdKu5e3XY5e90Tuaxq_R4WrKxSV734Q+Lwo5y39Omp2A-Gg@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox