public inbox for [email protected]  
help / color / mirror / Atom feed
From: Joel Jacobson <[email protected]>
To: Thomas Munro <[email protected]>
Cc: pgsql-hackers <[email protected]>
Cc: Heikki Linnakangas <[email protected]>
Cc: Rishu Bagga <[email protected]>
Subject: Re: Optimize LISTEN/NOTIFY
Date: Thu, 24 Jul 2025 23:03:27 +0200
Message-ID: <[email protected]> (raw)
In-Reply-To: <CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>
References: <[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CAK80=jhmE40KVqQ3ho37MArS7cAED1p9m7uikDxcnDmqdW7t8A@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<CA+hUKGLrMGkWDB0cwTa0RqD+AF7O-Ywgck8aVYKwOQnZgYRRug@mail.gmail.com>

On Wed, Jul 23, 2025, at 04:44, Thomas Munro wrote:
> On Wed, Jul 23, 2025 at 1:39 PM Joel Jacobson <[email protected]> wrote:
>> In their patch, in asyn.c's SignalBackends(), they do
>> SendInterrupt(INTERRUPT_ASYNC_NOTIFY, procno) instead of
>> SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]). They don't
>> seem to check if the backend is already signalled or not, but maybe
>> SendInterrupt() has signal coalescing built-in so it would be a noop
>> with almost no cost?
>
> Yeah:
>
> + old_pending = pg_atomic_fetch_or_u32(&proc->pendingInterrupts, interruptMask);
> +
> + /*
> + * If the process is currently blocked waiting for an interrupt to arrive,
> + * and the interrupt wasn't already pending, wake it up.
> + */
> + if ((old_pending & (interruptMask | SLEEPING_ON_INTERRUPTS)) ==
> SLEEPING_ON_INTERRUPTS)
> +     WakeupOtherProc(proc);

Thanks for confirming the coalescing logic in SendInterrupt. That's a
great low-level optimization. It's clear we're both targeting the same
problem of redundant wake-ups under contention, but approaching it from
different architectural levels.

The core difference, as I see it, is *where* the state management
resides. The "Interrupts vs signals" patch set creates a unified
machinery where the 'pending' state for all subsystems is combined into
a single atomic bitmask. This is a valid approach.

However, I've been exploring an alternative pattern that decouples the
state management from the signaling machinery, allowing each subsystem
to manage its own state independently. I believe this leads to a
simpler, more modular migration path. I've developed a two-patch series
for `async.c` to demonstrate this concept.

1. The first patch introduces a lock-free, atomic finite state machine
   (FSM) entirely within async.c. By using a subsystem-specific atomic
   integer and CAS operations, async.c can now robustly manage its own
   listener states (IDLE, SIGNALLED, PROCESSING). This solves the
   redundant signal problem at the source, as notifiers can now observe
   a listener's state and refrain from sending a wakeup if one is
   already pending.

2. The second patch demonstrates that once state is managed locally, the
   wakeup mechanism becomes trivial.** The expensive `SendProcSignal`
   call is replaced with a direct `SetLatch`. This leverages the
   existing, highly-optimized `WaitEventSet` infrastructure as a simple,
   efficient "poke."

This suggests a powerful, incremental migration pattern: first, fix a
subsystem's state management internally; second, replace its wakeup
mechanism. This vertical, module-by-module approach seems complementary
to the horizontal, layer-by-layer refactoring in the "Interrupts vs
signals" thread.

I'll post a more detailed follow-up in that thread to discuss the
broader architectural implications. Attached are the two patches,
reframed to better illustrate this two-step pattern.

/Joel
#!/bin/bash

# Configuration for the PostgreSQL instances using absolute paths.
# This script does NOT modify the shell's PATH variable.

# --- Master Config ---
MASTER_NAME="master"
MASTER_PORT=5432
MASTER_BIN_PATH="$HOME/pg-master/bin"
MASTER_DATA="$HOME/pg-master-data"
MASTER_LOG="/tmp/pg-master.log"

# --- Patch v1 Config ---
PATCH_NAME="patch-v1"
PATCH_PORT=5432
PATCH_BIN_PATH="$HOME/pg-patch-v1/bin"
PATCH_DATA="$HOME/pg-patch-v1-data"
PATCH_LOG="/tmp/pg-patch-v1.log"

# --- Patch v2 Config ---
PATCH_V2_NAME="patch-v2"
PATCH_V2_PORT=5432
PATCH_V2_BIN_PATH="$HOME/pg-patch-v2/bin"
PATCH_V2_DATA="$HOME/pg-patch-v2-data"
PATCH_V2_LOG="/tmp/pg-patch-v2.log"

# Benchmark settings
CHANNEL_NAME="mychannel"
CONNECTIONS=(64 128)
DURATION=10 # Benchmark duration in seconds for each run
MEASUREMENTS=3 # Number of measurements per configuration

# CSV output file
CSV_OUTPUT="benchmark_results.csv"

# Temporary files
PGBENCH_SCRIPT=$(mktemp)

# --- Cleanup Function ---
# Ensures that servers are stopped and temp files are removed on script exit.
cleanup() {
  echo ""
  echo "Cleaning up..."
  # Ensure all servers are stopped, silencing errors if they are not running.
  # Use absolute paths and explicit data directories.
  "$MASTER_BIN_PATH/pg_ctl" -D "$MASTER_DATA" -m fast stop &> /dev/null
  "$PATCH_BIN_PATH/pg_ctl" -D "$PATCH_DATA" -m fast stop &> /dev/null
  "$PATCH_V2_BIN_PATH/pg_ctl" -D "$PATCH_V2_DATA" -m fast stop &> /dev/null
  rm -f "$PGBENCH_SCRIPT"
  echo "Cleanup complete."
}

# Trap the script's exit (normal or interrupted) to run the cleanup function
trap cleanup EXIT

# Initialize CSV file with headers
# echo "version,connections,jobs,tps,run" > "$CSV_OUTPUT"

# --- Benchmark Function ---
# A generic function to run the benchmark for a given configuration.
# It starts, benchmarks, and then stops the specified server instance.
run_benchmark() {
  local name=$1
  local port=$2
  local bin_path=$3
  local data_path=$4
  local log_file=$5

  echo "--- Starting benchmark for: $name ---"

  # Set PGPORT for client tools (pgbench, psql) for this run
  export PGPORT=$port

  # 1. Start the server using absolute path and explicit data directory
  echo "Starting $name server on port $port..."
  "$bin_path/pg_ctl" -D "$data_path" -l "$log_file" -o "-p $port" start
  sleep 2 # Give server a moment to become available

  # Create the pgbench script content
  cat > "$PGBENCH_SCRIPT" << EOF
NOTIFY ${CHANNEL_NAME};
EOF

  # 2. Start the listener in the background for this server
  (echo "LISTEN ${CHANNEL_NAME};"; sleep 100000) | "$bin_path/psql" -d postgres &> /dev/null &
  local listener_pid=$!
  
  # 3. Run the benchmark loop
  echo "Running pgbench for connection counts: ${CONNECTIONS[*]}"
  for c in "${CONNECTIONS[@]}"; do
    echo "  Testing with $c connections ($MEASUREMENTS measurements per run)..."
    # Run multiple measurements for each connection count
    for m in $(seq 1 $MEASUREMENTS); do
      # Run pgbench and extract TPS value
      tps=$("$bin_path/pgbench" -d postgres -f "$PGBENCH_SCRIPT" -c "$c" -j "$c" -T "$DURATION" -n \
        | grep -E '^tps' \
        | awk '{printf "%.0f", $3}')
      
      # Write to CSV: version,connections,jobs,tps,run
      echo "$name,$c,$c,$tps,$m" >> "$CSV_OUTPUT"
    done
  done
  
  # 4. Stop the listener and the server
  kill "$listener_pid" &> /dev/null
  echo "Stopping $name server..."
  "$bin_path/pg_ctl" -D "$data_path" -m fast stop &> /dev/null
  echo "--- Benchmark for $name complete ---"
  echo ""
}

# --- Main Execution ---

# 1. Run benchmark for master
# run_benchmark "$MASTER_NAME" "$MASTER_PORT" "$MASTER_BIN_PATH" "$MASTER_DATA" "$MASTER_LOG"

# 2. Run benchmark for patch-v1
# run_benchmark "$PATCH_NAME" "$PATCH_PORT" "$PATCH_BIN_PATH" "$PATCH_DATA" "$PATCH_LOG"

# 3. Run benchmark for patch-v2
run_benchmark "$PATCH_V2_NAME" "$PATCH_V2_PORT" "$PATCH_V2_BIN_PATH" "$PATCH_V2_DATA" "$PATCH_V2_LOG"

# 4. Generate report using PostgreSQL
echo ""
echo "# BENCHMARK"
echo ""
echo "## TPS"

# Start the master server to run the analysis
export PGPORT=$MASTER_PORT
"$MASTER_BIN_PATH/pg_ctl" -D "$MASTER_DATA" -l "$MASTER_LOG" -o "-p $MASTER_PORT" start &> /dev/null
sleep 2

# Create analysis database and load data
"$MASTER_BIN_PATH/psql" -d postgres -q << EOF
-- Create a temporary database for analysis
DROP DATABASE IF EXISTS bench_analysis;
CREATE DATABASE bench_analysis;
\c bench_analysis

-- Create table for benchmark results
CREATE TABLE benchmark_results (
    version TEXT,
    connections INT,
    jobs INT,
    tps NUMERIC,
    run INT
);

-- Load CSV data
\COPY benchmark_results FROM '$CSV_OUTPUT' CSV HEADER

-- Generate comparison report
WITH avg_results AS (
    SELECT 
        version,
        connections,
        AVG(tps) AS avg_tps,
        STDDEV(tps) AS stddev_tps,
        COUNT(*) AS runs
    FROM benchmark_results
    GROUP BY version, connections
),
comparison AS (
    SELECT 
        m.connections,
        m.avg_tps AS master_tps,
        p1.avg_tps AS patch_v1_tps,
        p2.avg_tps AS patch_v2_tps,
        CASE 
            WHEN m.avg_tps > 0 THEN ((p1.avg_tps - m.avg_tps) / m.avg_tps * 100)
            ELSE 0
        END AS relative_diff_patch_v1_pct,
        CASE 
            WHEN m.avg_tps > 0 THEN ((p2.avg_tps - m.avg_tps) / m.avg_tps * 100)
            ELSE 0
        END AS relative_diff_patch_v2_pct
    FROM avg_results m
    JOIN avg_results p1 ON m.connections = p1.connections
    JOIN avg_results p2 ON m.connections = p2.connections
    WHERE m.version = 'master' AND p1.version = 'patch-v1' AND p2.version = 'patch-v2'
    ORDER BY m.connections
)

SELECT 
    connections AS "N backends",
    ROUND(master_tps) AS "master",
    ROUND(patch_v1_tps) AS "patch-v1",
    ROUND(patch_v2_tps) AS "patch-v2"
FROM comparison
ORDER BY connections;
EOF

echo ""
echo "## TPS speed-up vs master"

"$MASTER_BIN_PATH/psql" -d bench_analysis -q << EOF
SELECT 
    connections AS "N backends",
    CASE WHEN relative_diff_patch_v1_pct >= 0 THEN '+' ELSE '' END || 
    ROUND(relative_diff_patch_v1_pct) || '%' AS "patch-v1",
    CASE WHEN relative_diff_patch_v2_pct >= 0 THEN '+' ELSE '' END || 
    ROUND(relative_diff_patch_v2_pct) || '%' AS "patch-v2"
FROM (
    WITH avg_results AS (
        SELECT 
            version,
            connections,
            AVG(tps) AS avg_tps
        FROM benchmark_results
        GROUP BY version, connections
    )
    SELECT 
        m.connections,
        CASE 
            WHEN m.avg_tps > 0 THEN ((p1.avg_tps - m.avg_tps) / m.avg_tps * 100)
            ELSE 0
        END AS relative_diff_patch_v1_pct,
        CASE 
            WHEN m.avg_tps > 0 THEN ((p2.avg_tps - m.avg_tps) / m.avg_tps * 100)
            ELSE 0
        END AS relative_diff_patch_v2_pct
    FROM avg_results m
    JOIN avg_results p1 ON m.connections = p1.connections
    JOIN avg_results p2 ON m.connections = p2.connections
    WHERE m.version = 'master' AND p1.version = 'patch-v1' AND p2.version = 'patch-v2'
) AS comparison
ORDER BY connections;
EOF

# Stop the server
"$MASTER_BIN_PATH/pg_ctl" -D "$MASTER_DATA" -m fast stop &> /dev/null

echo ""
echo "CSV results saved to: $CSV_OUTPUT"

# BENCHMARK

A single backend does `LISTEN mychannel;` and stays idle,
then pgbench is run 3 times for each <N backends>.

script.sql: NOTIFY mychannel;

% pgbench" -f script.sql -c <N backends> -j <N backends> -T 10 -n

## TPS

 N backends | master | patch-v1 | patch-v2
------------+--------+----------+----------
          1 | 117343 |   151422 |   150735
          2 | 158427 |   236705 |   239004
          4 | 177454 |   250783 |   250782
          8 | 116521 |   155466 |   180418
         16 |  45627 |   144740 |   163491
         32 |  37281 |   135602 |   146659
         64 |  36608 |   123870 |   131202
        128 |  34798 |   120302 |   119041
(8 rows)


## TPS speed-up vs master

 N backends | patch-v1 | patch-v2
------------+----------+----------
          1 | +29%     | +28%
          2 | +49%     | +51%
          4 | +41%     | +41%
          8 | +33%     | +55%
         16 | +217%    | +258%
         32 | +264%    | +293%
         64 | +238%    | +258%
        128 | +246%    | +242%
(8 rows)


Attachments:

  [text/plain] pgbench-script.txt (7.0K, 2-pgbench-script.txt)
  download | inline:
#!/bin/bash

# Configuration for the PostgreSQL instances using absolute paths.
# This script does NOT modify the shell's PATH variable.

# --- Master Config ---
MASTER_NAME="master"
MASTER_PORT=5432
MASTER_BIN_PATH="$HOME/pg-master/bin"
MASTER_DATA="$HOME/pg-master-data"
MASTER_LOG="/tmp/pg-master.log"

# --- Patch v1 Config ---
PATCH_NAME="patch-v1"
PATCH_PORT=5432
PATCH_BIN_PATH="$HOME/pg-patch-v1/bin"
PATCH_DATA="$HOME/pg-patch-v1-data"
PATCH_LOG="/tmp/pg-patch-v1.log"

# --- Patch v2 Config ---
PATCH_V2_NAME="patch-v2"
PATCH_V2_PORT=5432
PATCH_V2_BIN_PATH="$HOME/pg-patch-v2/bin"
PATCH_V2_DATA="$HOME/pg-patch-v2-data"
PATCH_V2_LOG="/tmp/pg-patch-v2.log"

# Benchmark settings
CHANNEL_NAME="mychannel"
CONNECTIONS=(64 128)
DURATION=10 # Benchmark duration in seconds for each run
MEASUREMENTS=3 # Number of measurements per configuration

# CSV output file
CSV_OUTPUT="benchmark_results.csv"

# Temporary files
PGBENCH_SCRIPT=$(mktemp)

# --- Cleanup Function ---
# Ensures that servers are stopped and temp files are removed on script exit.
cleanup() {
  echo ""
  echo "Cleaning up..."
  # Ensure all servers are stopped, silencing errors if they are not running.
  # Use absolute paths and explicit data directories.
  "$MASTER_BIN_PATH/pg_ctl" -D "$MASTER_DATA" -m fast stop &> /dev/null
  "$PATCH_BIN_PATH/pg_ctl" -D "$PATCH_DATA" -m fast stop &> /dev/null
  "$PATCH_V2_BIN_PATH/pg_ctl" -D "$PATCH_V2_DATA" -m fast stop &> /dev/null
  rm -f "$PGBENCH_SCRIPT"
  echo "Cleanup complete."
}

# Trap the script's exit (normal or interrupted) to run the cleanup function
trap cleanup EXIT

# Initialize CSV file with headers
# echo "version,connections,jobs,tps,run" > "$CSV_OUTPUT"

# --- Benchmark Function ---
# A generic function to run the benchmark for a given configuration.
# It starts, benchmarks, and then stops the specified server instance.
run_benchmark() {
  local name=$1
  local port=$2
  local bin_path=$3
  local data_path=$4
  local log_file=$5

  echo "--- Starting benchmark for: $name ---"

  # Set PGPORT for client tools (pgbench, psql) for this run
  export PGPORT=$port

  # 1. Start the server using absolute path and explicit data directory
  echo "Starting $name server on port $port..."
  "$bin_path/pg_ctl" -D "$data_path" -l "$log_file" -o "-p $port" start
  sleep 2 # Give server a moment to become available

  # Create the pgbench script content
  cat > "$PGBENCH_SCRIPT" << EOF
NOTIFY ${CHANNEL_NAME};
EOF

  # 2. Start the listener in the background for this server
  (echo "LISTEN ${CHANNEL_NAME};"; sleep 100000) | "$bin_path/psql" -d postgres &> /dev/null &
  local listener_pid=$!
  
  # 3. Run the benchmark loop
  echo "Running pgbench for connection counts: ${CONNECTIONS[*]}"
  for c in "${CONNECTIONS[@]}"; do
    echo "  Testing with $c connections ($MEASUREMENTS measurements per run)..."
    # Run multiple measurements for each connection count
    for m in $(seq 1 $MEASUREMENTS); do
      # Run pgbench and extract TPS value
      tps=$("$bin_path/pgbench" -d postgres -f "$PGBENCH_SCRIPT" -c "$c" -j "$c" -T "$DURATION" -n \
        | grep -E '^tps' \
        | awk '{printf "%.0f", $3}')
      
      # Write to CSV: version,connections,jobs,tps,run
      echo "$name,$c,$c,$tps,$m" >> "$CSV_OUTPUT"
    done
  done
  
  # 4. Stop the listener and the server
  kill "$listener_pid" &> /dev/null
  echo "Stopping $name server..."
  "$bin_path/pg_ctl" -D "$data_path" -m fast stop &> /dev/null
  echo "--- Benchmark for $name complete ---"
  echo ""
}

# --- Main Execution ---

# 1. Run benchmark for master
# run_benchmark "$MASTER_NAME" "$MASTER_PORT" "$MASTER_BIN_PATH" "$MASTER_DATA" "$MASTER_LOG"

# 2. Run benchmark for patch-v1
# run_benchmark "$PATCH_NAME" "$PATCH_PORT" "$PATCH_BIN_PATH" "$PATCH_DATA" "$PATCH_LOG"

# 3. Run benchmark for patch-v2
run_benchmark "$PATCH_V2_NAME" "$PATCH_V2_PORT" "$PATCH_V2_BIN_PATH" "$PATCH_V2_DATA" "$PATCH_V2_LOG"

# 4. Generate report using PostgreSQL
echo ""
echo "# BENCHMARK"
echo ""
echo "## TPS"

# Start the master server to run the analysis
export PGPORT=$MASTER_PORT
"$MASTER_BIN_PATH/pg_ctl" -D "$MASTER_DATA" -l "$MASTER_LOG" -o "-p $MASTER_PORT" start &> /dev/null
sleep 2

# Create analysis database and load data
"$MASTER_BIN_PATH/psql" -d postgres -q << EOF
-- Create a temporary database for analysis
DROP DATABASE IF EXISTS bench_analysis;
CREATE DATABASE bench_analysis;
\c bench_analysis

-- Create table for benchmark results
CREATE TABLE benchmark_results (
    version TEXT,
    connections INT,
    jobs INT,
    tps NUMERIC,
    run INT
);

-- Load CSV data
\COPY benchmark_results FROM '$CSV_OUTPUT' CSV HEADER

-- Generate comparison report
WITH avg_results AS (
    SELECT 
        version,
        connections,
        AVG(tps) AS avg_tps,
        STDDEV(tps) AS stddev_tps,
        COUNT(*) AS runs
    FROM benchmark_results
    GROUP BY version, connections
),
comparison AS (
    SELECT 
        m.connections,
        m.avg_tps AS master_tps,
        p1.avg_tps AS patch_v1_tps,
        p2.avg_tps AS patch_v2_tps,
        CASE 
            WHEN m.avg_tps > 0 THEN ((p1.avg_tps - m.avg_tps) / m.avg_tps * 100)
            ELSE 0
        END AS relative_diff_patch_v1_pct,
        CASE 
            WHEN m.avg_tps > 0 THEN ((p2.avg_tps - m.avg_tps) / m.avg_tps * 100)
            ELSE 0
        END AS relative_diff_patch_v2_pct
    FROM avg_results m
    JOIN avg_results p1 ON m.connections = p1.connections
    JOIN avg_results p2 ON m.connections = p2.connections
    WHERE m.version = 'master' AND p1.version = 'patch-v1' AND p2.version = 'patch-v2'
    ORDER BY m.connections
)

SELECT 
    connections AS "N backends",
    ROUND(master_tps) AS "master",
    ROUND(patch_v1_tps) AS "patch-v1",
    ROUND(patch_v2_tps) AS "patch-v2"
FROM comparison
ORDER BY connections;
EOF

echo ""
echo "## TPS speed-up vs master"

"$MASTER_BIN_PATH/psql" -d bench_analysis -q << EOF
SELECT 
    connections AS "N backends",
    CASE WHEN relative_diff_patch_v1_pct >= 0 THEN '+' ELSE '' END || 
    ROUND(relative_diff_patch_v1_pct) || '%' AS "patch-v1",
    CASE WHEN relative_diff_patch_v2_pct >= 0 THEN '+' ELSE '' END || 
    ROUND(relative_diff_patch_v2_pct) || '%' AS "patch-v2"
FROM (
    WITH avg_results AS (
        SELECT 
            version,
            connections,
            AVG(tps) AS avg_tps
        FROM benchmark_results
        GROUP BY version, connections
    )
    SELECT 
        m.connections,
        CASE 
            WHEN m.avg_tps > 0 THEN ((p1.avg_tps - m.avg_tps) / m.avg_tps * 100)
            ELSE 0
        END AS relative_diff_patch_v1_pct,
        CASE 
            WHEN m.avg_tps > 0 THEN ((p2.avg_tps - m.avg_tps) / m.avg_tps * 100)
            ELSE 0
        END AS relative_diff_patch_v2_pct
    FROM avg_results m
    JOIN avg_results p1 ON m.connections = p1.connections
    JOIN avg_results p2 ON m.connections = p2.connections
    WHERE m.version = 'master' AND p1.version = 'patch-v1' AND p2.version = 'patch-v2'
) AS comparison
ORDER BY connections;
EOF

# Stop the server
"$MASTER_BIN_PATH/pg_ctl" -D "$MASTER_DATA" -m fast stop &> /dev/null

echo ""
echo "CSV results saved to: $CSV_OUTPUT"

  [text/plain] pgbench-results.txt (1020B, 3-pgbench-results.txt)
  download | inline:
# BENCHMARK

A single backend does `LISTEN mychannel;` and stays idle,
then pgbench is run 3 times for each <N backends>.

script.sql: NOTIFY mychannel;

% pgbench" -f script.sql -c <N backends> -j <N backends> -T 10 -n

## TPS

 N backends | master | patch-v1 | patch-v2
------------+--------+----------+----------
          1 | 117343 |   151422 |   150735
          2 | 158427 |   236705 |   239004
          4 | 177454 |   250783 |   250782
          8 | 116521 |   155466 |   180418
         16 |  45627 |   144740 |   163491
         32 |  37281 |   135602 |   146659
         64 |  36608 |   123870 |   131202
        128 |  34798 |   120302 |   119041
(8 rows)


## TPS speed-up vs master

 N backends | patch-v1 | patch-v2
------------+----------+----------
          1 | +29%     | +28%
          2 | +49%     | +51%
          4 | +41%     | +41%
          8 | +33%     | +55%
         16 | +217%    | +258%
         32 | +264%    | +293%
         64 | +238%    | +258%
        128 | +246%    | +242%
(8 rows)

  [application/octet-stream] 0001-Optimize-LISTEN-NOTIFY-signaling-with-a-lock-free-at.patch (14.1K, 4-0001-Optimize-LISTEN-NOTIFY-signaling-with-a-lock-free-at.patch)
  download | inline diff:
From 17777283bda5fa41b430e4f71a7246d3f04a94bf Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Tue, 22 Jul 2025 10:32:34 +0200
Subject: [PATCH 1/2] Optimize LISTEN/NOTIFY signaling with a lock-free atomic
 state machine

This commit introduces a powerful pattern for modernizing inter-process
communication by refactoring the LISTEN/NOTIFY subsystem to use a
lock-free, atomic finite state machine (FSM). This directly addresses
the historical lack of safe, efficient state synchronization primitives.

Previously, if multiple transactions sent notifications concurrently,
each would unconditionally attempt to signal all listening backends.
This resulted in a storm of superfluous signals to listeners that were
already pending a wakeup, causing unnecessary system call overhead.

By introducing an atomic per-backend state (IDLE, SIGNALLED, PROCESSING)
in shared memory and manipulated via compare-and-swap (CAS), this
inefficiency is eliminated. A notifier can now atomically transition a
listener's state from IDLE to SIGNALLED, ensuring that only the first
notifier for a given idle listener dispatches a wakeup. The FSM also
robustly handles race conditions where new notifications arrive while a
listener is PROCESSING, guaranteeing no work is ever missed.

This FSM pattern is a generalizable solution for managing concurrency in
PostgreSQL. By modeling inter-process interactions as explicit state
transitions, we can build more robust and performant subsystems. This
commit demonstrates the pattern's effectiveness within async.c, and by
cleanly solving the state management problem first, it enables a
subsequent, trivial optimization of the wakeup mechanism itself.
---
 src/backend/commands/async.c | 209 ++++++++++++++++++++++++++++++-----
 src/backend/tcop/postgres.c  |   4 +-
 src/include/commands/async.h |   4 +-
 3 files changed, 185 insertions(+), 32 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 4bd37d5beb5..ae20017af9b 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -150,8 +150,19 @@
 #include "utils/ps_status.h"
 #include "utils/snapmgr.h"
 #include "utils/timestamp.h"
+#include "port/atomics.h"
 
 
+/*
+ * Async notification state machine states
+ */
+typedef enum AsyncListenerState
+{
+	ASYNC_STATE_IDLE = 0,		/* Backend is idle, waiting for signal */
+	ASYNC_STATE_SIGNALLED = 1,	/* Backend has been signaled, will process soon */
+	ASYNC_STATE_PROCESSING = 2	/* Backend is actively processing notifications */
+} AsyncListenerState;
+
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
  * must be kept small enough so that a notification message fits on one
@@ -246,6 +257,7 @@ typedef struct QueueBackendStatus
 	Oid			dboid;			/* backend's database OID, or InvalidOid */
 	ProcNumber	nextListener;	/* id of next listener, or INVALID_PROC_NUMBER */
 	QueuePosition pos;			/* backend has read queue up to here */
+	pg_atomic_uint32 state;		/* async state machine state */
 } QueueBackendStatus;
 
 /*
@@ -301,6 +313,7 @@ static AsyncQueueControl *asyncQueueControl;
 #define QUEUE_BACKEND_DBOID(i)		(asyncQueueControl->backend[i].dboid)
 #define QUEUE_NEXT_LISTENER(i)		(asyncQueueControl->backend[i].nextListener)
 #define QUEUE_BACKEND_POS(i)		(asyncQueueControl->backend[i].pos)
+#define QUEUE_BACKEND_STATE(i)		(asyncQueueControl->backend[i].state)
 
 /*
  * The SLRU buffer area through which we access the notification queue
@@ -405,12 +418,10 @@ static NotificationList *pendingNotifies = NULL;
 
 /*
  * Inbound notifications are initially processed by HandleNotifyInterrupt(),
- * called from inside a signal handler. That just sets the
- * notifyInterruptPending flag and sets the process
+ * called from inside a signal handler. That just sets the process
  * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to
  * actually deal with the interrupt.
  */
-volatile sig_atomic_t notifyInterruptPending = false;
 
 /* True if we've registered an on_shmem_exit cleanup */
 static bool unlistenExitRegistered = false;
@@ -527,6 +538,7 @@ AsyncShmemInit(void)
 			QUEUE_BACKEND_DBOID(i) = InvalidOid;
 			QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER;
 			SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
+			pg_atomic_init_u32(&QUEUE_BACKEND_STATE(i), ASYNC_STATE_IDLE);
 		}
 	}
 
@@ -1099,6 +1111,8 @@ Exec_ListenPreCommit(void)
 	QUEUE_BACKEND_POS(MyProcNumber) = max;
 	QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId;
+	/* Initialize the atomic state to IDLE */
+	pg_atomic_write_u32(&QUEUE_BACKEND_STATE(MyProcNumber), ASYNC_STATE_IDLE);
 	/* Insert backend into list of listeners at correct position */
 	if (prevListener != INVALID_PROC_NUMBER)
 	{
@@ -1242,6 +1256,8 @@ asyncQueueUnregister(void)
 	/* Mark our entry as invalid */
 	QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid;
 	QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid;
+	/* Reset state to IDLE to prevent zombie listeners */
+	pg_atomic_write_u32(&QUEUE_BACKEND_STATE(MyProcNumber), ASYNC_STATE_IDLE);
 	/* and remove it from the list */
 	if (QUEUE_FIRST_LISTENER == MyProcNumber)
 		QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber);
@@ -1634,25 +1650,84 @@ SignalBackends(void)
 	for (int i = 0; i < count; i++)
 	{
 		int32		pid = pids[i];
+		ProcNumber	procno = procnos[i];
+		uint32		expected;
+		bool		signal_needed = false;
 
 		/*
-		 * If we are signaling our own process, no need to involve the kernel;
-		 * just set the flag directly.
+		 * Implement state machine transitions for the notifier.
+		 * We use a loop to handle race conditions where the state
+		 * changes between our read and the CAS operation.
 		 */
-		if (pid == MyProcPid)
+		uint32	current_state = pg_atomic_read_membarrier_u32(&QUEUE_BACKEND_STATE(procno));
+
+		switch (current_state)
 		{
-			notifyInterruptPending = true;
-			continue;
+			case ASYNC_STATE_IDLE:
+				/* Try to transition from IDLE to SIGNALLED */
+				expected = ASYNC_STATE_IDLE;
+				if (pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(procno),
+													&expected,
+													ASYNC_STATE_SIGNALLED))
+				{
+					/* Success - need to send signal */
+					signal_needed = true;
+					if (Trace_notify)
+						elog(DEBUG1, "SignalBackends: transitioned backend %d from IDLE to SIGNALLED", pid);
+				}
+				/* Another notifier already signaled - we're done */
+				break;
+
+			case ASYNC_STATE_SIGNALLED:
+				/* Backend is already signaled - nothing to do */
+				if (Trace_notify)
+					elog(DEBUG1, "SignalBackends: backend %d already in SIGNALLED state, skipping", pid);
+				break;
+
+			case ASYNC_STATE_PROCESSING:
+				/* Try to transition from PROCESSING to SIGNALLED */
+				expected = ASYNC_STATE_PROCESSING;
+				if (pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(procno),
+													&expected,
+													ASYNC_STATE_SIGNALLED))
+				{
+					/* Success - need to send signal for re-scan */
+					signal_needed = true;
+					if (Trace_notify)
+						elog(DEBUG1, "SignalBackends: transitioned backend %d from PROCESSING to SIGNALLED for re-scan", pid);
+					break;
+				}
+				/* Another notifier already signaled - we're done */
+				break;
+
+			default:
+				/* Should never happen */
+				elog(ERROR, "unexpected async state %u for backend %d",
+						current_state, pid);
 		}
 
-		/*
-		 * Note: assuming things aren't broken, a signal failure here could
-		 * only occur if the target backend exited since we released
-		 * NotifyQueueLock; which is unlikely but certainly possible. So we
-		 * just log a low-level debug message if it happens.
-		 */
-		if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0)
-			elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
+		/* Send signal if needed */
+		if (signal_needed)
+		{
+			/*
+			 * For our own process, no need to involve the kernel
+			 */
+			if (pid == MyProcPid)
+			{
+				SetLatch(MyLatch);
+			}
+			else
+			{
+				/*
+				 * Note: assuming things aren't broken, a signal failure here could
+				 * only occur if the target backend exited since we released
+				 * NotifyQueueLock; which is unlikely but certainly possible. So we
+				 * just log a low-level debug message if it happens.
+				 */
+				if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procno) < 0)
+					elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
+			}
+		}
 	}
 
 	pfree(pids);
@@ -1805,20 +1880,43 @@ HandleNotifyInterrupt(void)
 {
 	/*
 	 * Note: this is called by a SIGNAL HANDLER. You must be very wary what
-	 * you do here.
+	 * you do here. The actual state transition has already been done by
+	 * the notifier before sending the signal, so we only need to set the
+	 * latch to ensure the backend wakes up and processes the notification.
 	 */
 
-	/* signal that work needs to be done */
-	notifyInterruptPending = true;
-
 	/* make sure the event is processed in due course */
 	SetLatch(MyLatch);
 }
 
+/*
+ * IsNotifyInterruptPending
+ *
+ *		Check if there's a pending notify interrupt for this backend
+ */
+bool
+IsNotifyInterruptPending(void)
+{
+	uint32		state;
+
+	/* If not registered as a listener, no notifications are pending */
+	if (!amRegisteredListener)
+		return false;
+
+	/*
+	 * Read the current state with a memory barrier to ensure we see
+	 * the most recent value written by notifiers.
+	 */
+	state = pg_atomic_read_membarrier_u32(&QUEUE_BACKEND_STATE(MyProcNumber));
+
+	/* Notification is pending if state is SIGNALLED */
+	return (state == ASYNC_STATE_SIGNALLED);
+}
+
 /*
  * ProcessNotifyInterrupt
  *
- *		This is called if we see notifyInterruptPending set, just before
+ *		This is called if we see a notification interrupt is pending, just before
  *		transmitting ReadyForQuery at the end of a frontend command, and
  *		also if a notify signal occurs while reading from the frontend.
  *		HandleNotifyInterrupt() will cause the read to be interrupted
@@ -1837,7 +1935,7 @@ ProcessNotifyInterrupt(bool flush)
 		return;					/* not really idle */
 
 	/* Loop in case another signal arrives while sending messages */
-	while (notifyInterruptPending)
+	while (IsNotifyInterruptPending())
 		ProcessIncomingNotify(flush);
 }
 
@@ -2182,28 +2280,81 @@ asyncQueueAdvanceTail(void)
 static void
 ProcessIncomingNotify(bool flush)
 {
-	/* We *must* reset the flag */
-	notifyInterruptPending = false;
+	uint32		expected;
 
-	/* Do nothing else if we aren't actively listening */
+	/* Do nothing if we aren't actively listening */
 	if (listenChannels == NIL)
 		return;
 
+	/*
+	 * Perform state transition from SIGNALLED to PROCESSING.
+	 * This is the "acquire lock" operation for the listener.
+	 */
+	expected = ASYNC_STATE_SIGNALLED;
+	if (!pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(MyProcNumber),
+										&expected,
+										ASYNC_STATE_PROCESSING))
+	{
+		/*
+		 * CAS failed - the state was not SIGNALLED. This should not happen
+		 * as ProcessNotifyInterrupt only calls us when state is SIGNALLED.
+		 */
+		elog(ERROR, "unexpected async state %u in ProcessIncomingNotify, expected SIGNALLED",
+			 expected);
+	}
+
 	if (Trace_notify)
-		elog(DEBUG1, "ProcessIncomingNotify");
+		elog(DEBUG1, "ProcessIncomingNotify: transitioned to PROCESSING");
 
 	set_ps_display("notify interrupt");
 
 	/*
-	 * We must run asyncQueueReadAllNotifications inside a transaction, else
-	 * bad things happen if it gets an error.
-	 */
+		* We must run asyncQueueReadAllNotifications inside a transaction, else
+		* bad things happen if it gets an error.
+		*/
 	StartTransactionCommand();
 
 	asyncQueueReadAllNotifications();
 
 	CommitTransactionCommand();
 
+	/*
+	 * Try to transition from PROCESSING back to IDLE.
+	 * This is the "release lock" operation for the listener.
+	 */
+	expected = ASYNC_STATE_PROCESSING;
+	if (pg_atomic_compare_exchange_u32(&QUEUE_BACKEND_STATE(MyProcNumber),
+										&expected,
+										ASYNC_STATE_IDLE))
+	{
+		/* Success - we're done, transitioned to IDLE */
+		if (Trace_notify)
+			elog(DEBUG1, "ProcessIncomingNotify: transitioned to IDLE");
+	}
+	else
+	{
+		/* CAS failed - check what the new state is */
+		if (expected == ASYNC_STATE_SIGNALLED)
+		{
+			/*
+				* A notifier set our state to SIGNALLED while we were processing.
+				* We are done with this batch of work, but we know there is more
+				* to do. Rather than loop here and risk starving other backend
+				* activity, we set our own latch to ensure we are woken up again
+				* to re-process, and then exit. The state is left as SIGNALLED.
+				*/
+			if (Trace_notify)
+				elog(DEBUG1, "ProcessIncomingNotify: signalled while processing");
+			SetLatch(MyLatch);
+		}
+		else
+		{
+			/* Any other state is an error */
+			elog(ERROR, "unexpected async state %u when trying to return to IDLE",
+					expected);
+		}
+	}
+
 	/*
 	 * If this isn't an end-of-command case, we must flush the notify messages
 	 * to ensure frontend gets them promptly.
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index a297606cdd7..e1d80cbefea 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -512,7 +512,7 @@ ProcessClientReadInterrupt(bool blocked)
 			ProcessCatchupInterrupt();
 
 		/* Process notify interrupts, if any */
-		if (notifyInterruptPending)
+		if (IsNotifyInterruptPending())
 			ProcessNotifyInterrupt(true);
 	}
 	else if (ProcDiePending)
@@ -4604,7 +4604,7 @@ PostgresMain(const char *dbname, const char *username)
 				 * were received during the just-finished transaction, they'll
 				 * be seen by the client before ReadyForQuery is.
 				 */
-				if (notifyInterruptPending)
+				if (IsNotifyInterruptPending())
 					ProcessNotifyInterrupt(false);
 
 				/*
diff --git a/src/include/commands/async.h b/src/include/commands/async.h
index f75c3df9556..7f2e0ac0b9f 100644
--- a/src/include/commands/async.h
+++ b/src/include/commands/async.h
@@ -17,7 +17,6 @@
 
 extern PGDLLIMPORT bool Trace_notify;
 extern PGDLLIMPORT int max_notify_queue_pages;
-extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;
 
 extern Size AsyncShmemSize(void);
 extern void AsyncShmemInit(void);
@@ -46,4 +45,7 @@ extern void HandleNotifyInterrupt(void);
 /* process interrupts */
 extern void ProcessNotifyInterrupt(bool flush);
 
+/* check if notification interrupt is pending */
+extern bool IsNotifyInterruptPending(void);
+
 #endif							/* ASYNC_H */
-- 
2.47.1



  [application/octet-stream] 0002-Optimize-LISTEN-NOTIFY-wakeup-by-replacing-signal-wi.patch (3.3K, 5-0002-Optimize-LISTEN-NOTIFY-wakeup-by-replacing-signal-wi.patch)
  download | inline diff:
From 31e419747ab92dbc29d0d9db58d88ff2d2caf5c9 Mon Sep 17 00:00:00 2001
From: Joel Jacobson <[email protected]>
Date: Thu, 24 Jul 2025 21:17:19 +0200
Subject: [PATCH 2/2] Optimize LISTEN/NOTIFY wakeup by replacing signal with
 direct SetLatch

Building upon the robust atomic state machine introduced in the previous
commit, this change completes the modernization of NOTIFY IPC by
replacing its wakeup mechanism. With inter-process state now managed
reliably, the heavyweight SIGUSR1 signal is no longer necessary and is
replaced with a much more efficient, direct "poke."

The async.c notifier now replaces its call to SendProcSignal with a
direct call to SetLatch on the target backend's procLatch. This is a
significant optimization because WaitLatch, which listeners already use
for blocking, is underpinned by the modern WaitEventSet abstraction
(kqueue, epoll, etc.). We now leverage this existing, highly efficient
infrastructure for the wakeup, completely bypassing the kill() syscall
and the SIGUSR1 signal handler for all NOTIFY events.

This demonstrates a powerful, two-step migration pattern:

1. First, solve a subsystem's state synchronization problem with a
   lock-free, atomic FSM to eliminate redundant signaling.

2. Then, with state management handled, make the wakeup itself cheaper
   by replacing the expensive signal with a direct SetLatch.

This staged approach allows us to modernize subsystems incrementally and
safely. By applying this pattern to async.c, we prove its viability and
simplicity, creating a clear template for other parts of the system to
follow in moving towards a more performant, signal-free IPC model.
---
 src/backend/commands/async.c | 25 +++++++++++++++++++------
 1 file changed, 19 insertions(+), 6 deletions(-)

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index ae20017af9b..c871774b72c 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -142,6 +142,7 @@
 #include "miscadmin.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "storage/proc.h"
 #include "storage/procsignal.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
@@ -1719,13 +1720,25 @@ SignalBackends(void)
 			else
 			{
 				/*
-				 * Note: assuming things aren't broken, a signal failure here could
-				 * only occur if the target backend exited since we released
-				 * NotifyQueueLock; which is unlikely but certainly possible. So we
-				 * just log a low-level debug message if it happens.
+				 * Get the target backend's PGPROC and set its latch.
+				 *
+				 * Note: The target backend might exit after we released
+				 * NotifyQueueLock but before we set the latch. We need to
+				 * handle the race condition where the PGPROC slot might be
+				 * recycled by a new process with a different PID.
 				 */
-				if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procno) < 0)
-					elog(DEBUG3, "could not signal backend with PID %d: %m", pid);
+				PGPROC *proc = GetPGProcByNumber(procno);
+
+				/* Verify the PID hasn't changed (backend hasn't exited) */
+				if (proc->pid == pid)
+				{
+					SetLatch(&proc->procLatch);
+				}
+				else
+				{
+					/* Backend exited and slot was recycled */
+					elog(DEBUG3, "could not signal backend with PID %d: process no longer exists", pid);
+				}
 			}
 		}
 	}
-- 
2.47.1



reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected]
  Subject: Re: Optimize LISTEN/NOTIFY
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox