Message-ID: From: "vlsi (@vlsi)" To: "pgjdbc/pgjdbc" Date: Wed, 22 Apr 2026 10:15:48 +0000 Subject: [pgjdbc/pgjdbc] PR #4038: fix: keep LazyCleanerImpl cleanup task alive across transient empty queue List-Id: X-GitHub-Additions: 42 X-GitHub-Author-Id: 213894 X-GitHub-Author-Login: vlsi X-GitHub-Base: master X-GitHub-Changed-Files: 2 X-GitHub-Commits: 2 X-GitHub-Deletions: 13 X-GitHub-Head-Branch: improve_cleaner X-GitHub-Head-SHA: a75e06efc3678fe27255cb3a456a908e909b7161 X-GitHub-Issue: 4038 X-GitHub-Labels: bug X-GitHub-Merge-SHA: e06c64995ea8e930e467c97bdd625c3598fe9ecf X-GitHub-Merged-By: vlsi X-GitHub-Repo: pgjdbc/pgjdbc X-GitHub-State: merged X-GitHub-Type: pull_request X-GitHub-Url: https://github.com/pgjdbc/pgjdbc/pull/4038 Content-Type: text/plain; charset=utf-8 The fix reduces the possibility for `OutOfMemoryError` for Java 8 with high rate of `PreparedStatement` leak. When the reference queue briefly empties, the cleanup task previously exited immediately (the loop was guarded by `while (!checkEmpty())`), forcing the next register() call to re-submit a fresh ForkJoinPool task. Under bursty load with constrained CPU (reproduces with `-XX:ActiveProcessorCount=2`), the FJP submit + ManagedBlocker compensation overhead dominates and cleanup falls so far behind that Node entries — each holding the cleanup action lambda and the resources it captures — accumulate to `OutOfMemoryError`. The cleanup task now waits the full threadTtl on the reference queue after the registry empties before terminating, matching the prior dedicated-thread behavior and amortizing FJP startup cost across register bursts. diff --git a/.github/workflows/matrix.mjs b/.github/workflows/matrix.mjs index 6e37b04bca..c62824f748 100644 --- a/.github/workflows/matrix.mjs +++ b/.github/workflows/matrix.mjs @@ -243,6 +243,20 @@ matrix.addAxis({ ] }); +// Occasionally constrain the JVM to a single CPU via -XX:ActiveProcessorCount=1. This shrinks +// JVM-internal pools (ForkJoinPool common pool, GC threads, etc.) and has caught regressions +// where bursty work cannot keep up with producers — e.g. LazyCleaner backed by FJP, see +// https://github.com/pgjdbc/pgjdbc/issues/4037 +// Both HotSpot and OpenJ9 support the flag. +matrix.addAxis({ + name: 'cpu_count', + title: x => x.value === '1' ? 'ActiveProcessorCount=1' : '', + values: [ + {value: '1', weight: 1}, + {value: 'default', weight: 10}, + ] +}); + function lessThan(minVersion) { return value => Number(value) < Number(minVersion); } @@ -252,7 +266,7 @@ matrix.setNamePattern([ 'server_tz', 'tz', 'locale', 'check_anorm_sbt', 'gss', 'replication', 'slow_tests', 'adaptive_fetch', 'rewrite_batch_inserts', 'query_timeout', - 'autosave', 'cleanupSavepoints' + 'autosave', 'cleanupSavepoints', 'cpu_count' ]); // We take EA builds from Oracle @@ -302,6 +316,7 @@ matrix.ensureAllAxisValuesCovered('xa'); matrix.ensureAllAxisValuesCovered('ssl'); matrix.ensureAllAxisValuesCovered('replication'); matrix.ensureAllAxisValuesCovered('os'); +matrix.ensureAllAxisValuesCovered('cpu_count'); // Ensure at least one job with autosave=always matrix.generateRow({autosave: {value: 'always'}}); const include = matrix.generateRows(process.env.MATRIX_JOBS || 5); @@ -429,6 +444,13 @@ include.forEach(v => { if (v.gss === 'no') { testJvmArgs.push('-DskipGssEncryption=true'); } + if (v.cpu_count.value === '1') { + // Constrains ForkJoinPool common pool to a single worker, exposing FJP submit/compensation + // overhead in code paths like LazyCleaner. See https://github.com/pgjdbc/pgjdbc/issues/4037 + // Both HotSpot and OpenJ9 support -XX:ActiveProcessorCount. + testJvmArgs.push('-XX:ActiveProcessorCount=1'); + } + delete v.cpu_count; v.extraJvmArgs = jvmArgs.join(' '); v.testExtraJvmArgs = testJvmArgs.join(' ::: '); delete v.hash; diff --git a/pgjdbc/src/main/java/org/postgresql/util/LazyCleanerImpl.java b/pgjdbc/src/main/java/org/postgresql/util/LazyCleanerImpl.java index 995f0c1bdf..6e6fbfa677 100644 --- a/pgjdbc/src/main/java/org/postgresql/util/LazyCleanerImpl.java +++ b/pgjdbc/src/main/java/org/postgresql/util/LazyCleanerImpl.java @@ -29,7 +29,6 @@ import java.lang.ref.ReferenceQueue; import java.time.Duration; import java.util.concurrent.ForkJoinPool; -import java.util.function.BooleanSupplier; import java.util.logging.Level; import java.util.logging.Logger; @@ -126,23 +125,24 @@ private static class RefQueueBlocker implements ForkJoinPool.ManagedBlocker { private final String threadName; private @Nullable Reference ref; private final long blockTimeoutMillis; - private final BooleanSupplier shouldTerminate; - RefQueueBlocker(ReferenceQueue queue, String threadName, Duration blockTimeout, BooleanSupplier shouldTerminate) { + RefQueueBlocker(ReferenceQueue queue, String threadName, Duration blockTimeout) { this.queue = queue; this.threadName = threadName; - this.blockTimeoutMillis = blockTimeout.toMillis(); - this.shouldTerminate = shouldTerminate; + // ReferenceQueue.remove(0) blocks indefinitely; clamp to a small positive timeout so a + // user-configured threadTtl of 0/negative still yields a quickly-terminating cleanup task + // rather than a thread that parks forever waiting for the next reference. + long millis = blockTimeout.toMillis(); + this.blockTimeoutMillis = millis > 0 ? millis : 1; } @Override public boolean isReleasable() { - if (ref != null || shouldTerminate.getAsBoolean()) { + if (ref != null) { return true; // already have a ref from a previous call } // non-blocking check ref = queue.poll(); - // no need to block if we already have a ref from a previous call return ref != null; } @@ -156,12 +156,13 @@ public boolean block() throws InterruptedException { String oldName = currentThread.getName(); try { currentThread.setName(threadName); - // Perform blocking operation + // Wait the full blockTimeoutMillis for a reference. Returning true unconditionally lets + // managedBlock exit so the outer loop can decide whether to terminate the cleanup task. ref = queue.remove(blockTimeoutMillis); } finally { currentThread.setName(oldName); } - return false; + return true; } @Nullable Reference drainOne() { @@ -188,16 +189,22 @@ private boolean startThread() { // InnocuousForkJoinWorkerThread or a SecurityManager forbids setContextClassLoader } RefQueueBlocker blocker = - new RefQueueBlocker<>(queue, threadName, threadTtl, this::checkEmpty); - while (!checkEmpty()) { + new RefQueueBlocker<>(queue, threadName, threadTtl); + while (true) { try { ForkJoinPool.managedBlock(blocker); Node ref = (Node) blocker.drainOne(); if (ref != null) { ref.onClean(true); + } else if (checkEmpty()) { + // The blocker waited the full threadTtl without receiving a ref, and there are + // no pending registrations, so the cleanup task can terminate. Keeping the task + // alive across transient empties amortizes ForkJoinPool submit/compensation + // overhead under bursty workloads. + break; } } catch (InterruptedException e) { - if (!blocker.isReleasable()) { + if (checkEmpty()) { LOGGER.log(Level.FINE, "Got interrupt and the cleanup queue is empty, will terminate the cleanup thread"); break; }