pgjdbc/pgjdbc GitHub issues and pull requests (mirror)  
help / color / mirror / Atom feed
From: vlsi (@vlsi) <[email protected]>
To: pgjdbc/pgjdbc <[email protected]>
Subject: [pgjdbc/pgjdbc] PR #4175: test: fix flaky sentLocationEqualToLastReceiveLSN replication test
Date: Mon, 15 Jun 2026 07:58:08 +0000
Message-ID: <[email protected]> (raw)

## Why

`LogicalReplicationStatusTest.sentLocationEqualToLastReceiveLSN` is flaky in CI (for example, [this run](https://github.com/pgjdbc/pgjdbc/actions/runs/27528618073/job/81361261865)):

```
java.lang.AssertionError: When changes absent on server last receive by stream LSN should be equal to last sent by server LSN
Expected: <LSN{0/220660F0}>
     but: was <LSN{0/220EFA70}>
```

The test asserted that `pg_stat_replication.sent_lsn` exactly equals the last LSN the stream received. That equality is racy. When no decodable data is pending, the walsender advances `sent_lsn` with keepalive messages to the current end of WAL, so any unrelated WAL — the second test database, autovacuum, other background activity — pushes `sent_lsn` past the client's position. The polling helper added in 8133a98dc cannot recover once `sent_lsn` has overshot, because it waits for exact equality that never returns.

A client can never receive more than the server sent, so the robust invariant is that `sent_lsn` is at least the received LSN.

## What

- Assert `greaterThanOrEqualTo(lastReceivedLSN)` instead of strict equality, with a comment explaining why `sent_lsn` runs ahead.
- Poll the view until `sent_lsn` reaches that bound via a new `getLSNFromViewAtLeast` helper.
- Factor the single-row read out of the poll loop and drive both the equality and at-least pollers through one predicate, so the other tests keep their exact-match behaviour.

## How to verify

`./gradlew :postgresql:compileTestJava :postgresql:checkstyleTest` passes. The replication tests need a server with replication enabled (`@Replication`, `@EnabledForServerVersionRange(gte = "9.4")`); run `LogicalReplicationStatusTest` against such a server to confirm `sentLocationEqualToLastReceiveLSN` is stable under concurrent WAL activity.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

diff --git a/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java b/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java
index 55c965eb2f..64e15f731f 100644
--- a/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java
+++ b/pgjdbc/src/test/java/org/postgresql/replication/LogicalReplicationStatusTest.java
@@ -6,7 +6,9 @@
 package org.postgresql.replication;
 
 import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.core.IsEqual.equalTo;
 
 import org.postgresql.PGConnection;
@@ -30,6 +32,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 
 @Replication
 @EnabledForServerVersionRange(gte = "9.4")
@@ -90,11 +93,15 @@ void sentLocationEqualToLastReceiveLSN() throws Exception {
     LogSequenceNumber lastReceivedLSN = stream.getLastReceiveLSN();
     stream.forceUpdateStatus();
 
-    LogSequenceNumber sentByServer = getLSNFromView(sentColumnName(), lastReceivedLSN);
+    LogSequenceNumber sentByServer = getLSNFromViewAtLeast(sentColumnName(), lastReceivedLSN);
 
-    assertThat("When changes absent on server last receive by stream LSN "
-            + "should be equal to last sent by server LSN",
-        sentByServer, equalTo(lastReceivedLSN)
+    assertThat(
+        "The server cannot send less than the stream received, so the sent LSN is at least the "
+            + "last received LSN. It may be greater: with no decodable data pending, the walsender "
+            + "advances the sent LSN with keepalive messages to the current end of WAL, and "
+            + "unrelated activity (other databases, autovacuum) keeps that position moving, so "
+            + "strict equality would be racy",
+        sentByServer, greaterThanOrEqualTo(lastReceivedLSN)
     );
   }
 
@@ -517,40 +524,27 @@ private static String toString(ByteBuffer buffer) {
   }
 
   /**
-   * Reads an LSN column from pg_stat_replication, polling until a non-null value appears.
-   * If {@code expected} is non-null, keeps polling until the column matches that value
-   * (or a timeout expires).  This is necessary because {@code forceUpdateStatus()} only
+   * Reads an LSN column from pg_stat_replication, polling until {@code accept} is satisfied
+   * (or a timeout expires).  Polling is necessary because {@code forceUpdateStatus()} only
    * flushes data to the TCP socket; the server needs a short time to process the standby
    * status update and reflect it in pg_stat_replication.
    *
-   * @param expected if non-null, poll until the column equals this value; if null, return
-   *                 the first non-null value seen (or null on timeout)
+   * @param accept predicate the column value must satisfy to stop polling early
+   * @return the value that satisfied {@code accept}; on timeout, the last value seen, or null
+   *         if the column never produced one
    */
-  private LogSequenceNumber getLSNFromView(String columnName,
-      LogSequenceNumber expected) throws Exception {
+  private LogSequenceNumber pollLSNFromView(String columnName,
+      Predicate<LogSequenceNumber> accept) throws Exception {
     long start = System.nanoTime();
     long timeout = TimeUnit.SECONDS.toNanos(2);
 
     LogSequenceNumber last = null;
     while (System.nanoTime() - start < timeout) {
-      try (
-          PreparedStatement st = sqlConnection.prepareStatement(
-              "select r.* from pg_stat_replication r"
-                  + " join pg_replication_slots s on r.pid = s.active_pid"
-                  + " where s.slot_name = ?")
-      ) {
-        st.setString(1, SLOT_NAME);
-        try (ResultSet rs = st.executeQuery()) {
-          String result = null;
-          if (rs.next()) {
-            result = rs.getString(columnName);
-          }
-          if (result != null && !result.isEmpty()) {
-            last = LogSequenceNumber.valueOf(result);
-            if (expected == null || last.equals(expected)) {
-              return last;
-            }
-          }
+      LogSequenceNumber current = readLSNFromView(columnName);
+      if (current != null) {
+        last = current;
+        if (accept.test(current)) {
+          return current;
         }
       }
       TimeUnit.MILLISECONDS.sleep(10L);
@@ -558,6 +552,46 @@ private LogSequenceNumber getLSNFromView(String columnName,
     return last;
   }
 
+  private LogSequenceNumber readLSNFromView(String columnName) throws SQLException {
+    try (
+        PreparedStatement st = sqlConnection.prepareStatement(
+            "select r.* from pg_stat_replication r"
+                + " join pg_replication_slots s on r.pid = s.active_pid"
+                + " where s.slot_name = ?")
+    ) {
+      st.setString(1, SLOT_NAME);
+      try (ResultSet rs = st.executeQuery()) {
+        String result = rs.next() ? rs.getString(columnName) : null;
+        return result != null && !result.isEmpty() ? LogSequenceNumber.valueOf(result) : null;
+      }
+    }
+  }
+
+  /**
+   * Polls an LSN column until it equals {@code expected}, or returns the last value seen on
+   * timeout. Use when the column is expected to settle on an exact value the client set.
+   *
+   * @param expected if non-null, poll until the column equals this value; if null, return
+   *                 the first non-null value seen (or null on timeout)
+   */
+  private LogSequenceNumber getLSNFromView(String columnName,
+      LogSequenceNumber expected) throws Exception {
+    return pollLSNFromView(columnName, lsn -> expected == null || lsn.equals(expected));
+  }
+
+  /**
+   * Polls an LSN column until it reaches at least {@code atLeast}, or returns the last value
+   * seen on timeout. Use for columns the server may advance past the client's position, such
+   * as sent_lsn driven by keepalive messages.
+   */
+  private LogSequenceNumber getLSNFromViewAtLeast(String columnName,
+      LogSequenceNumber atLeast) throws Exception {
+    LogSequenceNumber last = pollLSNFromView(columnName, lsn -> lsn.compareTo(atLeast) >= 0);
+    assertThat("pg_stat_replication has no row for slot " + SLOT_NAME + " within the poll timeout",
+        last, notNullValue());
+    return last;
+  }
+
   private LogSequenceNumber getLSNFromView(String columnName) throws Exception {
     return getLSNFromView(columnName, null);
   }


reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: github://pgjdbc/pgjdbc
  Cc: [email protected], [email protected]
  Subject: Re: [pgjdbc/pgjdbc] PR #4175: test: fix flaky sentLocationEqualToLastReceiveLSN replication test
  In-Reply-To: <<[email protected]>>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox