public inbox for [email protected]  
help / color / mirror / Atom feed
From: Jacob Champion <[email protected]>
To: Thomas Munro <[email protected]>
To: Dagfinn Ilmari Mannsåker <[email protected]>
Cc: PostgreSQL Hackers <[email protected]>
Cc: Daniel Gustafsson <[email protected]>
Cc: Peter Eisentraut <[email protected]>
Subject: Re: [PATCH] OAuth: fix performance bug with stuck multiplexer events
Date: Thu, 7 Aug 2025 13:04:17 -0700
Message-ID: <CAOYmi+=2GhnPkFZSR5ne0CzPGq7tPZrOFUiuPu2OdEnoGt=o_g@mail.gmail.com> (raw)
In-Reply-To: <CAOYmi+=FVbELVYRq=kUri4EOjanyxNGjd2F1H-kMY9h2y=bVGg@mail.gmail.com>
References: <CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com>
	<CAOYmi+mRhhzGUvrcdickepAnsdaGbqhNcboNe4-YvgtkSzGNbQ@mail.gmail.com>
	<CA+hUKGLyS-GK_rkENaVbFVTa4VJ+reJxWtt+q4gmgNUXhptfYA@mail.gmail.com>
	<CAOYmi+k1q3feeMxfbaJA=+hx+XnOFQA0z2JU+0igA7fTUZTmoA@mail.gmail.com>
	<CA+hUKG+H1gwDh96jn5jB6Q3HyXrSC9x2y=uQJAthT8NLs6GN_Q@mail.gmail.com>
	<CAOYmi+n1xRNCDnwZzigXVk8V=+sr7ZzuGpJ0tAyozX-zQT19Gg@mail.gmail.com>
	<CAOYmi+mvk8Y7btYJhBzOGiNTY3cCpYZKjhA4-TP2Lkb=zOr4oQ@mail.gmail.com>
	<CA+hUKGKV5aM1K3gc_kAMtUq-BkD5AugLUCDVa-RMnbfFypALwQ@mail.gmail.com>
	<CAOYmi+=FVbELVYRq=kUri4EOjanyxNGjd2F1H-kMY9h2y=bVGg@mail.gmail.com>

On Thu, Aug 7, 2025 at 11:11 AM Jacob Champion
<[email protected]> wrote:
> Thank you so much for the reviews!

Here is v4, with the feedback from both of you. 0001-0004 are planned
for backport; 0005 is slated for master only. Thanks again for the
reviews!

--Jacob

1:  c5cdccfe374 ! 1:  a515435d3b4 oauth: Remove stale events from the kqueue multiplexer
    @@ Commit message
         after drive_request(), before we return control to the client to wait.
     
         Suggested-by: Thomas Munro <[email protected]>
    +    Co-authored-by: Thomas Munro <[email protected]>
    +    Reviewed-by: Thomas Munro <[email protected]>
    +    Backpatch-through: 18
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
     
      ## src/interfaces/libpq-oauth/oauth-curl.c ##
     @@ src/interfaces/libpq-oauth/oauth-curl.c: register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
      #endif
      }
      
    -+/*-------
    ++/*
     + * If there is no work to do on any of the descriptors in the multiplexer, then
     + * this function must ensure that the multiplexer is not readable.
     + *
    -+ * As a motivating example, consider the following sequence of events:
    -+ * 1. libcurl tries to write data to the send buffer, but it fills up.
    -+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
    -+ *    client to wait.
    -+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
    -+ *    and the client wakes up and calls back into the flow.
    -+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
    -+ *    The socket is no longer writable.
    -+ *
    -+ * At this point, an epoll-based mux no longer signals readiness, so nothing
    -+ * further needs to be done. But a kqueue-based mux will continue to signal
    -+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
    -+ * or the old socket-writable event is read from the queue. Since Curl isn't
    -+ * guaranteed to do the former, we must do the latter here.
    ++ * Unlike epoll descriptors, kqueue descriptors only transition from readable to
    ++ * unreadable when kevent() is called and finds nothing, after removing
    ++ * level-triggered conditions that have gone away. We therefore need a dummy
    ++ * kevent() call after operations might have been performed on the monitored
    ++ * sockets or timer_fd. Any event returned is ignored here, but it also remains
    ++ * queued (being level-triggered) and leaves the descriptor readable. This is a
    ++ * no-op for epoll descriptors.
     + */
     +static bool
     +comb_multiplexer(struct async_ctx *actx)
    @@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow_impl(PGconn *conn)
     +					 * Make sure that stale events don't cause us to come back
     +					 * early. (Currently, this can occur only with kqueue.) If
     +					 * this is forgotten, the multiplexer can get stuck in a
    -+					 * signalled state and we'll burn CPU cycles pointlessly.
    ++					 * signaled state and we'll burn CPU cycles pointlessly.
     +					 */
     +					if (!comb_multiplexer(actx))
     +						goto error_return;
2:  7725e0c173b ! 2:  a34be19f17f oauth: Ensure unused socket registrations are removed
    @@ Commit message
         number of events added, the number of events pulled off the queue, and
         the lengths of the kevent arrays.
     
    +    Reviewed-by: Thomas Munro <[email protected]>
    +    Backpatch-through: 18
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
    +
      ## src/interfaces/libpq-oauth/oauth-curl.c ##
     @@ src/interfaces/libpq-oauth/oauth-curl.c: register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
      
3:  6ccf7a5d156 ! 3:  7408778d579 oauth: Remove expired timers from the multiplexer
    @@ Commit message
         the timer was known to be set, but both implementations now use the
         kqueue logic.
     
    +    Reviewed-by: Thomas Munro <[email protected]>
    +    Backpatch-through: 18
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
    +
      ## src/interfaces/libpq-oauth/oauth-curl.c ##
     @@ src/interfaces/libpq-oauth/oauth-curl.c: set_timer(struct async_ctx *actx, long timeout)
      
4:  2be993b8f07 ! 4:  8241255e84c oauth: Track total call count during a client flow
    @@ Commit message
         future work to add TLS support to the oauth_validator test server should
         strengthen it as well.
     
    +    Backpatch-through: 18
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
    +
      ## src/interfaces/libpq-oauth/oauth-curl.c ##
     @@ src/interfaces/libpq-oauth/oauth-curl.c: struct async_ctx
      	bool		user_prompted;	/* have we already sent the authz prompt? */
5:  50257bf32eb ! 5:  337124064f3 oauth: Add unit tests for multiplexer handling
    @@ Commit message
         suite for the socket and timer handling code. This is all based on TAP
         and driven by our existing Test::More infrastructure.
     
    +    Reviewed-by: Dagfinn Ilmari Mannsåker <[email protected]>
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
    +
      ## src/interfaces/libpq-oauth/Makefile ##
     @@ src/interfaces/libpq-oauth/Makefile: uninstall:
      	rm -f '$(DESTDIR)$(libdir)/$(stlib)'
    @@ src/interfaces/libpq-oauth/t/001_oauth.pl (new)
     +my $err = $builder->failure_output;
     +
     +IPC::Run::run ['oauth_tests'],
    -+  '>', IPC::Run::new_chunker, sub { print {$out} $_[0] },
    -+  '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] }
    ++  '>', IPC::Run::new_chunker, sub { $out->print($_[0]) },
    ++  '2>', IPC::Run::new_chunker, sub { $err->print($_[0]) }
     +  or die "oauth_tests returned $?";
     
      ## src/interfaces/libpq-oauth/test-oauth-curl.c (new) ##


Attachments:

  [text/plain] since-v3.diff.txt (5.7K, 2-since-v3.diff.txt)
  download | inline:
1:  c5cdccfe374 ! 1:  a515435d3b4 oauth: Remove stale events from the kqueue multiplexer
    @@ Commit message
         after drive_request(), before we return control to the client to wait.
     
         Suggested-by: Thomas Munro <[email protected]>
    +    Co-authored-by: Thomas Munro <[email protected]>
    +    Reviewed-by: Thomas Munro <[email protected]>
    +    Backpatch-through: 18
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
     
      ## src/interfaces/libpq-oauth/oauth-curl.c ##
     @@ src/interfaces/libpq-oauth/oauth-curl.c: register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
      #endif
      }
      
    -+/*-------
    ++/*
     + * If there is no work to do on any of the descriptors in the multiplexer, then
     + * this function must ensure that the multiplexer is not readable.
     + *
    -+ * As a motivating example, consider the following sequence of events:
    -+ * 1. libcurl tries to write data to the send buffer, but it fills up.
    -+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
    -+ *    client to wait.
    -+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
    -+ *    and the client wakes up and calls back into the flow.
    -+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
    -+ *    The socket is no longer writable.
    -+ *
    -+ * At this point, an epoll-based mux no longer signals readiness, so nothing
    -+ * further needs to be done. But a kqueue-based mux will continue to signal
    -+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
    -+ * or the old socket-writable event is read from the queue. Since Curl isn't
    -+ * guaranteed to do the former, we must do the latter here.
    ++ * Unlike epoll descriptors, kqueue descriptors only transition from readable to
    ++ * unreadable when kevent() is called and finds nothing, after removing
    ++ * level-triggered conditions that have gone away. We therefore need a dummy
    ++ * kevent() call after operations might have been performed on the monitored
    ++ * sockets or timer_fd. Any event returned is ignored here, but it also remains
    ++ * queued (being level-triggered) and leaves the descriptor readable. This is a
    ++ * no-op for epoll descriptors.
     + */
     +static bool
     +comb_multiplexer(struct async_ctx *actx)
    @@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow_impl(PGconn *conn)
     +					 * Make sure that stale events don't cause us to come back
     +					 * early. (Currently, this can occur only with kqueue.) If
     +					 * this is forgotten, the multiplexer can get stuck in a
    -+					 * signalled state and we'll burn CPU cycles pointlessly.
    ++					 * signaled state and we'll burn CPU cycles pointlessly.
     +					 */
     +					if (!comb_multiplexer(actx))
     +						goto error_return;
2:  7725e0c173b ! 2:  a34be19f17f oauth: Ensure unused socket registrations are removed
    @@ Commit message
         number of events added, the number of events pulled off the queue, and
         the lengths of the kevent arrays.
     
    +    Reviewed-by: Thomas Munro <[email protected]>
    +    Backpatch-through: 18
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
    +
      ## src/interfaces/libpq-oauth/oauth-curl.c ##
     @@ src/interfaces/libpq-oauth/oauth-curl.c: register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
      
3:  6ccf7a5d156 ! 3:  7408778d579 oauth: Remove expired timers from the multiplexer
    @@ Commit message
         the timer was known to be set, but both implementations now use the
         kqueue logic.
     
    +    Reviewed-by: Thomas Munro <[email protected]>
    +    Backpatch-through: 18
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
    +
      ## src/interfaces/libpq-oauth/oauth-curl.c ##
     @@ src/interfaces/libpq-oauth/oauth-curl.c: set_timer(struct async_ctx *actx, long timeout)
      
4:  2be993b8f07 ! 4:  8241255e84c oauth: Track total call count during a client flow
    @@ Commit message
         future work to add TLS support to the oauth_validator test server should
         strengthen it as well.
     
    +    Backpatch-through: 18
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
    +
      ## src/interfaces/libpq-oauth/oauth-curl.c ##
     @@ src/interfaces/libpq-oauth/oauth-curl.c: struct async_ctx
      	bool		user_prompted;	/* have we already sent the authz prompt? */
5:  50257bf32eb ! 5:  337124064f3 oauth: Add unit tests for multiplexer handling
    @@ Commit message
         suite for the socket and timer handling code. This is all based on TAP
         and driven by our existing Test::More infrastructure.
     
    +    Reviewed-by: Dagfinn Ilmari Mannsåker <[email protected]>
    +    Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
    +
      ## src/interfaces/libpq-oauth/Makefile ##
     @@ src/interfaces/libpq-oauth/Makefile: uninstall:
      	rm -f '$(DESTDIR)$(libdir)/$(stlib)'
    @@ src/interfaces/libpq-oauth/t/001_oauth.pl (new)
     +my $err = $builder->failure_output;
     +
     +IPC::Run::run ['oauth_tests'],
    -+  '>', IPC::Run::new_chunker, sub { print {$out} $_[0] },
    -+  '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] }
    ++  '>', IPC::Run::new_chunker, sub { $out->print($_[0]) },
    ++  '2>', IPC::Run::new_chunker, sub { $err->print($_[0]) }
     +  or die "oauth_tests returned $?";
     
      ## src/interfaces/libpq-oauth/test-oauth-curl.c (new) ##

  [application/octet-stream] v4-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patch (4.4K, 3-v4-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patch)
  download | inline diff:
From a515435d3b4e02d20563a75e3dfa4177d333ca4c Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Tue, 10 Jun 2025 16:38:59 -0700
Subject: [PATCH v4 1/5] oauth: Remove stale events from the kqueue multiplexer

If a socket is added to the kqueue, becomes readable/writable, and
subsequently becomes non-readable/writable again, the kqueue itself will
remain readable until either the socket registration is removed, or the
stale event is cleared via a call to kevent().

In many simple cases, Curl itself will remove the socket registration
quickly, but in real-world usage, this is not guaranteed to happen. The
kqueue can then remain stuck in a permanently readable state until the
request ends, which results in pointless wakeups for the client and
wasted CPU time.

Implement comb_multiplexer() to call kevent() and unstick any stale
events that would cause unnecessary callbacks. This is called right
after drive_request(), before we return control to the client to wait.

Suggested-by: Thomas Munro <[email protected]>
Co-authored-by: Thomas Munro <[email protected]>
Reviewed-by: Thomas Munro <[email protected]>
Backpatch-through: 18
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
 src/interfaces/libpq-oauth/oauth-curl.c | 67 ++++++++++++++++++++++---
 1 file changed, 61 insertions(+), 6 deletions(-)

diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index dba9a684fa8..433135cb86d 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1376,6 +1376,53 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
 #endif
 }
 
+/*
+ * If there is no work to do on any of the descriptors in the multiplexer, then
+ * this function must ensure that the multiplexer is not readable.
+ *
+ * Unlike epoll descriptors, kqueue descriptors only transition from readable to
+ * unreadable when kevent() is called and finds nothing, after removing
+ * level-triggered conditions that have gone away. We therefore need a dummy
+ * kevent() call after operations might have been performed on the monitored
+ * sockets or timer_fd. Any event returned is ignored here, but it also remains
+ * queued (being level-triggered) and leaves the descriptor readable. This is a
+ * no-op for epoll descriptors.
+ */
+static bool
+comb_multiplexer(struct async_ctx *actx)
+{
+#if defined(HAVE_SYS_EPOLL_H)
+	/* The epoll implementation doesn't hold onto stale events. */
+	return true;
+#elif defined(HAVE_SYS_EVENT_H)
+	struct timespec timeout = {0};
+	struct kevent ev;
+
+	/*
+	 * Try to read a single pending event. We can actually ignore the result:
+	 * either we found an event to process, in which case the multiplexer is
+	 * correctly readable for that event at minimum, and it doesn't matter if
+	 * there are any stale events; or we didn't find any, in which case the
+	 * kernel will have discarded any stale events as it traveled to the end
+	 * of the queue.
+	 *
+	 * Note that this depends on our registrations being level-triggered --
+	 * even the timer, so we use a chained kqueue for that instead of an
+	 * EVFILT_TIMER on the top-level mux. If we used edge-triggered events,
+	 * this call would improperly discard them.
+	 */
+	if (kevent(actx->mux, NULL, 0, &ev, 1, &timeout) < 0)
+	{
+		actx_error(actx, "could not comb kqueue: %m");
+		return false;
+	}
+
+	return true;
+#else
+#error comb_multiplexer is not implemented on this platform
+#endif
+}
+
 /*
  * Enables or disables the timer in the multiplexer set. The timeout value is
  * in milliseconds (negative values disable the timer).
@@ -2755,13 +2802,21 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
 
 					if (status == PGRES_POLLING_FAILED)
 						goto error_return;
-					else if (status != PGRES_POLLING_OK)
-					{
-						/* not done yet */
-						return status;
-					}
+					else if (status == PGRES_POLLING_OK)
+						break;	/* done! */
 
-					break;
+					/*
+					 * This request is still running.
+					 *
+					 * Make sure that stale events don't cause us to come back
+					 * early. (Currently, this can occur only with kqueue.) If
+					 * this is forgotten, the multiplexer can get stuck in a
+					 * signaled state and we'll burn CPU cycles pointlessly.
+					 */
+					if (!comb_multiplexer(actx))
+						goto error_return;
+
+					return status;
 				}
 
 			case OAUTH_STEP_WAIT_INTERVAL:
-- 
2.34.1



  [application/octet-stream] v4-0002-oauth-Ensure-unused-socket-registrations-are-remo.patch (3.2K, 4-v4-0002-oauth-Ensure-unused-socket-registrations-are-remo.patch)
  download | inline diff:
From a34be19f17fcf7a15d64fc241c73ead993ba89e6 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Wed, 6 Aug 2025 15:18:08 -0700
Subject: [PATCH v4 2/5] oauth: Ensure unused socket registrations are removed

If Curl needs to switch the direction of a socket's registration (e.g.
from CURL_POLL_IN to CURL_POLL_OUT), it expects the old registration to
be discarded. For epoll, this happened via EPOLL_CTL_MOD, but for
kqueue, the old registration would remain if it was not explicitly
removed by Curl.

Explicitly remove the opposite-direction event during registrations. (If
that event doesn't exist, we'll just get an ENOENT, which will be
ignored by the same code that handles CURL_POLL_REMOVE.) A few
assertions are also added to strengthen the relationship between the
number of events added, the number of events pulled off the queue, and
the lengths of the kevent arrays.

Reviewed-by: Thomas Munro <[email protected]>
Backpatch-through: 18
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
 src/interfaces/libpq-oauth/oauth-curl.c | 22 ++++++++++++++--------
 1 file changed, 14 insertions(+), 8 deletions(-)

diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 433135cb86d..97c33299a79 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1291,22 +1291,31 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
 
 	return 0;
 #elif defined(HAVE_SYS_EVENT_H)
-	struct kevent ev[2] = {0};
+	struct kevent ev[2];
 	struct kevent ev_out[2];
 	struct timespec timeout = {0};
 	int			nev = 0;
 	int			res;
 
+	/*
+	 * We don't know which of the events is currently registered, perhaps
+	 * both, so we always try to remove unneeded events. This means we need to
+	 * tolerate ENOENT below.
+	 */
 	switch (what)
 	{
 		case CURL_POLL_IN:
 			EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0);
 			nev++;
+			EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+			nev++;
 			break;
 
 		case CURL_POLL_OUT:
 			EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0);
 			nev++;
+			EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
+			nev++;
 			break;
 
 		case CURL_POLL_INOUT:
@@ -1317,12 +1326,6 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
 			break;
 
 		case CURL_POLL_REMOVE:
-
-			/*
-			 * We don't know which of these is currently registered, perhaps
-			 * both, so we try to remove both.  This means we need to tolerate
-			 * ENOENT below.
-			 */
 			EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0);
 			nev++;
 			EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0);
@@ -1334,7 +1337,10 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
 			return -1;
 	}
 
-	res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout);
+	Assert(nev <= lengthof(ev));
+	Assert(nev <= lengthof(ev_out));
+
+	res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout);
 	if (res < 0)
 	{
 		actx_error(actx, "could not modify kqueue: %m");
-- 
2.34.1



  [application/octet-stream] v4-0003-oauth-Remove-expired-timers-from-the-multiplexer.patch (5.7K, 5-v4-0003-oauth-Remove-expired-timers-from-the-multiplexer.patch)
  download | inline diff:
From 7408778d57995eba1c4d82acf96497f5cd48f82c Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Wed, 5 Mar 2025 13:16:48 -0800
Subject: [PATCH v4 3/5] oauth: Remove expired timers from the multiplexer

In a case similar to the previous commit, an expired timer can remain
permanently readable if Curl does not remove the timeout itself. Since
that removal isn't guaranteed to happen in real-world situations,
implement drain_timer_events() to reset the timer before calling into
drive_request().

Moving to drain_timer_events() happens to fix a logic bug in the
previous caller of timer_expired(), which treated an error condition as
if the timer were expired instead of bailing out.

The previous implementation of timer_expired() gave differing results
for epoll and kqueue if the timer was reset. (For epoll, a reset timer
was considered to be expired, and for kqueue it was not.) This didn't
previously cause problems, since timer_expired() was only called while
the timer was known to be set, but both implementations now use the
kqueue logic.

Reviewed-by: Thomas Munro <[email protected]>
Backpatch-through: 18
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
 src/interfaces/libpq-oauth/oauth-curl.c | 108 +++++++++++++++---------
 1 file changed, 68 insertions(+), 40 deletions(-)

diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index 97c33299a79..aa5d2bfd96c 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1536,40 +1536,20 @@ set_timer(struct async_ctx *actx, long timeout)
 
 /*
  * Returns 1 if the timeout in the multiplexer set has expired since the last
- * call to set_timer(), 0 if the timer is still running, or -1 (with an
- * actx_error() report) if the timer cannot be queried.
+ * call to set_timer(), 0 if the timer is either still running or disarmed, or
+ * -1 (with an actx_error() report) if the timer cannot be queried.
  */
 static int
 timer_expired(struct async_ctx *actx)
 {
-#if defined(HAVE_SYS_EPOLL_H)
-	struct itimerspec spec = {0};
-
-	if (timerfd_gettime(actx->timerfd, &spec) < 0)
-	{
-		actx_error(actx, "getting timerfd value: %m");
-		return -1;
-	}
-
-	/*
-	 * This implementation assumes we're using single-shot timers. If you
-	 * change to using intervals, you'll need to reimplement this function
-	 * too, possibly with the read() or select() interfaces for timerfd.
-	 */
-	Assert(spec.it_interval.tv_sec == 0
-		   && spec.it_interval.tv_nsec == 0);
-
-	/* If the remaining time to expiration is zero, we're done. */
-	return (spec.it_value.tv_sec == 0
-			&& spec.it_value.tv_nsec == 0);
-#elif defined(HAVE_SYS_EVENT_H)
+#if defined(HAVE_SYS_EPOLL_H) || defined(HAVE_SYS_EVENT_H)
 	int			res;
 
-	/* Is the timer queue ready? */
+	/* Is the timer ready? */
 	res = PQsocketPoll(actx->timerfd, 1 /* forRead */ , 0, 0);
 	if (res < 0)
 	{
-		actx_error(actx, "checking kqueue for timeout: %m");
+		actx_error(actx, "checking timer expiration: %m");
 		return -1;
 	}
 
@@ -1601,6 +1581,36 @@ register_timer(CURLM *curlm, long timeout, void *ctx)
 	return 0;
 }
 
+/*
+ * Removes any expired-timer event from the multiplexer. If was_expired is not
+ * NULL, it will contain whether or not the timer was expired at time of call.
+ */
+static bool
+drain_timer_events(struct async_ctx *actx, bool *was_expired)
+{
+	int			res;
+
+	res = timer_expired(actx);
+	if (res < 0)
+		return false;
+
+	if (res > 0)
+	{
+		/*
+		 * Timer is expired. We could drain the event manually from the
+		 * timerfd, but it's easier to simply disable it; that keeps the
+		 * platform-specific code in set_timer().
+		 */
+		if (!set_timer(actx, -1))
+			return false;
+	}
+
+	if (was_expired)
+		*was_expired = (res > 0);
+
+	return true;
+}
+
 /*
  * Prints Curl request debugging information to stderr.
  *
@@ -2804,6 +2814,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
 				{
 					PostgresPollingStatusType status;
 
+					/*
+					 * Clear any expired timeout before calling back into
+					 * Curl. Curl is not guaranteed to do this for us, because
+					 * its API expects us to use single-shot (i.e.
+					 * edge-triggered) timeouts, and ours are level-triggered
+					 * via the mux.
+					 *
+					 * This can't be combined with the comb_multiplexer() call
+					 * below: we might accidentally clear a short timeout that
+					 * was both set and expired during the call to
+					 * drive_request().
+					 */
+					if (!drain_timer_events(actx, NULL))
+						goto error_return;
+
+					/* Move the request forward. */
 					status = drive_request(actx);
 
 					if (status == PGRES_POLLING_FAILED)
@@ -2826,24 +2852,26 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
 				}
 
 			case OAUTH_STEP_WAIT_INTERVAL:
-
-				/*
-				 * The client application is supposed to wait until our timer
-				 * expires before calling PQconnectPoll() again, but that
-				 * might not happen. To avoid sending a token request early,
-				 * check the timer before continuing.
-				 */
-				if (!timer_expired(actx))
 				{
-					set_conn_altsock(conn, actx->timerfd);
-					return PGRES_POLLING_READING;
-				}
+					bool		expired;
 
-				/* Disable the expired timer. */
-				if (!set_timer(actx, -1))
-					goto error_return;
+					/*
+					 * The client application is supposed to wait until our
+					 * timer expires before calling PQconnectPoll() again, but
+					 * that might not happen. To avoid sending a token request
+					 * early, check the timer before continuing.
+					 */
+					if (!drain_timer_events(actx, &expired))
+						goto error_return;
 
-				break;
+					if (!expired)
+					{
+						set_conn_altsock(conn, actx->timerfd);
+						return PGRES_POLLING_READING;
+					}
+
+					break;
+				}
 		}
 
 		/*
-- 
2.34.1



  [application/octet-stream] v4-0004-oauth-Track-total-call-count-during-a-client-flow.patch (4.8K, 6-v4-0004-oauth-Track-total-call-count-during-a-client-flow.patch)
  download | inline diff:
From 8241255e84c20000446c8fb2786999b9a0c1dd5c Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Fri, 6 Jun 2025 15:22:41 -0700
Subject: [PATCH v4 4/5] oauth: Track total call count during a client flow

Tracking down the bugs that led to the addition of comb_multiplexer()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
calls into the flow as part of debug mode, and print the total when the
flow finishes.

A new test makes sure the total count is less than 100. (We expect
something on the order of 10.) This isn't foolproof, but it is able to
catch several regressions in the logic of the prior two commits, and
future work to add TLS support to the oauth_validator test server should
strengthen it as well.

Backpatch-through: 18
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
 src/interfaces/libpq-oauth/oauth-curl.c       | 22 +++++++++++++
 .../modules/oauth_validator/t/001_server.pl   | 31 ++++++++++++++++++-
 2 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index aa5d2bfd96c..aa50b00d053 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -278,6 +278,7 @@ struct async_ctx
 	bool		user_prompted;	/* have we already sent the authz prompt? */
 	bool		used_basic_auth;	/* did we send a client secret? */
 	bool		debugging;		/* can we give unsafe developer assistance? */
+	int			dbg_num_calls;	/* (debug mode) how many times were we called? */
 };
 
 /*
@@ -3021,6 +3022,8 @@ PostgresPollingStatusType
 pg_fe_run_oauth_flow(PGconn *conn)
 {
 	PostgresPollingStatusType result;
+	fe_oauth_state *state = conn_sasl_state(conn);
+	struct async_ctx *actx;
 #ifndef WIN32
 	sigset_t	osigset;
 	bool		sigpipe_pending;
@@ -3049,6 +3052,25 @@ pg_fe_run_oauth_flow(PGconn *conn)
 
 	result = pg_fe_run_oauth_flow_impl(conn);
 
+	/*
+	 * To assist with finding bugs in comb_multiplexer() and
+	 * drain_timer_events(), when we're in debug mode, track the total number
+	 * of calls to this function and print that at the end of the flow.
+	 *
+	 * Be careful that state->async_ctx could be NULL if early initialization
+	 * fails during the first call.
+	 */
+	actx = state->async_ctx;
+	Assert(actx || result == PGRES_POLLING_FAILED);
+
+	if (actx && actx->debugging)
+	{
+		actx->dbg_num_calls++;
+		if (result == PGRES_POLLING_OK || result == PGRES_POLLING_FAILED)
+			fprintf(stderr, "[libpq] total number of polls: %d\n",
+					actx->dbg_num_calls);
+	}
+
 #ifndef WIN32
 	if (masked)
 	{
diff --git a/src/test/modules/oauth_validator/t/001_server.pl b/src/test/modules/oauth_validator/t/001_server.pl
index 41672ebd5c6..c0dafb8be76 100644
--- a/src/test/modules/oauth_validator/t/001_server.pl
+++ b/src/test/modules/oauth_validator/t/001_server.pl
@@ -418,6 +418,35 @@ $node->connect_fails(
 	  qr/failed to obtain access token: mutual TLS required for client \(invalid_client\)/
 );
 
+# Count the number of calls to the internal flow when multiple retries are
+# triggered. The exact number depends on many things -- the TCP stack, the
+# version of Curl in use, random chance -- but a ridiculously high number
+# suggests something is wrong with our ability to clear multiplexer events after
+# they're no longer applicable.
+my ($ret, $stdout, $stderr) = $node->psql(
+	'postgres',
+	"SELECT 'connected for call count'",
+	extra_params => ['-w'],
+	connstr => connstr(stage => 'token', retries => 2),
+	on_error_stop => 0);
+
+is($ret, 0, "call count connection succeeds");
+like(
+	$stderr,
+	qr@Visit https://example\.com/ and enter the code: postgresuser@,
+	"call count: stderr matches");
+
+my $count_pattern = qr/\[libpq\] total number of polls: (\d+)/;
+if (like($stderr, $count_pattern, "call count: count is printed"))
+{
+	# For reference, a typical flow with two retries might take between 5-15
+	# calls to the client implementation. And while this will probably continue
+	# to change across OSes and Curl updates, we're likely in trouble if we see
+	# hundreds or thousands of calls.
+	$stderr =~ $count_pattern;
+	cmp_ok($1, '<', 100, "call count is reasonably small");
+}
+
 # Stress test: make sure our builtin flow operates correctly even if the client
 # application isn't respecting PGRES_POLLING_READING/WRITING signals returned
 # from PQconnectPoll().
@@ -428,7 +457,7 @@ my @cmd = (
 	connstr(stage => 'all', retries => 1, interval => 1));
 
 note "running '" . join("' '", @cmd) . "'";
-my ($stdout, $stderr) = run_command(\@cmd);
+($stdout, $stderr) = run_command(\@cmd);
 
 like($stdout, qr/connection succeeded/, "stress-async: stdout matches");
 unlike(
-- 
2.34.1



  [application/octet-stream] v4-0005-oauth-Add-unit-tests-for-multiplexer-handling.patch (19.0K, 7-v4-0005-oauth-Add-unit-tests-for-multiplexer-handling.patch)
  download | inline diff:
From 337124064f37eccfb5d2f9ba27edbf70e945aeb2 Mon Sep 17 00:00:00 2001
From: Jacob Champion <[email protected]>
Date: Wed, 5 Mar 2025 15:04:34 -0800
Subject: [PATCH v4 5/5] oauth: Add unit tests for multiplexer handling
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

To better record the internal behaviors of oauth-curl.c, add a unit test
suite for the socket and timer handling code. This is all based on TAP
and driven by our existing Test::More infrastructure.

Reviewed-by: Dagfinn Ilmari Mannsåker <[email protected]>
Discussion: https://postgr.es/m/CAOYmi+nDZxJHaWj9_jRSyf8uMToCADAmOfJEggsKW-kY7aUwHA@mail.gmail.com
---
 src/interfaces/libpq-oauth/Makefile          |  14 +
 src/interfaces/libpq-oauth/meson.build       |  35 ++
 src/interfaces/libpq-oauth/t/001_oauth.pl    |  24 +
 src/interfaces/libpq-oauth/test-oauth-curl.c | 527 +++++++++++++++++++
 4 files changed, 600 insertions(+)
 create mode 100644 src/interfaces/libpq-oauth/t/001_oauth.pl
 create mode 100644 src/interfaces/libpq-oauth/test-oauth-curl.c

diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile
index 682f17413b3..e73573694b9 100644
--- a/src/interfaces/libpq-oauth/Makefile
+++ b/src/interfaces/libpq-oauth/Makefile
@@ -79,5 +79,19 @@ uninstall:
 	rm -f '$(DESTDIR)$(libdir)/$(stlib)'
 	rm -f '$(DESTDIR)$(libdir)/$(shlib)'
 
+.PHONY: all-tests
+all-tests: oauth_tests$(X)
+
+oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq
+	$(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@
+
+check: all-tests
+	$(prove_check)
+
+installcheck: all-tests
+	$(prove_installcheck)
+
 clean distclean: clean-lib
 	rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB)
+	rm -f test-oauth-curl.o oauth_tests$(X)
+	rm -rf tmp_check
diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build
index df064c59a40..505e1671b86 100644
--- a/src/interfaces/libpq-oauth/meson.build
+++ b/src/interfaces/libpq-oauth/meson.build
@@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name,
   link_args: export_fmt.format(export_file.full_path()),
   kwargs: default_lib_args,
 )
+
+libpq_oauth_test_deps = []
+
+oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources
+
+if host_system == 'windows'
+  oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [
+    '--NAME', 'oauth_tests',
+    '--FILEDESC', 'OAuth unit test program',])
+endif
+
+libpq_oauth_test_deps += executable('oauth_tests',
+  oauth_test_sources,
+  dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps],
+  kwargs: default_bin_args + {
+    'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args,
+    'c_pch': pch_postgres_fe_h,
+    'include_directories': [libpq_inc, postgres_inc],
+    'install': false,
+  }
+)
+
+testprep_targets += libpq_oauth_test_deps
+
+tests += {
+  'name': 'libpq-oauth',
+  'sd': meson.current_source_dir(),
+  'bd': meson.current_build_dir(),
+  'tap': {
+    'tests': [
+      't/001_oauth.pl',
+    ],
+    'deps': libpq_oauth_test_deps,
+  },
+}
diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl
new file mode 100644
index 00000000000..5af6c860768
--- /dev/null
+++ b/src/interfaces/libpq-oauth/t/001_oauth.pl
@@ -0,0 +1,24 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+use strict;
+use warnings FATAL => 'all';
+
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Defer entirely to the oauth_tests executable. stdout/err is routed through
+# Test::More so that our logging infrastructure can handle it correctly. Using
+# IPC::Run::new_chunker seems to help interleave the two streams a little better
+# than without.
+#
+# TODO: prove can also deal with native executables itself, which we could
+# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup
+# calls Perl directly, which would require more code to work around... and
+# there's still the matter of logging.
+my $builder = Test::More->builder;
+my $out = $builder->output;
+my $err = $builder->failure_output;
+
+IPC::Run::run ['oauth_tests'],
+  '>', IPC::Run::new_chunker, sub { $out->print($_[0]) },
+  '2>', IPC::Run::new_chunker, sub { $err->print($_[0]) }
+  or die "oauth_tests returned $?";
diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c
new file mode 100644
index 00000000000..8263aff2f4a
--- /dev/null
+++ b/src/interfaces/libpq-oauth/test-oauth-curl.c
@@ -0,0 +1,527 @@
+/*
+ * test-oauth-curl.c
+ *
+ * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets
+ * the tests reference static functions and other internals.
+ *
+ * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap
+ * must-succeed code as part of test setup.
+ *
+ * Copyright (c) 2025, PostgreSQL Global Development Group
+ */
+
+#include "oauth-curl.c"
+
+#include <fcntl.h>
+
+#ifdef USE_ASSERT_CHECKING
+
+/*
+ * TAP Helpers
+ */
+
+static int	num_tests = 0;
+
+/*
+ * Reports ok/not ok to the TAP stream on stdout.
+ */
+#define ok(OK, TEST) \
+	ok_impl(OK, TEST, #OK, __FILE__, __LINE__)
+
+static bool
+ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line)
+{
+	printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test);
+
+	if (!ok)
+	{
+		printf("# at %s:%d:\n", file, line);
+		printf("#   expression is false: %s\n", teststr);
+	}
+
+	return ok;
+}
+
+/*
+ * Like ok(this == that), but with more diagnostics on failure.
+ *
+ * Only works on ints, but luckily that's all we need here. Note that the much
+ * simpler-looking macro implementation
+ *
+ *     is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT)
+ *
+ * suffers from multiple evaluation of the macro arguments...
+ */
+#define is(THIS, THAT, TEST) \
+	do { \
+		int this_ = (THIS), \
+			that_ = (THAT); \
+		is_diag( \
+			ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \
+			this_, #THIS, that_, #THAT \
+		); \
+	} while (0)
+
+static bool
+is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr)
+{
+	if (!ok)
+		printf("#   %s = %d; %s = %d\n", thisstr, this, thatstr, that);
+
+	return ok;
+}
+
+/*
+ * Utilities
+ */
+
+/*
+ * Creates a partially-initialized async_ctx for the purposes of testing. Free
+ * with free_test_actx().
+ */
+static struct async_ctx *
+init_test_actx(void)
+{
+	struct async_ctx *actx;
+
+	actx = calloc(1, sizeof(*actx));
+	Assert(actx);
+
+	actx->mux = PGINVALID_SOCKET;
+	actx->timerfd = -1;
+	actx->debugging = true;
+
+	initPQExpBuffer(&actx->errbuf);
+
+	Assert(setup_multiplexer(actx));
+
+	return actx;
+}
+
+static void
+free_test_actx(struct async_ctx *actx)
+{
+	termPQExpBuffer(&actx->errbuf);
+
+	if (actx->mux != PGINVALID_SOCKET)
+		close(actx->mux);
+	if (actx->timerfd >= 0)
+		close(actx->timerfd);
+
+	free(actx);
+}
+
+static char dummy_buf[4 * 1024];	/* for fill_pipe/drain_pipe */
+
+/*
+ * Writes to the write side of a pipe until it won't take any more data. Returns
+ * the amount written.
+ */
+static ssize_t
+fill_pipe(int fd)
+{
+	int			mode;
+	ssize_t		written = 0;
+
+	/* Don't block. */
+	Assert((mode = fcntl(fd, F_GETFL)) != -1);
+	Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0);
+
+	while (true)
+	{
+		ssize_t		w;
+
+		w = write(fd, dummy_buf, sizeof(dummy_buf));
+		if (w < 0)
+		{
+			if (errno != EAGAIN && errno != EWOULDBLOCK)
+			{
+				perror("write to pipe");
+				written = -1;
+			}
+			break;
+		}
+
+		written += w;
+	}
+
+	/* Reset the descriptor flags. */
+	Assert(fcntl(fd, F_SETFD, mode) == 0);
+
+	return written;
+}
+
+/*
+ * Drains the requested amount of data from the read side of a pipe.
+ */
+static bool
+drain_pipe(int fd, ssize_t n)
+{
+	Assert(n > 0);
+
+	while (n)
+	{
+		size_t		to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf);
+		ssize_t		drained;
+
+		drained = read(fd, dummy_buf, to_read);
+		if (drained < 0)
+		{
+			perror("read from pipe");
+			return false;
+		}
+
+		n -= drained;
+	}
+
+	return true;
+}
+
+/*
+ * Tests whether the multiplexer is marked ready by the deadline. This is a
+ * macro so that file/line information makes sense during failures.
+ *
+ * NB: our current multiplexer implementations (epoll/kqueue) are *readable*
+ * when the underlying libcurl sockets are *writable*. This behavior is pinned
+ * here to record that expectation; PGRES_POLLING_READING is hardcoded
+ * throughout the flow and would need to be changed if a new multiplexer does
+ * something different.
+ */
+#define mux_is_ready(MUX, DEADLINE, TEST) \
+	do { \
+		int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \
+		Assert(res_ != -1); \
+		ok(res_ > 0, "multiplexer is ready " TEST); \
+	} while (0)
+
+/*
+ * The opposite of mux_is_ready().
+ */
+#define mux_is_not_ready(MUX, TEST) \
+	do { \
+		int res_ = PQsocketPoll(MUX, 1, 0, 0); \
+		Assert(res_ != -1); \
+		is(res_, 0, "multiplexer is not ready " TEST); \
+	} while (0)
+
+/*
+ * Test Suites
+ */
+
+/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */
+static pg_usec_time_t timeout_us = 180 * 1000 * 1000;
+
+static void
+test_set_timer(void)
+{
+	struct async_ctx *actx = init_test_actx();
+	const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+
+	printf("# test_set_timer\n");
+
+	/* A zero-duration timer should result in a near-immediate ready signal. */
+	Assert(set_timer(actx, 0));
+	mux_is_ready(actx->mux, deadline, "when timer expires");
+	is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires");
+
+	/* Resetting the timer far in the future should unset the ready signal. */
+	Assert(set_timer(actx, INT_MAX));
+	mux_is_not_ready(actx->mux, "when timer is reset to the future");
+	is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer");
+
+	/* Setting another zero-duration timer should override the previous one. */
+	Assert(set_timer(actx, 0));
+	mux_is_ready(actx->mux, deadline, "when timer is re-expired");
+	is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired");
+
+	/* And disabling that timer should once again unset the ready signal. */
+	Assert(set_timer(actx, -1));
+	mux_is_not_ready(actx->mux, "when timer is unset");
+	is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset");
+
+	{
+		bool		expired;
+
+		/* Make sure drain_timer_events() functions correctly as well. */
+		Assert(set_timer(actx, 0));
+		mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)");
+
+		Assert(drain_timer_events(actx, &expired));
+		mux_is_not_ready(actx->mux, "when timer is drained after expiring");
+		is(expired, 1, "drain_timer_events() reports expiration");
+		is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained");
+
+		/* A second drain should do nothing. */
+		Assert(drain_timer_events(actx, &expired));
+		mux_is_not_ready(actx->mux, "when timer is drained a second time");
+		is(expired, 0, "drain_timer_events() reports no expiration");
+		is(timer_expired(actx), 0, "timer_expired() still returns 0");
+	}
+
+	free_test_actx(actx);
+}
+
+static void
+test_register_socket(void)
+{
+	struct async_ctx *actx = init_test_actx();
+	int			pipefd[2];
+	int			rfd,
+				wfd;
+	bool		bidirectional;
+
+	/* Create a local pipe for communication. */
+	Assert(pipe(pipefd) == 0);
+	rfd = pipefd[0];
+	wfd = pipefd[1];
+
+	/*
+	 * Some platforms (FreeBSD) implement bidirectional pipes, affecting the
+	 * behavior of some of these tests. Store that knowledge for later.
+	 */
+	bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0;
+
+	/*
+	 * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for
+	 * read/write operations, respectively, and once using CURL_POLL_INOUT for
+	 * both sides.
+	 */
+	for (int inout = 0; inout < 2; inout++)
+	{
+		const int	in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN;
+		const int	out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT;
+		const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us;
+		size_t		bidi_pipe_size = 0; /* silence compiler warnings */
+
+		printf("# test_register_socket %s\n", inout ? "(INOUT)" : "");
+
+		/*
+		 * At the start of the test, the read side should be blocked and the
+		 * write side should be open. (There's a mistake at the end of this
+		 * loop otherwise.)
+		 */
+		Assert(PQsocketPoll(rfd, 1, 0, 0) == 0);
+		Assert(PQsocketPoll(wfd, 0, 1, 0) > 0);
+
+		/*
+		 * For bidirectional systems, emulate unidirectional behavior here by
+		 * filling up the "read side" of the pipe.
+		 */
+		if (bidirectional)
+			Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+		/* Listen on the read side. The multiplexer shouldn't be ready yet. */
+		Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+		mux_is_not_ready(actx->mux, "when fd is not readable");
+
+		/* Writing to the pipe should result in a read-ready multiplexer. */
+		Assert(write(wfd, "x", 1) == 1);
+		mux_is_ready(actx->mux, deadline, "when fd is readable");
+
+		/*
+		 * Update the registration to wait on write events instead. The
+		 * multiplexer should be unset.
+		 */
+		Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0);
+		mux_is_not_ready(actx->mux, "when waiting for writes on readable fd");
+
+		/* Re-register for read events. */
+		Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+		mux_is_ready(actx->mux, deadline, "when waiting for reads again");
+
+		/* Stop listening. The multiplexer should be unset. */
+		Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+		mux_is_not_ready(actx->mux, "when readable fd is removed");
+
+		/* Listen again. */
+		Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+		mux_is_ready(actx->mux, deadline, "when readable fd is re-added");
+
+		/*
+		 * Draining the pipe should unset the multiplexer again, once the old
+		 * event is cleared.
+		 */
+		Assert(drain_pipe(rfd, 1));
+		Assert(comb_multiplexer(actx));
+		mux_is_not_ready(actx->mux, "when fd is drained");
+
+		/* Undo any unidirectional emulation. */
+		if (bidirectional)
+			Assert(drain_pipe(wfd, bidi_pipe_size));
+
+		/* Listen on the write side. An empty buffer should be writable. */
+		Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+		Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+		mux_is_ready(actx->mux, deadline, "when fd is writable");
+
+		/* As above, wait on read events instead. */
+		Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0);
+		mux_is_not_ready(actx->mux, "when waiting for reads on writable fd");
+
+		/* Re-register for write events. */
+		Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0);
+		mux_is_ready(actx->mux, deadline, "when waiting for writes again");
+
+		{
+			ssize_t		written;
+
+			/*
+			 * Fill the pipe. Once the old writable event is cleared, the mux
+			 * should not be ready.
+			 */
+			Assert((written = fill_pipe(wfd)) > 0);
+			printf("# pipe buffer is full at %zd bytes\n", written);
+
+			Assert(comb_multiplexer(actx));
+			mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+			/* Drain the pipe again. */
+			Assert(drain_pipe(rfd, written));
+			mux_is_ready(actx->mux, deadline, "when fd buffer is drained");
+		}
+
+		/* Stop listening. */
+		Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+		mux_is_not_ready(actx->mux, "when fd is removed");
+
+		/* Make sure an expired timer doesn't interfere with event draining. */
+		{
+			bool		expired;
+
+			/* Make the rfd appear unidirectional if necessary. */
+			if (bidirectional)
+				Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+
+			/* Set the timer and wait for it to expire. */
+			Assert(set_timer(actx, 0));
+			Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0);
+			is(timer_expired(actx), 1, "timer is expired");
+
+			/* Register for read events and make the fd readable. */
+			Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+			Assert(write(wfd, "x", 1) == 1);
+			mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired");
+
+			/*
+			 * Draining the pipe should unset the multiplexer again, once the
+			 * old event is drained and the timer is reset.
+			 *
+			 * Order matters, since comb_multiplexer() doesn't have to remove
+			 * stale events when active events exist. Follow the call sequence
+			 * used in the code: drain the timer expiration, drain the pipe,
+			 * then clear the stale events.
+			 */
+			Assert(drain_timer_events(actx, &expired));
+			Assert(drain_pipe(rfd, 1));
+			Assert(comb_multiplexer(actx));
+
+			is(expired, 1, "drain_timer_events() reports expiration");
+			is(timer_expired(actx), 0, "timer is no longer expired");
+			mux_is_not_ready(actx->mux, "when fd is drained and timer reset");
+
+			/* Stop listening. */
+			Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+			/* Undo any unidirectional emulation. */
+			if (bidirectional)
+				Assert(drain_pipe(wfd, bidi_pipe_size));
+		}
+
+		/* Ensure comb_multiplexer() can handle multiple stale events. */
+		{
+			int			rfd2,
+						wfd2;
+
+			/* Create a second local pipe. */
+			Assert(pipe(pipefd) == 0);
+			rfd2 = pipefd[0];
+			wfd2 = pipefd[1];
+
+			/* Make both rfds appear unidirectional if necessary. */
+			if (bidirectional)
+			{
+				Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
+				Assert(fill_pipe(rfd2) == bidi_pipe_size);
+			}
+
+			/* Register for read events on both fds, and make them readable. */
+			Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0);
+			Assert(register_socket(NULL, rfd2, in_event, actx, NULL) == 0);
+
+			Assert(write(wfd, "x", 1) == 1);
+			Assert(write(wfd2, "x", 1) == 1);
+
+			mux_is_ready(actx->mux, deadline, "when two fds are readable");
+
+			/*
+			 * Drain both fds. comb_multiplexer() should then ensure that the
+			 * mux is no longer readable.
+			 */
+			Assert(drain_pipe(rfd, 1));
+			Assert(drain_pipe(rfd2, 1));
+			Assert(comb_multiplexer(actx));
+			mux_is_not_ready(actx->mux, "when two fds are drained");
+
+			/* Stop listening. */
+			Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0);
+			Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, actx, NULL) == 0);
+
+			/* Undo any unidirectional emulation. */
+			if (bidirectional)
+			{
+				Assert(drain_pipe(wfd, bidi_pipe_size));
+				Assert(drain_pipe(wfd2, bidi_pipe_size));
+			}
+
+			close(rfd2);
+			close(wfd2);
+		}
+	}
+
+	close(rfd);
+	close(wfd);
+	free_test_actx(actx);
+}
+
+int
+main(int argc, char *argv[])
+{
+	const char *timeout;
+
+	/* Grab the default timeout. */
+	timeout = getenv("PG_TEST_TIMEOUT_DEFAULT");
+	if (timeout)
+	{
+		int			timeout_s = atoi(timeout);
+
+		if (timeout_s > 0)
+			timeout_us = timeout_s * 1000 * 1000;
+	}
+
+	/*
+	 * Set up line buffering for our output, to let stderr interleave in the
+	 * log files.
+	 */
+	setvbuf(stdout, NULL, PG_IOLBF, 0);
+
+	test_set_timer();
+	test_register_socket();
+
+	printf("1..%d\n", num_tests);
+	return 0;
+}
+
+#else							/* !USE_ASSERT_CHECKING */
+
+/*
+ * Skip the test suite when we don't have assertions.
+ */
+int
+main(int argc, char *argv[])
+{
+	printf("1..0 # skip: cassert is not enabled\n");
+
+	return 0;
+}
+
+#endif							/* USE_ASSERT_CHECKING */
-- 
2.34.1



view thread (25+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: Re: [PATCH] OAuth: fix performance bug with stuck multiplexer events
  In-Reply-To: <CAOYmi+=2GhnPkFZSR5ne0CzPGq7tPZrOFUiuPu2OdEnoGt=o_g@mail.gmail.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox