pgjdbc/pgjdbc GitHub issues and pull requests (mirror)
help / color / mirror / Atom feedFrom: vlsi (@vlsi) <[email protected]>
To: pgjdbc/pgjdbc <[email protected]>
Subject: [pgjdbc/pgjdbc] PR #4175: test: fix flaky sentLocationEqualToLastReceiveLSN replication test
Date: Mon, 15 Jun 2026 07:58:08 +0000
Message-ID: <[email protected]> (raw)
## Why
`LogicalReplicationStatusTest.sentLocationEqualToLastReceiveLSN` is flaky in CI (for example, [this run](https://github.com/pgjdbc/pgjdbc/actions/runs/27528618073/job/81361261865)):
```
java.lang.AssertionError: When changes absent on server last receive by stream LSN should be equal to last sent by server LSN
Expected: <LSN{0/220660F0}>
but: was <LSN{0/220EFA70}>
```
The test asserted that `pg_stat_replication.sent_lsn` exactly equals the last LSN the stream received. That equality is racy. When no decodable data is pending, the walsender advances `sent_lsn` with keepalive messages to the current end of WAL, so any unrelated WAL — the second test database, autovacuum, other background activity — pushes `sent_lsn` past the client's position. The polling helper added in 8133a98dc cannot recover once `sent_lsn` has overshot, because it waits for exact equality that never returns.
A client can never receive more than the server sent, so the robust invariant is that `sent_lsn` is at least the received LSN.
## What
- Assert `greaterThanOrEqualTo(lastReceivedLSN)` instead of strict equality, with a comment explaining why `sent_lsn` runs ahead.
- Poll the view until `sent_lsn` reaches that bound via a new `getLSNFromViewAtLeast` helper.
- Factor the single-row read out of the poll loop and drive both the equality and at-least pollers through one predicate, so the other tests keep their exact-match behaviour.
## How to verify
`./gradlew :postgresql:compileTestJava :postgresql:checkstyleTest` passes. The replication tests need a server with replication enabled (`@Replication`, `@EnabledForServerVersionRange(gte = "9.4")`); run `LogicalReplicationStatusTest` against such a server to confirm `sentLocationEqualToLastReceiveLSN` is stable under concurrent WAL activity.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
diff --git a/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java b/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java
index 55c965eb2f..64e15f731f 100644
--- a/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java
+++ b/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java
@@ -6,7 +6,9 @@
package org.postgresql.replication;
import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.core.IsEqual.equalTo;
import org.postgresql.PGConnection;
@@ -30,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
@Replication
@EnabledForServerVersionRange(gte = "9.4")
@@ -90,11 +93,15 @@ void sentLocationEqualToLastReceiveLSN() throws Exception {
LogSequenceNumber lastReceivedLSN = stream.getLastReceiveLSN();
stream.forceUpdateStatus();
- LogSequenceNumber sentByServer = getLSNFromView(sentColumnName(), lastReceivedLSN);
+ LogSequenceNumber sentByServer = getLSNFromViewAtLeast(sentColumnName(), lastReceivedLSN);
- assertThat("When changes absent on server last receive by stream LSN "
- + "should be equal to last sent by server LSN",
- sentByServer, equalTo(lastReceivedLSN)
+ assertThat(
+ "The server cannot send less than the stream received, so the sent LSN is at least the "
+ + "last received LSN. It may be greater: with no decodable data pending, the walsender "
+ + "advances the sent LSN with keepalive messages to the current end of WAL, and "
+ + "unrelated activity (other databases, autovacuum) keeps that position moving, so "
+ + "strict equality would be racy",
+ sentByServer, greaterThanOrEqualTo(lastReceivedLSN)
);
}
@@ -517,40 +524,27 @@ private static String toString(ByteBuffer buffer) {
}
/**
- * Reads an LSN column from pg_stat_replication, polling until a non-null value appears.
- * If {@code expected} is non-null, keeps polling until the column matches that value
- * (or a timeout expires). This is necessary because {@code forceUpdateStatus()} only
+ * Reads an LSN column from pg_stat_replication, polling until {@code accept} is satisfied
+ * (or a timeout expires). Polling is necessary because {@code forceUpdateStatus()} only
* flushes data to the TCP socket; the server needs a short time to process the standby
* status update and reflect it in pg_stat_replication.
*
- * @param expected if non-null, poll until the column equals this value; if null, return
- * the first non-null value seen (or null on timeout)
+ * @param accept predicate the column value must satisfy to stop polling early
+ * @return the value that satisfied {@code accept}; on timeout, the last value seen, or null
+ * if the column never produced one
*/
- private LogSequenceNumber getLSNFromView(String columnName,
- LogSequenceNumber expected) throws Exception {
+ private LogSequenceNumber pollLSNFromView(String columnName,
+ Predicate<LogSequenceNumber> accept) throws Exception {
long start = System.nanoTime();
long timeout = TimeUnit.SECONDS.toNanos(2);
LogSequenceNumber last = null;
while (System.nanoTime() - start < timeout) {
- try (
- PreparedStatement st = sqlConnection.prepareStatement(
- "select r.* from pg_stat_replication r"
- + " join pg_replication_slots s on r.pid = s.active_pid"
- + " where s.slot_name = ?")
- ) {
- st.setString(1, SLOT_NAME);
- try (ResultSet rs = st.executeQuery()) {
- String result = null;
- if (rs.next()) {
- result = rs.getString(columnName);
- }
- if (result != null && !result.isEmpty()) {
- last = LogSequenceNumber.valueOf(result);
- if (expected == null || last.equals(expected)) {
- return last;
- }
- }
+ LogSequenceNumber current = readLSNFromView(columnName);
+ if (current != null) {
+ last = current;
+ if (accept.test(current)) {
+ return current;
}
}
TimeUnit.MILLISECONDS.sleep(10L);
@@ -558,6 +552,46 @@ private LogSequenceNumber getLSNFromView(String columnName,
return last;
}
+ private LogSequenceNumber readLSNFromView(String columnName) throws SQLException {
+ try (
+ PreparedStatement st = sqlConnection.prepareStatement(
+ "select r.* from pg_stat_replication r"
+ + " join pg_replication_slots s on r.pid = s.active_pid"
+ + " where s.slot_name = ?")
+ ) {
+ st.setString(1, SLOT_NAME);
+ try (ResultSet rs = st.executeQuery()) {
+ String result = rs.next() ? rs.getString(columnName) : null;
+ return result != null && !result.isEmpty() ? LogSequenceNumber.valueOf(result) : null;
+ }
+ }
+ }
+
+ /**
+ * Polls an LSN column until it equals {@code expected}, or returns the last value seen on
+ * timeout. Use when the column is expected to settle on an exact value the client set.
+ *
+ * @param expected if non-null, poll until the column equals this value; if null, return
+ * the first non-null value seen (or null on timeout)
+ */
+ private LogSequenceNumber getLSNFromView(String columnName,
+ LogSequenceNumber expected) throws Exception {
+ return pollLSNFromView(columnName, lsn -> expected == null || lsn.equals(expected));
+ }
+
+ /**
+ * Polls an LSN column until it reaches at least {@code atLeast}, or returns the last value
+ * seen on timeout. Use for columns the server may advance past the client's position, such
+ * as sent_lsn driven by keepalive messages.
+ */
+ private LogSequenceNumber getLSNFromViewAtLeast(String columnName,
+ LogSequenceNumber atLeast) throws Exception {
+ LogSequenceNumber last = pollLSNFromView(columnName, lsn -> lsn.compareTo(atLeast) >= 0);
+ assertThat("pg_stat_replication has no row for slot " + SLOT_NAME + " within the poll timeout",
+ last, notNullValue());
+ return last;
+ }
+
private LogSequenceNumber getLSNFromView(String columnName) throws Exception {
return getLSNFromView(columnName, null);
}
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: github://pgjdbc/pgjdbc
Cc: [email protected], [email protected]
Subject: Re: [pgjdbc/pgjdbc] PR #4175: test: fix flaky sentLocationEqualToLastReceiveLSN replication test
In-Reply-To: <<[email protected]>>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox