pgjdbc/pgjdbc GitHub issues and pull requests (mirror)
help / color / mirror / Atom feedFrom: vlsi (@vlsi) <[email protected]>
To: pgjdbc/pgjdbc <[email protected]>
Subject: [pgjdbc/pgjdbc] PR #4196: fix(protocol): defer flushes until response processing
Date: Thu, 18 Jun 2026 16:18:03 +0000
Message-ID: <[email protected]> (raw)
## Why
Simple-query messages were flushed as soon as they were encoded. An implicit `BEGIN` followed by a user query therefore caused separate socket flushes, increasing latency for workloads with many short transactions.
Java does not guarantee that an empty `OutputStream.flush()` is unobservable or harmless, so response reads must not add redundant flush calls.
## What
- Remove the flush from simple-query encoding.
- Flush explicitly at each send-then-read call site, including cursor fetches.
- Read already-sent savepoint-cleanup responses without another flush.
- Keep explicit flushes where the driver returns without reading responses.
- Count raw socket flush calls and verify transaction and fastpath paths.
- Add a short timeout to the cursor-fetch regression test so a missing flush fails promptly.
Fixes #3894.
## Socket flushes per short transaction (`BEGIN` + `SELECT 1`)
Flush count measured with `CountingSocketFactory` on the `BEGIN` + first query of a fresh transaction (`autoCommit=false`), comparing the fix against its parent commit.
A flush maps to one TCP segment because `tcpNoDelay` is on.
| Query mode | `autosave` | `cleanupSavepoints` | Baseline | With fix | Saved |
| ---------- | ---------- | ------------------- | -------: | -------: | ----: |
| EXTENDED | NEVER | false | 2 | 1 | −1 |
| EXTENDED | ALWAYS | false | 3 | 1 | −2 |
| EXTENDED | ALWAYS | true | 4 | 2 | −2 |
| SIMPLE | NEVER | false | 2 | 1 | −1 |
| SIMPLE | ALWAYS | false | 3 | 1 | −2 |
| SIMPLE | ALWAYS | true | 4 | 2 | −2 |
The fix saves one flush by pairing `BEGIN` with the following query, and a second flush when `autosave=ALWAYS` also defers the `SAVEPOINT`. Latency saved per transaction is the number of saved flushes times the round-trip time to the server.
## How to verify
- `./gradlew :postgresql:test --tests org.postgresql.test.jdbc2.ConcurrentStatementFetchTest --quiet`
- `./gradlew :postgresql:testClasses :postgresql:checkstyleMain :postgresql:checkstyleTest :postgresql:autostyleCheck --quiet`
- `./gradlew :postgresql:test --tests 'org.postgresql.test.jdbc2.*' --no-daemon --quiet` progressed through all 7,397 tests; only the environment-sensitive `ConnectTimeoutTest` failed because a 5 s timeout took 10 s locally.
The original stall was reproduced locally. A thread dump showed `ConcurrentStatementFetchTest` blocked in `QueryExecutorImpl.fetch()` while PostgreSQL was waiting for client input, identifying the missing flush after `sendSync()`.
diff --git a/CHANGELOG.md b/CHANGELOG.md
index bcf26928ff..dd92252d91 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).
* chore: `commitPrepared` / `rollback`-of-prepared now return `XAER_RMFAIL` instead of `XAER_RMERR` when the underlying connection is left in a non-idle `TransactionState`. Transaction managers (Geronimo, Narayana, Atomikos) treat `XAER_RMFAIL` as retryable on a fresh `XAResource`; the prepared transaction is no longer abandoned.
### Fixed
+* perf: defer simple-query flushes until the driver reads the response, allowing `BEGIN` and the following query to share a network flush [Issue #3894](https://github.com/pgjdbc/pgjdbc/issues/3894)
* fix: the driver no longer nulls the `contextClassLoader` of shared `ForkJoinPool.commonPool()` worker threads, which previously left unrelated tasks on those threads running with a `null` classloader [Issue #4155](https://github.com/pgjdbc/pgjdbc/issues/4155)
* fix: getCharacterStream wraps String in StringReader [PR #4063](https://github.com/pgjdbc/pgjdbc/pull/4063)
* fix: `PGXAConnection` no longer saves and restores the underlying connection's JDBC `autoCommit` flag. All XA-protocol SQL (`BEGIN`, `PREPARE TRANSACTION`, `COMMIT`, `ROLLBACK`, `COMMIT PREPARED`, `ROLLBACK PREPARED`, the `recover()` SELECT) is sent through `QUERY_SUPPRESS_BEGIN`, so the caller's `autoCommit` value is invariant across every `XAResource` call. Fixes the "2nd phase commit must be issued using an idle connection" failure during recovery on managed datasources that pool connections with `autoCommit=false` (TomEE, WildFly, WebSphere Liberty).
diff --git a/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java b/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
index 304a55f9cc..4504f44e4c 100644
--- a/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
+++ b/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
@@ -435,6 +435,7 @@ public void execute(Query query, @Nullable ParameterList parameters,
} else {
sendSync();
}
+ pgStream.flush();
processResults(handler, flags, adaptiveFetch);
estimatedReceiveBufferBytes = 0;
} catch (PGBindException se) {
@@ -453,6 +454,7 @@ public void execute(Query query, @Nullable ParameterList parameters,
// transaction in progress?
//
sendSync();
+ pgStream.flush();
processResults(handler, flags, adaptiveFetch);
estimatedReceiveBufferBytes = 0;
handler
@@ -540,6 +542,8 @@ private void releaseSavePoint(boolean autosave) throws SQLException {
try {
sendOneQuery(releaseAutoSave, SimpleQuery.NO_PARAMETERS, 1, 0,
QUERY_NO_RESULTS | QUERY_NO_METADATA | QUERY_EXECUTE_AS_SIMPLE);
+ // No response processing follows, so flush the deferred cleanup before returning.
+ pgStream.flush();
} catch (IOException ex) {
throw new PSQLException(GT.tr("Error releasing savepoint"), PSQLState.IO_ERROR);
}
@@ -683,6 +687,7 @@ public void execute(Query[] queries, @Nullable ParameterList[] parameterLists,
if ((flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {
sendSync();
}
+ pgStream.flush();
processResults(handler, flags, adaptiveFetch);
estimatedReceiveBufferBytes = 0;
}
@@ -828,6 +833,7 @@ public void handleWarning(SQLWarning warning) {
beginFlags = updateQueryMode(beginFlags);
sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS, 0, 0, beginFlags);
sendSync();
+ pgStream.flush();
processResults(handler, 0);
estimatedReceiveBufferBytes = 0;
} catch (IOException ioe) {
@@ -1659,6 +1665,7 @@ private void flushIfDeadlockRisk(SimpleQuery query,
} else {
LOGGER.log(Level.FINEST, "Forcing Sync, receive buffer full or batching disallowed");
sendSync();
+ pgStream.flush();
processResults(resultHandler, flags);
// We've processed incoming bytes, and the query to be executed would consume receive buffer
estimatedReceiveBufferBytes = resultBytes;
@@ -1723,18 +1730,11 @@ private void sendQuery(Query query, V3ParameterList parameters, int maxRows, int
//
private void sendSync() throws IOException {
- sendSync(true);
- }
-
- private void sendSync(boolean flush) throws IOException {
inExtendedProtocol = false;
LOGGER.log(Level.FINEST, " FE=> Sync");
pgStream.sendChar(PgMessageType.SYNC_REQUEST); // Sync
pgStream.sendInteger4(4); // Length
- if (flush) {
- pgStream.flush();
- }
// Below "add queues" are likely not required at all
pendingExecuteQueue.add(new ExecuteRequest(sync, null, true));
pendingDescribePortalQueue.add(sync);
@@ -2243,7 +2243,7 @@ private void sendSimpleQuery(SimpleQuery query, SimpleParameterList params) thro
if (inExtendedProtocol) {
// A sync message is required when switching from extended protocol to a simple query protocol
// See https://github.com/pgjdbc/pgjdbc/issues/3107
- sendSync(false);
+ sendSync();
}
String nativeSql = query.toString(
@@ -2258,7 +2258,6 @@ private void sendSimpleQuery(SimpleQuery query, SimpleParameterList params) thro
pgStream.sendInteger4(encoded.length + 4 + 1);
pgStream.send(encoded);
pgStream.sendChar(0);
- pgStream.flush();
pendingExecuteQueue.add(new ExecuteRequest(query, null, true));
pendingDescribePortalQueue.add(query);
}
@@ -2754,8 +2753,8 @@ && castNonNull(pendingExecuteQueue.peekFirst()).asSimple) {
pgStream.sendInteger4(buf.length + 4 + 1);
pgStream.send(buf);
pgStream.sendChar(0);
- pgStream.flush();
sendSync(); // send sync message
+ pgStream.flush();
skipMessage(); // skip the response message
break;
@@ -2826,6 +2825,7 @@ public void handleCommandStatus(String status, long updateCount, long insertOID)
sendExecute(query, portal, fetchSize);
sendSync();
+ pgStream.flush();
processResults(handler, 0, adaptiveFetch);
estimatedReceiveBufferBytes = 0;
diff --git a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java
index dc3657bbb3..471018688a 100644
--- a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java
+++ b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java
@@ -6,12 +6,14 @@
package org.postgresql.test.jdbc2;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
+import org.postgresql.test.util.CountingSocketFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedClass;
@@ -21,6 +23,7 @@
import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
@@ -41,6 +44,8 @@
@ParameterizedClass
@MethodSource("data")
class CleanupSavepointsWithFastpathTest extends BaseTest4 {
+ private CountingSocketFactory.Counters socketCounters = CountingSocketFactory.register();
+
CleanupSavepointsWithFastpathTest(BinaryMode binaryMode) {
setBinaryMode(binaryMode);
}
@@ -58,6 +63,17 @@ protected void updateProperties(Properties props) {
super.updateProperties(props);
PGProperty.AUTOSAVE.set(props, "always");
PGProperty.CLEANUP_SAVEPOINTS.set(props, true);
+ PGProperty.SOCKET_FACTORY.set(props, CountingSocketFactory.class.getName());
+ PGProperty.SOCKET_FACTORY_ARG.set(props, socketCounters.key());
+ }
+
+ @Override
+ protected void tearDown() throws SQLException {
+ try {
+ super.tearDown();
+ } finally {
+ CountingSocketFactory.unregister(socketCounters);
+ }
}
/**
@@ -81,9 +97,12 @@ void testLargeObjectWithCleanupSavepoints() throws Exception {
// The bug: the RELEASE SAVEPOINT response ('C') is still in the buffer
// and receiveFastpathResult() will read it instead of the expected response
LargeObjectManager lom = con.unwrap(PGConnection.class).getLargeObjectAPI();
+ long flushesBefore = socketCounters.flushes.get();
// This should NOT throw "Unknown Response Type C."
long oid = lom.createLO();
+ assertEquals(1, socketCounters.flushes.get() - flushesBefore,
+ "Consuming a pending savepoint response should not add an empty flush");
try {
try (LargeObject lo = lom.open(oid)) {
byte[] data = "Test data for issue #3910".getBytes(StandardCharsets.UTF_8);
diff --git a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java
index 860b4e521b..9384f0d859 100644
--- a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java
+++ b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java
@@ -14,6 +14,7 @@
import org.postgresql.test.TestUtil;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedClass;
import org.junit.jupiter.params.provider.MethodSource;
@@ -21,6 +22,7 @@
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.concurrent.TimeUnit;
@ParameterizedClass
@MethodSource("data")
@@ -54,6 +56,7 @@ public void setUp() throws Exception {
}
@Test
+ @Timeout(value = 10, unit = TimeUnit.SECONDS)
public void testFetchTwoStatements() throws Exception {
// This test definitely fails at 8.2 in autocommit=false, and works with 8.4+
assumeTrue(autoCommit == AutoCommit.YES
diff --git a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/TransactionRoundtripTest.java b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/TransactionRoundtripTest.java
new file mode 100644
index 0000000000..81691a92a9
--- /dev/null
+++ b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/TransactionRoundtripTest.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2026, PostgreSQL Global Development Group
+ * See the LICENSE file in the project root for more information.
+ */
+
+package org.postgresql.test.jdbc2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.postgresql.PGProperty;
+import org.postgresql.jdbc.AutoSave;
+import org.postgresql.jdbc.PreferQueryMode;
+import org.postgresql.test.util.CountingSocketFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Properties;
+
+/**
+ * Tests the number of network operations required to start and use a transaction.
+ */
+@ParameterizedClass
+@MethodSource("data")
+public class TransactionRoundtripTest extends BaseTest4 {
+ private CountingSocketFactory.Counters socketCounters = CountingSocketFactory.register();
+ private final AutoSave autosave;
+ private final boolean cleanupSavepoints;
+
+ public TransactionRoundtripTest(PreferQueryMode preferQueryMode, AutoSave autosave,
+ boolean cleanupSavepoints) {
+ setPreferQueryMode(preferQueryMode);
+ this.autosave = autosave;
+ this.cleanupSavepoints = cleanupSavepoints;
+ }
+
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][]{
+ {PreferQueryMode.EXTENDED, AutoSave.NEVER, false},
+ {PreferQueryMode.EXTENDED, AutoSave.ALWAYS, false},
+ {PreferQueryMode.EXTENDED, AutoSave.ALWAYS, true},
+ {PreferQueryMode.SIMPLE, AutoSave.NEVER, false},
+ {PreferQueryMode.SIMPLE, AutoSave.ALWAYS, false},
+ {PreferQueryMode.SIMPLE, AutoSave.ALWAYS, true}
+ });
+ }
+
+ @Override
+ protected void updateProperties(Properties props) {
+ super.updateProperties(props);
+ PGProperty.AUTOSAVE.set(props, autosave.value());
+ PGProperty.CLEANUP_SAVEPOINTS.set(props, cleanupSavepoints);
+ PGProperty.SOCKET_FACTORY.set(props, CountingSocketFactory.class.getName());
+ PGProperty.SOCKET_FACTORY_ARG.set(props, socketCounters.key());
+ }
+
+ @Override
+ protected void tearDown() throws SQLException {
+ try {
+ super.tearDown();
+ } finally {
+ CountingSocketFactory.unregister(socketCounters);
+ }
+ }
+
+ @Test
+ void beginAndQueryAreFlushedTogether() throws SQLException {
+ con.setAutoCommit(false);
+ long flushesBefore = socketCounters.flushes.get();
+ long roundtripsBefore = socketCounters.roundtrips.get();
+
+ try (Statement statement = con.createStatement()) {
+ statement.executeQuery("SELECT 1").close();
+ }
+
+ int expectedFlushes = cleanupSavepoints ? 2 : 1;
+ assertEquals(expectedFlushes, socketCounters.flushes.get() - flushesBefore,
+ "BEGIN and the query should share one flush; savepoint cleanup needs another");
+ assertEquals(1, socketCounters.roundtrips.get() - roundtripsBefore,
+ "BEGIN and the query should complete in one roundtrip");
+ }
+}
diff --git a/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java b/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java
index 6c3f56dfab..6467dffc1f 100644
--- a/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java
+++ b/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java
@@ -47,6 +47,7 @@ public class CountingSocketFactory extends SocketFactory {
*/
public static final class Counters {
public final AtomicLong roundtrips = new AtomicLong();
+ public final AtomicLong flushes = new AtomicLong();
public final AtomicLong bytesOut = new AtomicLong();
public final AtomicLong bytesIn = new AtomicLong();
@@ -214,6 +215,7 @@ public void write(byte[] b, int off, int len) throws IOException {
@Override
public void flush() throws IOException {
out.flush();
+ socket.counters.flushes.incrementAndGet();
}
@Override
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: github://pgjdbc/pgjdbc
Cc: [email protected], [email protected]
Subject: Re: [pgjdbc/pgjdbc] PR #4196: fix(protocol): defer flushes until response processing
In-Reply-To: <<[email protected]>>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox