pgjdbc/pgjdbc GitHub issues and pull requests (mirror)  
help / color / mirror / Atom feed
From: vlsi (@vlsi) <[email protected]>
To: pgjdbc/pgjdbc <[email protected]>
Subject: [pgjdbc/pgjdbc] PR #4196: fix(protocol): defer flushes until response processing
Date: Thu, 18 Jun 2026 16:18:03 +0000
Message-ID: <[email protected]> (raw)

## Why

Simple-query messages were flushed as soon as they were encoded. An implicit `BEGIN` followed by a user query therefore caused separate socket flushes, increasing latency for workloads with many short transactions.

Java does not guarantee that an empty `OutputStream.flush()` is unobservable or harmless, so response reads must not add redundant flush calls.

## What

- Remove the flush from simple-query encoding.
- Flush explicitly at each send-then-read call site, including cursor fetches.
- Read already-sent savepoint-cleanup responses without another flush.
- Keep explicit flushes where the driver returns without reading responses.
- Count raw socket flush calls and verify transaction and fastpath paths.
- Add a short timeout to the cursor-fetch regression test so a missing flush fails promptly.

Fixes #3894.

## Socket flushes per short transaction (`BEGIN` + `SELECT 1`)

Flush count measured with `CountingSocketFactory` on the `BEGIN` + first query of a fresh transaction (`autoCommit=false`), comparing the fix against its parent commit.
A flush maps to one TCP segment because `tcpNoDelay` is on.

| Query mode | `autosave` | `cleanupSavepoints` | Baseline | With fix | Saved |
| ---------- | ---------- | ------------------- | -------: | -------: | ----: |
| EXTENDED   | NEVER      | false               | 2        | 1        | −1    |
| EXTENDED   | ALWAYS     | false               | 3        | 1        | −2    |
| EXTENDED   | ALWAYS     | true                | 4        | 2        | −2    |
| SIMPLE     | NEVER      | false               | 2        | 1        | −1    |
| SIMPLE     | ALWAYS     | false               | 3        | 1        | −2    |
| SIMPLE     | ALWAYS     | true                | 4        | 2        | −2    |

The fix saves one flush by pairing `BEGIN` with the following query, and a second flush when `autosave=ALWAYS` also defers the `SAVEPOINT`. Latency saved per transaction is the number of saved flushes times the round-trip time to the server.

## How to verify

- `./gradlew :postgresql:test --tests org.postgresql.test.jdbc2.ConcurrentStatementFetchTest --quiet`
- `./gradlew :postgresql:testClasses :postgresql:checkstyleMain :postgresql:checkstyleTest :postgresql:autostyleCheck --quiet`
- `./gradlew :postgresql:test --tests 'org.postgresql.test.jdbc2.*' --no-daemon --quiet` progressed through all 7,397 tests; only the environment-sensitive `ConnectTimeoutTest` failed because a 5 s timeout took 10 s locally.

The original stall was reproduced locally. A thread dump showed `ConcurrentStatementFetchTest` blocked in `QueryExecutorImpl.fetch()` while PostgreSQL was waiting for client input, identifying the missing flush after `sendSync()`.

diff --git a/CHANGELOG.md b/CHANGELOG.md
index bcf26928ff..dd92252d91 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
 * chore: `commitPrepared` / `rollback`-of-prepared now return `XAER_RMFAIL` instead of `XAER_RMERR` when the underlying connection is left in a non-idle `TransactionState`. Transaction managers (Geronimo, Narayana, Atomikos) treat `XAER_RMFAIL` as retryable on a fresh `XAResource`; the prepared transaction is no longer abandoned.
 
 ### Fixed
+* perf: defer simple-query flushes until the driver reads the response, allowing `BEGIN` and the following query to share a network flush [Issue #3894](https://github.com/pgjdbc/pgjdbc/issues/3894)
 * fix: the driver no longer nulls the `contextClassLoader` of shared `ForkJoinPool.commonPool()` worker threads, which previously left unrelated tasks on those threads running with a `null` classloader [Issue #4155](https://github.com/pgjdbc/pgjdbc/issues/4155)
 * fix: getCharacterStream wraps String in StringReader [PR #4063](https://github.com/pgjdbc/pgjdbc/pull/4063)
 * fix: `PGXAConnection` no longer saves and restores the underlying connection's JDBC `autoCommit` flag. All XA-protocol SQL (`BEGIN`, `PREPARE TRANSACTION`, `COMMIT`, `ROLLBACK`, `COMMIT PREPARED`, `ROLLBACK PREPARED`, the `recover()` SELECT) is sent through `QUERY_SUPPRESS_BEGIN`, so the caller's `autoCommit` value is invariant across every `XAResource` call. Fixes the "2nd phase commit must be issued using an idle connection" failure during recovery on managed datasources that pool connections with `autoCommit=false` (TomEE, WildFly, WebSphere Liberty).
diff --git a/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java b/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
index 304a55f9cc..4504f44e4c 100644
--- a/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
+++ b/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
@@ -435,6 +435,7 @@ public void execute(Query query, @Nullable ParameterList parameters,
           } else {
             sendSync();
           }
+          pgStream.flush();
           processResults(handler, flags, adaptiveFetch);
           estimatedReceiveBufferBytes = 0;
         } catch (PGBindException se) {
@@ -453,6 +454,7 @@ public void execute(Query query, @Nullable ParameterList parameters,
           // transaction in progress?
           //
           sendSync();
+          pgStream.flush();
           processResults(handler, flags, adaptiveFetch);
           estimatedReceiveBufferBytes = 0;
           handler
@@ -540,6 +542,8 @@ private void releaseSavePoint(boolean autosave) throws SQLException {
     try {
       sendOneQuery(releaseAutoSave, SimpleQuery.NO_PARAMETERS, 1, 0,
           QUERY_NO_RESULTS | QUERY_NO_METADATA | QUERY_EXECUTE_AS_SIMPLE);
+      // No response processing follows, so flush the deferred cleanup before returning.
+      pgStream.flush();
     } catch (IOException ex) {
       throw new PSQLException(GT.tr("Error releasing savepoint"), PSQLState.IO_ERROR);
     }
@@ -683,6 +687,7 @@ public void execute(Query[] queries, @Nullable ParameterList[] parameterLists,
           if ((flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {
             sendSync();
           }
+          pgStream.flush();
           processResults(handler, flags, adaptiveFetch);
           estimatedReceiveBufferBytes = 0;
         }
@@ -828,6 +833,7 @@ public void handleWarning(SQLWarning warning) {
         beginFlags = updateQueryMode(beginFlags);
         sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS, 0, 0, beginFlags);
         sendSync();
+        pgStream.flush();
         processResults(handler, 0);
         estimatedReceiveBufferBytes = 0;
       } catch (IOException ioe) {
@@ -1659,6 +1665,7 @@ private void flushIfDeadlockRisk(SimpleQuery query,
     } else {
       LOGGER.log(Level.FINEST, "Forcing Sync, receive buffer full or batching disallowed");
       sendSync();
+      pgStream.flush();
       processResults(resultHandler, flags);
       // We've processed incoming bytes, and the query to be executed would consume receive buffer
       estimatedReceiveBufferBytes = resultBytes;
@@ -1723,18 +1730,11 @@ private void sendQuery(Query query, V3ParameterList parameters, int maxRows, int
   //
 
   private void sendSync() throws IOException {
-    sendSync(true);
-  }
-
-  private void sendSync(boolean flush) throws IOException {
     inExtendedProtocol = false;
     LOGGER.log(Level.FINEST, " FE=> Sync");
 
     pgStream.sendChar(PgMessageType.SYNC_REQUEST); // Sync
     pgStream.sendInteger4(4); // Length
-    if (flush) {
-      pgStream.flush();
-    }
     // Below "add queues" are likely not required at all
     pendingExecuteQueue.add(new ExecuteRequest(sync, null, true));
     pendingDescribePortalQueue.add(sync);
@@ -2243,7 +2243,7 @@ private void sendSimpleQuery(SimpleQuery query, SimpleParameterList params) thro
     if (inExtendedProtocol) {
       // A sync message is required when switching from extended protocol to a simple query protocol
       // See https://github.com/pgjdbc/pgjdbc/issues/3107
-      sendSync(false);
+      sendSync();
     }
 
     String nativeSql = query.toString(
@@ -2258,7 +2258,6 @@ private void sendSimpleQuery(SimpleQuery query, SimpleParameterList params) thro
     pgStream.sendInteger4(encoded.length + 4 + 1);
     pgStream.send(encoded);
     pgStream.sendChar(0);
-    pgStream.flush();
     pendingExecuteQueue.add(new ExecuteRequest(query, null, true));
     pendingDescribePortalQueue.add(query);
   }
@@ -2754,8 +2753,8 @@ && castNonNull(pendingExecuteQueue.peekFirst()).asSimple) {
           pgStream.sendInteger4(buf.length + 4 + 1);
           pgStream.send(buf);
           pgStream.sendChar(0);
-          pgStream.flush();
           sendSync(); // send sync message
+          pgStream.flush();
           skipMessage(); // skip the response message
           break;
 
@@ -2826,6 +2825,7 @@ public void handleCommandStatus(String status, long updateCount, long insertOID)
 
         sendExecute(query, portal, fetchSize);
         sendSync();
+        pgStream.flush();
 
         processResults(handler, 0, adaptiveFetch);
         estimatedReceiveBufferBytes = 0;
diff --git a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java
index dc3657bbb3..471018688a 100644
--- a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java
+++ b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java
@@ -6,12 +6,14 @@
 package org.postgresql.test.jdbc2;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.params.provider.Arguments.arguments;
 
 import org.postgresql.PGConnection;
 import org.postgresql.PGProperty;
 import org.postgresql.largeobject.LargeObject;
 import org.postgresql.largeobject.LargeObjectManager;
+import org.postgresql.test.util.CountingSocketFactory;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedClass;
@@ -21,6 +23,7 @@
 import java.nio.charset.StandardCharsets;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -41,6 +44,8 @@
 @ParameterizedClass
 @MethodSource("data")
 class CleanupSavepointsWithFastpathTest extends BaseTest4 {
+  private CountingSocketFactory.Counters socketCounters = CountingSocketFactory.register();
+
   CleanupSavepointsWithFastpathTest(BinaryMode binaryMode) {
     setBinaryMode(binaryMode);
   }
@@ -58,6 +63,17 @@ protected void updateProperties(Properties props) {
     super.updateProperties(props);
     PGProperty.AUTOSAVE.set(props, "always");
     PGProperty.CLEANUP_SAVEPOINTS.set(props, true);
+    PGProperty.SOCKET_FACTORY.set(props, CountingSocketFactory.class.getName());
+    PGProperty.SOCKET_FACTORY_ARG.set(props, socketCounters.key());
+  }
+
+  @Override
+  protected void tearDown() throws SQLException {
+    try {
+      super.tearDown();
+    } finally {
+      CountingSocketFactory.unregister(socketCounters);
+    }
   }
 
   /**
@@ -81,9 +97,12 @@ void testLargeObjectWithCleanupSavepoints() throws Exception {
     // The bug: the RELEASE SAVEPOINT response ('C') is still in the buffer
     // and receiveFastpathResult() will read it instead of the expected response
     LargeObjectManager lom = con.unwrap(PGConnection.class).getLargeObjectAPI();
+    long flushesBefore = socketCounters.flushes.get();
 
     // This should NOT throw "Unknown Response Type C."
     long oid = lom.createLO();
+    assertEquals(1, socketCounters.flushes.get() - flushesBefore,
+        "Consuming a pending savepoint response should not add an empty flush");
     try {
       try (LargeObject lo = lom.open(oid)) {
         byte[] data = "Test data for issue #3910".getBytes(StandardCharsets.UTF_8);
diff --git a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java
index 860b4e521b..9384f0d859 100644
--- a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java
+++ b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java
@@ -14,6 +14,7 @@
 import org.postgresql.test.TestUtil;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedClass;
 import org.junit.jupiter.params.provider.MethodSource;
 
@@ -21,6 +22,7 @@
 import java.sql.ResultSet;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.concurrent.TimeUnit;
 
 @ParameterizedClass
 @MethodSource("data")
@@ -54,6 +56,7 @@ public void setUp() throws Exception {
   }
 
   @Test
+  @Timeout(value = 10, unit = TimeUnit.SECONDS)
   public void testFetchTwoStatements() throws Exception {
     // This test definitely fails at 8.2 in autocommit=false, and works with 8.4+
     assumeTrue(autoCommit == AutoCommit.YES
diff --git a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/TransactionRoundtripTest.java b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/TransactionRoundtripTest.java
new file mode 100644
index 0000000000..81691a92a9
--- /dev/null
+++ b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/TransactionRoundtripTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2026, PostgreSQL Global Development Group
+ * See the LICENSE file in the project root for more information.
+ */
+
+package org.postgresql.test.jdbc2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.postgresql.PGProperty;
+import org.postgresql.jdbc.AutoSave;
+import org.postgresql.jdbc.PreferQueryMode;
+import org.postgresql.test.util.CountingSocketFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Tests the number of network operations required to start and use a transaction.
+ */
+@ParameterizedClass
+@MethodSource("data")
+public class TransactionRoundtripTest extends BaseTest4 {
+  private CountingSocketFactory.Counters socketCounters = CountingSocketFactory.register();
+  private final AutoSave autosave;
+  private final boolean cleanupSavepoints;
+
+  public TransactionRoundtripTest(PreferQueryMode preferQueryMode, AutoSave autosave,
+      boolean cleanupSavepoints) {
+    setPreferQueryMode(preferQueryMode);
+    this.autosave = autosave;
+    this.cleanupSavepoints = cleanupSavepoints;
+  }
+
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+        {PreferQueryMode.EXTENDED, AutoSave.NEVER, false},
+        {PreferQueryMode.EXTENDED, AutoSave.ALWAYS, false},
+        {PreferQueryMode.EXTENDED, AutoSave.ALWAYS, true},
+        {PreferQueryMode.SIMPLE, AutoSave.NEVER, false},
+        {PreferQueryMode.SIMPLE, AutoSave.ALWAYS, false},
+        {PreferQueryMode.SIMPLE, AutoSave.ALWAYS, true}
+    });
+  }
+
+  @Override
+  protected void updateProperties(Properties props) {
+    super.updateProperties(props);
+    PGProperty.AUTOSAVE.set(props, autosave.value());
+    PGProperty.CLEANUP_SAVEPOINTS.set(props, cleanupSavepoints);
+    PGProperty.SOCKET_FACTORY.set(props, CountingSocketFactory.class.getName());
+    PGProperty.SOCKET_FACTORY_ARG.set(props, socketCounters.key());
+  }
+
+  @Override
+  protected void tearDown() throws SQLException {
+    try {
+      super.tearDown();
+    } finally {
+      CountingSocketFactory.unregister(socketCounters);
+    }
+  }
+
+  @Test
+  void beginAndQueryAreFlushedTogether() throws SQLException {
+    con.setAutoCommit(false);
+    long flushesBefore = socketCounters.flushes.get();
+    long roundtripsBefore = socketCounters.roundtrips.get();
+
+    try (Statement statement = con.createStatement()) {
+      statement.executeQuery("SELECT 1").close();
+    }
+
+    int expectedFlushes = cleanupSavepoints ? 2 : 1;
+    assertEquals(expectedFlushes, socketCounters.flushes.get() - flushesBefore,
+        "BEGIN and the query should share one flush; savepoint cleanup needs another");
+    assertEquals(1, socketCounters.roundtrips.get() - roundtripsBefore,
+        "BEGIN and the query should complete in one roundtrip");
+  }
+}
diff --git a/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java b/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java
index 6c3f56dfab..6467dffc1f 100644
--- a/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java
+++ b/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java
@@ -47,6 +47,7 @@ public class CountingSocketFactory extends SocketFactory {
    */
   public static final class Counters {
     public final AtomicLong roundtrips = new AtomicLong();
+    public final AtomicLong flushes = new AtomicLong();
     public final AtomicLong bytesOut = new AtomicLong();
     public final AtomicLong bytesIn = new AtomicLong();
 
@@ -214,6 +215,7 @@ public void write(byte[] b, int off, int len) throws IOException {
     @Override
     public void flush() throws IOException {
       out.flush();
+      socket.counters.flushes.incrementAndGet();
     }
 
     @Override


reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: github://pgjdbc/pgjdbc
  Cc: [email protected], [email protected]
  Subject: Re: [pgjdbc/pgjdbc] PR #4196: fix(protocol): defer flushes until response processing
  In-Reply-To: <<[email protected]>>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox