Message-ID: From: "vlsi (@vlsi)" To: "pgjdbc/pgjdbc" Date: Thu, 18 Jun 2026 16:18:03 +0000 Subject: [pgjdbc/pgjdbc] PR #4196: fix(protocol): defer flushes until response processing List-Id: X-GitHub-Additions: 121 X-GitHub-Author-Id: 213894 X-GitHub-Author-Login: vlsi X-GitHub-Base: master X-GitHub-Changed-Files: 6 X-GitHub-Commits: 1 X-GitHub-Deletions: 10 X-GitHub-Head-Branch: codex/fix-transaction-start-flush X-GitHub-Head-SHA: f2d251d7588a9829f632ef8c74f2785155613f9f X-GitHub-Issue: 4196 X-GitHub-Labels: performance X-GitHub-Merge-SHA: e9bb252d935a996a7c033bb69ee59ac203016b48 X-GitHub-Repo: pgjdbc/pgjdbc X-GitHub-State: open X-GitHub-Type: pull_request X-GitHub-Url: https://github.com/pgjdbc/pgjdbc/pull/4196 Content-Type: text/plain; charset=utf-8 ## Why Simple-query messages were flushed as soon as they were encoded. An implicit `BEGIN` followed by a user query therefore caused separate socket flushes, increasing latency for workloads with many short transactions. Java does not guarantee that an empty `OutputStream.flush()` is unobservable or harmless, so response reads must not add redundant flush calls. ## What - Remove the flush from simple-query encoding. - Flush explicitly at each send-then-read call site, including cursor fetches. - Read already-sent savepoint-cleanup responses without another flush. - Keep explicit flushes where the driver returns without reading responses. - Count raw socket flush calls and verify transaction and fastpath paths. - Add a short timeout to the cursor-fetch regression test so a missing flush fails promptly. Fixes #3894. ## Socket flushes per short transaction (`BEGIN` + `SELECT 1`) Flush count measured with `CountingSocketFactory` on the `BEGIN` + first query of a fresh transaction (`autoCommit=false`), comparing the fix against its parent commit. A flush maps to one TCP segment because `tcpNoDelay` is on. | Query mode | `autosave` | `cleanupSavepoints` | Baseline | With fix | Saved | | ---------- | ---------- | ------------------- | -------: | -------: | ----: | | EXTENDED | NEVER | false | 2 | 1 | −1 | | EXTENDED | ALWAYS | false | 3 | 1 | −2 | | EXTENDED | ALWAYS | true | 4 | 2 | −2 | | SIMPLE | NEVER | false | 2 | 1 | −1 | | SIMPLE | ALWAYS | false | 3 | 1 | −2 | | SIMPLE | ALWAYS | true | 4 | 2 | −2 | The fix saves one flush by pairing `BEGIN` with the following query, and a second flush when `autosave=ALWAYS` also defers the `SAVEPOINT`. Latency saved per transaction is the number of saved flushes times the round-trip time to the server. ## How to verify - `./gradlew :postgresql:test --tests org.postgresql.test.jdbc2.ConcurrentStatementFetchTest --quiet` - `./gradlew :postgresql:testClasses :postgresql:checkstyleMain :postgresql:checkstyleTest :postgresql:autostyleCheck --quiet` - `./gradlew :postgresql:test --tests 'org.postgresql.test.jdbc2.*' --no-daemon --quiet` progressed through all 7,397 tests; only the environment-sensitive `ConnectTimeoutTest` failed because a 5 s timeout took 10 s locally. The original stall was reproduced locally. A thread dump showed `ConcurrentStatementFetchTest` blocked in `QueryExecutorImpl.fetch()` while PostgreSQL was waiting for client input, identifying the missing flush after `sendSync()`. diff --git a/CHANGELOG.md b/CHANGELOG.md index bcf26928ff..dd92252d91 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). * chore: `commitPrepared` / `rollback`-of-prepared now return `XAER_RMFAIL` instead of `XAER_RMERR` when the underlying connection is left in a non-idle `TransactionState`. Transaction managers (Geronimo, Narayana, Atomikos) treat `XAER_RMFAIL` as retryable on a fresh `XAResource`; the prepared transaction is no longer abandoned. ### Fixed +* perf: defer simple-query flushes until the driver reads the response, allowing `BEGIN` and the following query to share a network flush [Issue #3894](https://github.com/pgjdbc/pgjdbc/issues/3894) * fix: the driver no longer nulls the `contextClassLoader` of shared `ForkJoinPool.commonPool()` worker threads, which previously left unrelated tasks on those threads running with a `null` classloader [Issue #4155](https://github.com/pgjdbc/pgjdbc/issues/4155) * fix: getCharacterStream wraps String in StringReader [PR #4063](https://github.com/pgjdbc/pgjdbc/pull/4063) * fix: `PGXAConnection` no longer saves and restores the underlying connection's JDBC `autoCommit` flag. All XA-protocol SQL (`BEGIN`, `PREPARE TRANSACTION`, `COMMIT`, `ROLLBACK`, `COMMIT PREPARED`, `ROLLBACK PREPARED`, the `recover()` SELECT) is sent through `QUERY_SUPPRESS_BEGIN`, so the caller's `autoCommit` value is invariant across every `XAResource` call. Fixes the "2nd phase commit must be issued using an idle connection" failure during recovery on managed datasources that pool connections with `autoCommit=false` (TomEE, WildFly, WebSphere Liberty). diff --git a/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java b/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java index 304a55f9cc..4504f44e4c 100644 --- a/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java +++ b/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java @@ -435,6 +435,7 @@ public void execute(Query query, @Nullable ParameterList parameters, } else { sendSync(); } + pgStream.flush(); processResults(handler, flags, adaptiveFetch); estimatedReceiveBufferBytes = 0; } catch (PGBindException se) { @@ -453,6 +454,7 @@ public void execute(Query query, @Nullable ParameterList parameters, // transaction in progress? // sendSync(); + pgStream.flush(); processResults(handler, flags, adaptiveFetch); estimatedReceiveBufferBytes = 0; handler @@ -540,6 +542,8 @@ private void releaseSavePoint(boolean autosave) throws SQLException { try { sendOneQuery(releaseAutoSave, SimpleQuery.NO_PARAMETERS, 1, 0, QUERY_NO_RESULTS | QUERY_NO_METADATA | QUERY_EXECUTE_AS_SIMPLE); + // No response processing follows, so flush the deferred cleanup before returning. + pgStream.flush(); } catch (IOException ex) { throw new PSQLException(GT.tr("Error releasing savepoint"), PSQLState.IO_ERROR); } @@ -683,6 +687,7 @@ public void execute(Query[] queries, @Nullable ParameterList[] parameterLists, if ((flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) { sendSync(); } + pgStream.flush(); processResults(handler, flags, adaptiveFetch); estimatedReceiveBufferBytes = 0; } @@ -828,6 +833,7 @@ public void handleWarning(SQLWarning warning) { beginFlags = updateQueryMode(beginFlags); sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS, 0, 0, beginFlags); sendSync(); + pgStream.flush(); processResults(handler, 0); estimatedReceiveBufferBytes = 0; } catch (IOException ioe) { @@ -1659,6 +1665,7 @@ private void flushIfDeadlockRisk(SimpleQuery query, } else { LOGGER.log(Level.FINEST, "Forcing Sync, receive buffer full or batching disallowed"); sendSync(); + pgStream.flush(); processResults(resultHandler, flags); // We've processed incoming bytes, and the query to be executed would consume receive buffer estimatedReceiveBufferBytes = resultBytes; @@ -1723,18 +1730,11 @@ private void sendQuery(Query query, V3ParameterList parameters, int maxRows, int // private void sendSync() throws IOException { - sendSync(true); - } - - private void sendSync(boolean flush) throws IOException { inExtendedProtocol = false; LOGGER.log(Level.FINEST, " FE=> Sync"); pgStream.sendChar(PgMessageType.SYNC_REQUEST); // Sync pgStream.sendInteger4(4); // Length - if (flush) { - pgStream.flush(); - } // Below "add queues" are likely not required at all pendingExecuteQueue.add(new ExecuteRequest(sync, null, true)); pendingDescribePortalQueue.add(sync); @@ -2243,7 +2243,7 @@ private void sendSimpleQuery(SimpleQuery query, SimpleParameterList params) thro if (inExtendedProtocol) { // A sync message is required when switching from extended protocol to a simple query protocol // See https://github.com/pgjdbc/pgjdbc/issues/3107 - sendSync(false); + sendSync(); } String nativeSql = query.toString( @@ -2258,7 +2258,6 @@ private void sendSimpleQuery(SimpleQuery query, SimpleParameterList params) thro pgStream.sendInteger4(encoded.length + 4 + 1); pgStream.send(encoded); pgStream.sendChar(0); - pgStream.flush(); pendingExecuteQueue.add(new ExecuteRequest(query, null, true)); pendingDescribePortalQueue.add(query); } @@ -2754,8 +2753,8 @@ && castNonNull(pendingExecuteQueue.peekFirst()).asSimple) { pgStream.sendInteger4(buf.length + 4 + 1); pgStream.send(buf); pgStream.sendChar(0); - pgStream.flush(); sendSync(); // send sync message + pgStream.flush(); skipMessage(); // skip the response message break; @@ -2826,6 +2825,7 @@ public void handleCommandStatus(String status, long updateCount, long insertOID) sendExecute(query, portal, fetchSize); sendSync(); + pgStream.flush(); processResults(handler, 0, adaptiveFetch); estimatedReceiveBufferBytes = 0; diff --git a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java index dc3657bbb3..471018688a 100644 --- a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java +++ b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/CleanupSavepointsWithFastpathTest.java @@ -6,12 +6,14 @@ package org.postgresql.test.jdbc2; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.params.provider.Arguments.arguments; import org.postgresql.PGConnection; import org.postgresql.PGProperty; import org.postgresql.largeobject.LargeObject; import org.postgresql.largeobject.LargeObjectManager; +import org.postgresql.test.util.CountingSocketFactory; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; @@ -21,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Collection; @@ -41,6 +44,8 @@ @ParameterizedClass @MethodSource("data") class CleanupSavepointsWithFastpathTest extends BaseTest4 { + private CountingSocketFactory.Counters socketCounters = CountingSocketFactory.register(); + CleanupSavepointsWithFastpathTest(BinaryMode binaryMode) { setBinaryMode(binaryMode); } @@ -58,6 +63,17 @@ protected void updateProperties(Properties props) { super.updateProperties(props); PGProperty.AUTOSAVE.set(props, "always"); PGProperty.CLEANUP_SAVEPOINTS.set(props, true); + PGProperty.SOCKET_FACTORY.set(props, CountingSocketFactory.class.getName()); + PGProperty.SOCKET_FACTORY_ARG.set(props, socketCounters.key()); + } + + @Override + protected void tearDown() throws SQLException { + try { + super.tearDown(); + } finally { + CountingSocketFactory.unregister(socketCounters); + } } /** @@ -81,9 +97,12 @@ void testLargeObjectWithCleanupSavepoints() throws Exception { // The bug: the RELEASE SAVEPOINT response ('C') is still in the buffer // and receiveFastpathResult() will read it instead of the expected response LargeObjectManager lom = con.unwrap(PGConnection.class).getLargeObjectAPI(); + long flushesBefore = socketCounters.flushes.get(); // This should NOT throw "Unknown Response Type C." long oid = lom.createLO(); + assertEquals(1, socketCounters.flushes.get() - flushesBefore, + "Consuming a pending savepoint response should not add an empty flush"); try { try (LargeObject lo = lom.open(oid)) { byte[] data = "Test data for issue #3910".getBytes(StandardCharsets.UTF_8); diff --git a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java index 860b4e521b..9384f0d859 100644 --- a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java +++ b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/ConcurrentStatementFetchTest.java @@ -14,6 +14,7 @@ import org.postgresql.test.TestUtil; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.MethodSource; @@ -21,6 +22,7 @@ import java.sql.ResultSet; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.TimeUnit; @ParameterizedClass @MethodSource("data") @@ -54,6 +56,7 @@ public void setUp() throws Exception { } @Test + @Timeout(value = 10, unit = TimeUnit.SECONDS) public void testFetchTwoStatements() throws Exception { // This test definitely fails at 8.2 in autocommit=false, and works with 8.4+ assumeTrue(autoCommit == AutoCommit.YES diff --git a/pgjdbc/src/test/java/org/postgresql/test/jdbc2/TransactionRoundtripTest.java b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/TransactionRoundtripTest.java new file mode 100644 index 0000000000..81691a92a9 --- /dev/null +++ b/pgjdbc/src/test/java/org/postgresql/test/jdbc2/TransactionRoundtripTest.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2026, PostgreSQL Global Development Group + * See the LICENSE file in the project root for more information. + */ + +package org.postgresql.test.jdbc2; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.postgresql.PGProperty; +import org.postgresql.jdbc.AutoSave; +import org.postgresql.jdbc.PreferQueryMode; +import org.postgresql.test.util.CountingSocketFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; + +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.Properties; + +/** + * Tests the number of network operations required to start and use a transaction. + */ +@ParameterizedClass +@MethodSource("data") +public class TransactionRoundtripTest extends BaseTest4 { + private CountingSocketFactory.Counters socketCounters = CountingSocketFactory.register(); + private final AutoSave autosave; + private final boolean cleanupSavepoints; + + public TransactionRoundtripTest(PreferQueryMode preferQueryMode, AutoSave autosave, + boolean cleanupSavepoints) { + setPreferQueryMode(preferQueryMode); + this.autosave = autosave; + this.cleanupSavepoints = cleanupSavepoints; + } + + public static Iterable data() { + return Arrays.asList(new Object[][]{ + {PreferQueryMode.EXTENDED, AutoSave.NEVER, false}, + {PreferQueryMode.EXTENDED, AutoSave.ALWAYS, false}, + {PreferQueryMode.EXTENDED, AutoSave.ALWAYS, true}, + {PreferQueryMode.SIMPLE, AutoSave.NEVER, false}, + {PreferQueryMode.SIMPLE, AutoSave.ALWAYS, false}, + {PreferQueryMode.SIMPLE, AutoSave.ALWAYS, true} + }); + } + + @Override + protected void updateProperties(Properties props) { + super.updateProperties(props); + PGProperty.AUTOSAVE.set(props, autosave.value()); + PGProperty.CLEANUP_SAVEPOINTS.set(props, cleanupSavepoints); + PGProperty.SOCKET_FACTORY.set(props, CountingSocketFactory.class.getName()); + PGProperty.SOCKET_FACTORY_ARG.set(props, socketCounters.key()); + } + + @Override + protected void tearDown() throws SQLException { + try { + super.tearDown(); + } finally { + CountingSocketFactory.unregister(socketCounters); + } + } + + @Test + void beginAndQueryAreFlushedTogether() throws SQLException { + con.setAutoCommit(false); + long flushesBefore = socketCounters.flushes.get(); + long roundtripsBefore = socketCounters.roundtrips.get(); + + try (Statement statement = con.createStatement()) { + statement.executeQuery("SELECT 1").close(); + } + + int expectedFlushes = cleanupSavepoints ? 2 : 1; + assertEquals(expectedFlushes, socketCounters.flushes.get() - flushesBefore, + "BEGIN and the query should share one flush; savepoint cleanup needs another"); + assertEquals(1, socketCounters.roundtrips.get() - roundtripsBefore, + "BEGIN and the query should complete in one roundtrip"); + } +} diff --git a/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java b/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java index 6c3f56dfab..6467dffc1f 100644 --- a/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java +++ b/testkit/src/main/java/org/postgresql/test/util/CountingSocketFactory.java @@ -47,6 +47,7 @@ public class CountingSocketFactory extends SocketFactory { */ public static final class Counters { public final AtomicLong roundtrips = new AtomicLong(); + public final AtomicLong flushes = new AtomicLong(); public final AtomicLong bytesOut = new AtomicLong(); public final AtomicLong bytesIn = new AtomicLong(); @@ -214,6 +215,7 @@ public void write(byte[] b, int off, int len) throws IOException { @Override public void flush() throws IOException { out.flush(); + socket.counters.flushes.incrementAndGet(); } @Override