Message-ID: From: "vlsi (@vlsi)" To: "pgjdbc/pgjdbc" Date: Mon, 15 Jun 2026 07:58:08 +0000 Subject: [pgjdbc/pgjdbc] PR #4175: test: fix flaky sentLocationEqualToLastReceiveLSN replication test List-Id: X-GitHub-Additions: 63 X-GitHub-Author-Id: 213894 X-GitHub-Author-Login: vlsi X-GitHub-Base: master X-GitHub-Changed-Files: 1 X-GitHub-Commits: 1 X-GitHub-Deletions: 29 X-GitHub-Head-Branch: fix/flaky-sent-location-replication-test X-GitHub-Head-SHA: 6c6f1418d9450c2a7366446b51cb64d4da7b1e2f X-GitHub-Issue: 4175 X-GitHub-Labels: building-and-testing X-GitHub-Merge-SHA: d0c6269cfa35da25663e022dd03c242fa16a65f3 X-GitHub-Merged-By: vlsi X-GitHub-Repo: pgjdbc/pgjdbc X-GitHub-State: merged X-GitHub-Type: pull_request X-GitHub-Url: https://github.com/pgjdbc/pgjdbc/pull/4175 Content-Type: text/plain; charset=utf-8 ## Why `LogicalReplicationStatusTest.sentLocationEqualToLastReceiveLSN` is flaky in CI (for example, [this run](https://github.com/pgjdbc/pgjdbc/actions/runs/27528618073/job/81361261865)): ``` java.lang.AssertionError: When changes absent on server last receive by stream LSN should be equal to last sent by server LSN Expected: but: was ``` The test asserted that `pg_stat_replication.sent_lsn` exactly equals the last LSN the stream received. That equality is racy. When no decodable data is pending, the walsender advances `sent_lsn` with keepalive messages to the current end of WAL, so any unrelated WAL — the second test database, autovacuum, other background activity — pushes `sent_lsn` past the client's position. The polling helper added in 8133a98dc cannot recover once `sent_lsn` has overshot, because it waits for exact equality that never returns. A client can never receive more than the server sent, so the robust invariant is that `sent_lsn` is at least the received LSN. ## What - Assert `greaterThanOrEqualTo(lastReceivedLSN)` instead of strict equality, with a comment explaining why `sent_lsn` runs ahead. - Poll the view until `sent_lsn` reaches that bound via a new `getLSNFromViewAtLeast` helper. - Factor the single-row read out of the poll loop and drive both the equality and at-least pollers through one predicate, so the other tests keep their exact-match behaviour. ## How to verify `./gradlew :postgresql:compileTestJava :postgresql:checkstyleTest` passes. The replication tests need a server with replication enabled (`@Replication`, `@EnabledForServerVersionRange(gte = "9.4")`); run `LogicalReplicationStatusTest` against such a server to confirm `sentLocationEqualToLastReceiveLSN` is stable under concurrent WAL activity. 🤖 Generated with [Claude Code](https://claude.com/claude-code) diff --git a/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java b/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java index 55c965eb2f..64e15f731f 100644 --- a/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java +++ b/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java @@ -6,7 +6,9 @@ package org.postgresql.replication; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.core.IsEqual.equalTo; import org.postgresql.PGConnection; @@ -30,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; @Replication @EnabledForServerVersionRange(gte = "9.4") @@ -90,11 +93,15 @@ void sentLocationEqualToLastReceiveLSN() throws Exception { LogSequenceNumber lastReceivedLSN = stream.getLastReceiveLSN(); stream.forceUpdateStatus(); - LogSequenceNumber sentByServer = getLSNFromView(sentColumnName(), lastReceivedLSN); + LogSequenceNumber sentByServer = getLSNFromViewAtLeast(sentColumnName(), lastReceivedLSN); - assertThat("When changes absent on server last receive by stream LSN " - + "should be equal to last sent by server LSN", - sentByServer, equalTo(lastReceivedLSN) + assertThat( + "The server cannot send less than the stream received, so the sent LSN is at least the " + + "last received LSN. It may be greater: with no decodable data pending, the walsender " + + "advances the sent LSN with keepalive messages to the current end of WAL, and " + + "unrelated activity (other databases, autovacuum) keeps that position moving, so " + + "strict equality would be racy", + sentByServer, greaterThanOrEqualTo(lastReceivedLSN) ); } @@ -517,40 +524,27 @@ private static String toString(ByteBuffer buffer) { } /** - * Reads an LSN column from pg_stat_replication, polling until a non-null value appears. - * If {@code expected} is non-null, keeps polling until the column matches that value - * (or a timeout expires). This is necessary because {@code forceUpdateStatus()} only + * Reads an LSN column from pg_stat_replication, polling until {@code accept} is satisfied + * (or a timeout expires). Polling is necessary because {@code forceUpdateStatus()} only * flushes data to the TCP socket; the server needs a short time to process the standby * status update and reflect it in pg_stat_replication. * - * @param expected if non-null, poll until the column equals this value; if null, return - * the first non-null value seen (or null on timeout) + * @param accept predicate the column value must satisfy to stop polling early + * @return the value that satisfied {@code accept}; on timeout, the last value seen, or null + * if the column never produced one */ - private LogSequenceNumber getLSNFromView(String columnName, - LogSequenceNumber expected) throws Exception { + private LogSequenceNumber pollLSNFromView(String columnName, + Predicate accept) throws Exception { long start = System.nanoTime(); long timeout = TimeUnit.SECONDS.toNanos(2); LogSequenceNumber last = null; while (System.nanoTime() - start < timeout) { - try ( - PreparedStatement st = sqlConnection.prepareStatement( - "select r.* from pg_stat_replication r" - + " join pg_replication_slots s on r.pid = s.active_pid" - + " where s.slot_name = ?") - ) { - st.setString(1, SLOT_NAME); - try (ResultSet rs = st.executeQuery()) { - String result = null; - if (rs.next()) { - result = rs.getString(columnName); - } - if (result != null && !result.isEmpty()) { - last = LogSequenceNumber.valueOf(result); - if (expected == null || last.equals(expected)) { - return last; - } - } + LogSequenceNumber current = readLSNFromView(columnName); + if (current != null) { + last = current; + if (accept.test(current)) { + return current; } } TimeUnit.MILLISECONDS.sleep(10L); @@ -558,6 +552,46 @@ private LogSequenceNumber getLSNFromView(String columnName, return last; } + private LogSequenceNumber readLSNFromView(String columnName) throws SQLException { + try ( + PreparedStatement st = sqlConnection.prepareStatement( + "select r.* from pg_stat_replication r" + + " join pg_replication_slots s on r.pid = s.active_pid" + + " where s.slot_name = ?") + ) { + st.setString(1, SLOT_NAME); + try (ResultSet rs = st.executeQuery()) { + String result = rs.next() ? rs.getString(columnName) : null; + return result != null && !result.isEmpty() ? LogSequenceNumber.valueOf(result) : null; + } + } + } + + /** + * Polls an LSN column until it equals {@code expected}, or returns the last value seen on + * timeout. Use when the column is expected to settle on an exact value the client set. + * + * @param expected if non-null, poll until the column equals this value; if null, return + * the first non-null value seen (or null on timeout) + */ + private LogSequenceNumber getLSNFromView(String columnName, + LogSequenceNumber expected) throws Exception { + return pollLSNFromView(columnName, lsn -> expected == null || lsn.equals(expected)); + } + + /** + * Polls an LSN column until it reaches at least {@code atLeast}, or returns the last value + * seen on timeout. Use for columns the server may advance past the client's position, such + * as sent_lsn driven by keepalive messages. + */ + private LogSequenceNumber getLSNFromViewAtLeast(String columnName, + LogSequenceNumber atLeast) throws Exception { + LogSequenceNumber last = pollLSNFromView(columnName, lsn -> lsn.compareTo(atLeast) >= 0); + assertThat("pg_stat_replication has no row for slot " + SLOT_NAME + " within the poll timeout", + last, notNullValue()); + return last; + } + private LogSequenceNumber getLSNFromView(String columnName) throws Exception { return getLSNFromView(columnName, null); }