public inbox for [email protected]
help / color / mirror / Atom feedPoC: Add condition variable support to WaitEventSetWait()
10+ messages / 3 participants
[nested] [flat]
* PoC: Add condition variable support to WaitEventSetWait()
@ 2026-03-31 07:28 Chao Li <[email protected]>
0 siblings, 1 reply; 10+ messages in thread
From: Chao Li @ 2026-03-31 07:28 UTC (permalink / raw)
To: PostgreSQL Hackers <[email protected]>
Hi,
There is an XXX comment in WalSndWait():
```
* XXX: A desirable future improvement would be to add support for CVs
* into WaitEventSetWait().
```
I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
I’d like to highlight a few key points about the design:
1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
Here is the v1 patch.
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
Attachments:
[application/octet-stream] v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch (22.7K, 2-v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch)
download | inline diff:
From 8fe7d4382f4a6c28be01b40fdc3c1b834a214e09 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <[email protected]>
Date: Tue, 31 Mar 2026 08:25:12 +0800
Subject: [PATCH v1] Add condition variable support to WaitEventSetWait()
WalSndWait() currently combines WaitEventSetWait() with manual
ConditionVariablePrepareToSleep() / ConditionVariableCancelSleep()
calls.
This patch teaches WaitEventSetWait() to cooperate with condition
variables directly. A wait-event set can now include a condition
variable event, and WaitEventSetWait() takes care of preparing to
sleep on the condition variable before blocking and canceling the
sleep afterwards.
With that in place, WalSndWait() no longer needs to manage the
condition-variable sleep state manually, and can express the wakeup
source through the wait-event set itself.
Author: Chao Li <[email protected]>
---
src/backend/executor/nodeAppend.c | 4 +-
src/backend/libpq/be-secure.c | 4 +-
src/backend/libpq/pqcomm.c | 12 +-
src/backend/postmaster/postmaster.c | 4 +-
src/backend/postmaster/syslogger.c | 4 +-
src/backend/replication/walsender.c | 17 ++-
src/backend/storage/ipc/latch.c | 16 +--
src/backend/storage/ipc/waiteventset.c | 116 +++++++++++++++++-
src/backend/storage/lmgr/condition_variable.c | 33 +++--
src/backend/utils/init/miscinit.c | 4 +-
src/include/libpq/libpq.h | 3 +-
src/include/storage/condition_variable.h | 1 +
src/include/storage/waiteventset.h | 7 +-
13 files changed, 179 insertions(+), 46 deletions(-)
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 85c85569b5e..27895b96b50 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1045,7 +1045,7 @@ ExecAppendAsyncEventWait(AppendState *node)
Assert(node->as_eventset == NULL);
node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/* Give each waiting subplan a chance to add an event. */
i = -1;
@@ -1081,7 +1081,7 @@ ExecAppendAsyncEventWait(AppendState *node)
* extensions too.
*/
AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
/* Return at most EVENT_BUFFER_SIZE events in one call. */
if (nevents > EVENT_BUFFER_SIZE)
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 617704bb993..0a406a050d9 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -217,7 +217,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_READ);
@@ -342,7 +342,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_WRITE);
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 4a442f22df6..5d3a5c22db0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -177,6 +177,7 @@ pq_init(ClientSocket *client_sock)
Port *port;
int socket_pos PG_USED_FOR_ASSERTS_ONLY;
int latch_pos PG_USED_FOR_ASSERTS_ONLY;
+ int cv_pos PG_USED_FOR_ASSERTS_ONLY;
/* allocate the Port struct and copy the ClientSocket contents to it */
port = palloc0_object(Port);
@@ -307,11 +308,13 @@ pq_init(ClientSocket *client_sock)
FeBeWaitSet = CreateWaitEventSet(NULL, FeBeWaitSetNEvents);
socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
- port->sock, NULL, NULL);
+ port->sock, NULL, NULL, NULL);
latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
+ cv_pos = AddWaitEventToSet(FeBeWaitSet, WL_CONDITION_VARIABLE, PGINVALID_SOCKET,
+ NULL, NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/*
* The event positions match the order we added them, but let's sanity
@@ -319,6 +322,7 @@ pq_init(ClientSocket *client_sock)
*/
Assert(socket_pos == FeBeWaitSetSocketPos);
Assert(latch_pos == FeBeWaitSetLatchPos);
+ Assert(cv_pos == FeBeWaitSetCVPos);
return port;
}
@@ -2063,7 +2067,7 @@ pq_check_connection(void)
* It's OK to modify the socket event filter without restoring, because
* all FeBeWaitSet socket wait sites do the same.
*/
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL, NULL);
retry:
rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index abf0c97569e..f96cce8730d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1638,13 +1638,13 @@ ConfigurePostmasterWaitSet(bool accept_connections)
pm_wait_set = CreateWaitEventSet(NULL,
accept_connections ? (1 + NumListenSockets) : 1);
AddWaitEventToSet(pm_wait_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
- NULL);
+ NULL, NULL);
if (accept_connections)
{
for (int i = 0; i < NumListenSockets; i++)
AddWaitEventToSet(pm_wait_set, WL_SOCKET_ACCEPT, ListenSockets[i],
- NULL, NULL);
+ NULL, NULL, NULL);
}
}
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index 0c2a7bc8578..4db7df8e4a5 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -338,9 +338,9 @@ SysLoggerMain(const void *startup_data, size_t startup_data_len)
* (including the postmaster).
*/
wes = CreateWaitEventSet(NULL, 2);
- AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+ AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL, NULL);
#ifndef WIN32
- AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
+ AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL, NULL);
#endif
/* main worker loop */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 66507e9c2dd..d2749a2ca67 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3846,8 +3846,9 @@ static void
WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
{
WaitEvent event;
+ ConditionVariable *cv = NULL;
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL, NULL);
/*
* We use a condition variable to efficiently wake up walsenders in
@@ -3867,9 +3868,6 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* release for every iteration, just to wake up only the waiting
* walsenders. It makes WalSndWakeup() callers' life easy.
*
- * XXX: A desirable future improvement would be to add support for CVs
- * into WaitEventSetWait().
- *
* And, we use separate shared memory CVs for physical and logical
* walsenders for selective wake ups, see WalSndWakeup() for more details.
*
@@ -3878,20 +3876,19 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* the receipt of the LSN.
*/
if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+ cv = &WalSndCtl->wal_confirm_rcv_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
+ cv = &WalSndCtl->wal_flush_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
+ cv = &WalSndCtl->wal_replay_cv;
+
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetCVPos, WL_CONDITION_VARIABLE, NULL, cv);
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
(event.events & WL_POSTMASTER_DEATH))
{
- ConditionVariableCancelSleep();
proc_exit(1);
}
-
- ConditionVariableCancelSleep();
}
/*
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 8537e9fef2d..9b148d0f52b 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -41,7 +41,7 @@ InitializeLatchWaitSet(void)
/* Set up the WaitEventSet used by WaitLatch(). */
LatchWaitSet = CreateWaitEventSet(NULL, 2);
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
Assert(latch_pos == LatchWaitSetLatchPos);
/*
@@ -51,7 +51,7 @@ InitializeLatchWaitSet(void)
if (IsUnderPostmaster)
{
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_EXIT_ON_PM_DEATH,
- PGINVALID_SOCKET, NULL, NULL);
+ PGINVALID_SOCKET, NULL, NULL, NULL);
Assert(latch_pos == LatchWaitSetPostmasterDeathPos);
}
}
@@ -186,12 +186,12 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout,
*/
if (!(wakeEvents & WL_LATCH_SET))
latch = NULL;
- ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch);
+ ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch, NULL);
if (IsUnderPostmaster)
ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos,
(wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)),
- NULL);
+ NULL, NULL);
if (WaitEventSetWait(LatchWaitSet,
(wakeEvents & WL_TIMEOUT) ? timeout : -1,
@@ -235,7 +235,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if (wakeEvents & WL_LATCH_SET)
AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
- latch, NULL);
+ latch, NULL, NULL);
/* Postmaster-managed callers must handle postmaster death somehow. */
Assert(!IsUnderPostmaster ||
@@ -244,18 +244,18 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if ((wakeEvents & WL_POSTMASTER_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if ((wakeEvents & WL_EXIT_ON_PM_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if (wakeEvents & WL_SOCKET_MASK)
{
int ev;
ev = wakeEvents & WL_SOCKET_MASK;
- AddWaitEventToSet(set, ev, sock, NULL, NULL);
+ AddWaitEventToSet(set, ev, sock, NULL, NULL, NULL);
}
rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
diff --git a/src/backend/storage/ipc/waiteventset.c b/src/backend/storage/ipc/waiteventset.c
index 0f228e1e7b8..2382e0cae2a 100644
--- a/src/backend/storage/ipc/waiteventset.c
+++ b/src/backend/storage/ipc/waiteventset.c
@@ -73,6 +73,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/waiteventset.h"
#include "utils/memutils.h"
@@ -137,6 +138,15 @@ struct WaitEventSet
Latch *latch;
int latch_pos;
+ /*
+ * If WL_CONDITION_VARIABLE is specified in any wait event, cv is a
+ * pointer to said condition variable, and cv_pos the offset in the
+ * ->events array.
+ */
+ ConditionVariable *cv;
+ int cv_pos;
+ bool cv_maybe_signaled; /* Decide if check signaled after poll */
+
/*
* WL_EXIT_ON_PM_DEATH is converted to WL_POSTMASTER_DEATH, but this flag
* is set so that we'll exit immediately if postmaster death is detected,
@@ -414,6 +424,8 @@ CreateWaitEventSet(ResourceOwner resowner, int nevents)
#endif
set->latch = NULL;
+ set->cv = NULL;
+ set->cv_maybe_signaled = false;
set->nevents_space = nevents;
set->exit_on_postmaster_death = false;
@@ -550,6 +562,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
* platforms, this is the same as WL_SOCKET_READABLE)
* - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
* - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
+ * - WL_CONDITION_VARIABLE: Wait for a condition variable to be signaled.
*
* Returns the offset in WaitEventSet->events (starting from 0), which can be
* used to modify previously added wait events using ModifyWaitEvent().
@@ -568,7 +581,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
*/
int
AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
- void *user_data)
+ ConditionVariable *cv, void *user_data)
{
WaitEvent *event;
@@ -596,6 +609,18 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
elog(ERROR, "cannot wait on latch without a specified latch");
}
+ /*
+ * When add a CV event, we allow a NULL cv, because it can be set later
+ * with ModifyWaitEvent.
+ */
+ if (cv)
+ {
+ if (set->cv)
+ elog(ERROR, "cannot wait on more than one condition variable");
+ if ((events & WL_CONDITION_VARIABLE) != WL_CONDITION_VARIABLE)
+ elog(ERROR, "condition variable events only support being set");
+ }
+
/* waiting for socket readiness without a socket indicates a bug */
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
@@ -624,6 +649,13 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#endif
#endif
}
+ else if (events == WL_CONDITION_VARIABLE)
+ {
+ set->cv = cv;
+ set->cv_pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ return event->pos;
+ }
else if (events == WL_POSTMASTER_DEATH)
{
#ifndef WIN32
@@ -653,14 +685,15 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
* 'pos' is the id returned by AddWaitEventToSet.
*/
void
-ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
+ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch,
+ ConditionVariable *cv)
{
WaitEvent *event;
#if defined(WAIT_USE_KQUEUE)
int old_events;
#endif
- Assert(pos < set->nevents);
+ Assert(pos >= 0 && pos < set->nevents);
event = &set->events[pos];
#if defined(WAIT_USE_KQUEUE)
@@ -682,6 +715,21 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
return;
}
+ /*
+ * Change the condition variable associated with the event. As CV doesn't
+ * rely on kernal, go ahead with a fast-path and only update the CV
+ * pointer.
+ */
+ if (event->events == WL_CONDITION_VARIABLE)
+ {
+ if (events != WL_CONDITION_VARIABLE)
+ elog(ERROR, "cannot change event type of a condition variable event");
+ if (cv == NULL)
+ elog(ERROR, "cannot set condition variable to NULL");
+ set->cv = cv;
+ return;
+ }
+
/*
* If neither the event mask nor the associated latch changes, return
* early. That's an important optimization for some sockets, where
@@ -1042,9 +1090,11 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
uint32 wait_event_info)
{
int returned_events = 0;
+ WaitEvent *first_occurred_event = occurred_events;
instr_time start_time;
instr_time cur_time;
long cur_timeout = -1;
+ bool cv_signaled = false;
Assert(nevents > 0);
@@ -1061,6 +1111,9 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
else
INSTR_TIME_SET_ZERO(start_time);
+ if (set->cv != NULL)
+ ConditionVariablePrepareToSleep(set->cv);
+
pgstat_report_wait_start(wait_event_info);
#ifndef WIN32
@@ -1072,6 +1125,7 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
while (returned_events == 0)
{
int rc;
+ WaitEvent *block_occurred_events = occurred_events;
/*
* Check if the latch is set already first. If so, we either exit
@@ -1115,6 +1169,7 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
occurred_events->user_data =
set->events[set->latch_pos].user_data;
occurred_events->events = WL_LATCH_SET;
+
occurred_events++;
returned_events++;
@@ -1133,6 +1188,38 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
timeout = 0;
}
+ if (set->cv)
+ cv_signaled = ConditionVariableSignaled(set->cv);
+
+ if (set->cv && !cv_signaled)
+ set->cv_maybe_signaled = true;
+
+ if (set->cv && cv_signaled)
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+
+ occurred_events++;
+ returned_events++;
+
+ /* could have been set above */
+ set->cv_maybe_signaled = false;
+
+ if (returned_events == nevents)
+ break; /* output buffer full already */
+
+ /*
+ * Even though we already have an event, we'll poll just once with
+ * zero timeout to see what non-latch events we can fit into the
+ * output buffer at the same time.
+ */
+ cur_timeout = 0;
+ timeout = 0;
+ }
+
/*
* Wait for events using the readiness primitive chosen at the top of
* this file. If -1 is returned, a timeout has occurred, if 0 we have
@@ -1145,6 +1232,25 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
set->latch->maybe_sleeping)
set->latch->maybe_sleeping = false;
+ /* Check CV again after waiting if not done before waiting */
+ if (set->cv && set->cv_maybe_signaled && ConditionVariableSignaled(set->cv))
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+
+ occurred_events++;
+ returned_events++;
+
+ if (returned_events == nevents)
+ break; /* output buffer full already */
+ }
+
+ if (set->cv && set->cv_maybe_signaled)
+ set->cv_maybe_signaled = false;
+
if (rc == -1)
break; /* timeout occurred */
else
@@ -1166,10 +1272,12 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
pgstat_report_wait_end();
+ if (set->cv != NULL)
+ ConditionVariableCancelSleep();
+
return returned_events;
}
-
#if defined(WAIT_USE_EPOLL)
/*
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 1f16b3f7475..c04bc420bcf 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -182,13 +182,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
* by something other than ConditionVariableSignal; though we don't
* guarantee not to return spuriously, we'll avoid this obvious case.
*/
- SpinLockAcquire(&cv->mutex);
- if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
- {
- done = true;
- proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
- }
- SpinLockRelease(&cv->mutex);
+ done = ConditionVariableSignaled(cv);
/*
* Check for interrupts, and return spuriously if that caused the
@@ -217,6 +211,31 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
}
}
+/*
+ * Check whether this process was removed from cv's wait list by a CV
+ * signal/broadcast. If so, re-add it to preserve wakeups while the caller
+ * checks the predicate.
+ */
+bool
+ConditionVariableSignaled(ConditionVariable *cv)
+{
+ bool signaled = false;
+
+ /* Ignore probes for CVs we are not currently prepared to sleep on. */
+ if (cv_sleep_target != cv)
+ return false;
+
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
+ {
+ signaled = true;
+ proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
+ }
+ SpinLockRelease(&cv->mutex);
+
+ return signaled;
+}
+
/*
* Cancel any pending sleep operation.
*
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index ba191977697..b970c1d727c 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -222,7 +222,7 @@ SwitchToSharedLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
/*
* Set the shared latch as the local one might have been set. This
@@ -249,7 +249,7 @@ SwitchBackToLocalLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
SetLatch(MyLatch);
}
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index c9b934d2321..8d13eaf8393 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -65,7 +65,8 @@ extern PGDLLIMPORT WaitEventSet *FeBeWaitSet;
#define FeBeWaitSetSocketPos 0
#define FeBeWaitSetLatchPos 1
-#define FeBeWaitSetNEvents 3
+#define FeBeWaitSetCVPos 2
+#define FeBeWaitSetNEvents 4
extern int ListenServerPort(int family, const char *hostName,
unsigned short portNumber, const char *unixSocketDir,
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 14bd6dd55c0..fb7e4bb67f2 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -57,6 +57,7 @@ extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info
extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info);
extern bool ConditionVariableCancelSleep(void);
+extern bool ConditionVariableSignaled(ConditionVariable *cv);
/*
* Optionally, ConditionVariablePrepareToSleep can be called before entering
diff --git a/src/include/storage/waiteventset.h b/src/include/storage/waiteventset.h
index 5341267f0a0..f3d569a6aa0 100644
--- a/src/include/storage/waiteventset.h
+++ b/src/include/storage/waiteventset.h
@@ -25,6 +25,7 @@
#ifndef WAITEVENTSET_H
#define WAITEVENTSET_H
+#include "storage/condition_variable.h"
#include "utils/resowner.h"
/*
@@ -50,6 +51,7 @@
/* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_ACCEPT WL_SOCKET_READABLE
#endif
+#define WL_CONDITION_VARIABLE (1 << 9)
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
WL_SOCKET_CONNECTED | \
@@ -81,9 +83,10 @@ extern WaitEventSet *CreateWaitEventSet(ResourceOwner resowner, int nevents);
extern void FreeWaitEventSet(WaitEventSet *set);
extern void FreeWaitEventSetAfterFork(WaitEventSet *set);
extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
- struct Latch *latch, void *user_data);
+ struct Latch *latch, ConditionVariable *cv,
+ void *user_data);
extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events,
- struct Latch *latch);
+ struct Latch *latch, ConditionVariable *cv);
extern int WaitEventSetWait(WaitEventSet *set, long timeout,
WaitEvent *occurred_events, int nevents,
uint32 wait_event_info);
--
2.50.1 (Apple Git-155)
^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: PoC: Add condition variable support to WaitEventSetWait()
@ 2026-04-02 07:38 Chao Li <[email protected]>
parent: Chao Li <[email protected]>
0 siblings, 1 reply; 10+ messages in thread
From: Chao Li @ 2026-04-02 07:38 UTC (permalink / raw)
To: PostgreSQL Hackers <[email protected]>
> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
>
>
>
>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
>>
>> Hi,
>>
>> There is an XXX comment in WalSndWait():
>> ```
>> * XXX: A desirable future improvement would be to add support for CVs
>> * into WaitEventSetWait().
>> ```
>>
>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
>>
>> I’d like to highlight a few key points about the design:
>>
>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
>>
>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
>>
>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
>>
>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
>>
>> Here is the v1 patch.
>>
>> Best regards,
>> --
>> Chao Li (Evan)
>> HighGo Software Co., Ltd.
>> https://www.highgo.com/
>>
>>
>>
>>
>> <v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
>
>
> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
>
> Best regards,
> --
> Chao Li (Evan)
> HighGo Software Co., Ltd.
> https://www.highgo.com/
>
>
>
>
> <v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
PFA v2 - fixed a CI failure from contrib/postgres_fdw.
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
Attachments:
[application/octet-stream] v2-0001-Add-condition-variable-support-to-WaitEventSetWai.patch (23.4K, 2-v2-0001-Add-condition-variable-support-to-WaitEventSetWai.patch)
download | inline diff:
From 8e30e9c9d111942dc232617392836727da292197 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <[email protected]>
Date: Tue, 31 Mar 2026 08:25:12 +0800
Subject: [PATCH v2] Add condition variable support to WaitEventSetWait()
WalSndWait() currently combines WaitEventSetWait() with manual
ConditionVariablePrepareToSleep() / ConditionVariableCancelSleep()
calls.
This patch teaches WaitEventSetWait() to cooperate with condition
variables directly. A wait-event set can now include a condition
variable event, and WaitEventSetWait() takes care of preparing to
sleep on the condition variable before blocking and canceling the
sleep afterwards.
With that in place, WalSndWait() no longer needs to manage the
condition-variable sleep state manually, and can express the wakeup
source through the wait-event set itself.
Author: Chao Li <[email protected]>
---
contrib/postgres_fdw/postgres_fdw.c | 2 +-
src/backend/executor/execAsync.c | 2 +-
src/backend/executor/nodeAppend.c | 4 +-
src/backend/libpq/be-secure.c | 4 +-
src/backend/libpq/pqcomm.c | 12 +-
src/backend/postmaster/postmaster.c | 4 +-
src/backend/postmaster/syslogger.c | 4 +-
src/backend/replication/walsender.c | 17 ++-
src/backend/storage/ipc/latch.c | 16 +--
src/backend/storage/ipc/waiteventset.c | 114 +++++++++++++++++-
src/backend/storage/lmgr/condition_variable.c | 33 +++--
src/backend/utils/init/miscinit.c | 4 +-
src/include/libpq/libpq.h | 3 +-
src/include/storage/condition_variable.h | 1 +
src/include/storage/waiteventset.h | 7 +-
15 files changed, 179 insertions(+), 48 deletions(-)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 41e47cc795b..5895c59a5c5 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -7277,7 +7277,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
Assert(pendingAreq == areq);
AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
- NULL, areq);
+ NULL, NULL, areq);
}
/*
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index cf7ddbb01f4..e048328f583 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -57,7 +57,7 @@ ExecAsyncRequest(AsyncRequest *areq)
* for which it wishes to wait. We expect the node-type specific callback to
* make a single call of the following form:
*
- * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, NULL, areq);
*/
void
ExecAsyncConfigureWait(AsyncRequest *areq)
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 85c85569b5e..27895b96b50 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1045,7 +1045,7 @@ ExecAppendAsyncEventWait(AppendState *node)
Assert(node->as_eventset == NULL);
node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/* Give each waiting subplan a chance to add an event. */
i = -1;
@@ -1081,7 +1081,7 @@ ExecAppendAsyncEventWait(AppendState *node)
* extensions too.
*/
AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
/* Return at most EVENT_BUFFER_SIZE events in one call. */
if (nevents > EVENT_BUFFER_SIZE)
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 617704bb993..0a406a050d9 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -217,7 +217,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_READ);
@@ -342,7 +342,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_WRITE);
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 4a442f22df6..5d3a5c22db0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -177,6 +177,7 @@ pq_init(ClientSocket *client_sock)
Port *port;
int socket_pos PG_USED_FOR_ASSERTS_ONLY;
int latch_pos PG_USED_FOR_ASSERTS_ONLY;
+ int cv_pos PG_USED_FOR_ASSERTS_ONLY;
/* allocate the Port struct and copy the ClientSocket contents to it */
port = palloc0_object(Port);
@@ -307,11 +308,13 @@ pq_init(ClientSocket *client_sock)
FeBeWaitSet = CreateWaitEventSet(NULL, FeBeWaitSetNEvents);
socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
- port->sock, NULL, NULL);
+ port->sock, NULL, NULL, NULL);
latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
+ cv_pos = AddWaitEventToSet(FeBeWaitSet, WL_CONDITION_VARIABLE, PGINVALID_SOCKET,
+ NULL, NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/*
* The event positions match the order we added them, but let's sanity
@@ -319,6 +322,7 @@ pq_init(ClientSocket *client_sock)
*/
Assert(socket_pos == FeBeWaitSetSocketPos);
Assert(latch_pos == FeBeWaitSetLatchPos);
+ Assert(cv_pos == FeBeWaitSetCVPos);
return port;
}
@@ -2063,7 +2067,7 @@ pq_check_connection(void)
* It's OK to modify the socket event filter without restoring, because
* all FeBeWaitSet socket wait sites do the same.
*/
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL, NULL);
retry:
rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index abf0c97569e..f96cce8730d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1638,13 +1638,13 @@ ConfigurePostmasterWaitSet(bool accept_connections)
pm_wait_set = CreateWaitEventSet(NULL,
accept_connections ? (1 + NumListenSockets) : 1);
AddWaitEventToSet(pm_wait_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
- NULL);
+ NULL, NULL);
if (accept_connections)
{
for (int i = 0; i < NumListenSockets; i++)
AddWaitEventToSet(pm_wait_set, WL_SOCKET_ACCEPT, ListenSockets[i],
- NULL, NULL);
+ NULL, NULL, NULL);
}
}
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index 0c2a7bc8578..4db7df8e4a5 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -338,9 +338,9 @@ SysLoggerMain(const void *startup_data, size_t startup_data_len)
* (including the postmaster).
*/
wes = CreateWaitEventSet(NULL, 2);
- AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+ AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL, NULL);
#ifndef WIN32
- AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
+ AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL, NULL);
#endif
/* main worker loop */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 66507e9c2dd..d2749a2ca67 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3846,8 +3846,9 @@ static void
WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
{
WaitEvent event;
+ ConditionVariable *cv = NULL;
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL, NULL);
/*
* We use a condition variable to efficiently wake up walsenders in
@@ -3867,9 +3868,6 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* release for every iteration, just to wake up only the waiting
* walsenders. It makes WalSndWakeup() callers' life easy.
*
- * XXX: A desirable future improvement would be to add support for CVs
- * into WaitEventSetWait().
- *
* And, we use separate shared memory CVs for physical and logical
* walsenders for selective wake ups, see WalSndWakeup() for more details.
*
@@ -3878,20 +3876,19 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* the receipt of the LSN.
*/
if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+ cv = &WalSndCtl->wal_confirm_rcv_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
+ cv = &WalSndCtl->wal_flush_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
+ cv = &WalSndCtl->wal_replay_cv;
+
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetCVPos, WL_CONDITION_VARIABLE, NULL, cv);
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
(event.events & WL_POSTMASTER_DEATH))
{
- ConditionVariableCancelSleep();
proc_exit(1);
}
-
- ConditionVariableCancelSleep();
}
/*
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 8537e9fef2d..9b148d0f52b 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -41,7 +41,7 @@ InitializeLatchWaitSet(void)
/* Set up the WaitEventSet used by WaitLatch(). */
LatchWaitSet = CreateWaitEventSet(NULL, 2);
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
Assert(latch_pos == LatchWaitSetLatchPos);
/*
@@ -51,7 +51,7 @@ InitializeLatchWaitSet(void)
if (IsUnderPostmaster)
{
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_EXIT_ON_PM_DEATH,
- PGINVALID_SOCKET, NULL, NULL);
+ PGINVALID_SOCKET, NULL, NULL, NULL);
Assert(latch_pos == LatchWaitSetPostmasterDeathPos);
}
}
@@ -186,12 +186,12 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout,
*/
if (!(wakeEvents & WL_LATCH_SET))
latch = NULL;
- ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch);
+ ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch, NULL);
if (IsUnderPostmaster)
ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos,
(wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)),
- NULL);
+ NULL, NULL);
if (WaitEventSetWait(LatchWaitSet,
(wakeEvents & WL_TIMEOUT) ? timeout : -1,
@@ -235,7 +235,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if (wakeEvents & WL_LATCH_SET)
AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
- latch, NULL);
+ latch, NULL, NULL);
/* Postmaster-managed callers must handle postmaster death somehow. */
Assert(!IsUnderPostmaster ||
@@ -244,18 +244,18 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if ((wakeEvents & WL_POSTMASTER_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if ((wakeEvents & WL_EXIT_ON_PM_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if (wakeEvents & WL_SOCKET_MASK)
{
int ev;
ev = wakeEvents & WL_SOCKET_MASK;
- AddWaitEventToSet(set, ev, sock, NULL, NULL);
+ AddWaitEventToSet(set, ev, sock, NULL, NULL, NULL);
}
rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
diff --git a/src/backend/storage/ipc/waiteventset.c b/src/backend/storage/ipc/waiteventset.c
index 0f228e1e7b8..a685c93a5ad 100644
--- a/src/backend/storage/ipc/waiteventset.c
+++ b/src/backend/storage/ipc/waiteventset.c
@@ -73,6 +73,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/waiteventset.h"
#include "utils/memutils.h"
@@ -137,6 +138,15 @@ struct WaitEventSet
Latch *latch;
int latch_pos;
+ /*
+ * If WL_CONDITION_VARIABLE is specified in any wait event, cv is a
+ * pointer to said condition variable, and cv_pos the offset in the
+ * ->events array.
+ */
+ ConditionVariable *cv;
+ int cv_pos;
+ bool cv_maybe_signaled; /* Decide if check signaled after poll */
+
/*
* WL_EXIT_ON_PM_DEATH is converted to WL_POSTMASTER_DEATH, but this flag
* is set so that we'll exit immediately if postmaster death is detected,
@@ -414,6 +424,8 @@ CreateWaitEventSet(ResourceOwner resowner, int nevents)
#endif
set->latch = NULL;
+ set->cv = NULL;
+ set->cv_maybe_signaled = false;
set->nevents_space = nevents;
set->exit_on_postmaster_death = false;
@@ -550,6 +562,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
* platforms, this is the same as WL_SOCKET_READABLE)
* - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
* - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
+ * - WL_CONDITION_VARIABLE: Wait for a condition variable to be signaled.
*
* Returns the offset in WaitEventSet->events (starting from 0), which can be
* used to modify previously added wait events using ModifyWaitEvent().
@@ -568,7 +581,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
*/
int
AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
- void *user_data)
+ ConditionVariable *cv, void *user_data)
{
WaitEvent *event;
@@ -596,6 +609,18 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
elog(ERROR, "cannot wait on latch without a specified latch");
}
+ /*
+ * When add a CV event, we allow a NULL cv, because it can be set later
+ * with ModifyWaitEvent.
+ */
+ if (cv)
+ {
+ if (set->cv)
+ elog(ERROR, "cannot wait on more than one condition variable");
+ if ((events & WL_CONDITION_VARIABLE) != WL_CONDITION_VARIABLE)
+ elog(ERROR, "condition variable events only support being set");
+ }
+
/* waiting for socket readiness without a socket indicates a bug */
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
@@ -624,6 +649,13 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#endif
#endif
}
+ else if (events == WL_CONDITION_VARIABLE)
+ {
+ set->cv = cv;
+ set->cv_pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ return event->pos;
+ }
else if (events == WL_POSTMASTER_DEATH)
{
#ifndef WIN32
@@ -653,14 +685,15 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
* 'pos' is the id returned by AddWaitEventToSet.
*/
void
-ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
+ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch,
+ ConditionVariable *cv)
{
WaitEvent *event;
#if defined(WAIT_USE_KQUEUE)
int old_events;
#endif
- Assert(pos < set->nevents);
+ Assert(pos >= 0 && pos < set->nevents);
event = &set->events[pos];
#if defined(WAIT_USE_KQUEUE)
@@ -682,6 +715,21 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
return;
}
+ /*
+ * Change the condition variable associated with the event. As CV doesn't
+ * rely on kernal, go ahead with a fast-path and only update the CV
+ * pointer.
+ */
+ if (event->events == WL_CONDITION_VARIABLE)
+ {
+ if (events != WL_CONDITION_VARIABLE)
+ elog(ERROR, "cannot change event type of a condition variable event");
+ if (cv == NULL)
+ elog(ERROR, "cannot set condition variable to NULL");
+ set->cv = cv;
+ return;
+ }
+
/*
* If neither the event mask nor the associated latch changes, return
* early. That's an important optimization for some sockets, where
@@ -1061,6 +1109,9 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
else
INSTR_TIME_SET_ZERO(start_time);
+ if (set->cv != NULL)
+ ConditionVariablePrepareToSleep(set->cv);
+
pgstat_report_wait_start(wait_event_info);
#ifndef WIN32
@@ -1072,6 +1123,7 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
while (returned_events == 0)
{
int rc;
+ bool cv_signaled = false;
/*
* Check if the latch is set already first. If so, we either exit
@@ -1133,6 +1185,37 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
timeout = 0;
}
+ if (set->cv)
+ cv_signaled = ConditionVariableSignaled(set->cv);
+
+ if (set->cv && !cv_signaled)
+ set->cv_maybe_signaled = true;
+
+ if (set->cv && cv_signaled)
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+ occurred_events++;
+ returned_events++;
+
+ /* could have been set above */
+ set->cv_maybe_signaled = false;
+
+ if (returned_events == nevents)
+ break; /* output buffer full already */
+
+ /*
+ * Even though we already have an event, we'll poll just once with
+ * zero timeout to see what non-latch events we can fit into the
+ * output buffer at the same time.
+ */
+ cur_timeout = 0;
+ timeout = 0;
+ }
+
/*
* Wait for events using the readiness primitive chosen at the top of
* this file. If -1 is returned, a timeout has occurred, if 0 we have
@@ -1145,11 +1228,32 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
set->latch->maybe_sleeping)
set->latch->maybe_sleeping = false;
+ if (set->cv && set->cv_maybe_signaled)
+ set->cv_maybe_signaled = false;
+
if (rc == -1)
break; /* timeout occurred */
else
+ {
returned_events += rc;
+ /* Check CV again after waiting if not done before waiting */
+ if (set->cv && set->cv_maybe_signaled && returned_events < nevents &&
+ ConditionVariableSignaled(set->cv))
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+ occurred_events++;
+ returned_events++;
+
+ if (returned_events == nevents)
+ break; /* output buffer full already */
+ }
+ }
+
/* If we're not done, update cur_timeout for next iteration */
if (returned_events == 0 && timeout >= 0)
{
@@ -1166,10 +1270,12 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
pgstat_report_wait_end();
+ if (set->cv != NULL)
+ ConditionVariableCancelSleep();
+
return returned_events;
}
-
#if defined(WAIT_USE_EPOLL)
/*
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 1f16b3f7475..c04bc420bcf 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -182,13 +182,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
* by something other than ConditionVariableSignal; though we don't
* guarantee not to return spuriously, we'll avoid this obvious case.
*/
- SpinLockAcquire(&cv->mutex);
- if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
- {
- done = true;
- proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
- }
- SpinLockRelease(&cv->mutex);
+ done = ConditionVariableSignaled(cv);
/*
* Check for interrupts, and return spuriously if that caused the
@@ -217,6 +211,31 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
}
}
+/*
+ * Check whether this process was removed from cv's wait list by a CV
+ * signal/broadcast. If so, re-add it to preserve wakeups while the caller
+ * checks the predicate.
+ */
+bool
+ConditionVariableSignaled(ConditionVariable *cv)
+{
+ bool signaled = false;
+
+ /* Ignore probes for CVs we are not currently prepared to sleep on. */
+ if (cv_sleep_target != cv)
+ return false;
+
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
+ {
+ signaled = true;
+ proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
+ }
+ SpinLockRelease(&cv->mutex);
+
+ return signaled;
+}
+
/*
* Cancel any pending sleep operation.
*
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index ba191977697..b970c1d727c 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -222,7 +222,7 @@ SwitchToSharedLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
/*
* Set the shared latch as the local one might have been set. This
@@ -249,7 +249,7 @@ SwitchBackToLocalLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
SetLatch(MyLatch);
}
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index c9b934d2321..8d13eaf8393 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -65,7 +65,8 @@ extern PGDLLIMPORT WaitEventSet *FeBeWaitSet;
#define FeBeWaitSetSocketPos 0
#define FeBeWaitSetLatchPos 1
-#define FeBeWaitSetNEvents 3
+#define FeBeWaitSetCVPos 2
+#define FeBeWaitSetNEvents 4
extern int ListenServerPort(int family, const char *hostName,
unsigned short portNumber, const char *unixSocketDir,
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 14bd6dd55c0..fb7e4bb67f2 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -57,6 +57,7 @@ extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info
extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info);
extern bool ConditionVariableCancelSleep(void);
+extern bool ConditionVariableSignaled(ConditionVariable *cv);
/*
* Optionally, ConditionVariablePrepareToSleep can be called before entering
diff --git a/src/include/storage/waiteventset.h b/src/include/storage/waiteventset.h
index 5341267f0a0..f3d569a6aa0 100644
--- a/src/include/storage/waiteventset.h
+++ b/src/include/storage/waiteventset.h
@@ -25,6 +25,7 @@
#ifndef WAITEVENTSET_H
#define WAITEVENTSET_H
+#include "storage/condition_variable.h"
#include "utils/resowner.h"
/*
@@ -50,6 +51,7 @@
/* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_ACCEPT WL_SOCKET_READABLE
#endif
+#define WL_CONDITION_VARIABLE (1 << 9)
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
WL_SOCKET_CONNECTED | \
@@ -81,9 +83,10 @@ extern WaitEventSet *CreateWaitEventSet(ResourceOwner resowner, int nevents);
extern void FreeWaitEventSet(WaitEventSet *set);
extern void FreeWaitEventSetAfterFork(WaitEventSet *set);
extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
- struct Latch *latch, void *user_data);
+ struct Latch *latch, ConditionVariable *cv,
+ void *user_data);
extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events,
- struct Latch *latch);
+ struct Latch *latch, ConditionVariable *cv);
extern int WaitEventSetWait(WaitEventSet *set, long timeout,
WaitEvent *occurred_events, int nevents,
uint32 wait_event_info);
--
2.50.1 (Apple Git-155)
^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: PoC: Add condition variable support to WaitEventSetWait()
@ 2026-04-08 03:50 Chao Li <[email protected]>
parent: Chao Li <[email protected]>
0 siblings, 1 reply; 10+ messages in thread
From: Chao Li @ 2026-04-08 03:50 UTC (permalink / raw)
To: PostgreSQL Hackers <[email protected]>
> On Apr 2, 2026, at 15:38, Chao Li <[email protected]> wrote:
>
>
>
>> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
>>
>>
>>
>>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
>>>
>>> Hi,
>>>
>>> There is an XXX comment in WalSndWait():
>>> ```
>>> * XXX: A desirable future improvement would be to add support for CVs
>>> * into WaitEventSetWait().
>>> ```
>>>
>>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
>>>
>>> I’d like to highlight a few key points about the design:
>>>
>>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
>>>
>>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
>>>
>>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
>>>
>>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
>>>
>>> Here is the v1 patch.
>>>
>>> Best regards,
>>> --
>>> Chao Li (Evan)
>>> HighGo Software Co., Ltd.
>>> https://www.highgo.com/
>>>
>>>
>>>
>>>
>>> <v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
>>
>>
>> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
>>
>> Best regards,
>> --
>> Chao Li (Evan)
>> HighGo Software Co., Ltd.
>> https://www.highgo.com/
>>
>>
>>
>>
>> <v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
>
> PFA v2 - fixed a CI failure from contrib/postgres_fdw.
>
> Best regards,
> --
> Chao Li (Evan)
> HighGo Software Co., Ltd.
> https://www.highgo.com/
>
>
>
>
> <v2-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
Fixed a CI test failure and rebased.
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
Attachments:
[application/octet-stream] v3-0001-Add-condition-variable-support-to-WaitEventSetWai.patch (23.3K, 2-v3-0001-Add-condition-variable-support-to-WaitEventSetWai.patch)
download | inline diff:
From 8a6f7086bc843cf3bcd99943e55ec634cda7d628 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <[email protected]>
Date: Tue, 31 Mar 2026 08:25:12 +0800
Subject: [PATCH v3] Add condition variable support to WaitEventSetWait()
WalSndWait() currently combines WaitEventSetWait() with manual
ConditionVariablePrepareToSleep() / ConditionVariableCancelSleep()
calls.
This patch teaches WaitEventSetWait() to cooperate with condition
variables directly. A wait-event set can now include a condition
variable event, and WaitEventSetWait() takes care of preparing to
sleep on the condition variable before blocking and canceling the
sleep afterwards.
With that in place, WalSndWait() no longer needs to manage the
condition-variable sleep state manually, and can express the wakeup
source through the wait-event set itself.
Author: Chao Li <[email protected]>
---
contrib/postgres_fdw/postgres_fdw.c | 2 +-
src/backend/executor/execAsync.c | 2 +-
src/backend/executor/nodeAppend.c | 4 +-
src/backend/libpq/be-secure.c | 4 +-
src/backend/libpq/pqcomm.c | 12 +-
src/backend/postmaster/postmaster.c | 4 +-
src/backend/postmaster/syslogger.c | 4 +-
src/backend/replication/walsender.c | 17 ++-
src/backend/storage/ipc/latch.c | 16 +--
src/backend/storage/ipc/waiteventset.c | 112 +++++++++++++++++-
src/backend/storage/lmgr/condition_variable.c | 33 ++++--
src/backend/utils/init/miscinit.c | 4 +-
src/include/libpq/libpq.h | 3 +-
src/include/storage/condition_variable.h | 1 +
src/include/storage/waiteventset.h | 7 +-
15 files changed, 177 insertions(+), 48 deletions(-)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index cc8ec24c30e..6ee750e9e21 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -7277,7 +7277,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
Assert(pendingAreq == areq);
AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
- NULL, areq);
+ NULL, NULL, areq);
}
/*
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index cf7ddbb01f4..e048328f583 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -57,7 +57,7 @@ ExecAsyncRequest(AsyncRequest *areq)
* for which it wishes to wait. We expect the node-type specific callback to
* make a single call of the following form:
*
- * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, NULL, areq);
*/
void
ExecAsyncConfigureWait(AsyncRequest *areq)
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 85c85569b5e..27895b96b50 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1045,7 +1045,7 @@ ExecAppendAsyncEventWait(AppendState *node)
Assert(node->as_eventset == NULL);
node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/* Give each waiting subplan a chance to add an event. */
i = -1;
@@ -1081,7 +1081,7 @@ ExecAppendAsyncEventWait(AppendState *node)
* extensions too.
*/
AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
/* Return at most EVENT_BUFFER_SIZE events in one call. */
if (nevents > EVENT_BUFFER_SIZE)
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 617704bb993..0a406a050d9 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -217,7 +217,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_READ);
@@ -342,7 +342,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_WRITE);
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 4a442f22df6..5d3a5c22db0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -177,6 +177,7 @@ pq_init(ClientSocket *client_sock)
Port *port;
int socket_pos PG_USED_FOR_ASSERTS_ONLY;
int latch_pos PG_USED_FOR_ASSERTS_ONLY;
+ int cv_pos PG_USED_FOR_ASSERTS_ONLY;
/* allocate the Port struct and copy the ClientSocket contents to it */
port = palloc0_object(Port);
@@ -307,11 +308,13 @@ pq_init(ClientSocket *client_sock)
FeBeWaitSet = CreateWaitEventSet(NULL, FeBeWaitSetNEvents);
socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
- port->sock, NULL, NULL);
+ port->sock, NULL, NULL, NULL);
latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
+ cv_pos = AddWaitEventToSet(FeBeWaitSet, WL_CONDITION_VARIABLE, PGINVALID_SOCKET,
+ NULL, NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/*
* The event positions match the order we added them, but let's sanity
@@ -319,6 +322,7 @@ pq_init(ClientSocket *client_sock)
*/
Assert(socket_pos == FeBeWaitSetSocketPos);
Assert(latch_pos == FeBeWaitSetLatchPos);
+ Assert(cv_pos == FeBeWaitSetCVPos);
return port;
}
@@ -2063,7 +2067,7 @@ pq_check_connection(void)
* It's OK to modify the socket event filter without restoring, because
* all FeBeWaitSet socket wait sites do the same.
*/
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL, NULL);
retry:
rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index ae829747004..28f7c5cd602 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1655,13 +1655,13 @@ ConfigurePostmasterWaitSet(bool accept_connections)
pm_wait_set = CreateWaitEventSet(NULL,
accept_connections ? (1 + NumListenSockets) : 1);
AddWaitEventToSet(pm_wait_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
- NULL);
+ NULL, NULL);
if (accept_connections)
{
for (int i = 0; i < NumListenSockets; i++)
AddWaitEventToSet(pm_wait_set, WL_SOCKET_ACCEPT, ListenSockets[i],
- NULL, NULL);
+ NULL, NULL, NULL);
}
}
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index 0c2a7bc8578..4db7df8e4a5 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -338,9 +338,9 @@ SysLoggerMain(const void *startup_data, size_t startup_data_len)
* (including the postmaster).
*/
wes = CreateWaitEventSet(NULL, 2);
- AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+ AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL, NULL);
#ifndef WIN32
- AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
+ AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL, NULL);
#endif
/* main worker loop */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bad45adb004..ff73fba15f8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3979,8 +3979,9 @@ static void
WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
{
WaitEvent event;
+ ConditionVariable *cv = NULL;
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL, NULL);
/*
* We use a condition variable to efficiently wake up walsenders in
@@ -4000,9 +4001,6 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* release for every iteration, just to wake up only the waiting
* walsenders. It makes WalSndWakeup() callers' life easy.
*
- * XXX: A desirable future improvement would be to add support for CVs
- * into WaitEventSetWait().
- *
* And, we use separate shared memory CVs for physical and logical
* walsenders for selective wake ups, see WalSndWakeup() for more details.
*
@@ -4011,20 +4009,19 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* the receipt of the LSN.
*/
if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+ cv = &WalSndCtl->wal_confirm_rcv_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
+ cv = &WalSndCtl->wal_flush_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
+ cv = &WalSndCtl->wal_replay_cv;
+
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetCVPos, WL_CONDITION_VARIABLE, NULL, cv);
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
(event.events & WL_POSTMASTER_DEATH))
{
- ConditionVariableCancelSleep();
proc_exit(1);
}
-
- ConditionVariableCancelSleep();
}
/*
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 7d4f4cf32bb..391b5218f51 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -41,7 +41,7 @@ InitializeLatchWaitSet(void)
/* Set up the WaitEventSet used by WaitLatch(). */
LatchWaitSet = CreateWaitEventSet(NULL, 2);
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
Assert(latch_pos == LatchWaitSetLatchPos);
/*
@@ -51,7 +51,7 @@ InitializeLatchWaitSet(void)
if (IsUnderPostmaster)
{
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_EXIT_ON_PM_DEATH,
- PGINVALID_SOCKET, NULL, NULL);
+ PGINVALID_SOCKET, NULL, NULL, NULL);
Assert(latch_pos == LatchWaitSetPostmasterDeathPos);
}
}
@@ -186,12 +186,12 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout,
*/
if (!(wakeEvents & WL_LATCH_SET))
latch = NULL;
- ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch);
+ ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch, NULL);
if (IsUnderPostmaster)
ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos,
(wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)),
- NULL);
+ NULL, NULL);
if (WaitEventSetWait(LatchWaitSet,
(wakeEvents & WL_TIMEOUT) ? timeout : -1,
@@ -235,7 +235,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if (wakeEvents & WL_LATCH_SET)
AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
- latch, NULL);
+ latch, NULL, NULL);
/* Postmaster-managed callers must handle postmaster death somehow. */
Assert(!IsUnderPostmaster ||
@@ -244,18 +244,18 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if ((wakeEvents & WL_POSTMASTER_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if ((wakeEvents & WL_EXIT_ON_PM_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if (wakeEvents & WL_SOCKET_MASK)
{
int ev;
ev = wakeEvents & WL_SOCKET_MASK;
- AddWaitEventToSet(set, ev, sock, NULL, NULL);
+ AddWaitEventToSet(set, ev, sock, NULL, NULL, NULL);
}
rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
diff --git a/src/backend/storage/ipc/waiteventset.c b/src/backend/storage/ipc/waiteventset.c
index 0f228e1e7b8..6fc681381fc 100644
--- a/src/backend/storage/ipc/waiteventset.c
+++ b/src/backend/storage/ipc/waiteventset.c
@@ -73,6 +73,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/waiteventset.h"
#include "utils/memutils.h"
@@ -137,6 +138,14 @@ struct WaitEventSet
Latch *latch;
int latch_pos;
+ /*
+ * If WL_CONDITION_VARIABLE is specified in any wait event, cv is a
+ * pointer to said condition variable, and cv_pos the offset in the
+ * ->events array.
+ */
+ ConditionVariable *cv;
+ int cv_pos;
+
/*
* WL_EXIT_ON_PM_DEATH is converted to WL_POSTMASTER_DEATH, but this flag
* is set so that we'll exit immediately if postmaster death is detected,
@@ -414,6 +423,7 @@ CreateWaitEventSet(ResourceOwner resowner, int nevents)
#endif
set->latch = NULL;
+ set->cv = NULL;
set->nevents_space = nevents;
set->exit_on_postmaster_death = false;
@@ -550,6 +560,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
* platforms, this is the same as WL_SOCKET_READABLE)
* - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
* - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
+ * - WL_CONDITION_VARIABLE: Wait for a condition variable to be signaled.
*
* Returns the offset in WaitEventSet->events (starting from 0), which can be
* used to modify previously added wait events using ModifyWaitEvent().
@@ -568,7 +579,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
*/
int
AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
- void *user_data)
+ ConditionVariable *cv, void *user_data)
{
WaitEvent *event;
@@ -596,6 +607,18 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
elog(ERROR, "cannot wait on latch without a specified latch");
}
+ /*
+ * When add a CV event, we allow a NULL cv, because it can be set later
+ * with ModifyWaitEvent.
+ */
+ if (cv)
+ {
+ if (set->cv)
+ elog(ERROR, "cannot wait on more than one condition variable");
+ if ((events & WL_CONDITION_VARIABLE) != WL_CONDITION_VARIABLE)
+ elog(ERROR, "condition variable events only support being set");
+ }
+
/* waiting for socket readiness without a socket indicates a bug */
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
@@ -624,6 +647,13 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#endif
#endif
}
+ else if (events == WL_CONDITION_VARIABLE)
+ {
+ set->cv = cv;
+ set->cv_pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+ return event->pos;
+ }
else if (events == WL_POSTMASTER_DEATH)
{
#ifndef WIN32
@@ -653,14 +683,15 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
* 'pos' is the id returned by AddWaitEventToSet.
*/
void
-ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
+ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch,
+ ConditionVariable *cv)
{
WaitEvent *event;
#if defined(WAIT_USE_KQUEUE)
int old_events;
#endif
- Assert(pos < set->nevents);
+ Assert(pos >= 0 && pos < set->nevents);
event = &set->events[pos];
#if defined(WAIT_USE_KQUEUE)
@@ -682,6 +713,21 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
return;
}
+ /*
+ * Change the condition variable associated with the event. As CV doesn't
+ * rely on kernal, go ahead with a fast-path and only update the CV
+ * pointer.
+ */
+ if (event->events == WL_CONDITION_VARIABLE)
+ {
+ if (events != WL_CONDITION_VARIABLE)
+ elog(ERROR, "cannot change event type of a condition variable event");
+ if (cv == NULL)
+ elog(ERROR, "cannot set condition variable to NULL");
+ set->cv = cv;
+ return;
+ }
+
/*
* If neither the event mask nor the associated latch changes, return
* early. That's an important optimization for some sockets, where
@@ -1061,6 +1107,9 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
else
INSTR_TIME_SET_ZERO(start_time);
+ if (set->cv != NULL)
+ ConditionVariablePrepareToSleep(set->cv);
+
pgstat_report_wait_start(wait_event_info);
#ifndef WIN32
@@ -1072,6 +1121,8 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
while (returned_events == 0)
{
int rc;
+ bool cv_signaled = false;
+ bool cv_maybe_signaled = false;
/*
* Check if the latch is set already first. If so, we either exit
@@ -1133,6 +1184,39 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
timeout = 0;
}
+ if (set->cv)
+ cv_signaled = ConditionVariableSignaled(set->cv);
+
+ if (set->cv && !cv_signaled)
+ cv_maybe_signaled = true;
+
+ if (set->cv && cv_signaled)
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+ occurred_events++;
+ returned_events++;
+
+ if (returned_events == nevents)
+ {
+ /* could have been set above */
+ if (set->latch && set->latch->maybe_sleeping)
+ set->latch->maybe_sleeping = false;
+ break; /* output buffer full already */
+ }
+
+ /*
+ * Even though we already have an event, we'll poll just once with
+ * zero timeout to see what non-latch events we can fit into the
+ * output buffer at the same time.
+ */
+ cur_timeout = 0;
+ timeout = 0;
+ }
+
/*
* Wait for events using the readiness primitive chosen at the top of
* this file. If -1 is returned, a timeout has occurred, if 0 we have
@@ -1148,8 +1232,26 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
if (rc == -1)
break; /* timeout occurred */
else
+ {
returned_events += rc;
+ /* Check CV again after waiting if not done before waiting */
+ if (set->cv && cv_maybe_signaled && returned_events < nevents &&
+ ConditionVariableSignaled(set->cv))
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+ occurred_events++;
+ returned_events++;
+
+ if (returned_events == nevents)
+ break; /* output buffer full already */
+ }
+ }
+
/* If we're not done, update cur_timeout for next iteration */
if (returned_events == 0 && timeout >= 0)
{
@@ -1166,10 +1268,12 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
pgstat_report_wait_end();
+ if (set->cv != NULL)
+ ConditionVariableCancelSleep();
+
return returned_events;
}
-
#if defined(WAIT_USE_EPOLL)
/*
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 1f16b3f7475..c04bc420bcf 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -182,13 +182,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
* by something other than ConditionVariableSignal; though we don't
* guarantee not to return spuriously, we'll avoid this obvious case.
*/
- SpinLockAcquire(&cv->mutex);
- if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
- {
- done = true;
- proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
- }
- SpinLockRelease(&cv->mutex);
+ done = ConditionVariableSignaled(cv);
/*
* Check for interrupts, and return spuriously if that caused the
@@ -217,6 +211,31 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
}
}
+/*
+ * Check whether this process was removed from cv's wait list by a CV
+ * signal/broadcast. If so, re-add it to preserve wakeups while the caller
+ * checks the predicate.
+ */
+bool
+ConditionVariableSignaled(ConditionVariable *cv)
+{
+ bool signaled = false;
+
+ /* Ignore probes for CVs we are not currently prepared to sleep on. */
+ if (cv_sleep_target != cv)
+ return false;
+
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
+ {
+ signaled = true;
+ proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
+ }
+ SpinLockRelease(&cv->mutex);
+
+ return signaled;
+}
+
/*
* Cancel any pending sleep operation.
*
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 7ffc808073a..4bac31ec0d9 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -222,7 +222,7 @@ SwitchToSharedLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
/*
* Set the shared latch as the local one might have been set. This
@@ -249,7 +249,7 @@ SwitchBackToLocalLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
SetLatch(MyLatch);
}
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index c9b934d2321..8d13eaf8393 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -65,7 +65,8 @@ extern PGDLLIMPORT WaitEventSet *FeBeWaitSet;
#define FeBeWaitSetSocketPos 0
#define FeBeWaitSetLatchPos 1
-#define FeBeWaitSetNEvents 3
+#define FeBeWaitSetCVPos 2
+#define FeBeWaitSetNEvents 4
extern int ListenServerPort(int family, const char *hostName,
unsigned short portNumber, const char *unixSocketDir,
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 14bd6dd55c0..fb7e4bb67f2 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -57,6 +57,7 @@ extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info
extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info);
extern bool ConditionVariableCancelSleep(void);
+extern bool ConditionVariableSignaled(ConditionVariable *cv);
/*
* Optionally, ConditionVariablePrepareToSleep can be called before entering
diff --git a/src/include/storage/waiteventset.h b/src/include/storage/waiteventset.h
index 5341267f0a0..f3d569a6aa0 100644
--- a/src/include/storage/waiteventset.h
+++ b/src/include/storage/waiteventset.h
@@ -25,6 +25,7 @@
#ifndef WAITEVENTSET_H
#define WAITEVENTSET_H
+#include "storage/condition_variable.h"
#include "utils/resowner.h"
/*
@@ -50,6 +51,7 @@
/* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_ACCEPT WL_SOCKET_READABLE
#endif
+#define WL_CONDITION_VARIABLE (1 << 9)
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
WL_SOCKET_CONNECTED | \
@@ -81,9 +83,10 @@ extern WaitEventSet *CreateWaitEventSet(ResourceOwner resowner, int nevents);
extern void FreeWaitEventSet(WaitEventSet *set);
extern void FreeWaitEventSetAfterFork(WaitEventSet *set);
extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
- struct Latch *latch, void *user_data);
+ struct Latch *latch, ConditionVariable *cv,
+ void *user_data);
extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events,
- struct Latch *latch);
+ struct Latch *latch, ConditionVariable *cv);
extern int WaitEventSetWait(WaitEventSet *set, long timeout,
WaitEvent *occurred_events, int nevents,
uint32 wait_event_info);
--
2.50.1 (Apple Git-155)
^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: PoC: Add condition variable support to WaitEventSetWait()
@ 2026-04-08 08:22 Chao Li <[email protected]>
parent: Chao Li <[email protected]>
0 siblings, 1 reply; 10+ messages in thread
From: Chao Li @ 2026-04-08 08:22 UTC (permalink / raw)
To: PostgreSQL Hackers <[email protected]>
> On Apr 8, 2026, at 11:50, Chao Li <[email protected]> wrote:
>
>
>
>> On Apr 2, 2026, at 15:38, Chao Li <[email protected]> wrote:
>>
>>
>>
>>> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
>>>
>>>
>>>
>>>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
>>>>
>>>> Hi,
>>>>
>>>> There is an XXX comment in WalSndWait():
>>>> ```
>>>> * XXX: A desirable future improvement would be to add support for CVs
>>>> * into WaitEventSetWait().
>>>> ```
>>>>
>>>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
>>>>
>>>> I’d like to highlight a few key points about the design:
>>>>
>>>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
>>>>
>>>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
>>>>
>>>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
>>>>
>>>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
>>>>
>>>> Here is the v1 patch.
>>>>
>>>> Best regards,
>>>> --
>>>> Chao Li (Evan)
>>>> HighGo Software Co., Ltd.
>>>> https://www.highgo.com/
>>>>
>>>>
>>>>
>>>>
>>>> <v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
>>>
>>>
>>> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
>>>
>>> Best regards,
>>> --
>>> Chao Li (Evan)
>>> HighGo Software Co., Ltd.
>>> https://www.highgo.com/
>>>
>>>
>>>
>>>
>>> <v1-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
>>
>> PFA v2 - fixed a CI failure from contrib/postgres_fdw.
>>
>> Best regards,
>> --
>> Chao Li (Evan)
>> HighGo Software Co., Ltd.
>> https://www.highgo.com/
>>
>>
>>
>>
>> <v2-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
>
> Fixed a CI test failure and rebased.
>
> Best regards,
> --
> Chao Li (Evan)
> HighGo Software Co., Ltd.
> https://www.highgo.com/
>
>
>
>
> <v3-0001-Add-condition-variable-support-to-WaitEventSetWai.patch>
PFA v4 - try to fix a test failure on windows.
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
Attachments:
[application/octet-stream] v4-0001-Add-condition-variable-support-to-WaitEventSetWai.patch (24.3K, 2-v4-0001-Add-condition-variable-support-to-WaitEventSetWai.patch)
download | inline diff:
From 397b57824507ce7f7281dce03a8e83cb1d2ca2e2 Mon Sep 17 00:00:00 2001
From: "Chao Li (Evan)" <[email protected]>
Date: Tue, 31 Mar 2026 08:25:12 +0800
Subject: [PATCH v4] Add condition variable support to WaitEventSetWait()
WalSndWait() currently combines WaitEventSetWait() with manual
ConditionVariablePrepareToSleep() / ConditionVariableCancelSleep()
calls.
This patch teaches WaitEventSetWait() to cooperate with condition
variables directly. A wait-event set can now include a condition
variable event, and WaitEventSetWait() takes care of preparing to
sleep on the condition variable before blocking and canceling the
sleep afterwards.
With that in place, WalSndWait() no longer needs to manage the
condition-variable sleep state manually, and can express the wakeup
source through the wait-event set itself.
Author: Chao Li <[email protected]>
---
contrib/postgres_fdw/postgres_fdw.c | 2 +-
src/backend/executor/execAsync.c | 2 +-
src/backend/executor/nodeAppend.c | 4 +-
src/backend/libpq/be-secure.c | 4 +-
src/backend/libpq/pqcomm.c | 12 +-
src/backend/postmaster/postmaster.c | 4 +-
src/backend/postmaster/syslogger.c | 4 +-
src/backend/replication/walsender.c | 17 +--
src/backend/storage/ipc/latch.c | 16 +--
src/backend/storage/ipc/waiteventset.c | 132 +++++++++++++++++-
src/backend/storage/lmgr/condition_variable.c | 33 ++++-
src/backend/utils/init/miscinit.c | 4 +-
src/include/libpq/libpq.h | 3 +-
src/include/storage/condition_variable.h | 1 +
src/include/storage/waiteventset.h | 7 +-
15 files changed, 197 insertions(+), 48 deletions(-)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index cc8ec24c30e..6ee750e9e21 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -7277,7 +7277,7 @@ postgresForeignAsyncConfigureWait(AsyncRequest *areq)
Assert(pendingAreq == areq);
AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
- NULL, areq);
+ NULL, NULL, areq);
}
/*
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index cf7ddbb01f4..e048328f583 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -57,7 +57,7 @@ ExecAsyncRequest(AsyncRequest *areq)
* for which it wishes to wait. We expect the node-type specific callback to
* make a single call of the following form:
*
- * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, NULL, areq);
*/
void
ExecAsyncConfigureWait(AsyncRequest *areq)
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 85c85569b5e..27895b96b50 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -1045,7 +1045,7 @@ ExecAppendAsyncEventWait(AppendState *node)
Assert(node->as_eventset == NULL);
node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/* Give each waiting subplan a chance to add an event. */
i = -1;
@@ -1081,7 +1081,7 @@ ExecAppendAsyncEventWait(AppendState *node)
* extensions too.
*/
AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
/* Return at most EVENT_BUFFER_SIZE events in one call. */
if (nevents > EVENT_BUFFER_SIZE)
diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c
index 617704bb993..0a406a050d9 100644
--- a/src/backend/libpq/be-secure.c
+++ b/src/backend/libpq/be-secure.c
@@ -217,7 +217,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_READ);
@@ -342,7 +342,7 @@ retry:
Assert(waitfor);
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, waitfor, NULL, NULL);
WaitEventSetWait(FeBeWaitSet, -1 /* no timeout */ , &event, 1,
WAIT_EVENT_CLIENT_WRITE);
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 4a442f22df6..5d3a5c22db0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -177,6 +177,7 @@ pq_init(ClientSocket *client_sock)
Port *port;
int socket_pos PG_USED_FOR_ASSERTS_ONLY;
int latch_pos PG_USED_FOR_ASSERTS_ONLY;
+ int cv_pos PG_USED_FOR_ASSERTS_ONLY;
/* allocate the Port struct and copy the ClientSocket contents to it */
port = palloc0_object(Port);
@@ -307,11 +308,13 @@ pq_init(ClientSocket *client_sock)
FeBeWaitSet = CreateWaitEventSet(NULL, FeBeWaitSetNEvents);
socket_pos = AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE,
- port->sock, NULL, NULL);
+ port->sock, NULL, NULL, NULL);
latch_pos = AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
+ cv_pos = AddWaitEventToSet(FeBeWaitSet, WL_CONDITION_VARIABLE, PGINVALID_SOCKET,
+ NULL, NULL, NULL);
AddWaitEventToSet(FeBeWaitSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
/*
* The event positions match the order we added them, but let's sanity
@@ -319,6 +322,7 @@ pq_init(ClientSocket *client_sock)
*/
Assert(socket_pos == FeBeWaitSetSocketPos);
Assert(latch_pos == FeBeWaitSetLatchPos);
+ Assert(cv_pos == FeBeWaitSetCVPos);
return port;
}
@@ -2063,7 +2067,7 @@ pq_check_connection(void)
* It's OK to modify the socket event filter without restoring, because
* all FeBeWaitSet socket wait sites do the same.
*/
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, WL_SOCKET_CLOSED, NULL, NULL);
retry:
rc = WaitEventSetWait(FeBeWaitSet, 0, events, lengthof(events), 0);
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index ae829747004..28f7c5cd602 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -1655,13 +1655,13 @@ ConfigurePostmasterWaitSet(bool accept_connections)
pm_wait_set = CreateWaitEventSet(NULL,
accept_connections ? (1 + NumListenSockets) : 1);
AddWaitEventToSet(pm_wait_set, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch,
- NULL);
+ NULL, NULL);
if (accept_connections)
{
for (int i = 0; i < NumListenSockets; i++)
AddWaitEventToSet(pm_wait_set, WL_SOCKET_ACCEPT, ListenSockets[i],
- NULL, NULL);
+ NULL, NULL, NULL);
}
}
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index 0c2a7bc8578..4db7df8e4a5 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -338,9 +338,9 @@ SysLoggerMain(const void *startup_data, size_t startup_data_len)
* (including the postmaster).
*/
wes = CreateWaitEventSet(NULL, 2);
- AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
+ AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL, NULL);
#ifndef WIN32
- AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
+ AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL, NULL);
#endif
/* main worker loop */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bad45adb004..ff73fba15f8 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3979,8 +3979,9 @@ static void
WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
{
WaitEvent event;
+ ConditionVariable *cv = NULL;
- ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL, NULL);
/*
* We use a condition variable to efficiently wake up walsenders in
@@ -4000,9 +4001,6 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* release for every iteration, just to wake up only the waiting
* walsenders. It makes WalSndWakeup() callers' life easy.
*
- * XXX: A desirable future improvement would be to add support for CVs
- * into WaitEventSetWait().
- *
* And, we use separate shared memory CVs for physical and logical
* walsenders for selective wake ups, see WalSndWakeup() for more details.
*
@@ -4011,20 +4009,19 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
* the receipt of the LSN.
*/
if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+ cv = &WalSndCtl->wal_confirm_rcv_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
+ cv = &WalSndCtl->wal_flush_cv;
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
- ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
+ cv = &WalSndCtl->wal_replay_cv;
+
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetCVPos, WL_CONDITION_VARIABLE, NULL, cv);
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
(event.events & WL_POSTMASTER_DEATH))
{
- ConditionVariableCancelSleep();
proc_exit(1);
}
-
- ConditionVariableCancelSleep();
}
/*
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 7d4f4cf32bb..391b5218f51 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -41,7 +41,7 @@ InitializeLatchWaitSet(void)
/* Set up the WaitEventSet used by WaitLatch(). */
LatchWaitSet = CreateWaitEventSet(NULL, 2);
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_LATCH_SET, PGINVALID_SOCKET,
- MyLatch, NULL);
+ MyLatch, NULL, NULL);
Assert(latch_pos == LatchWaitSetLatchPos);
/*
@@ -51,7 +51,7 @@ InitializeLatchWaitSet(void)
if (IsUnderPostmaster)
{
latch_pos = AddWaitEventToSet(LatchWaitSet, WL_EXIT_ON_PM_DEATH,
- PGINVALID_SOCKET, NULL, NULL);
+ PGINVALID_SOCKET, NULL, NULL, NULL);
Assert(latch_pos == LatchWaitSetPostmasterDeathPos);
}
}
@@ -186,12 +186,12 @@ WaitLatch(Latch *latch, int wakeEvents, long timeout,
*/
if (!(wakeEvents & WL_LATCH_SET))
latch = NULL;
- ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch);
+ ModifyWaitEvent(LatchWaitSet, LatchWaitSetLatchPos, WL_LATCH_SET, latch, NULL);
if (IsUnderPostmaster)
ModifyWaitEvent(LatchWaitSet, LatchWaitSetPostmasterDeathPos,
(wakeEvents & (WL_EXIT_ON_PM_DEATH | WL_POSTMASTER_DEATH)),
- NULL);
+ NULL, NULL);
if (WaitEventSetWait(LatchWaitSet,
(wakeEvents & WL_TIMEOUT) ? timeout : -1,
@@ -235,7 +235,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if (wakeEvents & WL_LATCH_SET)
AddWaitEventToSet(set, WL_LATCH_SET, PGINVALID_SOCKET,
- latch, NULL);
+ latch, NULL, NULL);
/* Postmaster-managed callers must handle postmaster death somehow. */
Assert(!IsUnderPostmaster ||
@@ -244,18 +244,18 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
if ((wakeEvents & WL_POSTMASTER_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_POSTMASTER_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if ((wakeEvents & WL_EXIT_ON_PM_DEATH) && IsUnderPostmaster)
AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
- NULL, NULL);
+ NULL, NULL, NULL);
if (wakeEvents & WL_SOCKET_MASK)
{
int ev;
ev = wakeEvents & WL_SOCKET_MASK;
- AddWaitEventToSet(set, ev, sock, NULL, NULL);
+ AddWaitEventToSet(set, ev, sock, NULL, NULL, NULL);
}
rc = WaitEventSetWait(set, timeout, &event, 1, wait_event_info);
diff --git a/src/backend/storage/ipc/waiteventset.c b/src/backend/storage/ipc/waiteventset.c
index 0f228e1e7b8..d9849611966 100644
--- a/src/backend/storage/ipc/waiteventset.c
+++ b/src/backend/storage/ipc/waiteventset.c
@@ -73,6 +73,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/waiteventset.h"
#include "utils/memutils.h"
@@ -137,6 +138,14 @@ struct WaitEventSet
Latch *latch;
int latch_pos;
+ /*
+ * If WL_CONDITION_VARIABLE is specified in any wait event, cv is a
+ * pointer to said condition variable, and cv_pos the offset in the
+ * ->events array.
+ */
+ ConditionVariable *cv;
+ int cv_pos;
+
/*
* WL_EXIT_ON_PM_DEATH is converted to WL_POSTMASTER_DEATH, but this flag
* is set so that we'll exit immediately if postmaster death is detected,
@@ -414,6 +423,7 @@ CreateWaitEventSet(ResourceOwner resowner, int nevents)
#endif
set->latch = NULL;
+ set->cv = NULL;
set->nevents_space = nevents;
set->exit_on_postmaster_death = false;
@@ -501,6 +511,11 @@ FreeWaitEventSet(WaitEventSet *set)
{
/* uses the latch's HANDLE */
}
+ else if (cur_event->events == WL_CONDITION_VARIABLE)
+ {
+ /* uses a dummy HANDLE */
+ CloseHandle(set->handles[cur_event->pos + 1]);
+ }
else if (cur_event->events & WL_POSTMASTER_DEATH)
{
/* uses PostmasterHandle */
@@ -550,6 +565,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
* platforms, this is the same as WL_SOCKET_READABLE)
* - WL_SOCKET_CLOSED: Wait for socket to be closed by remote peer.
* - WL_EXIT_ON_PM_DEATH: Exit immediately if the postmaster dies
+ * - WL_CONDITION_VARIABLE: Wait for a condition variable to be signaled.
*
* Returns the offset in WaitEventSet->events (starting from 0), which can be
* used to modify previously added wait events using ModifyWaitEvent().
@@ -568,7 +584,7 @@ FreeWaitEventSetAfterFork(WaitEventSet *set)
*/
int
AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
- void *user_data)
+ ConditionVariable *cv, void *user_data)
{
WaitEvent *event;
@@ -596,6 +612,18 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
elog(ERROR, "cannot wait on latch without a specified latch");
}
+ /*
+ * When add a CV event, we allow a NULL cv, because it can be set later
+ * with ModifyWaitEvent.
+ */
+ if (cv)
+ {
+ if (set->cv)
+ elog(ERROR, "cannot wait on more than one condition variable");
+ if ((events & WL_CONDITION_VARIABLE) != WL_CONDITION_VARIABLE)
+ elog(ERROR, "condition variable events only support being set");
+ }
+
/* waiting for socket readiness without a socket indicates a bug */
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
elog(ERROR, "cannot wait on socket event without a socket");
@@ -624,6 +652,16 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
#endif
#endif
}
+ else if (events == WL_CONDITION_VARIABLE)
+ {
+ set->cv = cv;
+ set->cv_pos = event->pos;
+ event->fd = PGINVALID_SOCKET;
+#ifdef WAIT_USE_WIN32
+ WaitEventAdjustWin32(set, event);
+#endif
+ return event->pos;
+ }
else if (events == WL_POSTMASTER_DEATH)
{
#ifndef WIN32
@@ -653,14 +691,15 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
* 'pos' is the id returned by AddWaitEventToSet.
*/
void
-ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
+ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch,
+ ConditionVariable *cv)
{
WaitEvent *event;
#if defined(WAIT_USE_KQUEUE)
int old_events;
#endif
- Assert(pos < set->nevents);
+ Assert(pos >= 0 && pos < set->nevents);
event = &set->events[pos];
#if defined(WAIT_USE_KQUEUE)
@@ -682,6 +721,21 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
return;
}
+ /*
+ * Change the condition variable associated with the event. As CV doesn't
+ * rely on kernal, go ahead with a fast-path and only update the CV
+ * pointer.
+ */
+ if (event->events == WL_CONDITION_VARIABLE)
+ {
+ if (events != WL_CONDITION_VARIABLE)
+ elog(ERROR, "cannot change event type of a condition variable event");
+ if (cv == NULL)
+ elog(ERROR, "cannot set condition variable to NULL");
+ set->cv = cv;
+ return;
+ }
+
/*
* If neither the event mask nor the associated latch changes, return
* early. That's an important optimization for some sockets, where
@@ -991,6 +1045,18 @@ WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
Assert(set->latch != NULL);
*handle = set->latch->event;
}
+ else if (event->events == WL_CONDITION_VARIABLE)
+ {
+ /*
+ * Condition-variable waits are handled in userspace, but
+ * WaitForMultipleObjects() still requires a valid HANDLE in every
+ * slot.
+ */
+ *handle = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (*handle == NULL)
+ elog(ERROR, "failed to create event for condition variable: error code %lu",
+ GetLastError());
+ }
else if (event->events == WL_POSTMASTER_DEATH)
{
*handle = PostmasterHandle;
@@ -1061,6 +1127,9 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
else
INSTR_TIME_SET_ZERO(start_time);
+ if (set->cv != NULL)
+ ConditionVariablePrepareToSleep(set->cv);
+
pgstat_report_wait_start(wait_event_info);
#ifndef WIN32
@@ -1072,6 +1141,8 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
while (returned_events == 0)
{
int rc;
+ bool cv_signaled = false;
+ bool cv_maybe_signaled = false;
/*
* Check if the latch is set already first. If so, we either exit
@@ -1133,6 +1204,39 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
timeout = 0;
}
+ if (set->cv)
+ cv_signaled = ConditionVariableSignaled(set->cv);
+
+ if (set->cv && !cv_signaled)
+ cv_maybe_signaled = true;
+
+ if (set->cv && cv_signaled)
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+ occurred_events++;
+ returned_events++;
+
+ if (returned_events == nevents)
+ {
+ /* could have been set above */
+ if (set->latch && set->latch->maybe_sleeping)
+ set->latch->maybe_sleeping = false;
+ break; /* output buffer full already */
+ }
+
+ /*
+ * Even though we already have an event, we'll poll just once with
+ * zero timeout to see what non-latch events we can fit into the
+ * output buffer at the same time.
+ */
+ cur_timeout = 0;
+ timeout = 0;
+ }
+
/*
* Wait for events using the readiness primitive chosen at the top of
* this file. If -1 is returned, a timeout has occurred, if 0 we have
@@ -1148,8 +1252,26 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
if (rc == -1)
break; /* timeout occurred */
else
+ {
returned_events += rc;
+ /* Check CV again after waiting if not done before waiting */
+ if (set->cv && cv_maybe_signaled && returned_events < nevents &&
+ ConditionVariableSignaled(set->cv))
+ {
+ occurred_events->fd = PGINVALID_SOCKET;
+ occurred_events->pos = set->cv_pos;
+ occurred_events->user_data =
+ set->events[set->cv_pos].user_data;
+ occurred_events->events = WL_CONDITION_VARIABLE;
+ occurred_events++;
+ returned_events++;
+
+ if (returned_events == nevents)
+ break; /* output buffer full already */
+ }
+ }
+
/* If we're not done, update cur_timeout for next iteration */
if (returned_events == 0 && timeout >= 0)
{
@@ -1166,10 +1288,12 @@ WaitEventSetWait(WaitEventSet *set, long timeout,
pgstat_report_wait_end();
+ if (set->cv != NULL)
+ ConditionVariableCancelSleep();
+
return returned_events;
}
-
#if defined(WAIT_USE_EPOLL)
/*
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index 1f16b3f7475..c04bc420bcf 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -182,13 +182,7 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
* by something other than ConditionVariableSignal; though we don't
* guarantee not to return spuriously, we'll avoid this obvious case.
*/
- SpinLockAcquire(&cv->mutex);
- if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
- {
- done = true;
- proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
- }
- SpinLockRelease(&cv->mutex);
+ done = ConditionVariableSignaled(cv);
/*
* Check for interrupts, and return spuriously if that caused the
@@ -217,6 +211,31 @@ ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
}
}
+/*
+ * Check whether this process was removed from cv's wait list by a CV
+ * signal/broadcast. If so, re-add it to preserve wakeups while the caller
+ * checks the predicate.
+ */
+bool
+ConditionVariableSignaled(ConditionVariable *cv)
+{
+ bool signaled = false;
+
+ /* Ignore probes for CVs we are not currently prepared to sleep on. */
+ if (cv_sleep_target != cv)
+ return false;
+
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, MyProcNumber, cvWaitLink))
+ {
+ signaled = true;
+ proclist_push_tail(&cv->wakeup, MyProcNumber, cvWaitLink);
+ }
+ SpinLockRelease(&cv->mutex);
+
+ return signaled;
+}
+
/*
* Cancel any pending sleep operation.
*
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 7ffc808073a..4bac31ec0d9 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -222,7 +222,7 @@ SwitchToSharedLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
/*
* Set the shared latch as the local one might have been set. This
@@ -249,7 +249,7 @@ SwitchBackToLocalLatch(void)
if (FeBeWaitSet)
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetLatchPos, WL_LATCH_SET,
- MyLatch);
+ MyLatch, NULL);
SetLatch(MyLatch);
}
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index c9b934d2321..8d13eaf8393 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -65,7 +65,8 @@ extern PGDLLIMPORT WaitEventSet *FeBeWaitSet;
#define FeBeWaitSetSocketPos 0
#define FeBeWaitSetLatchPos 1
-#define FeBeWaitSetNEvents 3
+#define FeBeWaitSetCVPos 2
+#define FeBeWaitSetNEvents 4
extern int ListenServerPort(int family, const char *hostName,
unsigned short portNumber, const char *unixSocketDir,
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
index 14bd6dd55c0..fb7e4bb67f2 100644
--- a/src/include/storage/condition_variable.h
+++ b/src/include/storage/condition_variable.h
@@ -57,6 +57,7 @@ extern void ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info
extern bool ConditionVariableTimedSleep(ConditionVariable *cv, long timeout,
uint32 wait_event_info);
extern bool ConditionVariableCancelSleep(void);
+extern bool ConditionVariableSignaled(ConditionVariable *cv);
/*
* Optionally, ConditionVariablePrepareToSleep can be called before entering
diff --git a/src/include/storage/waiteventset.h b/src/include/storage/waiteventset.h
index 5341267f0a0..f3d569a6aa0 100644
--- a/src/include/storage/waiteventset.h
+++ b/src/include/storage/waiteventset.h
@@ -25,6 +25,7 @@
#ifndef WAITEVENTSET_H
#define WAITEVENTSET_H
+#include "storage/condition_variable.h"
#include "utils/resowner.h"
/*
@@ -50,6 +51,7 @@
/* avoid having to deal with case on platforms not requiring it */
#define WL_SOCKET_ACCEPT WL_SOCKET_READABLE
#endif
+#define WL_CONDITION_VARIABLE (1 << 9)
#define WL_SOCKET_MASK (WL_SOCKET_READABLE | \
WL_SOCKET_WRITEABLE | \
WL_SOCKET_CONNECTED | \
@@ -81,9 +83,10 @@ extern WaitEventSet *CreateWaitEventSet(ResourceOwner resowner, int nevents);
extern void FreeWaitEventSet(WaitEventSet *set);
extern void FreeWaitEventSetAfterFork(WaitEventSet *set);
extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
- struct Latch *latch, void *user_data);
+ struct Latch *latch, ConditionVariable *cv,
+ void *user_data);
extern void ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events,
- struct Latch *latch);
+ struct Latch *latch, ConditionVariable *cv);
extern int WaitEventSetWait(WaitEventSet *set, long timeout,
WaitEvent *occurred_events, int nevents,
uint32 wait_event_info);
--
2.50.1 (Apple Git-155)
^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: PoC: Add condition variable support to WaitEventSetWait()
@ 2026-04-21 17:06 Yura Sokolov <[email protected]>
parent: Chao Li <[email protected]>
0 siblings, 1 reply; 10+ messages in thread
From: Yura Sokolov @ 2026-04-21 17:06 UTC (permalink / raw)
To: Chao Li <[email protected]>; PostgreSQL Hackers <[email protected]>
08.04.2026 11:22, Chao Li пишет:
>> On Apr 8, 2026, at 11:50, Chao Li <[email protected]> wrote:
>>> On Apr 2, 2026, at 15:38, Chao Li <[email protected]> wrote:
>>>> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
>>>>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> There is an XXX comment in WalSndWait():
>>>>> ```
>>>>> * XXX: A desirable future improvement would be to add support for CVs
>>>>> * into WaitEventSetWait().
>>>>> ```
>>>>>
>>>>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
>>>>>
>>>>> I’d like to highlight a few key points about the design:
>>>>>
>>>>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
>>>>>
>>>>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
>>>>>
>>>>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
>>>>>
>>>>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
>>>>>
>>>>> Here is the v1 patch.
>>>>>
>>>>
>>>> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
>>>
>>> PFA v2 - fixed a CI failure from contrib/postgres_fdw.
>>
>> Fixed a CI test failure and rebased.
>
> PFA v4 - try to fix a test failure on windows.
Good day, Chao Li.
ConditionalVariable works through the MyLatch.
And almost always there is single latch for one backend:
MyLatch = PGPROC->procLatch;
(Single exception I found is XLogRecoveryCtl->recoveryWakeupLatch.)
And that is where ConditionVariablePrepareToSleep were used in WalSndWait
but without check: FeBeWaitSet always waits for MyLatch.
Therefore, I don't clear get, how you suggest to simultaneously wait
WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE?
How you will distinguish which one was fired?
It looks to my, WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE had to be
mutual exclusive. At least unless ConditionVariable internal are changed.
And check after waiting for conditional variable set had to be placed into
WaitEventSetWaitBlock where all other such checks are.
All written above is just my humble opinion.
Excuse me if i'm too strict.
--
regards
Yura Sokolov aka funny-falcon
^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: PoC: Add condition variable support to WaitEventSetWait()
@ 2026-04-22 02:58 Xuneng Zhou <[email protected]>
parent: Yura Sokolov <[email protected]>
0 siblings, 1 reply; 10+ messages in thread
From: Xuneng Zhou @ 2026-04-22 02:58 UTC (permalink / raw)
To: Yura Sokolov <[email protected]>; +Cc: Chao Li <[email protected]>; PostgreSQL Hackers <[email protected]>
Hi Yura,
On Wed, Apr 22, 2026 at 1:06 AM Yura Sokolov <[email protected]> wrote:
>
> 08.04.2026 11:22, Chao Li пишет:
> >> On Apr 8, 2026, at 11:50, Chao Li <[email protected]> wrote:
> >>> On Apr 2, 2026, at 15:38, Chao Li <[email protected]> wrote:
> >>>> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
> >>>>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> There is an XXX comment in WalSndWait():
> >>>>> ```
> >>>>> * XXX: A desirable future improvement would be to add support for CVs
> >>>>> * into WaitEventSetWait().
> >>>>> ```
> >>>>>
> >>>>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
> >>>>>
> >>>>> I’d like to highlight a few key points about the design:
> >>>>>
> >>>>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
> >>>>>
> >>>>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
> >>>>>
> >>>>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
> >>>>>
> >>>>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
> >>>>>
> >>>>> Here is the v1 patch.
> >>>>>
> >>>>
> >>>> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
> >>>
> >>> PFA v2 - fixed a CI failure from contrib/postgres_fdw.
> >>
> >> Fixed a CI test failure and rebased.
> >
> > PFA v4 - try to fix a test failure on windows.
>
> Good day, Chao Li.
>
> ConditionalVariable works through the MyLatch.
> And almost always there is single latch for one backend:
> MyLatch = PGPROC->procLatch;
>
> (Single exception I found is XLogRecoveryCtl->recoveryWakeupLatch.)
>
> And that is where ConditionVariablePrepareToSleep were used in WalSndWait
> but without check: FeBeWaitSet always waits for MyLatch.
>
> Therefore, I don't clear get, how you suggest to simultaneously wait
> WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE?
> How you will distinguish which one was fired?
>
> It looks to my, WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE had to be
> mutual exclusive. At least unless ConditionVariable internal are changed.
>
> And check after waiting for conditional variable set had to be placed into
> WaitEventSetWaitBlock where all other such checks are.
>
> All written above is just my humble opinion.
> Excuse me if i'm too strict.
>
I think you're correct that ConditionVariableSignal() wakes the target
process by calling SetLatch(&proc->procLatch). So every CV signal also
sets MyLatch. The patch introduces ConditionVariableSignaled() to
distinguish the two by checking CV wait-list membership (not the latch
state itself), so in principle the two sources are distinguishable.
In practice, however, the current patch seems not reliably report
WL_CONDITION_VARIABLE.
Inside WaitEventSetWait(), the latch check runs before the new CV
check. When a CV signal fires, both conditions become true
simultaneously: MyLatch is set AND the proc has been removed from the
CV wait list. With nevents=1 (which is what WalSndWait passes), the
latch event fills the one-slot output buffer and the loop breaks
before reaching the CV check:
/* existing latch check — runs first */
if (set->latch && set->latch->is_set)
{
...
returned_events++;
if (returned_events == nevents)
break; /* CV check below is never reached */
}
/* new CV check — unreachable when buffer is full */
if (set->cv)
cv_signaled = ConditionVariableSignaled(set->cv);
So for the converted caller, WL_CONDITION_VARIABLE is not reliably
observable. In the common case where the latch readiness is returned
first, the one-slot buffer is filled by WL_LATCH_SET and the CV check
is skipped.
--
Best,
Xuneng
^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: PoC: Add condition variable support to WaitEventSetWait()
@ 2026-04-22 13:13 Yura Sokolov <[email protected]>
parent: Xuneng Zhou <[email protected]>
0 siblings, 1 reply; 10+ messages in thread
From: Yura Sokolov @ 2026-04-22 13:13 UTC (permalink / raw)
To: Xuneng Zhou <[email protected]>; +Cc: Chao Li <[email protected]>; PostgreSQL Hackers <[email protected]>
22.04.2026 05:58, Xuneng Zhou пишет:
> Hi Yura,
>
> On Wed, Apr 22, 2026 at 1:06 AM Yura Sokolov <[email protected]> wrote:
>>
>> 08.04.2026 11:22, Chao Li пишет:
>>>> On Apr 8, 2026, at 11:50, Chao Li <[email protected]> wrote:
>>>>> On Apr 2, 2026, at 15:38, Chao Li <[email protected]> wrote:
>>>>>> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
>>>>>>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
>>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> There is an XXX comment in WalSndWait():
>>>>>>> ```
>>>>>>> * XXX: A desirable future improvement would be to add support for CVs
>>>>>>> * into WaitEventSetWait().
>>>>>>> ```
>>>>>>>
>>>>>>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
>>>>>>>
>>>>>>> I’d like to highlight a few key points about the design:
>>>>>>>
>>>>>>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
>>>>>>>
>>>>>>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
>>>>>>>
>>>>>>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
>>>>>>>
>>>>>>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
>>>>>>>
>>>>>>> Here is the v1 patch.
>>>>>>>
>>>>>>
>>>>>> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
>>>>>
>>>>> PFA v2 - fixed a CI failure from contrib/postgres_fdw.
>>>>
>>>> Fixed a CI test failure and rebased.
>>>
>>> PFA v4 - try to fix a test failure on windows.
>>
>> Good day, Chao Li.
>>
>> ConditionalVariable works through the MyLatch.
>> And almost always there is single latch for one backend:
>> MyLatch = PGPROC->procLatch;
>>
>> (Single exception I found is XLogRecoveryCtl->recoveryWakeupLatch.)
>>
>> And that is where ConditionVariablePrepareToSleep were used in WalSndWait
>> but without check: FeBeWaitSet always waits for MyLatch.
>>
>> Therefore, I don't clear get, how you suggest to simultaneously wait
>> WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE?
>> How you will distinguish which one was fired?
>>
>> It looks to my, WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE had to be
>> mutual exclusive. At least unless ConditionVariable internal are changed.
>>
>> And check after waiting for conditional variable set had to be placed into
>> WaitEventSetWaitBlock where all other such checks are.
>>
>> All written above is just my humble opinion.
>> Excuse me if i'm too strict.
>>
>
> I think you're correct that ConditionVariableSignal() wakes the target
> process by calling SetLatch(&proc->procLatch). So every CV signal also
> sets MyLatch. The patch introduces ConditionVariableSignaled() to
> distinguish the two by checking CV wait-list membership (not the latch
> state itself), so in principle the two sources are distinguishable.
Nope.
Patch may say if ConditionVariable was signalled.
But it cann't say if MyLatch were set as well without touching
ConditionVariable.
It is not better than current state in term of code clearness and flawlessness.
> In practice, however, the current patch seems not reliably report
> WL_CONDITION_VARIABLE.
>
> Inside WaitEventSetWait(), the latch check runs before the new CV
> check. When a CV signal fires, both conditions become true
> simultaneously: MyLatch is set AND the proc has been removed from the
> CV wait list. With nevents=1 (which is what WalSndWait passes), the
> latch event fills the one-slot output buffer and the loop breaks
> before reaching the CV check:
> /* existing latch check — runs first */
> if (set->latch && set->latch->is_set)
> {
> ...
> returned_events++;
> if (returned_events == nevents)
> break; /* CV check below is never reached */
> }
>
> /* new CV check — unreachable when buffer is full */
> if (set->cv)
> cv_signaled = ConditionVariableSignaled(set->cv);
>
> So for the converted caller, WL_CONDITION_VARIABLE is not reliably
> observable. In the common case where the latch readiness is returned
> first, the one-slot buffer is filled by WL_LATCH_SET and the CV check
> is skipped.
Bingo! That is another case - opposite side of coin.
That is why I say: "At least unless ConditionVariable internals are changed."
--
regards
Yura Sokolov aka funny-falcon
^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: PoC: Add condition variable support to WaitEventSetWait()
@ 2026-04-23 08:15 Chao Li <[email protected]>
parent: Yura Sokolov <[email protected]>
0 siblings, 1 reply; 10+ messages in thread
From: Chao Li @ 2026-04-23 08:15 UTC (permalink / raw)
To: Yura Sokolov <[email protected]>; +Cc: Xuneng Zhou <[email protected]>; PostgreSQL Hackers <[email protected]>
> On Apr 22, 2026, at 21:13, Yura Sokolov <[email protected]> wrote:
>
> 22.04.2026 05:58, Xuneng Zhou пишет:
>> Hi Yura,
>>
>> On Wed, Apr 22, 2026 at 1:06 AM Yura Sokolov <[email protected]> wrote:
>>>
>>> 08.04.2026 11:22, Chao Li пишет:
>>>>> On Apr 8, 2026, at 11:50, Chao Li <[email protected]> wrote:
>>>>>> On Apr 2, 2026, at 15:38, Chao Li <[email protected]> wrote:
>>>>>>> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
>>>>>>>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> There is an XXX comment in WalSndWait():
>>>>>>>> ```
>>>>>>>> * XXX: A desirable future improvement would be to add support for CVs
>>>>>>>> * into WaitEventSetWait().
>>>>>>>> ```
>>>>>>>>
>>>>>>>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
>>>>>>>>
>>>>>>>> I’d like to highlight a few key points about the design:
>>>>>>>>
>>>>>>>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
>>>>>>>>
>>>>>>>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
>>>>>>>>
>>>>>>>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
>>>>>>>>
>>>>>>>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
>>>>>>>>
>>>>>>>> Here is the v1 patch.
>>>>>>>>
>>>>>>>
>>>>>>> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
>>>>>>
>>>>>> PFA v2 - fixed a CI failure from contrib/postgres_fdw.
>>>>>
>>>>> Fixed a CI test failure and rebased.
>>>>
>>>> PFA v4 - try to fix a test failure on windows.
>>>
>>> Good day, Chao Li.
>>>
>>> ConditionalVariable works through the MyLatch.
>>> And almost always there is single latch for one backend:
>>> MyLatch = PGPROC->procLatch;
>>>
>>> (Single exception I found is XLogRecoveryCtl->recoveryWakeupLatch.)
>>>
>>> And that is where ConditionVariablePrepareToSleep were used in WalSndWait
>>> but without check: FeBeWaitSet always waits for MyLatch.
>>>
>>> Therefore, I don't clear get, how you suggest to simultaneously wait
>>> WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE?
>>> How you will distinguish which one was fired?
>>>
>>> It looks to my, WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE had to be
>>> mutual exclusive. At least unless ConditionVariable internal are changed.
>>>
>>> And check after waiting for conditional variable set had to be placed into
>>> WaitEventSetWaitBlock where all other such checks are.
>>>
>>> All written above is just my humble opinion.
>>> Excuse me if i'm too strict.
>>>
>>
>> I think you're correct that ConditionVariableSignal() wakes the target
>> process by calling SetLatch(&proc->procLatch). So every CV signal also
>> sets MyLatch. The patch introduces ConditionVariableSignaled() to
>> distinguish the two by checking CV wait-list membership (not the latch
>> state itself), so in principle the two sources are distinguishable.
>
> Nope.
> Patch may say if ConditionVariable was signalled.
> But it cann't say if MyLatch were set as well without touching
> ConditionVariable.
> It is not better than current state in term of code clearness and flawlessness.
Hi Yura,
Thank you for the review. This patch is marked as “PoC”, so nothing has been finalized yet, and I am open to any comments and suggestions.
In theory, ConditionVariable relies on MyLatch, and in most cases a process has only one MyLatch, so your point that WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE should ideally be mutually exclusive makes sense.
However, the current code does not really follow that principle. Looking at the current code in WalSndWait():
```
if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
(event.events & WL_POSTMASTER_DEATH))
{
ConditionVariableCancelSleep();
proc_exit(1);
}
ConditionVariableCancelSleep();
```
It calls ConditionVariablePrepareToSleep() on a specific CV, so ideally only ConditionVariableSignal() on that specific CV should wake the wait.
However, the current code actually relies on WaitEventSetWait(FeBeWaitSet) waiting on MyLatch, where FeBeWaitSet includes WL_LATCH_SET. So if someone calls SetLatch(the_proc_latch), the wait will still wake up. The code does not check at all which CV was actually signaled.
With this patch, we would be able to verify whether the CV was signaled. The current PoC does not do that yet, because I simply followed the existing behavior, but with the new WaitEventSetWait() support, it would be easy to add such a check. From that perspective, I think the PoC is already better than the current code.
As for your question, “How will you distinguish which one was fired?”: when FeBeWaitSet includes both WL_LATCH_SET and WL_CONDITION_VARIABLE, and MyLatch is used for WL_LATCH_SET, then if the CV is signaled, WL_LATCH_SET may also fire as a side effect. I think that would be okay. Do you have a use case where that would lead to a problem?
Again, this PoC version is still far from the final version. Any discussion is very welcome.
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: PoC: Add condition variable support to WaitEventSetWait()
@ 2026-04-23 09:52 Yura Sokolov <[email protected]>
parent: Chao Li <[email protected]>
0 siblings, 1 reply; 10+ messages in thread
From: Yura Sokolov @ 2026-04-23 09:52 UTC (permalink / raw)
To: Chao Li <[email protected]>; +Cc: Xuneng Zhou <[email protected]>; PostgreSQL Hackers <[email protected]>
23.04.2026 11:15, Chao Li пишет:
>
>
>> On Apr 22, 2026, at 21:13, Yura Sokolov <[email protected]> wrote:
>>
>> 22.04.2026 05:58, Xuneng Zhou пишет:
>>> Hi Yura,
>>>
>>> On Wed, Apr 22, 2026 at 1:06 AM Yura Sokolov <[email protected]> wrote:
>>>>
>>>> 08.04.2026 11:22, Chao Li пишет:
>>>>>> On Apr 8, 2026, at 11:50, Chao Li <[email protected]> wrote:
>>>>>>> On Apr 2, 2026, at 15:38, Chao Li <[email protected]> wrote:
>>>>>>>> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
>>>>>>>>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> There is an XXX comment in WalSndWait():
>>>>>>>>> ```
>>>>>>>>> * XXX: A desirable future improvement would be to add support for CVs
>>>>>>>>> * into WaitEventSetWait().
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
>>>>>>>>>
>>>>>>>>> I’d like to highlight a few key points about the design:
>>>>>>>>>
>>>>>>>>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
>>>>>>>>>
>>>>>>>>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
>>>>>>>>>
>>>>>>>>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
>>>>>>>>>
>>>>>>>>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
>>>>>>>>>
>>>>>>>>> Here is the v1 patch.
>>>>>>>>>
>>>>>>>>
>>>>>>>> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
>>>>>>>
>>>>>>> PFA v2 - fixed a CI failure from contrib/postgres_fdw.
>>>>>>
>>>>>> Fixed a CI test failure and rebased.
>>>>>
>>>>> PFA v4 - try to fix a test failure on windows.
>>>>
>>>> Good day, Chao Li.
>>>>
>>>> ConditionalVariable works through the MyLatch.
>>>> And almost always there is single latch for one backend:
>>>> MyLatch = PGPROC->procLatch;
>>>>
>>>> (Single exception I found is XLogRecoveryCtl->recoveryWakeupLatch.)
>>>>
>>>> And that is where ConditionVariablePrepareToSleep were used in WalSndWait
>>>> but without check: FeBeWaitSet always waits for MyLatch.
>>>>
>>>> Therefore, I don't clear get, how you suggest to simultaneously wait
>>>> WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE?
>>>> How you will distinguish which one was fired?
>>>>
>>>> It looks to my, WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE had to be
>>>> mutual exclusive. At least unless ConditionVariable internal are changed.
>>>>
>>>> And check after waiting for conditional variable set had to be placed into
>>>> WaitEventSetWaitBlock where all other such checks are.
>>>>
>>>> All written above is just my humble opinion.
>>>> Excuse me if i'm too strict.
>>>>
>>>
>>> I think you're correct that ConditionVariableSignal() wakes the target
>>> process by calling SetLatch(&proc->procLatch). So every CV signal also
>>> sets MyLatch. The patch introduces ConditionVariableSignaled() to
>>> distinguish the two by checking CV wait-list membership (not the latch
>>> state itself), so in principle the two sources are distinguishable.
>>
>> Nope.
>> Patch may say if ConditionVariable was signalled.
>> But it cann't say if MyLatch were set as well without touching
>> ConditionVariable.
>> It is not better than current state in term of code clearness and flawlessness.
>
> Hi Yura,
>
> Thank you for the review. This patch is marked as “PoC”, so nothing has been finalized yet, and I am open to any comments and suggestions.
>
> In theory, ConditionVariable relies on MyLatch, and in most cases a process has only one MyLatch, so your point that WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE should ideally be mutually exclusive makes sense.
>
> However, the current code does not really follow that principle. Looking at the current code in WalSndWait():
> ```
> if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
> ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
> else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
> ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
> else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
> ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
>
> if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
> (event.events & WL_POSTMASTER_DEATH))
> {
> ConditionVariableCancelSleep();
> proc_exit(1);
> }
>
> ConditionVariableCancelSleep();
> ```
>
> It calls ConditionVariablePrepareToSleep() on a specific CV, so ideally only ConditionVariableSignal() on that specific CV should wake the wait.
>
> However, the current code actually relies on WaitEventSetWait(FeBeWaitSet) waiting on MyLatch, where FeBeWaitSet includes WL_LATCH_SET. So if someone calls SetLatch(the_proc_latch), the wait will still wake up. The code does not check at all which CV was actually signaled.
And looks like it doesn't need to. It seems to me, ConditionVariable is
used just as convenient way to wake up many processes. Just as list of latches.
> With this patch, we would be able to verify whether the CV was signaled. The current PoC does not do that yet, because I simply followed the existing behavior, but with the new WaitEventSetWait() support, it would be easy to add such a check.
This check is just your function ConditionVariableSignaled . This function
is only valuable addition of your patch.
But still: ConditionVariable has no value by itself. It is just a way to
notify that some condition MAY BE changed. One had to recheck condition it
waits any way, because ConditionVariable could be awaken spontanously. If
you read comments in ConditionVariableBroadcast, you will see, it is
totally legal to signal "one more backend" if several broadcasts are
performed simultaneously.
> From that perspective, I think the PoC is already better than the current code.
I disagree. It is my personal opinion.
> As for your question, “How will you distinguish which one was fired?”: when FeBeWaitSet includes both WL_LATCH_SET and WL_CONDITION_VARIABLE, and MyLatch is used for WL_LATCH_SET, then if the CV is signaled, WL_LATCH_SET may also fire as a side effect. I think that would be okay. Do you have a use case where that would lead to a problem?
If you detect ConditionVariable was fired, how you will distinguish, was
MyLatch set separately from ConditionVariable or not?
In current state of your patch there is no way.
So, WL_LATCH_SET | WL_CONDITIONAL_VARIABLE become meaningless.
> Again, this PoC version is still far from the final version. Any discussion is very welcome.
So we discuss.
I repeat: ConditionVariable by itself is meaningless. It exists to signal
about probably changed other condition.
Therefore, WL_LATCH_SET + ConditionVariableSignaled() is more than enough,
imho.
I still don't see need in WL_CONDITION_VARIABLE.
And the place you did patch for is single and not representative.
If you find more places where it could be useful, then it will be clearer
which way API should look like and which semantic it should implement.
imho. I could be wrong.
--
regards
Yura Sokolov aka funny-falcon
^ permalink raw reply [nested|flat] 10+ messages in thread
* Re: PoC: Add condition variable support to WaitEventSetWait()
@ 2026-04-23 10:31 Chao Li <[email protected]>
parent: Yura Sokolov <[email protected]>
0 siblings, 0 replies; 10+ messages in thread
From: Chao Li @ 2026-04-23 10:31 UTC (permalink / raw)
To: Yura Sokolov <[email protected]>; +Cc: Xuneng Zhou <[email protected]>; PostgreSQL Hackers <[email protected]>
> On Apr 23, 2026, at 17:52, Yura Sokolov <[email protected]> wrote:
>
> 23.04.2026 11:15, Chao Li пишет:
>>
>>
>>> On Apr 22, 2026, at 21:13, Yura Sokolov <[email protected]> wrote:
>>>
>>> 22.04.2026 05:58, Xuneng Zhou пишет:
>>>> Hi Yura,
>>>>
>>>> On Wed, Apr 22, 2026 at 1:06 AM Yura Sokolov <[email protected]> wrote:
>>>>>
>>>>> 08.04.2026 11:22, Chao Li пишет:
>>>>>>> On Apr 8, 2026, at 11:50, Chao Li <[email protected]> wrote:
>>>>>>>> On Apr 2, 2026, at 15:38, Chao Li <[email protected]> wrote:
>>>>>>>>> On Mar 31, 2026, at 16:59, Chao Li <[email protected]> wrote:
>>>>>>>>>> On Mar 31, 2026, at 15:28, Chao Li <[email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> There is an XXX comment in WalSndWait():
>>>>>>>>>> ```
>>>>>>>>>> * XXX: A desirable future improvement would be to add support for CVs
>>>>>>>>>> * into WaitEventSetWait().
>>>>>>>>>> ```
>>>>>>>>>>
>>>>>>>>>> I have been exploring a possible approach for that. This patch is a PoC that adds ConditionVariable support to WaitEventSet. This v1 is mainly intended to gather feedback on the design, so I have only done some basic testing so far, such as a normal logical replication workflow.
>>>>>>>>>>
>>>>>>>>>> I’d like to highlight a few key points about the design:
>>>>>>>>>>
>>>>>>>>>> 1. In the current WalSndWait(), although it prepares to sleep on a ConditionVariable, it does not actually check whether the CV has been signaled. In this PoC, I kept that same behavior. However, I tried to make the WaitEventSet support for CVs generic, so that if we want to add actual signal checking in the future, that would be possible.
>>>>>>>>>>
>>>>>>>>>> 2. To keep the design generic, this patch introduces a new wait event type, WL_CONDITION_VARIABLE. A WL_CONDITION_VARIABLE event occupies a position in the event array, similar to latch and socket events. When a CV is signaled, the corresponding WL_CONDITION_VARIABLE event is returned in occurred_events.
>>>>>>>>>>
>>>>>>>>>> 3. The WaitEventSet APIs AddWaitEventToSet() and ModifyWaitEvent() are extended to support CVs by adding one more parameter “cv" to both APIs. The downside of this approach is that all call sites of these two APIs need to be updated. I also considered adding separate APIs for CVs, such as AddWaitEventToSetForCV() and ModifyWaitEventForCV(), since CVs do not rely on the kernel and it might therefore make sense to decouple them from socket and latch handling. But for v1, I chose the more generic approach. I’d be interested in hearing comments on this part of the design.
>>>>>>>>>>
>>>>>>>>>> 4. One important point is that this patch extends the WaitEventSet abstraction, not the underlying kernel wait primitives. A ConditionVariable is still a userspace/shared-memory concept, but with this design it can participate in the same waiting framework as sockets and latches. I think that is useful because it allows mixed waits to be handled through one interface.
>>>>>>>>>>
>>>>>>>>>> Here is the v1 patch.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I just noticed that I missed checking in my last edit when switching to the other branch, so attaching an updated v1.
>>>>>>>>
>>>>>>>> PFA v2 - fixed a CI failure from contrib/postgres_fdw.
>>>>>>>
>>>>>>> Fixed a CI test failure and rebased.
>>>>>>
>>>>>> PFA v4 - try to fix a test failure on windows.
>>>>>
>>>>> Good day, Chao Li.
>>>>>
>>>>> ConditionalVariable works through the MyLatch.
>>>>> And almost always there is single latch for one backend:
>>>>> MyLatch = PGPROC->procLatch;
>>>>>
>>>>> (Single exception I found is XLogRecoveryCtl->recoveryWakeupLatch.)
>>>>>
>>>>> And that is where ConditionVariablePrepareToSleep were used in WalSndWait
>>>>> but without check: FeBeWaitSet always waits for MyLatch.
>>>>>
>>>>> Therefore, I don't clear get, how you suggest to simultaneously wait
>>>>> WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE?
>>>>> How you will distinguish which one was fired?
>>>>>
>>>>> It looks to my, WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE had to be
>>>>> mutual exclusive. At least unless ConditionVariable internal are changed.
>>>>>
>>>>> And check after waiting for conditional variable set had to be placed into
>>>>> WaitEventSetWaitBlock where all other such checks are.
>>>>>
>>>>> All written above is just my humble opinion.
>>>>> Excuse me if i'm too strict.
>>>>>
>>>>
>>>> I think you're correct that ConditionVariableSignal() wakes the target
>>>> process by calling SetLatch(&proc->procLatch). So every CV signal also
>>>> sets MyLatch. The patch introduces ConditionVariableSignaled() to
>>>> distinguish the two by checking CV wait-list membership (not the latch
>>>> state itself), so in principle the two sources are distinguishable.
>>>
>>> Nope.
>>> Patch may say if ConditionVariable was signalled.
>>> But it cann't say if MyLatch were set as well without touching
>>> ConditionVariable.
>>> It is not better than current state in term of code clearness and flawlessness.
>>
>> Hi Yura,
>>
>> Thank you for the review. This patch is marked as “PoC”, so nothing has been finalized yet, and I am open to any comments and suggestions.
>>
>> In theory, ConditionVariable relies on MyLatch, and in most cases a process has only one MyLatch, so your point that WL_LATCH_SET on MyLatch and WL_CONDITION_VARIABLE should ideally be mutually exclusive makes sense.
>>
>> However, the current code does not really follow that principle. Looking at the current code in WalSndWait():
>> ```
>> if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
>> ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
>> else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
>> ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
>> else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
>> ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
>>
>> if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
>> (event.events & WL_POSTMASTER_DEATH))
>> {
>> ConditionVariableCancelSleep();
>> proc_exit(1);
>> }
>>
>> ConditionVariableCancelSleep();
>> ```
>>
>> It calls ConditionVariablePrepareToSleep() on a specific CV, so ideally only ConditionVariableSignal() on that specific CV should wake the wait.
>>
>> However, the current code actually relies on WaitEventSetWait(FeBeWaitSet) waiting on MyLatch, where FeBeWaitSet includes WL_LATCH_SET. So if someone calls SetLatch(the_proc_latch), the wait will still wake up. The code does not check at all which CV was actually signaled.
>
> And looks like it doesn't need to. It seems to me, ConditionVariable is
> used just as convenient way to wake up many processes. Just as list of latches.
>
>> With this patch, we would be able to verify whether the CV was signaled. The current PoC does not do that yet, because I simply followed the existing behavior, but with the new WaitEventSetWait() support, it would be easy to add such a check.
>
> This check is just your function ConditionVariableSignaled . This function
> is only valuable addition of your patch.
>
> But still: ConditionVariable has no value by itself. It is just a way to
> notify that some condition MAY BE changed. One had to recheck condition it
> waits any way, because ConditionVariable could be awaken spontanously. If
> you read comments in ConditionVariableBroadcast, you will see, it is
> totally legal to signal "one more backend" if several broadcasts are
> performed simultaneously.
>
>> From that perspective, I think the PoC is already better than the current code.
>
> I disagree. It is my personal opinion.
>
>> As for your question, “How will you distinguish which one was fired?”: when FeBeWaitSet includes both WL_LATCH_SET and WL_CONDITION_VARIABLE, and MyLatch is used for WL_LATCH_SET, then if the CV is signaled, WL_LATCH_SET may also fire as a side effect. I think that would be okay. Do you have a use case where that would lead to a problem?
>
> If you detect ConditionVariable was fired, how you will distinguish, was
> MyLatch set separately from ConditionVariable or not?
> In current state of your patch there is no way.
> So, WL_LATCH_SET | WL_CONDITIONAL_VARIABLE become meaningless.
>
>> Again, this PoC version is still far from the final version. Any discussion is very welcome.
>
> So we discuss.
>
> I repeat: ConditionVariable by itself is meaningless. It exists to signal
> about probably changed other condition.
> Therefore, WL_LATCH_SET + ConditionVariableSignaled() is more than enough,
> imho.
> I still don't see need in WL_CONDITION_VARIABLE.
> And the place you did patch for is single and not representative.
>
> If you find more places where it could be useful, then it will be clearer
> which way API should look like and which semantic it should implement.
>
> imho. I could be wrong.
>
> --
> regards
> Yura Sokolov aka funny-falcon
I don’t have an immediate plan to work on this patch. We can wait for more voices.
Best regards,
--
Chao Li (Evan)
HighGo Software Co., Ltd.
https://www.highgo.com/
^ permalink raw reply [nested|flat] 10+ messages in thread
end of thread, other threads:[~2026-04-23 10:31 UTC | newest]
Thread overview: 10+ messages (download: mbox mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2026-03-31 07:28 PoC: Add condition variable support to WaitEventSetWait() Chao Li <[email protected]>
2026-04-02 07:38 ` Chao Li <[email protected]>
2026-04-08 03:50 ` Chao Li <[email protected]>
2026-04-08 08:22 ` Chao Li <[email protected]>
2026-04-21 17:06 ` Yura Sokolov <[email protected]>
2026-04-22 02:58 ` Xuneng Zhou <[email protected]>
2026-04-22 13:13 ` Yura Sokolov <[email protected]>
2026-04-23 08:15 ` Chao Li <[email protected]>
2026-04-23 09:52 ` Yura Sokolov <[email protected]>
2026-04-23 10:31 ` Chao Li <[email protected]>
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox