public inbox for [email protected]
help / color / mirror / Atom feedFrom: Jacob Champion <[email protected]>
To: PostgreSQL Hackers <[email protected]>
Cc: Daniel Gustafsson <[email protected]>
Cc: Thomas Munro <[email protected]>
Cc: Peter Eisentraut <[email protected]>
Subject: Re: [PATCH] OAuth: fix performance bug with stuck multiplexer events
Date: Mon, 28 Jul 2025 13:52:42 -0700
Message-ID: <CAOYmi+mRhhzGUvrcdickepAnsdaGbqhNcboNe4-YvgtkSzGNbQ@mail.gmail.com> (raw)
In-Reply-To: <CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com>
References: <CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com>
Hi all,
On Thu, Jun 26, 2025 at 4:33 PM Jacob Champion
<[email protected]> wrote:
> My plan, if this code seems reasonable, is to backport 0001-0003, but
> keep the larger 0004 on HEAD only until it has proven to be stable.
> It's a big new suite and I want to make sure it's not flapping on some
> buildfarm animal. Eventually I'll backport that too.
Any thoughts on the approach? Too big/too scary/too BSD-specific?
A small bit of self-review: a comment I wrote in the tests suggested
that the choice of readable/writable events was up to the multiplexer
implementation, but it *must* choose readable, due to the hardcoded
use of PGRES_POLLING_READING throughout the current code. Updated in
v2.
Thanks,
--Jacob
1: ddb7875bb58 = 1: 379c12b5d26 oauth: Remove stale events from the kqueue multiplexer
2: a871ce498ea = 2: f30317d7265 oauth: Remove expired timers from the multiplexer
3: 5033b6d51c1 = 3: d243d28964d oauth: Track total call count during a client flow
4: f4a640f7995 ! 4: ca6fd237653 oauth: Add unit tests for multiplexer handling
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
-+ * here to record that expectation, but it's not a required part of the API. If
-+ * you've added a new implementation that doesn't have that behavior, feel free
-+ * to modify this test.
++ * here to record that expectation; PGRES_POLLING_READING is hardcoded
++ * throughout the flow and would need to be changed if a new multiplexer does
++ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
Attachments:
[text/plain] since-v1.diff.txt (1.0K, 2-since-v1.diff.txt)
download | inline:
1: ddb7875bb58 = 1: 379c12b5d26 oauth: Remove stale events from the kqueue multiplexer
2: a871ce498ea = 2: f30317d7265 oauth: Remove expired timers from the multiplexer
3: 5033b6d51c1 = 3: d243d28964d oauth: Track total call count during a client flow
4: f4a640f7995 ! 4: ca6fd237653 oauth: Add unit tests for multiplexer handling
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
-+ * here to record that expectation, but it's not a required part of the API. If
-+ * you've added a new implementation that doesn't have that behavior, feel free
-+ * to modify this test.
++ * here to record that expectation; PGRES_POLLING_READING is hardcoded
++ * throughout the flow and would need to be changed if a new multiplexer does
++ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
[application/octet-stream] v2-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patch (10.2K, 3-v2-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patch)
download | inline diff:
From 379c12b5d26c93eaf104bed128f58a7a99b1e0b4 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Tue, 10 Jun 2025 16:38:59 -0700
Subject: [PATCH v2 1/4] oauth: Remove stale events from the kqueue multiplexer
If a socket is added to the kqueue, becomes readable/writable, and
subsequently becomes non-readable/writable again, the kqueue itself will
remain readable until either the socket registration is removed, or the
stale event is cleared via a call to kevent().
In many simple cases, Curl itself will remove the socket registration
quickly, but in real-world usage, this is not guaranteed to happen. The
kqueue can then remain stuck in a permanently readable state until the
request ends, which results in pointless wakeups for the client and
wasted CPU time.
Implement drain_socket_events() to call kevent() and unstick any stale
events. This is called right after drive_request(), before we return
control to the client to wait. To make sure we've taken a look at the
entire queue, register_socket() now tracks the number of outstanding
registrations.
Suggested-by: Thomas Munro <[email protected]>
---
src/interfaces/libpq-oauth/oauth-curl.c | 218 ++++++++++++++++++------
1 file changed, 166 insertions(+), 52 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index dba9a684fa8..8430356cfb5 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,10 @@ struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+
+#if defined(HAVE_SYS_EVENT_H)
+ int nevents; /* how many events are we waiting on? */
+#endif
};
/*
@@ -1291,41 +1295,95 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return 0;
#elif defined(HAVE_SYS_EVENT_H)
- struct kevent ev[2] = {0};
+ struct kevent ev[2];
struct kevent ev_out[2];
struct timespec timeout = {0};
- int nev = 0;
+ int nev;
int res;
+ /*
+ * First, any existing registrations for this socket need to be removed,
+ * both to track the outstanding number of events, and to ensure that
+ * we're not woken up for things that Curl no longer cares about.
+ *
+ * ENOENT is okay, but we have to track how many we get, so use
+ * EV_RECEIPT.
+ */
+ nev = 0;
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ nev++;
+
+ Assert(nev <= lengthof(ev));
+ Assert(nev <= lengthof(ev_out));
+
+ res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout);
+ if (res < 0)
+ {
+ actx_error(actx, "could not delete from kqueue: %m");
+ return -1;
+ }
+
+ /*
+ * We can't use the simple errno version of kevent, because we need to
+ * skip over ENOENT while still allowing a second change to be processed.
+ * So we need a longer-form error checking loop.
+ */
+ for (int i = 0; i < res; ++i)
+ {
+ /*
+ * EV_RECEIPT should guarantee one EV_ERROR result for every change,
+ * whether successful or not. Failed entries contain a non-zero errno
+ * in the data field.
+ */
+ Assert(ev_out[i].flags & EV_ERROR);
+
+ errno = ev_out[i].data;
+ if (!errno)
+ {
+ /* Successfully removed; update the event count. */
+ Assert(actx->nevents > 0);
+ actx->nevents--;
+ }
+ else if (errno != ENOENT)
+ {
+ actx_error(actx, "could not delete from kqueue: %m");
+ return -1;
+ }
+ }
+
+ /* If we're only removing registrations, we're done. */
+ if (what == CURL_POLL_REMOVE)
+ return 0;
+
+ /*
+ * Now add the new filters. This is more straightfoward than deletion.
+ *
+ * Combining this kevent() call with the one above seems like it should be
+ * theoretically possible, but beware that not all BSDs keep the original
+ * event flags when using EV_RECEIPT, so it's tricky to figure out which
+ * operations succeeded. For now we keep the deletions and the additions
+ * separate.
+ */
+ nev = 0;
+
switch (what)
{
case CURL_POLL_IN:
- EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0);
nev++;
break;
case CURL_POLL_OUT:
- EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0);
nev++;
break;
case CURL_POLL_INOUT:
- EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
- nev++;
- EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
- nev++;
- break;
-
- case CURL_POLL_REMOVE:
-
- /*
- * We don't know which of these is currently registered, perhaps
- * both, so we try to remove both. This means we need to tolerate
- * ENOENT below.
- */
- EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0);
nev++;
- EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+ EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0);
nev++;
break;
@@ -1334,45 +1392,91 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
return -1;
}
- res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout);
+ Assert(nev <= lengthof(ev));
+
+ res = kevent(actx->mux, ev, nev, NULL, 0, NULL);
if (res < 0)
{
actx_error(actx, "could not modify kqueue: %m");
return -1;
}
+ /* Update the event count, and we're done. */
+ actx->nevents += nev;
+
+ return 0;
+#else
+#error register_socket is not implemented on this platform
+#endif
+}
+
+/*-------
+ * Drains any stale level-triggered events out of the multiplexer. This is
+ * necessary only if the mux implementation requires it.
+ *
+ * As an example, consider the following sequence of events:
+ * 1. libcurl tries to write data to the send buffer, but it fills up.
+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
+ * client to wait.
+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
+ * and the client wakes up and calls back into the flow.
+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
+ * The socket is no longer writable.
+ *
+ * At this point, an epoll-based mux no longer signals readiness, so nothing
+ * further needs to be done. But a kqueue-based mux will continue to signal
+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
+ * or the old socket-writable event is read from the queue. Since Curl isn't
+ * guaranteed to do the former, we must do the latter here.
+ */
+static bool
+drain_socket_events(struct async_ctx *actx)
+{
+#if defined(HAVE_SYS_EPOLL_H)
+ /* The epoll implementation doesn't need to drain pending events. */
+ return true;
+#elif defined(HAVE_SYS_EVENT_H)
+ struct timespec timeout = {0};
+ struct kevent *drain;
+ int drain_len;
+
/*
- * We can't use the simple errno version of kevent, because we need to
- * skip over ENOENT while still allowing a second change to be processed.
- * So we need a longer-form error checking loop.
+ * Drain the events in one call, rather than looping. (We could maybe call
+ * kevent() drain_len times, instead of allocating space for the maximum
+ * number of events, but that relies on the events being in FIFO order to
+ * avoid starvation. The kqueue man pages don't seem to make any
+ * guarantees about that.)
+ *
+ * register_socket() keeps actx->nevents updated with the number of
+ * outstanding event filters. We don't track the registration of the
+ * timer; we just assume one could be registered here.
*/
- for (int i = 0; i < res; ++i)
+ drain_len = actx->nevents + 1;
+
+ drain = malloc(sizeof(*drain) * drain_len);
+ if (!drain)
{
- /*
- * EV_RECEIPT should guarantee one EV_ERROR result for every change,
- * whether successful or not. Failed entries contain a non-zero errno
- * in the data field.
- */
- Assert(ev_out[i].flags & EV_ERROR);
+ actx_error(actx, "out of memory");
+ return false;
+ }
- errno = ev_out[i].data;
- if (errno && errno != ENOENT)
- {
- switch (what)
- {
- case CURL_POLL_REMOVE:
- actx_error(actx, "could not delete from kqueue: %m");
- break;
- default:
- actx_error(actx, "could not add to kqueue: %m");
- }
- return -1;
- }
+ /*
+ * Discard all pending events. Since our registrations are level-triggered
+ * (even the timer, since we use a chained kqueue for that instead of an
+ * EVFILT_TIMER on the top-level mux!), any events that we still need will
+ * remain signalled, and the stale ones will be swept away.
+ */
+ if (kevent(actx->mux, NULL, 0, drain, drain_len, &timeout) < 0)
+ {
+ actx_error(actx, "could not drain kqueue: %m");
+ free(drain);
+ return false;
}
- return 0;
+ free(drain);
+ return true;
#else
-#error register_socket is not implemented on this platform
+#error drain_socket_events is not implemented on this platform
#endif
}
@@ -1441,7 +1545,8 @@ set_timer(struct async_ctx *actx, long timeout)
* macOS.)
*
* If there was no previous timer set, the kevent calls will result in
- * ENOENT, which is fine.
+ * ENOENT, which is fine. (We don't track actx->nevents for this case;
+ * instead, drain_socket_events() just assumes a timer could be set.)
*/
EV_SET(&ev, 1, EVFILT_TIMER, EV_DELETE, 0, 0, 0);
if (kevent(actx->timerfd, &ev, 1, NULL, 0, NULL) < 0 && errno != ENOENT)
@@ -2755,13 +2860,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
if (status == PGRES_POLLING_FAILED)
goto error_return;
- else if (status != PGRES_POLLING_OK)
- {
- /* not done yet */
- return status;
- }
+ else if (status == PGRES_POLLING_OK)
+ break; /* done! */
+
+ /*
+ * This request is still running.
+ *
+ * Drain any stale socket events from the mux before we
+ * ask the client to poll. (Currently, this can occur only
+ * with kqueue.) If this is forgotten, the multiplexer can
+ * get stuck in a signalled state and we'll burn CPU
+ * cycles pointlessly.
+ */
+ if (!drain_socket_events(actx))
+ goto error_return;
- break;
+ return status;
}
case OAUTH_STEP_WAIT_INTERVAL:
--
2.34.1
[application/octet-stream] v2-0002-oauth-Remove-expired-timers-from-the-multiplexer.patch (5.6K, 4-v2-0002-oauth-Remove-expired-timers-from-the-multiplexer.patch)
download | inline diff:
From f30317d7265bf463b0c0f6c3b92097e021761c95 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Wed, 5 Mar 2025 13:16:48 -0800
Subject: [PATCH v2 2/4] oauth: Remove expired timers from the multiplexer
In a case similar to the previous commit, an expired timer can remain
permanently readable if Curl does not remove the timeout itself. Since
that removal isn't guaranteed to happen in real-world situations,
implement drain_timer_events() to reset the timer before calling into
drive_request().
Moving to drain_timer_events() happens to fix a logic bug in the
previous caller of timer_expired(), which treated an error condition as
if the timer were expired instead of bailing out.
The previous implementation of timer_expired() gave differing results
for epoll and kqueue if the timer was reset. (For epoll, a reset timer
was considered to be expired, and for kqueue it was not.) This didn't
previously cause problems, since timer_expired() was only called while
the timer was known to be set, but both implementations now use the
kqueue logic.
---
src/interfaces/libpq-oauth/oauth-curl.c | 108 +++++++++++++++---------
1 file changed, 68 insertions(+), 40 deletions(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 8430356cfb5..78ba3399495 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1588,40 +1588,20 @@ set_timer(struct async_ctx *actx, long timeout)
/*
* Returns 1 if the timeout in the multiplexer set has expired since the last
- * call to set_timer(), 0 if the timer is still running, or -1 (with an
- * actx_error() report) if the timer cannot be queried.
+ * call to set_timer(), 0 if the timer is either still running or disarmed, or
+ * -1 (with an actx_error() report) if the timer cannot be queried.
*/
static int
timer_expired(struct async_ctx *actx)
{
-#if defined(HAVE_SYS_EPOLL_H)
- struct itimerspec spec = {0};
-
- if (timerfd_gettime(actx->timerfd, &spec) < 0)
- {
- actx_error(actx, "getting timerfd value: %m");
- return -1;
- }
-
- /*
- * This implementation assumes we're using single-shot timers. If you
- * change to using intervals, you'll need to reimplement this function
- * too, possibly with the read() or select() interfaces for timerfd.
- */
- Assert(spec.it_interval.tv_sec == 0
- && spec.it_interval.tv_nsec == 0);
-
- /* If the remaining time to expiration is zero, we're done. */
- return (spec.it_value.tv_sec == 0
- && spec.it_value.tv_nsec == 0);
-#elif defined(HAVE_SYS_EVENT_H)
+#if defined(HAVE_SYS_EPOLL_H) || defined(HAVE_SYS_EVENT_H)
int res;
- /* Is the timer queue ready? */
+ /* Is the timer ready? */
res = PQsocketPoll(actx->timerfd, 1 /* forRead */ , 0, 0);
if (res < 0)
{
- actx_error(actx, "checking kqueue for timeout: %m");
+ actx_error(actx, "checking timer expiration: %m");
return -1;
}
@@ -1653,6 +1633,36 @@ register_timer(CURLM *curlm, long timeout, void *ctx)
return 0;
}
+/*
+ * Removes any expired-timer event from the multiplexer. If was_expired is not
+ * NULL, it will contain whether or not the timer was expired at time of call.
+ */
+static bool
+drain_timer_events(struct async_ctx *actx, bool *was_expired)
+{
+ int res;
+
+ res = timer_expired(actx);
+ if (res < 0)
+ return false;
+
+ if (res > 0)
+ {
+ /*
+ * Timer is expired. We could drain the event manually from the
+ * timerfd, but it's easier to simply disable it; that keeps the
+ * platform-specific code in set_timer().
+ */
+ if (!set_timer(actx, -1))
+ return false;
+ }
+
+ if (was_expired)
+ *was_expired = (res > 0);
+
+ return true;
+}
+
/*
* Prints Curl request debugging information to stderr.
*
@@ -2856,6 +2866,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
{
PostgresPollingStatusType status;
+ /*
+ * Clear any expired timeout before calling back into
+ * Curl. Curl is not guaranteed to do this for us, because
+ * its API expects us to use single-shot (i.e.
+ * edge-triggered) timeouts, and ours are level-triggered
+ * via the mux.
+ *
+ * This can't be combined with the drain_socket_events()
+ * call below: we might accidentally clear a short timeout
+ * that was both set and expired during the call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
+ goto error_return;
+
+ /* Move the request forward. */
status = drive_request(actx);
if (status == PGRES_POLLING_FAILED)
@@ -2879,24 +2905,26 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
}
case OAUTH_STEP_WAIT_INTERVAL:
-
- /*
- * The client application is supposed to wait until our timer
- * expires before calling PQconnectPoll() again, but that
- * might not happen. To avoid sending a token request early,
- * check the timer before continuing.
- */
- if (!timer_expired(actx))
{
- set_conn_altsock(conn, actx->timerfd);
- return PGRES_POLLING_READING;
- }
+ bool expired;
- /* Disable the expired timer. */
- if (!set_timer(actx, -1))
- goto error_return;
+ /*
+ * The client application is supposed to wait until our
+ * timer expires before calling PQconnectPoll() again, but
+ * that might not happen. To avoid sending a token request
+ * early, check the timer before continuing.
+ */
+ if (!drain_timer_events(actx, &expired))
+ goto error_return;
- break;
+ if (!expired)
+ {
+ set_conn_altsock(conn, actx->timerfd);
+ return PGRES_POLLING_READING;
+ }
+
+ break;
+ }
}
/*
--
2.34.1
[application/octet-stream] v2-0003-oauth-Track-total-call-count-during-a-client-flow.patch (4.8K, 5-v2-0003-oauth-Track-total-call-count-during-a-client-flow.patch)
download | inline diff:
From d243d28964dd7afb28abfaf470dc7612e7235cb9 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Fri, 6 Jun 2025 15:22:41 -0700
Subject: [PATCH v2 3/4] oauth: Track total call count during a client flow
Tracking down the bugs that led to the addition of drain_socket_events()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
calls into the flow as part of debug mode, and print the total when the
flow finishes.
A new test makes sure the total count is less than 100. (We expect
something on the order of 10.) This isn't foolproof, but it is able to
catch several regressions in the logic of the prior two commits, and
future work to add TLS support to the oauth_validator test server should
strengthen it as well.
---
src/interfaces/libpq-oauth/oauth-curl.c | 22 +++++++++++++
.../modules/oauth_validator/t/001_server.pl | 31 ++++++++++++++++++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 78ba3399495..68303106a5d 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,7 @@ struct async_ctx
bool user_prompted; /* have we already sent the authz prompt? */
bool used_basic_auth; /* did we send a client secret? */
bool debugging; /* can we give unsafe developer assistance? */
+ int dbg_num_calls; /* (debug mode) how many times were we called? */
#if defined(HAVE_SYS_EVENT_H)
int nevents; /* how many events are we waiting on? */
@@ -3074,6 +3075,8 @@ PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
PostgresPollingStatusType result;
+ fe_oauth_state *state = conn_sasl_state(conn);
+ struct async_ctx *actx;
#ifndef WIN32
sigset_t osigset;
bool sigpipe_pending;
@@ -3102,6 +3105,25 @@ pg_fe_run_oauth_flow(PGconn *conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
+ * To assist with finding bugs in drain_socket_events() and
+ * drain_timer_events(), when we're in debug mode, track the total number
+ * of calls to this function and print that at the end of the flow.
+ *
+ * Be careful that state->async_ctx could be NULL if early initialization
+ * fails during the first call.
+ */
+ actx = state->async_ctx;
+ Assert(actx || result == PGRES_POLLING_FAILED);
+
+ if (actx && actx->debugging)
+ {
+ actx->dbg_num_calls++;
+ if (result == PGRES_POLLING_OK || result == PGRES_POLLING_FAILED)
+ fprintf(stderr, "[libpq] total number of polls: %d\n",
+ actx->dbg_num_calls);
+ }
+
#ifndef WIN32
if (masked)
{
diff --git a/src/test/modules/oauth_validator/t/001_server.pl b/src/test/modules/oauth_validator/t/001_server.pl
index 41672ebd5c6..c0dafb8be76 100644
--- a/src/test/modules/oauth_validator/t/001_server.pl
+++ b/src/test/modules/oauth_validator/t/001_server.pl
@@ -418,6 +418,35 @@ $node->connect_fails(
qr/failed to obtain access token: mutual TLS required for client \(invalid_client\)/
);
+# Count the number of calls to the internal flow when multiple retries are
+# triggered. The exact number depends on many things -- the TCP stack, the
+# version of Curl in use, random chance -- but a ridiculously high number
+# suggests something is wrong with our ability to clear multiplexer events after
+# they're no longer applicable.
+my ($ret, $stdout, $stderr) = $node->psql(
+ 'postgres',
+ "SELECT 'connected for call count'",
+ extra_params => ['-w'],
+ connstr => connstr(stage => 'token', retries => 2),
+ on_error_stop => 0);
+
+is($ret, 0, "call count connection succeeds");
+like(
+ $stderr,
+ qr@Visit https://example\.com/ and enter the code: postgresuser@,
+ "call count: stderr matches");
+
+my $count_pattern = qr/\[libpq\] total number of polls: (\d+)/;
+if (like($stderr, $count_pattern, "call count: count is printed"))
+{
+ # For reference, a typical flow with two retries might take between 5-15
+ # calls to the client implementation. And while this will probably continue
+ # to change across OSes and Curl updates, we're likely in trouble if we see
+ # hundreds or thousands of calls.
+ $stderr =~ $count_pattern;
+ cmp_ok($1, '<', 100, "call count is reasonably small");
+}
+
# Stress test: make sure our builtin flow operates correctly even if the client
# application isn't respecting PGRES_POLLING_READING/WRITING signals returned
# from PQconnectPoll().
@@ -428,7 +457,7 @@ my @cmd = (
connstr(stage => 'all', retries => 1, interval => 1));
note "running '" . join("' '", @cmd) . "'";
-my ($stdout, $stderr) = run_command(\@cmd);
+($stdout, $stderr) = run_command(\@cmd);
like($stdout, qr/connection succeeded/, "stress-async: stdout matches");
unlike(
--
2.34.1
[application/octet-stream] v2-0004-oauth-Add-unit-tests-for-multiplexer-handling.patch (17.2K, 6-v2-0004-oauth-Add-unit-tests-for-multiplexer-handling.patch)
download | inline diff:
From ca6fd237653d8de9038b8f81a2c08c882a7e5f51 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Wed, 5 Mar 2025 15:04:34 -0800
Subject: [PATCH v2 4/4] oauth: Add unit tests for multiplexer handling
To better record the internal behaviors of oauth-curl.c, add a unit test
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.
---
src/interfaces/libpq-oauth/Makefile | 14 +
src/interfaces/libpq-oauth/meson.build | 35 ++
src/interfaces/libpq-oauth/t/001_oauth.pl | 24 +
src/interfaces/libpq-oauth/test-oauth-curl.c | 474 +++++++++++++++++++
4 files changed, 547 insertions(+)
create mode 100644 src/interfaces/libpq-oauth/t/001_oauth.pl
create mode 100644 src/interfaces/libpq-oauth/test-oauth-curl.c
diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile
index 270fc0cf2d9..9da8e4b7143 100644
--- a/src/interfaces/libpq-oauth/Makefile
+++ b/src/interfaces/libpq-oauth/Makefile
@@ -79,5 +79,19 @@ uninstall:
rm -f '$(DESTDIR)$(libdir)/$(stlib)'
rm -f '$(DESTDIR)$(libdir)/$(shlib)'
+.PHONY: all-tests
+all-tests: oauth_tests$(X)
+
+oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
+ $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
+
+check: all-tests
+ $(prove_check)
+
+installcheck: all-tests
+ $(prove_installcheck)
+
clean distclean: clean-lib
rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
+ rm -f test-oauth-curl.o oauth_tests$(X)
+ rm -rf tmp_check
diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build
index df064c59a40..505e1671b86 100644
--- a/src/interfaces/libpq-oauth/meson.build
+++ b/src/interfaces/libpq-oauth/meson.build
@@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name,
link_args: export_fmt.format(export_file.full_path()),
kwargs: default_lib_args,
)
+
+libpq_oauth_test_deps = []
+
+oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources
+
+if host_system == 'windows'
+ oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+ '--NAME', 'oauth_tests',
+ '--FILEDESC', 'OAuth unit test program',])
+endif
+
+libpq_oauth_test_deps += executable('oauth_tests',
+ oauth_test_sources,
+ dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps],
+ kwargs: default_bin_args + {
+ 'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args,
+ 'c_pch': pch_postgres_fe_h,
+ 'include_directories': [libpq_inc, postgres_inc],
+ 'install': false,
+ }
+)
+
+testprep_targets += libpq_oauth_test_deps
+
+tests += {
+ 'name': 'libpq-oauth',
+ 'sd': meson.current_source_dir(),
+ 'bd': meson.current_build_dir(),
+ 'tap': {
+ 'tests': [
+ 't/001_oauth.pl',
+ ],
+ 'deps': libpq_oauth_test_deps,
+ },
+}
diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl
new file mode 100644
index 00000000000..e769856c2c9
--- /dev/null
+++ b/src/interfaces/libpq-oauth/t/001_oauth.pl
@@ -0,0 +1,24 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Defer entirely to the oauth_tests executable. stdout/err is routed through
+# Test::More so that our logging infrastructure can handle it correctly. Using
+# IPC::Run::new_chunker seems to help interleave the two streams a little better
+# than without.
+#
+# TODO: prove can also deal with native executables itself, which we could
+# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup
+# calls Perl directly, which would require more code to work around... and
+# there's still the matter of logging.
+my $builder = Test::More->builder;
+my $out = $builder->output;
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
+ '>', IPC::Run::new_chunker, sub { print {$out} $_[0] },
+ '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] }
+ or die "oauth_tests returned $?";
diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c
new file mode 100644
index 00000000000..1a03b0fc552
--- /dev/null
+++ b/src/interfaces/libpq-oauth/test-oauth-curl.c
@@ -0,0 +1,474 @@
+/*
+ * test-oauth-curl.c
+ *
+ * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
+ * the tests reference static functions and other internals.
+ *
+ * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
+ * must-succeed code as part of test setup.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ */
+
+#include "oauth-curl.c"
+
+#include <fcntl.h>
+
+#ifdef USE_ASSERT_CHECKING
+
+/*
+ * TAP Helpers
+ */
+
+static int num_tests = 0;
+
+/*
+ * Reports ok/not ok to the TAP stream on stdout.
+ */
+#define ok(OK, TEST) \
+ ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
+
+static bool
+ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
+{
+ printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
+
+ if (!ok)
+ {
+ printf("# at %s:%d:\n", file, line);
+ printf("# expression is false: %s\n", teststr);
+ }
+
+ return ok;
+}
+
+/*
+ * Like ok(this == that), but with more diagnostics on failure.
+ *
+ * Only works on ints, but luckily that's all we need here. Note that the much
+ * simpler-looking macro implementation
+ *
+ * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
+ *
+ * suffers from multiple evaluation of the macro arguments...
+ */
+#define is(THIS, THAT, TEST) \
+ do { \
+ int this_ = (THIS), \
+ that_ = (THAT); \
+ is_diag( \
+ ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
+ this_, #THIS, that_, #THAT \
+ ); \
+ } while (0)
+
+static bool
+is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
+{
+ if (!ok)
+ printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that);
+
+ return ok;
+}
+
+/*
+ * Utilities
+ */
+
+/*
+ * Creates a partially-initialized async_ctx for the purposes of testing. Free
+ * with free_test_actx().
+ */
+static struct async_ctx *
+init_test_actx(void)
+{
+ struct async_ctx *actx;
+
+ actx = calloc(1, sizeof(*actx));
+ Assert(actx);
+
+ actx->mux = PGINVALID_SOCKET;
+ actx->timerfd = -1;
+ actx->debugging = true;
+
+ initPQExpBuffer(&actx->errbuf);
+
+ Assert(setup_multiplexer(actx));
+
+ return actx;
+}
+
+static void
+free_test_actx(struct async_ctx *actx)
+{
+ termPQExpBuffer(&actx->errbuf);
+
+ if (actx->mux != PGINVALID_SOCKET)
+ close(actx->mux);
+ if (actx->timerfd >= 0)
+ close(actx->timerfd);
+
+ free(actx);
+}
+
+static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */
+
+/*
+ * Writes to the write side of a pipe until it won't take any more data. Returns
+ * the amount written.
+ */
+static ssize_t
+fill_pipe(int fd)
+{
+ int mode;
+ ssize_t written = 0;
+
+ /* Don't block. */
+ Assert((mode = fcntl(fd, F_GETFL)) != -1);
+ Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
+
+ while (true)
+ {
+ ssize_t w;
+
+ w = write(fd, dummy_buf, sizeof(dummy_buf));
+ if (w < 0)
+ {
+ if (errno != EAGAIN && errno != EWOULDBLOCK)
+ {
+ perror("write to pipe");
+ written = -1;
+ }
+ break;
+ }
+
+ written += w;
+ }
+
+ /* Reset the descriptor flags. */
+ Assert(fcntl(fd, F_SETFD, mode) == 0);
+
+ return written;
+}
+
+/*
+ * Drains the requested amount of data from the read side of a pipe.
+ */
+static bool
+drain_pipe(int fd, ssize_t n)
+{
+ Assert(n > 0);
+
+ while (n)
+ {
+ size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
+ ssize_t drained;
+
+ drained = read(fd, dummy_buf, to_read);
+ if (drained < 0)
+ {
+ perror("read from pipe");
+ return false;
+ }
+
+ n -= drained;
+ }
+
+ return true;
+}
+
+/*
+ * Tests whether the multiplexer is marked ready by the deadline. This is a
+ * macro so that file/line information makes sense during failures.
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
+ * here to record that expectation; PGRES_POLLING_READING is hardcoded
+ * throughout the flow and would need to be changed if a new multiplexer does
+ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
+ Assert(res_ != -1); \
+ ok(res_ > 0, "multiplexer is ready " TEST); \
+ } while (0)
+
+/*
+ * The opposite of mux_is_ready().
+ */
+#define mux_is_not_ready(MUX, TEST) \
+ do { \
+ int res_ = PQsocketPoll(MUX, 1, 0, 0); \
+ Assert(res_ != -1); \
+ is(res_, 0, "multiplexer is not ready " TEST); \
+ } while (0)
+
+/*
+ * Test Suites
+ */
+
+/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
+static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
+
+static void
+test_set_timer(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+
+ printf("# test_set_timer\n");
+
+ /* A zero-duration timer should result in a near-immediate ready signal. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer expires");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
+
+ /* Resetting the timer far in the future should unset the ready signal. */
+ Assert(set_timer(actx, INT_MAX));
+ mux_is_not_ready(actx->mux, "when timer is reset to the future");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
+
+ /* Setting another zero-duration timer should override the previous one. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired");
+ is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
+
+ /* And disabling that timer should once again unset the ready signal. */
+ Assert(set_timer(actx, -1));
+ mux_is_not_ready(actx->mux, "when timer is unset");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
+
+ {
+ bool expired;
+
+ /* Make sure drain_timer_events() functions correctly as well. */
+ Assert(set_timer(actx, 0));
+ mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
+
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained after expiring");
+ is(expired, 1, "drain_timer_events() reports expiration");
+ is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
+
+ /* A second drain should do nothing. */
+ Assert(drain_timer_events(actx, &expired));
+ mux_is_not_ready(actx->mux, "when timer is drained a second time");
+ is(expired, 0, "drain_timer_events() reports no expiration");
+ is(timer_expired(actx), 0, "timer_expired() still returns 0");
+ }
+
+ free_test_actx(actx);
+}
+
+static void
+test_register_socket(void)
+{
+ struct async_ctx *actx = init_test_actx();
+ int pipefd[2];
+ int rfd,
+ wfd;
+ bool bidirectional;
+
+ /* Create a local pipe for communication. */
+ Assert(pipe(pipefd) == 0);
+ rfd = pipefd[0];
+ wfd = pipefd[1];
+
+ /*
+ * Some platforms (FreeBSD) implement bidirectional pipes, affecting the
+ * behavior of some of these tests. Store that knowledge for later.
+ */
+ bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
+
+ /*
+ * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
+ * read/write operations, respectively, and once using CURL_POLL_INOUT for
+ * both sides.
+ */
+ for (int inout = 0; inout < 2; inout++)
+ {
+ const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
+ const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
+ const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+ size_t bidi_pipe_size = 0; /* silence compiler warnings */
+
+ printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
+
+ /*
+ * At the start of the test, the read side should be blocked and the
+ * write side should be open. (There's a mistake at the end of this
+ * loop otherwise.)
+ */
+ Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
+ Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
+
+ /*
+ * For bidirectional systems, emulate unidirectional behavior here by
+ * filling up the "read side" of the pipe.
+ */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Listen on the read side. The multiplexer shouldn't be ready yet. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is not readable");
+
+ /* Writing to the pipe should result in a read-ready multiplexer. */
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable");
+
+ /*
+ * Update the registration to wait on write events instead. The
+ * multiplexer should be unset.
+ */
+ Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
+
+ /* Re-register for read events. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for reads again");
+
+ /* Stop listening. The multiplexer should be unset. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when readable fd is removed");
+
+ /* Listen again. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the old
+ * event is drained.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_socket_events(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+
+ /* Listen on the write side. An empty buffer should be writable. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when fd is writable");
+
+ /* As above, wait on read events instead. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
+
+ /* Re-register for write events. */
+ Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+ mux_is_ready(actx->mux, deadline, "when waiting for writes again");
+
+ {
+ ssize_t written;
+
+ /*
+ * Fill the pipe. Once the old writable event is drained, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
+ Assert(drain_socket_events(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
+ Assert(drain_pipe(rfd, written));
+ mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
+ }
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+ mux_is_not_ready(actx->mux, "when fd is removed");
+
+ /* Make sure an expired timer doesn't interfere with event draining. */
+ {
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+ /* Set the timer and wait for it to expire. */
+ Assert(set_timer(actx, 0));
+ Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
+ is(timer_expired(actx), 1, "timer is expired");
+
+ /* Register for read events and make the fd readable. */
+ Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+ Assert(write(wfd, "x", 1) == 1);
+ mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once the
+ * old event is drained and the timer is reset.
+ *
+ * Order matters to avoid false negatives. First drain the socket,
+ * then unset the timer. We're trying to catch the case where the
+ * pending timer expiration event takes the place of one of the
+ * socket events we're attempting to drain.
+ */
+ Assert(drain_pipe(rfd, 1));
+ Assert(drain_socket_events(actx));
+ Assert(set_timer(actx, -1));
+
+ is(timer_expired(actx), 0, "timer is no longer expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
+ /* Stop listening. */
+ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+ /* Undo any unidirectional emulation. */
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
+ }
+
+ close(rfd);
+ close(wfd);
+ free_test_actx(actx);
+}
+
+int
+main(int argc, char *argv[])
+{
+ const char *timeout;
+
+ /* Grab the default timeout. */
+ timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
+ if (timeout)
+ {
+ int timeout_s = atoi(timeout);
+
+ if (timeout_s > 0)
+ timeout_us = timeout_s * 1000 * 1000;
+ }
+
+ /*
+ * Set up line buffering for our output, to let stderr interleave in the
+ * log files.
+ */
+ setvbuf(stdout, NULL, PG_IOLBF, 0);
+
+ test_set_timer();
+ test_register_socket();
+
+ printf("1..%d\n", num_tests);
+ return 0;
+}
+
+#else /* !USE_ASSERT_CHECKING */
+
+/*
+ * Skip the test suite when we don't have assertions.
+ */
+int
+main(int argc, char *argv[])
+{
+ printf("1..0 # skip: cassert is not enabled\n");
+
+ return 0;
+}
+
+#endif /* USE_ASSERT_CHECKING */
--
2.34.1
view thread (25+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: [PATCH] OAuth: fix performance bug with stuck multiplexer events
In-Reply-To: <CAOYmi+mRhhzGUvrcdickepAnsdaGbqhNcboNe4-YvgtkSzGNbQ@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox