pgjdbc/pgjdbc GitHub issues and pull requests (mirror)  
help / color / mirror / Atom feed
From: vlsi (@vlsi) <[email protected]>
To: pgjdbc/pgjdbc <[email protected]>
Subject: [pgjdbc/pgjdbc] PR #4038: fix: keep LazyCleanerImpl cleanup task alive across transient empty queue
Date: Wed, 22 Apr 2026 10:15:48 +0000
Message-ID: <[email protected]> (raw)

The fix reduces the possibility for `OutOfMemoryError` for Java 8 with high rate of `PreparedStatement` leak.

When the reference queue briefly empties, the cleanup task previously exited immediately (the loop was guarded by `while (!checkEmpty())`), forcing the next register() call to re-submit a fresh ForkJoinPool task. Under bursty load with constrained CPU (reproduces with `-XX:ActiveProcessorCount=2`), the FJP submit + ManagedBlocker compensation overhead dominates and cleanup falls so far behind that Node entries —  each holding the cleanup action lambda and the resources it captures —  accumulate to `OutOfMemoryError`.

The cleanup task now waits the full threadTtl on the reference queue after the registry empties before terminating, matching the prior dedicated-thread behavior and amortizing FJP startup cost across register bursts.

diff --git a/.github/workflows/matrix.mjs b/.github/workflows/matrix.mjs
index 6e37b04bca..c62824f748 100644
--- a/.github/workflows/matrix.mjs
+++ b/.github/workflows/matrix.mjs
@@ -243,6 +243,20 @@ matrix.addAxis({
   ]
 });
 
+// Occasionally constrain the JVM to a single CPU via -XX:ActiveProcessorCount=1. This shrinks
+// JVM-internal pools (ForkJoinPool common pool, GC threads, etc.) and has caught regressions
+// where bursty work cannot keep up with producers — e.g. LazyCleaner backed by FJP, see
+// https://github.com/pgjdbc/pgjdbc/issues/4037
+// Both HotSpot and OpenJ9 support the flag.
+matrix.addAxis({
+  name: 'cpu_count',
+  title: x => x.value === '1' ? 'ActiveProcessorCount=1' : '',
+  values: [
+    {value: '1', weight: 1},
+    {value: 'default', weight: 10},
+  ]
+});
+
 function lessThan(minVersion) {
     return value => Number(value) < Number(minVersion);
 }
@@ -252,7 +266,7 @@ matrix.setNamePattern([
     'server_tz', 'tz', 'locale',
     'check_anorm_sbt', 'gss', 'replication', 'slow_tests',
     'adaptive_fetch', 'rewrite_batch_inserts', 'query_timeout',
-    'autosave', 'cleanupSavepoints'
+    'autosave', 'cleanupSavepoints', 'cpu_count'
 ]);
 
 // We take EA builds from Oracle
@@ -302,6 +316,7 @@ matrix.ensureAllAxisValuesCovered('xa');
 matrix.ensureAllAxisValuesCovered('ssl');
 matrix.ensureAllAxisValuesCovered('replication');
 matrix.ensureAllAxisValuesCovered('os');
+matrix.ensureAllAxisValuesCovered('cpu_count');
 // Ensure at least one job with autosave=always
 matrix.generateRow({autosave: {value: 'always'}});
 const include = matrix.generateRows(process.env.MATRIX_JOBS || 5);
@@ -429,6 +444,13 @@ include.forEach(v => {
   if (v.gss === 'no') {
       testJvmArgs.push('-DskipGssEncryption=true');
   }
+  if (v.cpu_count.value === '1') {
+      // Constrains ForkJoinPool common pool to a single worker, exposing FJP submit/compensation
+      // overhead in code paths like LazyCleaner. See https://github.com/pgjdbc/pgjdbc/issues/4037
+      // Both HotSpot and OpenJ9 support -XX:ActiveProcessorCount.
+      testJvmArgs.push('-XX:ActiveProcessorCount=1');
+  }
+  delete v.cpu_count;
   v.extraJvmArgs = jvmArgs.join(' ');
   v.testExtraJvmArgs = testJvmArgs.join(' ::: ');
   delete v.hash;
diff --git a/pgjdbc/src/main/java/org/postgresql/util/LazyCleanerImpl.java b/pgjdbc/src/main/java/org/postgresql/util/LazyCleanerImpl.java
index 995f0c1bdf..6e6fbfa677 100644
--- a/pgjdbc/src/main/java/org/postgresql/util/LazyCleanerImpl.java
+++ b/pgjdbc/src/main/java/org/postgresql/util/LazyCleanerImpl.java
@@ -29,7 +29,6 @@
 import java.lang.ref.ReferenceQueue;
 import java.time.Duration;
 import java.util.concurrent.ForkJoinPool;
-import java.util.function.BooleanSupplier;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -126,23 +125,24 @@ private static class RefQueueBlocker<T> implements ForkJoinPool.ManagedBlocker {
     private final String threadName;
     private @Nullable Reference<? extends T> ref;
     private final long blockTimeoutMillis;
-    private final BooleanSupplier shouldTerminate;
 
-    RefQueueBlocker(ReferenceQueue<T> queue, String threadName, Duration blockTimeout, BooleanSupplier shouldTerminate) {
+    RefQueueBlocker(ReferenceQueue<T> queue, String threadName, Duration blockTimeout) {
       this.queue = queue;
       this.threadName = threadName;
-      this.blockTimeoutMillis = blockTimeout.toMillis();
-      this.shouldTerminate = shouldTerminate;
+      // ReferenceQueue.remove(0) blocks indefinitely; clamp to a small positive timeout so a
+      // user-configured threadTtl of 0/negative still yields a quickly-terminating cleanup task
+      // rather than a thread that parks forever waiting for the next reference.
+      long millis = blockTimeout.toMillis();
+      this.blockTimeoutMillis = millis > 0 ? millis : 1;
     }
 
     @Override
     public boolean isReleasable() {
-      if (ref != null || shouldTerminate.getAsBoolean()) {
+      if (ref != null) {
         return true; // already have a ref from a previous call
       }
       // non-blocking check
       ref = queue.poll();
-      // no need to block if we already have a ref from a previous call
       return ref != null;
     }
 
@@ -156,12 +156,13 @@ public boolean block() throws InterruptedException {
       String oldName = currentThread.getName();
       try {
         currentThread.setName(threadName);
-        // Perform blocking operation
+        // Wait the full blockTimeoutMillis for a reference. Returning true unconditionally lets
+        // managedBlock exit so the outer loop can decide whether to terminate the cleanup task.
         ref = queue.remove(blockTimeoutMillis);
       } finally {
         currentThread.setName(oldName);
       }
-      return false;
+      return true;
     }
 
     @Nullable Reference<? extends T> drainOne() {
@@ -188,16 +189,22 @@ private boolean startThread() {
             // InnocuousForkJoinWorkerThread or a SecurityManager forbids setContextClassLoader
           }
           RefQueueBlocker<Object> blocker =
-              new RefQueueBlocker<>(queue, threadName, threadTtl, this::checkEmpty);
-          while (!checkEmpty()) {
+              new RefQueueBlocker<>(queue, threadName, threadTtl);
+          while (true) {
             try {
               ForkJoinPool.managedBlock(blocker);
               Node<?> ref = (Node<?>) blocker.drainOne();
               if (ref != null) {
                 ref.onClean(true);
+              } else if (checkEmpty()) {
+                // The blocker waited the full threadTtl without receiving a ref, and there are
+                // no pending registrations, so the cleanup task can terminate. Keeping the task
+                // alive across transient empties amortizes ForkJoinPool submit/compensation
+                // overhead under bursty workloads.
+                break;
               }
             } catch (InterruptedException e) {
-              if (!blocker.isReleasable()) {
+              if (checkEmpty()) {
                 LOGGER.log(Level.FINE, "Got interrupt and the cleanup queue is empty, will terminate the cleanup thread");
                 break;
               }


reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: github://pgjdbc/pgjdbc
  Cc: [email protected], [email protected]
  Subject: Re: [pgjdbc/pgjdbc] PR #4038: fix: keep LazyCleanerImpl cleanup task alive across transient empty queue
  In-Reply-To: <<[email protected]>>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox