public inbox for [email protected]
help / color / mirror / Atom feedRe: Bypassing cursors in postgres_fdw to enable parallel plans
2+ messages / 1 participants
[nested] [flat]
* Re: Bypassing cursors in postgres_fdw to enable parallel plans
@ 2026-01-26 11:44 Rafia Sabih <[email protected]>
0 siblings, 1 reply; 2+ messages in thread
From: Rafia Sabih @ 2026-01-26 11:44 UTC (permalink / raw)
To: Robert Haas <[email protected]>; +Cc: KENAN YILMAZ <[email protected]>; Andy Fan <[email protected]>; PostgreSQL Hackers <[email protected]>
On Wed, 10 Dec 2025 at 18:44, Robert Haas <[email protected]> wrote:
>
>
> Overall, I think the direction of the patch set has some promise, but
> I think it needs a lot of cleanup: removal of unnecessary code, proper
> formatting, moving variables to inner scopes, explanatory comments,
> good names for variables and functions and structure members, removal
> of unnecessary files from the patch, cleanup of the regression test
> coverage so that it doesn't add more bloat than necessary, proper
> choice of data structures, and so on. Right now, the good things that
> you've done here are being hidden by these sorts of mechanical issues.
> That's not just an issue for me as a reviewer: I suspect it's also
> blocking you, as the patch author, from finding places where the code
> could be made better. Being able to find such opportunities for
> improvement and act on them is what will get this patch from
> "interesting proof of concept" to "potentially committable patch".
>
> --
> Robert Haas
> EDB: http://www.enterprisedb.com
>
Thanks Robert for your time and attention to this patch.
Based on these review comments and an off list discussion about the design
of the patch, I have reworked the patch significantly.
In this version, a tuplestore is added to the PgFdwScanState along with
a flag. Now, in case of a cursor switch, this tuplestore is filled with the
remaining tuples of the query. The corresponding flag is set to indicate
that the tuplestore is ready to be fetched. To remember the last query that
was executing, a pointer to its PgFdwScanState is maintained in the
conn_state. Note that we only need to remember just the very last query and
once tuples for it are fetched we can forget that pointer, because now its
fsstate has the remaining tuples in the tuplestore.
So, this version of the patch looks simpler than before.
A few test cases are also added in the attached patch to cover the
different scenarios in case of non-cursor mode.
--
Regards,
Rafia Sabih
CYBERTEC PostgreSQL International GmbH
Attachments:
[application/octet-stream] v5-0001-Fetch-without-cursors.patch (55.2K, 3-v5-0001-Fetch-without-cursors.patch)
download | inline diff:
From 496c0e2c156a6349b6d0f58e5c389998be7b8a31 Mon Sep 17 00:00:00 2001
From: Rafia Sabih <[email protected]>
Date: Mon, 26 Jan 2026 12:25:08 +0100
Subject: [PATCH] Fetch without cursors
This implements a new fetch mechanism for postgres_fdw which does not use cursors.
The motivation behind this work is the limitation of cursors to not be able to use
parallel query even at the local side. Since, this new fetch mode doesn't use cursors
parallel query can now be used and hence improving the performance for postgres_fdw.
The way this new mechanism works is, once we start a query and create a cursor
to fetch the tuples for a query this mechanism just start processing the tuples for
this query without creating any cursor for it. Next, when we need tuples from a
different query and issue a call to create a new cursor in old method. At this point,
the new mode fetches all the tuples of the first query and saves them in a tuplestore.
Moving forward, the next query is processed as is. Next, whenever we need to fetch the
tuples of the first query, we do so by reading the associated tuplestore where we saved
the tuples earlier. This way we use this tuplestore to keep track of the tuples required
instead of cursors.
This new mode can be used by a new GUC called postgres_fdw.use_cursor.
When it is set, everything works as it was before this patch i.e. with the cursors.
At the moment, this does not have support for async mode. When async mode is used and
use_cursor is set to false, then it switches to true.
Original idea: Bernd Helmle
Key suggestions: Robert Haas
---
contrib/postgres_fdw/connection.c | 11 +
.../postgres_fdw/expected/postgres_fdw.out | 181 +++++++
contrib/postgres_fdw/option.c | 19 +
contrib/postgres_fdw/postgres_fdw.c | 443 ++++++++++++++----
contrib/postgres_fdw/postgres_fdw.h | 8 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 31 +-
6 files changed, 612 insertions(+), 81 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 953c2e0ab82..0d0aa044ed0 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -966,6 +966,17 @@ pgfdw_get_result(PGconn *conn)
return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
}
+/*
+ * Wrap libpqsrv_get_result(), adding wait event.
+ * Used in case of non-cursor mode.
+ * Caller is responsible for the error handling on the result.
+ */
+PGresult *
+pgfdw_get_next_result(PGconn *conn)
+{
+ return libpqsrv_get_result(conn, pgfdw_we_get_result);
+}
+
/*
* Report an error we got from the remote server.
*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 48e3185b227..e9d9deead8b 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -418,6 +418,15 @@ SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
1000 | 0 | 01000 | Thu Jan 01 00:00:00 1970 PST | Thu Jan 01 00:00:00 1970 | 0 | 0 | foo
(1 row)
+--Test in non-cursor mode to cover process_query_params path
+SET postgres_fdw.use_cursor TO off;
+SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+ c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
+------+----+-------+------------------------------+--------------------------+----+------------+-----
+ 1000 | 0 | 01000 | Thu Jan 01 00:00:00 1970 PST | Thu Jan 01 00:00:00 1970 | 0 | 0 | foo
+(1 row)
+
+RESET postgres_fdw.use_cursor;
-- used in CTE
WITH t1 AS (SELECT * FROM ft1 WHERE c1 <= 10) SELECT t2.c1, t2.c2, t2.c3, t2.c4 FROM t1, ft2 t2 WHERE t1.c1 = t2.c1 ORDER BY t1.c1;
c1 | c2 | c3 | c4
@@ -884,6 +893,122 @@ WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
(100 rows)
+-- Test in non-cursor mode for rescan path
+SET postgres_fdw.use_cursor TO off;
+SELECT * FROM ft2 a, ft2 b
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
+-----+----+-------+------------------------------+--------------------------+----+------------+-----+-----+----+-------+------------------------------+--------------------------+----+------------+-----
+ 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 26 | 6 | 00026 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 26 | 6 | 00026 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 36 | 6 | 00036 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 36 | 6 | 00036 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 46 | 6 | 00046 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 46 | 6 | 00046 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 56 | 6 | 00056 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 56 | 6 | 00056 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 66 | 6 | 00066 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 66 | 6 | 00066 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 76 | 6 | 00076 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 76 | 6 | 00076 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 86 | 6 | 00086 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 86 | 6 | 00086 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 96 | 6 | 00096 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 96 | 6 | 00096 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 116 | 6 | 00116 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 116 | 6 | 00116 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 126 | 6 | 00126 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 126 | 6 | 00126 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 136 | 6 | 00136 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 136 | 6 | 00136 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 146 | 6 | 00146 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 146 | 6 | 00146 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 156 | 6 | 00156 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 156 | 6 | 00156 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 166 | 6 | 00166 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 166 | 6 | 00166 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 176 | 6 | 00176 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 176 | 6 | 00176 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 186 | 6 | 00186 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 186 | 6 | 00186 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 196 | 6 | 00196 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 196 | 6 | 00196 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 206 | 6 | 00206 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 206 | 6 | 00206 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 216 | 6 | 00216 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 216 | 6 | 00216 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 226 | 6 | 00226 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 226 | 6 | 00226 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 236 | 6 | 00236 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 236 | 6 | 00236 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 246 | 6 | 00246 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 246 | 6 | 00246 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 256 | 6 | 00256 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 256 | 6 | 00256 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 266 | 6 | 00266 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 266 | 6 | 00266 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 276 | 6 | 00276 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 276 | 6 | 00276 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 286 | 6 | 00286 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 286 | 6 | 00286 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 296 | 6 | 00296 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 296 | 6 | 00296 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 306 | 6 | 00306 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 306 | 6 | 00306 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 316 | 6 | 00316 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 316 | 6 | 00316 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 326 | 6 | 00326 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 326 | 6 | 00326 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 336 | 6 | 00336 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 336 | 6 | 00336 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 346 | 6 | 00346 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 346 | 6 | 00346 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 356 | 6 | 00356 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 356 | 6 | 00356 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 366 | 6 | 00366 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 366 | 6 | 00366 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 376 | 6 | 00376 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 376 | 6 | 00376 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 386 | 6 | 00386 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 386 | 6 | 00386 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 396 | 6 | 00396 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 396 | 6 | 00396 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 406 | 6 | 00406 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 406 | 6 | 00406 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 416 | 6 | 00416 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 416 | 6 | 00416 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 426 | 6 | 00426 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 426 | 6 | 00426 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 436 | 6 | 00436 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 436 | 6 | 00436 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 446 | 6 | 00446 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 446 | 6 | 00446 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 456 | 6 | 00456 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 456 | 6 | 00456 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 466 | 6 | 00466 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 466 | 6 | 00466 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 476 | 6 | 00476 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 476 | 6 | 00476 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 486 | 6 | 00486 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 486 | 6 | 00486 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 496 | 6 | 00496 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 496 | 6 | 00496 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 506 | 6 | 00506 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 506 | 6 | 00506 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 516 | 6 | 00516 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 516 | 6 | 00516 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 526 | 6 | 00526 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 526 | 6 | 00526 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 536 | 6 | 00536 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 536 | 6 | 00536 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 546 | 6 | 00546 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 546 | 6 | 00546 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 556 | 6 | 00556 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 556 | 6 | 00556 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 566 | 6 | 00566 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 566 | 6 | 00566 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 576 | 6 | 00576 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 576 | 6 | 00576 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 586 | 6 | 00586 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 586 | 6 | 00586 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 596 | 6 | 00596 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 596 | 6 | 00596 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 606 | 6 | 00606 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 606 | 6 | 00606 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 616 | 6 | 00616 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 616 | 6 | 00616 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 626 | 6 | 00626 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 626 | 6 | 00626 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 636 | 6 | 00636 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 636 | 6 | 00636 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 646 | 6 | 00646 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 646 | 6 | 00646 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 656 | 6 | 00656 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 656 | 6 | 00656 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 666 | 6 | 00666 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 666 | 6 | 00666 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 676 | 6 | 00676 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 676 | 6 | 00676 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 686 | 6 | 00686 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 686 | 6 | 00686 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 696 | 6 | 00696 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 696 | 6 | 00696 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 706 | 6 | 00706 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 706 | 6 | 00706 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 716 | 6 | 00716 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 716 | 6 | 00716 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 726 | 6 | 00726 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 726 | 6 | 00726 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 736 | 6 | 00736 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 736 | 6 | 00736 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 746 | 6 | 00746 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 746 | 6 | 00746 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 756 | 6 | 00756 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 756 | 6 | 00756 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 766 | 6 | 00766 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 766 | 6 | 00766 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 776 | 6 | 00776 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 776 | 6 | 00776 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 786 | 6 | 00786 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 786 | 6 | 00786 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 796 | 6 | 00796 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 796 | 6 | 00796 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 806 | 6 | 00806 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 806 | 6 | 00806 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 816 | 6 | 00816 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 816 | 6 | 00816 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 826 | 6 | 00826 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 826 | 6 | 00826 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 836 | 6 | 00836 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 836 | 6 | 00836 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 846 | 6 | 00846 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 846 | 6 | 00846 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 856 | 6 | 00856 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 856 | 6 | 00856 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 866 | 6 | 00866 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 866 | 6 | 00866 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 876 | 6 | 00876 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 876 | 6 | 00876 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 886 | 6 | 00886 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 886 | 6 | 00886 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 896 | 6 | 00896 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 896 | 6 | 00896 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 906 | 6 | 00906 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 906 | 6 | 00906 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 916 | 6 | 00916 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 916 | 6 | 00916 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 926 | 6 | 00926 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 926 | 6 | 00926 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 936 | 6 | 00936 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 936 | 6 | 00936 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 946 | 6 | 00946 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 946 | 6 | 00946 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 956 | 6 | 00956 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 956 | 6 | 00956 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 966 | 6 | 00966 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 966 | 6 | 00966 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 976 | 6 | 00976 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 976 | 6 | 00976 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 986 | 6 | 00986 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 986 | 6 | 00986 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+(100 rows)
+
+-- Test for non cursor mode covering rescans and three active cursors
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ count
+-------
+ 100
+(1 row)
+
+RESET postgres_fdw.use_cursor;
-- bug before 9.3.5 due to sloppy handling of remote-estimate parameters
SELECT * FROM ft1 WHERE c1 = ANY (ARRAY(SELECT c1 FROM ft2 WHERE c1 < 5));
c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
@@ -2153,6 +2278,24 @@ SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2
119
(10 rows)
+--Test in non-cursor mode to cover the patch for two simultaneous active cursors
+SET postgres_fdw.use_cursor TO off;
+SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
+ c1
+-----
+ 110
+ 111
+ 112
+ 113
+ 114
+ 115
+ 116
+ 117
+ 118
+ 119
+(10 rows)
+
+RESET postgres_fdw.use_cursor;
-- CROSS JOIN can be pushed down
EXPLAIN (VERBOSE, COSTS OFF)
SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
@@ -2204,6 +2347,14 @@ SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t
----+----
(0 rows)
+--Test in non-cursor mode to cover the case with multiple cursors but only one active cursor at a time
+SET postgres_fdw.use_cursor TO off;
+SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
+ c1 | c1
+----+----
+(0 rows)
+
+RESET postgres_fdw.use_cursor;
-- unsafe join conditions (c8 has a UDT), not pushed down. Practically a CROSS
-- JOIN since c8 in both tables has same value.
EXPLAIN (VERBOSE, COSTS OFF)
@@ -11506,6 +11657,36 @@ SELECT * FROM result_tbl ORDER BY a;
(20 rows)
DELETE FROM result_tbl;
+-- Test in non-cursor mode, it doesn't support async execution
+SET postgres_fdw.use_cursor TO off;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1000 | 0 | 0000
+ 1100 | 100 | 0100
+ 1200 | 200 | 0200
+ 1300 | 300 | 0300
+ 1400 | 400 | 0400
+ 1500 | 500 | 0500
+ 1600 | 600 | 0600
+ 1700 | 700 | 0700
+ 1800 | 800 | 0800
+ 1900 | 900 | 0900
+ 2000 | 0 | 0000
+ 2100 | 100 | 0100
+ 2200 | 200 | 0200
+ 2300 | 300 | 0300
+ 2400 | 400 | 0400
+ 2500 | 500 | 0500
+ 2600 | 600 | 0600
+ 2700 | 700 | 0700
+ 2800 | 800 | 0800
+ 2900 | 900 | 0900
+(20 rows)
+
+DELETE FROM result_tbl;
+RESET postgres_fdw.use_cursor;
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
QUERY PLAN
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 04788b7e8b3..bb06bba8fd5 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -44,6 +44,7 @@ static PgFdwOption *postgres_fdw_options;
* GUC parameters
*/
char *pgfdw_application_name = NULL;
+bool pgfdw_use_cursor = true;
/*
* Helper functions
@@ -586,5 +587,23 @@ _PG_init(void)
NULL,
NULL);
+ /*
+ * If use_cursor is set to false, then the new way of fetching is used. In
+ * this mode, cursors are not used, rather the tuples are stored in a
+ * tuplestore in case the switch of queries in between execution. So, for
+ * the next call, tuples are fetched from this tuplestore instead of the
+ * fetch from cursor.
+ */
+ DefineCustomBoolVariable("postgres_fdw.use_cursor",
+ "If set uses the cursor, otherwise fetches without cursor",
+ NULL,
+ &pgfdw_use_cursor,
+ true,
+ PGC_USERSET,
+ 0,
+ NULL,
+ NULL,
+ NULL);
+
MarkGUCPrefixReserved("postgres_fdw");
}
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 5e178c21b39..8e0eb048aec 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -22,6 +22,7 @@
#include "commands/explain_format.h"
#include "commands/explain_state.h"
#include "executor/execAsync.h"
+#include "executor/executor.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -174,6 +175,14 @@ typedef struct PgFdwScanState
MemoryContext temp_cxt; /* context for per-tuple temporary data */
int fetch_size; /* number of tuples per fetch */
+ /* To be used only in non-cursor mode */
+ Tuplestorestate *tuplestore; /* Tuplestore to save the tuples of the
+ * query for later fetch. */
+ TupleTableSlot *slot; /* Slot to be used when reading the tuple from
+ * the tuplestore */
+ bool tuples_ready; /* To indicate when tuplestore is ready to be
+ * read. */
+ int total_tuples; /* total tuples in the tuplestore. */
} PgFdwScanState;
/*
@@ -451,7 +460,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void fetch_more_data(ForeignScanState *node, bool use_tuplestore);
static void close_cursor(PGconn *conn, unsigned int cursor_number,
PgFdwConnState *conn_state);
static PgFdwModifyState *create_foreign_modify(EState *estate,
@@ -516,7 +525,8 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
AttInMetadata *attinmeta,
List *retrieved_attrs,
ForeignScanState *fsstate,
- MemoryContext temp_context);
+ MemoryContext temp_context,
+ TupleDesc last_tupdesc);
static void conversion_error_callback(void *arg);
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
@@ -546,6 +556,8 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_i);
static int get_batch_size_option(Relation rel);
+/* Only required for non-cursor mode */
+static void fill_tuplestore(ForeignScanState *node);
/*
* Foreign-data wrapper handler function: return a struct with pointers
@@ -1593,6 +1605,57 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
/* Set the async-capable flag */
fsstate->async_capable = node->ss.ps.async_capable;
+ /* Initially, there is no last_query */
+ fsstate->conn_state->last_query = NULL;
+}
+
+/*
+ * This routine fetches all the tuples of a query and saves them in a tuplestore.
+ * This is required when the result of a query is not completely fetched but the control
+ * switches to a different query.
+ * A call to fetch_data is made from here, hence we need complete ForeignScanState here.
+ */
+static void
+fill_tuplestore(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *last_fsstate = (PgFdwScanState *) fsstate->conn_state->last_query;
+ MemoryContext oldcontext;
+ ExprContext *econtext = node->ss.ps.ps_ExprContext;
+ PGconn *conn = fsstate->conn;
+ const char **values = last_fsstate->param_values;
+ int numParams = last_fsstate->numParams;
+
+ /*
+ * Construct array of query parameter values in text format, as done in
+ * create_cursor
+ */
+ if (numParams > 0)
+ {
+ oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+ process_query_params(econtext,
+ last_fsstate->param_flinfo,
+ last_fsstate->param_exprs,
+ values);
+ MemoryContextSwitchTo(oldcontext);
+ }
+ if (conn->asyncStatus == PGASYNC_IDLE)
+ {
+ /* If the connection is not active then set up */
+ if (!PQsendQueryParams(conn, last_fsstate->query, last_fsstate->numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(NULL, conn, last_fsstate->query);
+
+ if (!PQsetChunkedRowsMode(conn, last_fsstate->fetch_size))
+ pgfdw_report_error(NULL, conn, last_fsstate->query);
+ }
+ fetch_more_data(node, true);
+
+ /*
+ * Remove the last_query since it is completely fetched, so no need to
+ * remember it now.
+ */
+ fsstate->conn_state->last_query = NULL;
}
/*
@@ -1625,7 +1688,8 @@ postgresIterateForeignScan(ForeignScanState *node)
return ExecClearTuple(slot);
/* No point in another fetch if we already detected EOF, though. */
if (!fsstate->eof_reached)
- fetch_more_data(node);
+ fetch_more_data(node, false);
+
/* If we didn't get any tuples, must be end of data. */
if (fsstate->next_tuple >= fsstate->num_tuples)
return ExecClearTuple(slot);
@@ -1651,6 +1715,7 @@ postgresReScanForeignScan(ForeignScanState *node)
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
char sql[64];
PGresult *res;
+ bool close_cursor = false;
/* If we haven't created the cursor yet, nothing to do. */
if (!fsstate->cursor_exists)
@@ -1666,7 +1731,7 @@ postgresReScanForeignScan(ForeignScanState *node)
if (fsstate->async_capable &&
fsstate->conn_state->pendingAreq &&
fsstate->conn_state->pendingAreq->requestee == (PlanState *) node)
- fetch_more_data(node);
+ fetch_more_data(node, false);
/*
* If any internal parameters affecting this node have changed, we'd
@@ -1680,19 +1745,26 @@ postgresReScanForeignScan(ForeignScanState *node)
if (node->ss.ps.chgParam != NULL)
{
fsstate->cursor_exists = false;
- snprintf(sql, sizeof(sql), "CLOSE c%u",
- fsstate->cursor_number);
+ if (pgfdw_use_cursor)
+ snprintf(sql, sizeof(sql), "CLOSE c%u",
+ fsstate->cursor_number);
+ else
+ close_cursor = true;
}
else if (fsstate->fetch_ct_2 > 1)
{
if (PQserverVersion(fsstate->conn) < 150000)
+ /* TODO: Handle it in non-cursor mode as well */
snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
fsstate->cursor_number);
else
{
fsstate->cursor_exists = false;
- snprintf(sql, sizeof(sql), "CLOSE c%u",
- fsstate->cursor_number);
+ if (pgfdw_use_cursor)
+ snprintf(sql, sizeof(sql), "CLOSE c%u",
+ fsstate->cursor_number);
+ else
+ close_cursor = true;
}
}
else
@@ -1701,18 +1773,32 @@ postgresReScanForeignScan(ForeignScanState *node)
fsstate->next_tuple = 0;
return;
}
-
- res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(res, fsstate->conn, sql);
- PQclear(res);
-
- /* Now force a fresh FETCH. */
- fsstate->tuples = NULL;
- fsstate->num_tuples = 0;
- fsstate->next_tuple = 0;
- fsstate->fetch_ct_2 = 0;
- fsstate->eof_reached = false;
+ if (pgfdw_use_cursor)
+ {
+ res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(res, fsstate->conn, sql);
+ PQclear(res);
+ /* Now force a fresh FETCH. */
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ fsstate->next_tuple = 0;
+ fsstate->fetch_ct_2 = 0;
+ fsstate->eof_reached = false;
+ }
+ else if (!pgfdw_use_cursor && close_cursor)
+ {
+ res = pgfdw_get_result(fsstate->conn);
+ while (res != NULL)
+ res = pgfdw_get_result(fsstate->conn);
+ PQclear(res);
+ /* Now force a fresh FETCH. */
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ fsstate->next_tuple = 0;
+ fsstate->fetch_ct_2 = 0;
+ fsstate->eof_reached = false;
+ }
}
/*
@@ -3755,29 +3841,67 @@ create_cursor(ForeignScanState *node)
MemoryContextSwitchTo(oldcontext);
}
- /* Construct the DECLARE CURSOR command */
- initStringInfo(&buf);
- appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
- fsstate->cursor_number, fsstate->query);
+ if (pgfdw_use_cursor)
+ {
+ /* Construct the DECLARE CURSOR command */
+ initStringInfo(&buf);
+ appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
+ fsstate->cursor_number, fsstate->query);
- /*
- * Notice that we pass NULL for paramTypes, thus forcing the remote server
- * to infer types for all parameters. Since we explicitly cast every
- * parameter (see deparse.c), the "inference" is trivial and will produce
- * the desired result. This allows us to avoid assuming that the remote
- * server has the same OIDs we do for the parameters' types.
- */
- if (!PQsendQueryParams(conn, buf.data, numParams,
- NULL, values, NULL, NULL, 0))
- pgfdw_report_error(NULL, conn, buf.data);
+ /*
+ * Notice that we pass NULL for paramTypes, thus forcing the remote
+ * server to infer types for all parameters. Since we explicitly cast
+ * every parameter (see deparse.c), the "inference" is trivial and
+ * will produce the desired result. This allows us to avoid assuming
+ * that the remote server has the same OIDs we do for the parameters'
+ * types.
+ */
+ if (!PQsendQueryParams(conn, buf.data, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(NULL, conn, buf.data);
- /*
- * Get the result, and check for success.
- */
- res = pgfdw_get_result(conn);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(res, conn, fsstate->query);
- PQclear(res);
+ /*
+ * Get the result, and check for success.
+ */
+ res = pgfdw_get_result(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(res, conn, fsstate->query);
+ PQclear(res);
+
+ /* Clean up */
+ pfree(buf.data);
+ }
+ else
+ {
+ /*
+ * Finish fetching tuples of the last query. Do this only when there
+ * is a different query than the current one. In the case of rescan,
+ * create_cursor is called simultaneously for the same query so to
+ * avoid calling fill_tuplestore in such cases, check if the queries
+ * are different and tuplestore is not already filled for this query.
+ */
+ if (fsstate->conn_state->last_query &&
+ fsstate != fsstate->conn_state->last_query &&
+ !fsstate->conn_state->last_query->tuples_ready)
+ fill_tuplestore(node);
+
+ /*
+ * To remember the current query as the last one, when control
+ * switches to another query
+ */
+ fsstate->conn_state->last_query = fsstate;
+
+ if (!PQsendQueryParams(conn, fsstate->query, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(NULL, conn, fsstate->query);
+
+ /*
+ * Call for Chunked rows mode with same size of chunk as the fetch
+ * size
+ */
+ if (!PQsetChunkedRowsMode(conn, fsstate->fetch_size))
+ pgfdw_report_error(NULL, conn, fsstate->query);
+ }
/* Mark the cursor as created, and show no tuples have been retrieved */
fsstate->cursor_exists = true;
@@ -3786,30 +3910,29 @@ create_cursor(ForeignScanState *node)
fsstate->next_tuple = 0;
fsstate->fetch_ct_2 = 0;
fsstate->eof_reached = false;
-
- /* Clean up */
- pfree(buf.data);
}
/*
* Fetch some more rows from the node's cursor.
*/
static void
-fetch_more_data(ForeignScanState *node)
+fetch_more_data(ForeignScanState *node, bool use_tuplestore)
{
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *last_fsstate = fsstate->conn_state->last_query;
PGconn *conn = fsstate->conn;
PGresult *res;
- int numrows;
- int i;
MemoryContext oldcontext;
+ int numrows,
+ i = 0,
+ total_tuples = 0;
+ bool already_done = false;
/*
* We'll store the tuples in the batch_cxt. First, flush the previous
* batch.
*/
fsstate->tuples = NULL;
- MemoryContextReset(fsstate->batch_cxt);
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
if (fsstate->async_capable)
@@ -3828,7 +3951,7 @@ fetch_more_data(ForeignScanState *node)
/* Reset per-connection state */
fsstate->conn_state->pendingAreq = NULL;
}
- else
+ else if (pgfdw_use_cursor)
{
char sql[64];
@@ -3841,32 +3964,177 @@ fetch_more_data(ForeignScanState *node)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(res, conn, fsstate->query);
}
+ else
+ {
+ /*
+ * In non-cursor mode, there are three options for the further
+ * processing: 1. If the tuplestore is already filled, retrieve the
+ * tuples from there. 2. Fetch the tuples till the end of the query
+ * and store them in tuplestore. 3. Perform a normal fetch and process
+ * the tuples.
+ */
+ if (fsstate->tuplestore && fsstate->tuples_ready)
+ {
+ /* Retrieve the tuples from the tuplestore instead of actual fetch */
+ numrows = fsstate->total_tuples;
+ fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ fsstate->slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
- /* Convert the data into HeapTuples */
- numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
- fsstate->num_tuples = numrows;
- fsstate->next_tuple = 0;
+ while (tuplestore_gettupleslot(fsstate->tuplestore, true, true, fsstate->slot))
+ {
+ fsstate->tuples[i++] = ExecFetchSlotHeapTuple(fsstate->slot, true, NULL);
+ ExecClearTuple(fsstate->slot);
+ }
+ fsstate->num_tuples = numrows;
+ fsstate->next_tuple = 0;
+ already_done = true;
+ fsstate->eof_reached = true;
+ fsstate->tuples_ready = false;
+
+ /* Clean up */
+ tuplestore_end(fsstate->tuplestore);
+ ExecDropSingleTupleTableSlot(fsstate->slot);
+ fsstate->slot = NULL;
+ fsstate->tuplestore = NULL;
+ return;
+ }
+ else
+ {
+ /*
+ * Non-cursor mode uses PQSetChunkedRowsMode during create_cursor,
+ * so just get the result here.
+ */
+ res = pgfdw_get_next_result(conn);
+ if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+ pgfdw_report_error(res, conn, fsstate->query);
- for (i = 0; i < numrows; i++)
+ else if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ {
+ /*
+ * This signifies query is completed and there are no more
+ * tuples left.
+ */
+ if (use_tuplestore)
+ {
+ /*
+ * If we are here to store the tuples in tuplestore then
+ * this signals we have already fetched all the tuples for
+ * this query, so nothing to do. Just set the right flags
+ */
+ already_done = true;
+ last_fsstate->tuples_ready = true;
+ last_fsstate->eof_reached = true;
+ }
+
+ /* There are no more tuples to fetch */
+ while (res != NULL)
+ res = pgfdw_get_result(conn);
+ }
+ else if (PQresultStatus(res) == PGRES_TUPLES_CHUNK)
+ {
+ if (use_tuplestore)
+ {
+ /*
+ * This is to fetch all the tuples of the query in
+ * last_fsstate and save them in Tuple Slot.
+ */
+
+ /*
+ * We should never be here without a valid last_fsstate in
+ * the scan state.
+ */
+ Assert(last_fsstate != NULL);
+ last_fsstate->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+
+ for (;;)
+ {
+ /*
+ * Since it is using PQSetChunkedRowsMode, we only get
+ * the fsstate->fetch_size tuples in one run, so keep
+ * on executing till we get NULL in PGresult i.e. all
+ * the tuples are retrieved.
+ */
+ CHECK_FOR_INTERRUPTS();
+ numrows = PQntuples(res);
+ total_tuples += numrows;
+
+ /* Convert the data into HeapTuples */
+ Assert(IsA(node->ss.ps.plan, ForeignScan));
+ for (i = 0; i < numrows; i++)
+ {
+ HeapTuple temp_tuple;
+
+ temp_tuple = make_tuple_from_result_row(res, i,
+ last_fsstate->rel,
+ last_fsstate->attinmeta,
+ last_fsstate->retrieved_attrs,
+ node,
+ last_fsstate->temp_cxt,
+ last_fsstate->tupdesc);
+ tuplestore_puttuple(last_fsstate->tuplestore, temp_tuple);
+ heap_freetuple(temp_tuple);
+ }
+
+ res = pgfdw_get_next_result(conn);
+ if (res == NULL)
+ break;
+ else if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+ pgfdw_report_error(res, conn, last_fsstate->query);
+ else if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ {
+ /* This means all the tuples are retreived. */
+ numrows = PQntuples(res);
+ /* If there is nothing to fetch */
+ if (numrows == 0)
+ res = pgfdw_get_result(conn);
+ }
+ }
+
+ /*
+ * EOF is reached because we are storing all tuples to the
+ * tuplestore.
+ */
+ already_done = true;
+ last_fsstate->tuples_ready = true;
+ last_fsstate->total_tuples = total_tuples;
+ }
+ }
+ }
+ }
+ if (!already_done)
{
- Assert(IsA(node->ss.ps.plan, ForeignScan));
+ /*
+ * To fetch tuples for the query in fsstate used in both cursor and
+ * non-cursor mode.
+ */
+ /* Convert the data into HeapTuples */
+ numrows = PQntuples(res);
+ fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ fsstate->num_tuples = numrows;
+ fsstate->next_tuple = 0;
- fsstate->tuples[i] =
- make_tuple_from_result_row(res, i,
- fsstate->rel,
- fsstate->attinmeta,
- fsstate->retrieved_attrs,
- node,
- fsstate->temp_cxt);
+ for (i = 0; i < numrows; i++)
+ {
+ Assert(IsA(node->ss.ps.plan, ForeignScan));
+
+ fsstate->tuples[i] =
+ make_tuple_from_result_row(res, i,
+ fsstate->rel,
+ fsstate->attinmeta,
+ fsstate->retrieved_attrs,
+ node,
+ fsstate->temp_cxt,
+ NULL);
+ }
}
/* Update fetch_ct_2 */
- if (fsstate->fetch_ct_2 < 2)
+ if (!use_tuplestore && fsstate->fetch_ct_2 < 2)
fsstate->fetch_ct_2++;
/* Must be EOF if we didn't get as many tuples as we asked for. */
- fsstate->eof_reached = (numrows < fsstate->fetch_size);
+ if (!use_tuplestore)
+ fsstate->eof_reached = (numrows < fsstate->fetch_size);
PQclear(res);
@@ -3941,11 +4209,16 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
char sql[64];
PGresult *res;
- snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
- res = pgfdw_exec_query(conn, sql, conn_state);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(res, conn, sql);
- PQclear(res);
+ if (pgfdw_use_cursor)
+ {
+ snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
+ res = pgfdw_exec_query(conn, sql, conn_state);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(res, conn, sql);
+ PQclear(res);
+ }
+ else
+ while (pgfdw_get_result(conn) != NULL);
}
/*
@@ -4329,7 +4602,7 @@ store_returning_result(PgFdwModifyState *fmstate,
fmstate->attinmeta,
fmstate->retrieved_attrs,
NULL,
- fmstate->temp_cxt);
+ fmstate->temp_cxt, NULL);
/*
* The returning slot will not necessarily be suitable to store heaptuples
@@ -4608,7 +4881,7 @@ get_returning_data(ForeignScanState *node)
dmstate->attinmeta,
dmstate->retrieved_attrs,
node,
- dmstate->temp_cxt);
+ dmstate->temp_cxt, NULL);
ExecStoreHeapTuple(newtup, slot, false);
/* Get the updated/deleted tuple. */
if (dmstate->rel)
@@ -5239,7 +5512,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
for (;;)
{
int numrows;
- int i;
+ int i = 0;
/* Allow users to cancel long query */
CHECK_FOR_INTERRUPTS();
@@ -5360,7 +5633,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
astate->attinmeta,
astate->retrieved_attrs,
NULL,
- astate->temp_cxt);
+ astate->temp_cxt, NULL);
MemoryContextSwitchTo(oldcontext);
}
@@ -7314,7 +7587,7 @@ postgresForeignAsyncNotify(AsyncRequest *areq)
if (!PQconsumeInput(fsstate->conn))
pgfdw_report_error(NULL, fsstate->conn, fsstate->query);
- fetch_more_data(node);
+ fetch_more_data(node, false);
produce_tuple_asynchronously(areq, true);
}
@@ -7400,6 +7673,13 @@ fetch_more_data_begin(AsyncRequest *areq)
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
char sql[64];
+ /*
+ * Reset cursor mode when in asynchronous mode as it is not supported in
+ * non-cursor mode
+ */
+ if (!pgfdw_use_cursor)
+ pgfdw_use_cursor = true;
+
Assert(!fsstate->conn_state->pendingAreq);
/* Create the cursor synchronously. */
@@ -7432,7 +7712,7 @@ process_pending_request(AsyncRequest *areq)
/* The request should be currently in-process */
Assert(fsstate->conn_state->pendingAreq == areq);
- fetch_more_data(node);
+ fetch_more_data(node, false);
/*
* If we didn't get any tuples, must be end of data; complete the request
@@ -7494,7 +7774,8 @@ make_tuple_from_result_row(PGresult *res,
AttInMetadata *attinmeta,
List *retrieved_attrs,
ForeignScanState *fsstate,
- MemoryContext temp_context)
+ MemoryContext temp_context,
+ TupleDesc last_tupdesc)
{
HeapTuple tuple;
TupleDesc tupdesc;
@@ -7518,9 +7799,13 @@ make_tuple_from_result_row(PGresult *res,
/*
* Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
- * provided, otherwise look to the scan node's ScanTupleSlot.
+ * provided, otherwise look to the scan node's ScanTupleSlot. In case of
+ * non-cursor mode, use the tupledesc that is already provided, because
+ * getting from the current fsstate would be wrong in this case.
*/
- if (rel)
+ if (last_tupdesc)
+ tupdesc = last_tupdesc;
+ else if (rel)
tupdesc = RelationGetDescr(rel);
else
{
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index e69735298d7..6a257252647 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -16,9 +16,11 @@
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "libpq/libpq-be-fe.h"
+#include "libpq-int.h"
#include "nodes/execnodes.h"
#include "nodes/pathnodes.h"
#include "utils/relcache.h"
+#include "funcapi.h"
/*
* FDW-specific planner information kept in RelOptInfo.fdw_private for a
@@ -131,12 +133,16 @@ typedef struct PgFdwRelationInfo
int relation_index;
} PgFdwRelationInfo;
+typedef struct PgFdwScanState PgFdwScanState;
+
/*
* Extra control information relating to a connection.
*/
typedef struct PgFdwConnState
{
AsyncRequest *pendingAreq; /* pending async request */
+ PgFdwScanState *last_query; /* last query executed, required for
+ * non-cursor mode */
} PgFdwConnState;
/*
@@ -164,6 +170,7 @@ extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern void do_sql_command(PGconn *conn, const char *sql);
extern PGresult *pgfdw_get_result(PGconn *conn);
+extern PGresult *pgfdw_get_next_result(PGconn *conn);
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
PgFdwConnState *state);
pg_noreturn extern void pgfdw_report_error(PGresult *res, PGconn *conn,
@@ -179,6 +186,7 @@ extern List *ExtractExtensionList(const char *extensionsString,
bool warnOnMissing);
extern char *process_pgfdw_appname(const char *appname);
extern char *pgfdw_application_name;
+extern bool pgfdw_use_cursor;
/* in deparse.c */
extern void classifyConditions(PlannerInfo *root,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 9a8f9e28135..f15c5ae2145 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -3,11 +3,9 @@
-- ===================================================================
CREATE EXTENSION postgres_fdw;
-
SELECT current_database() AS current_database,
current_setting('port') AS current_port
\gset
-
CREATE SERVER testserver1 FOREIGN DATA WRAPPER postgres_fdw;
CREATE SERVER loopback FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (dbname :'current_database', port :'current_port');
@@ -277,6 +275,10 @@ SELECT COUNT(*) FROM ft1 t1;
SELECT * FROM ft1 t1 WHERE t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 <= 10) ORDER BY c1;
-- subquery+MAX
SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+--Test in non-cursor mode to cover process_query_params path
+SET postgres_fdw.use_cursor TO off;
+SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+RESET postgres_fdw.use_cursor;
-- used in CTE
WITH t1 AS (SELECT * FROM ft1 WHERE c1 <= 10) SELECT t2.c1, t2.c2, t2.c3, t2.c4 FROM t1, ft2 t2 WHERE t1.c1 = t2.c1 ORDER BY t1.c1;
-- fixed values
@@ -356,6 +358,15 @@ EXPLAIN (VERBOSE, COSTS OFF)
WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
SELECT * FROM ft2 a, ft2 b
WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+
+-- Test in non-cursor mode for rescan path
+SET postgres_fdw.use_cursor TO off;
+SELECT * FROM ft2 a, ft2 b
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+-- Test for non cursor mode covering rescans and three active cursors
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+RESET postgres_fdw.use_cursor;
+
-- bug before 9.3.5 due to sloppy handling of remote-estimate parameters
SELECT * FROM ft1 WHERE c1 = ANY (ARRAY(SELECT c1 FROM ft2 WHERE c1 < 5));
SELECT * FROM ft2 WHERE c1 = ANY (ARRAY(SELECT c1 FROM ft1 WHERE c1 < 5));
@@ -646,6 +657,11 @@ SELECT t1.c1 FROM ft1 t1 WHERE EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c1)
EXPLAIN (VERBOSE, COSTS OFF)
SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
+
+--Test in non-cursor mode to cover the patch for two simultaneous active cursors
+SET postgres_fdw.use_cursor TO off;
+SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
+RESET postgres_fdw.use_cursor;
-- CROSS JOIN can be pushed down
EXPLAIN (VERBOSE, COSTS OFF)
SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
@@ -654,6 +670,10 @@ SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 1
EXPLAIN (VERBOSE, COSTS OFF)
SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
+--Test in non-cursor mode to cover the case with multiple cursors but only one active cursor at a time
+SET postgres_fdw.use_cursor TO off;
+SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
+RESET postgres_fdw.use_cursor;
-- unsafe join conditions (c8 has a UDT), not pushed down. Practically a CROSS
-- JOIN since c8 in both tables has same value.
EXPLAIN (VERBOSE, COSTS OFF)
@@ -3907,6 +3927,13 @@ INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
SELECT * FROM result_tbl ORDER BY a;
DELETE FROM result_tbl;
+-- Test in non-cursor mode, it doesn't support async execution
+SET postgres_fdw.use_cursor TO off;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+RESET postgres_fdw.use_cursor;
+
EXPLAIN (VERBOSE, COSTS OFF)
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
--
2.39.5 (Apple Git-154)
^ permalink raw reply [nested|flat] 2+ messages in thread
* Re: Bypassing cursors in postgres_fdw to enable parallel plans
@ 2026-05-20 10:40 Rafia Sabih <[email protected]>
parent: Rafia Sabih <[email protected]>
0 siblings, 0 replies; 2+ messages in thread
From: Rafia Sabih @ 2026-05-20 10:40 UTC (permalink / raw)
To: Robert Haas <[email protected]>; +Cc: KENAN YILMAZ <[email protected]>; Andy Fan <[email protected]>; PostgreSQL Hackers <[email protected]>
On Tue, 12 May 2026 at 14:56, Robert Haas <[email protected]> wrote:
> On Mon, May 11, 2026 at 7:03 AM Rafia Sabih <[email protected]>
> wrote:
> > Thanks Robert for the detailed review and some great suggestions for
> improving this patch. I went through the comments and the patch and here is
> the revised version. To summarize the changes done in this patch,
> > - as suggested, split it into two patches now, the first patch changes
> the cursor_exists flag to scan_in_progress, the second patch is the one
> with the rest of the changes to implement this fetch mechanism
> > - changed the GUC to server/tables level option called streaming_fetch.
> Still a boolean type. Open to better naming suggestions.
> > - refactored postrgesReScanForeignScan for a more readable code
> > Following the suggestions above, now, it handles the case for the
> backward cursor separately and exits the function straight from there. In
> case of close cursor, it checks if cursor or cursor-free mode is used, and
> executes different actions as per the mode. In cursor-free mode we end_scan
> and tuplestore. In the third case when we start scanning the tuples already
> fetched, things remain the same in both modes.
> > - added the changes to show the option in explain verbose output
> > - Added a lot more test cases to check the option with other options and
> also to cover more code-paths, particularly rescans and ending the query in
> between, error cases etc.
> >
> > There is one problem that remains here, the use of ExprContext. We need
> this when we are fetching the tuples for the active scan, but since this is
> only available in node, we can not have the one which was there at the time
> when active_scan was the fsstate, so at the moment it is using the econtext
> from the node. This doesn't seem correct to me, but to have the correct
> ExprContext we need the node, but it also doesn't seem totally right to
> have the pointer to node in ScanState for this scenario. Please let me know
> what could be a good way to handle this.
>
> The problem here is that save_to_tuplestore() has gotten itself into
> the business of resending the query when a node is rescanned and we
> haven't yet buffered the results anywhere. But it doesn't need to do
> that in the first place. If the query needs to be resent, it's not
> currently active on the connection, and then there's no need to do
> anything to free up the connection, so save_to_tuplestore() doesn't
> need to be called in the first place. The reason why you're having
> this problem is that postgresReScanForeignScan calls end_scan but
> end_scan does not clear the active_scan pointer. So then
> save_to_tuplestore() gets called if something else tries to use the
> connection, and to make that work, you did this.
>
> But the right solution is to make sure that the active_scan pointer is
> only non-NULL when there are actually results already on the wire that
> need to be drained. If you do that, then save_to_tuplestore() won't
> get called if rescan has already read all of the pending results off
> the wire, and then you can delete the code that resends the query, and
> then you won't need an econtext here in the first place.
>
> Thanks Robert for the quick follow up on this. I found this to be true and
modified the patch accordingly.
Please find the updated patches attached. In the first patch jnothing is
changed, just adding it here for the sake of completeness.
--
Regards,
Rafia Sabih
CYBERTEC PostgreSQL International GmbH
Attachments:
[application/octet-stream] v10-0002-postgres_fdw-Add-streaming_fetch-option-for-curs.patch (122.2K, 3-v10-0002-postgres_fdw-Add-streaming_fetch-option-for-curs.patch)
download | inline diff:
From 8dd70c513961113b1f0c2deaf8927828a2f62334 Mon Sep 17 00:00:00 2001
From: Rafia Sabih <[email protected]>
Date: Wed, 20 May 2026 12:35:22 +0200
Subject: [PATCH v10 2/2] postgres_fdw:Add streaming_fetch option for
cursor-free fetching
postgres_fdw uses cursors to fetch tuples from the remote server
incrementally. While this works correctly, cursors prevent the remote
side from using parallel query execution, which can significantly limit
performance for large scans.
This adds a new boolean option streaming_fetch, available at both the
server and table level, that gives an alternate fetching mechanism.
Because no cursor is created on the remote side, the remote query is free
to use parallel execution.
When a second scan begins on the same connection while another is still
in progress, the remaining tuples of the active scan are drained into a
tuplestore and replayed when execution returns to that scan. This
preserves correct results across nested or interleaved scans without
requiring an additional connection.
streaming_fetch defaults to false, preserving existing behavior.
Asynchronous execution is not supported in this mode and is disabled
automatically when streaming_fetch is enabled.
Original idea: Bernd Helmle
Key suggestions: Robert Haas
---
contrib/postgres_fdw/connection.c | 11 +
.../postgres_fdw/expected/postgres_fdw.out | 986 +++++++++++++++++-
contrib/postgres_fdw/option.c | 4 +
contrib/postgres_fdw/postgres_fdw.c | 439 +++++++-
contrib/postgres_fdw/postgres_fdw.h | 6 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 410 ++++++++
doc/src/sgml/postgres-fdw.sgml | 15 +
7 files changed, 1824 insertions(+), 47 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 3d2a8d0519d..543bb31cfc6 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -1082,6 +1082,17 @@ pgfdw_get_result(PGconn *conn)
return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
}
+/*
+ * Wrap libpqsrv_get_result(), adding wait event.
+ * Used in case of streaming_fetch mode.
+ * Caller is responsible for the error handling on the result.
+ */
+PGresult *
+pgfdw_get_next_result(PGconn *conn)
+{
+ return libpqsrv_get_result(conn, pgfdw_we_get_result);
+}
+
/*
* Report an error we got from the remote server.
*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index aaffcf31271..216c7305964 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -277,6 +277,363 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
ANALYZE ft1;
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
-- ===================================================================
+-- test streaming_fetch option
+-- ===================================================================
+CREATE SERVER fetch_stream_srv
+ FOREIGN DATA WRAPPER postgres_fdw
+ OPTIONS (dbname :'current_database', port :'current_port');
+CREATE USER MAPPING FOR CURRENT_USER SERVER fetch_stream_srv;
+CREATE TABLE local_tbl (id int, val text);
+INSERT INTO local_tbl VALUES (1, 'a'), (2, 'b'), (3, 'c');
+-- 1. streaming_fetch set at SERVER level only
+-- 1a. Set streaming_fetch = true on the server
+ALTER SERVER fetch_stream_srv OPTIONS (ADD streaming_fetch 'true');
+CREATE FOREIGN TABLE ft_server (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl');
+-- Verify option appears in foreign server
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch')
+ORDER BY srvname, option_name;
+ srvname | option_name | option_value
+------------------+-----------------+--------------
+ fetch_stream_srv | streaming_fetch | true
+(1 row)
+
+-- Verify option is NOT present at table level (inherits from server)
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+ ftrelid | ftoptions
+-----------+-------------------------------------------
+ ft_server | {schema_name=public,table_name=local_tbl}
+(1 row)
+
+-- 1b. Switch server-level to streaming_fetch = false
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'false');
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch')
+ORDER BY srvname, option_name;
+ srvname | option_name | option_value
+------------------+-----------------+--------------
+ fetch_stream_srv | streaming_fetch | false
+(1 row)
+
+-- Verify option is NOT present at table level (inherits from server)
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+ ftrelid | ftoptions
+-----------+-------------------------------------------
+ ft_server | {schema_name=public,table_name=local_tbl}
+(1 row)
+
+-- 2. streaming_fetch set at TABLE level only (no server-level option)
+ALTER SERVER fetch_stream_srv OPTIONS (DROP streaming_fetch);
+-- 2a. streaming_fetch = true at table level
+ALTER FOREIGN TABLE ft_server OPTIONS (ADD streaming_fetch 'true');
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+ ftrelid | ftoptions
+-----------+----------------------------------------------------------------
+ ft_server | {schema_name=public,table_name=local_tbl,streaming_fetch=true}
+(1 row)
+
+SELECT * FROM ft_server ORDER BY id;
+ id | val
+----+-----
+ 1 | a
+ 2 | b
+ 3 | c
+(3 rows)
+
+-- 2b. streaming_fetch = false at table level
+ALTER FOREIGN TABLE ft_server OPTIONS (SET streaming_fetch 'false');
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+ ftrelid | ftoptions
+-----------+-----------------------------------------------------------------
+ ft_server | {schema_name=public,table_name=local_tbl,streaming_fetch=false}
+(1 row)
+
+SELECT * FROM ft_server ORDER BY id;
+ id | val
+----+-----
+ 1 | a
+ 2 | b
+ 3 | c
+(3 rows)
+
+-- 3. TABLE-level value overrides SERVER-level value
+-- 3a. Server = true, Table = false
+ALTER SERVER fetch_stream_srv OPTIONS (ADD streaming_fetch 'true');
+-- Server shows true
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch')
+ORDER BY srvname, option_name;
+ srvname | option_name | option_value
+------------------+-----------------+--------------
+ fetch_stream_srv | streaming_fetch | true
+(1 row)
+
+-- Table shows its own override: false
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+ ftrelid | ftoptions
+-----------+-----------------------------------------------------------------
+ ft_server | {schema_name=public,table_name=local_tbl,streaming_fetch=false}
+(1 row)
+
+-- Query must succeed and use table-level setting (false)
+SELECT * FROM ft_server ORDER BY id;
+ id | val
+----+-----
+ 1 | a
+ 2 | b
+ 3 | c
+(3 rows)
+
+-- 3b. Server = false, Table = true → effective value is true
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'false');
+ALTER FOREIGN TABLE ft_server OPTIONS (SET streaming_fetch 'true');
+-- Server shows false
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch')
+ORDER BY srvname, option_name;
+ srvname | option_name | option_value
+------------------+-----------------+--------------
+ fetch_stream_srv | streaming_fetch | false
+(1 row)
+
+-- Table shows its own override: true
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+ ftrelid | ftoptions
+-----------+----------------------------------------------------------------
+ ft_server | {schema_name=public,table_name=local_tbl,streaming_fetch=true}
+(1 row)
+
+-- Query must succeed and use table-level setting (true)
+SELECT * FROM ft_server ORDER BY id;
+ id | val
+----+-----
+ 1 | a
+ 2 | b
+ 3 | c
+(3 rows)
+
+-- 4. Negative tests: invalid values must be rejected
+-- streaming_fetch is boolean; non-boolean value must ERROR
+\set VERBOSITY terse
+CREATE FOREIGN TABLE ft_invalid (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl', streaming_fetch 'yes'); -- ERROR
+ERROR: streaming_fetch requires a Boolean value
+CREATE FOREIGN TABLE ft_invalid (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl', streaming_fetch '1'); -- ERROR
+ERROR: streaming_fetch requires a Boolean value
+CREATE FOREIGN TABLE ft_invalid (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl', streaming_fetch ''); -- ERROR
+ERROR: streaming_fetch requires a Boolean value
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'notabool'); -- ERROR
+ERROR: streaming_fetch requires a Boolean value
+\set VERBOSITY default
+-- 5. ALTER FOREIGN TABLE: add, change, and drop streaming_fetch
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'true');
+CREATE FOREIGN TABLE ft_alter_test (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl');
+-- No table-level option yet
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_alter_test'::regclass;
+ ftrelid | ftoptions
+---------------+-------------------------------------------
+ ft_alter_test | {schema_name=public,table_name=local_tbl}
+(1 row)
+
+-- ADD table-level option
+ALTER FOREIGN TABLE ft_alter_test OPTIONS (ADD streaming_fetch 'false');
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_alter_test'::regclass;
+ ftrelid | ftoptions
+---------------+-----------------------------------------------------------------
+ ft_alter_test | {schema_name=public,table_name=local_tbl,streaming_fetch=false}
+(1 row)
+
+SELECT * FROM ft_alter_test ORDER BY id;
+ id | val
+----+-----
+ 1 | a
+ 2 | b
+ 3 | c
+(3 rows)
+
+-- SET (change) table-level option
+ALTER FOREIGN TABLE ft_alter_test OPTIONS (SET streaming_fetch 'true');
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_alter_test'::regclass;
+ ftrelid | ftoptions
+---------------+----------------------------------------------------------------
+ ft_alter_test | {schema_name=public,table_name=local_tbl,streaming_fetch=true}
+(1 row)
+
+SELECT * FROM ft_alter_test ORDER BY id;
+ id | val
+----+-----
+ 1 | a
+ 2 | b
+ 3 | c
+(3 rows)
+
+-- DROP table-level option (falls back to server-level)
+ALTER FOREIGN TABLE ft_alter_test OPTIONS (DROP streaming_fetch);
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_alter_test'::regclass;
+ ftrelid | ftoptions
+---------------+-------------------------------------------
+ ft_alter_test | {schema_name=public,table_name=local_tbl}
+(1 row)
+
+SELECT * FROM ft_alter_test ORDER BY id;
+ id | val
+----+-----
+ 1 | a
+ 2 | b
+ 3 | c
+(3 rows)
+
+DROP FOREIGN TABLE ft_alter_test;
+-- 6. streaming_fetch with non-default fetch_size values
+-- Use a 12-row table so chunk boundaries are distinct and predictable:
+-- fetch_size=1 gives 12 single-row chunks, fetch_size=5 gives chunks
+-- of 5+5+2, and fetch_size=1000 puts all rows in a single chunk.
+CREATE TABLE local_tbl_large (id int, val text);
+INSERT INTO local_tbl_large SELECT id, 'val' || id FROM generate_series(1, 12) id;
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'true');
+-- fetch_size = 1: every row is its own libpq chunk; exercises the path
+-- where pgfdw_get_next_result is called once per row.
+CREATE FOREIGN TABLE ft_fetchsize (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl_large', fetch_size '1');
+SELECT count(*) FROM ft_fetchsize;
+ count
+-------
+ 12
+(1 row)
+
+DROP FOREIGN TABLE ft_fetchsize;
+-- fetch_size = 5: three chunks with a partial last chunk (5+5+2);
+-- the final chunk is smaller than fetch_size.
+CREATE FOREIGN TABLE ft_fetchsize (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl_large', fetch_size '5');
+SELECT count(*) FROM ft_fetchsize;
+ count
+-------
+ 12
+(1 row)
+
+SELECT * FROM ft_fetchsize ORDER BY id;
+ id | val
+----+-------
+ 1 | val1
+ 2 | val2
+ 3 | val3
+ 4 | val4
+ 5 | val5
+ 6 | val6
+ 7 | val7
+ 8 | val8
+ 9 | val9
+ 10 | val10
+ 11 | val11
+ 12 | val12
+(12 rows)
+
+DROP FOREIGN TABLE ft_fetchsize;
+-- fetch_size exceeds the table row count: all rows arrive in one chunk
+-- followed immediately by the final empty PGRES_TUPLES_OK result.
+CREATE FOREIGN TABLE ft_fetchsize (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl_large', fetch_size '1000');
+SELECT count(*) FROM ft_fetchsize;
+ count
+-------
+ 12
+(1 row)
+
+DROP FOREIGN TABLE ft_fetchsize;
+DROP TABLE local_tbl_large;
+-- 7. streaming_fetch combined with use_remote_estimate
+-- use_remote_estimate issues a remote EXPLAIN to size the scan at plan
+-- time; streaming_fetch must not interfere with that EXPLAIN call.
+ALTER SERVER fetch_stream_srv OPTIONS (ADD use_remote_estimate 'true');
+CREATE FOREIGN TABLE ft_remote_est (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl');
+-- Verify both options are active at the server level.
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch', 'use_remote_estimate')
+ORDER BY option_name;
+ srvname | option_name | option_value
+------------------+---------------------+--------------
+ fetch_stream_srv | streaming_fetch | true
+ fetch_stream_srv | use_remote_estimate | true
+(2 rows)
+
+-- Both options active: use_remote_estimate sizes the scan remotely via
+-- EXPLAIN, then streaming_fetch fetches rows without a cursor.
+SELECT * FROM ft_remote_est ORDER BY id;
+ id | val
+----+-----
+ 1 | a
+ 2 | b
+ 3 | c
+(3 rows)
+
+ALTER SERVER fetch_stream_srv OPTIONS (DROP use_remote_estimate);
+-- 9. streaming_fetch check the output in explain with verbose
+EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft_remote_est ORDER BY id;
+ QUERY PLAN
+-------------------------------------------------------------------------------
+ Foreign Scan on public.ft_remote_est
+ Output: id, val
+ Remote SQL: SELECT id, val FROM public.local_tbl ORDER BY id ASC NULLS LAST
+ streaming_fetch: true
+(4 rows)
+
+-- Cleanup
+DROP FOREIGN TABLE ft_remote_est;
+DROP USER MAPPING FOR CURRENT_USER SERVER fetch_stream_srv;
+DROP SERVER fetch_stream_srv CASCADE;
+NOTICE: drop cascades to foreign table ft_server
+DROP TABLE local_tbl;
+-- ===================================================================
-- test subscription
-- ===================================================================
CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
@@ -448,6 +805,89 @@ SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
1000 | 0 | 01000 | Thu Jan 01 00:00:00 1970 PST | Thu Jan 01 00:00:00 1970 | 0 | 0 | foo
(1 row)
+-- Test in streaming_fetch mode to cover process_query_params path
+-- with only one table using streaming_fetch
+ALTER FOREIGN TABLE ft1 OPTIONS (streaming_fetch 'true');
+SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+ c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
+------+----+-------+------------------------------+--------------------------+----+------------+-----
+ 1000 | 0 | 01000 | Thu Jan 01 00:00:00 1970 PST | Thu Jan 01 00:00:00 1970 | 0 | 0 | foo
+(1 row)
+
+-- Test join with only one table using streaming_fetch at a time
+SELECT t1.c1, t2."C 1" FROM ft2 t1 JOIN "S 1"."T 1" t2 ON (t1.c1 = t2."C 1") OFFSET 100 LIMIT 10;
+ c1 | C 1
+-----+-----
+ 101 | 101
+ 102 | 102
+ 103 | 103
+ 104 | 104
+ 105 | 105
+ 106 | 106
+ 107 | 107
+ 108 | 108
+ 109 | 109
+ 110 | 110
+(10 rows)
+
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+ALTER FOREIGN TABLE ft2 OPTIONS (streaming_fetch 'true');
+SELECT t1.c1, t2."C 1" FROM ft2 t1 JOIN "S 1"."T 1" t2 ON (t1.c1 = t2."C 1") OFFSET 100 LIMIT 10;
+ c1 | C 1
+-----+-----
+ 101 | 101
+ 102 | 102
+ 103 | 103
+ 104 | 104
+ 105 | 105
+ 106 | 106
+ 107 | 107
+ 108 | 108
+ 109 | 109
+ 110 | 110
+(10 rows)
+
+-- with both the tables using streaming_fetch
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+ c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
+------+----+-------+------------------------------+--------------------------+----+------------+-----
+ 1000 | 0 | 01000 | Thu Jan 01 00:00:00 1970 PST | Thu Jan 01 00:00:00 1970 | 0 | 0 | foo
+(1 row)
+
+-- Test join with both the tables using streaming_fetch
+SELECT t1.c1, t2."C 1" FROM ft2 t1 JOIN "S 1"."T 1" t2 ON (t1.c1 = t2."C 1") OFFSET 100 LIMIT 10;
+ c1 | C 1
+-----+-----
+ 101 | 101
+ 102 | 102
+ 103 | 103
+ 104 | 104
+ 105 | 105
+ 106 | 106
+ 107 | 107
+ 108 | 108
+ 109 | 109
+ 110 | 110
+(10 rows)
+
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
+-- streaming_fetch: verify correct results when parallel-friendly settings
+-- are active locally. With no cursor on the remote side, the remote
+-- planner is free to choose a parallel plan; results must match exactly.
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+SET max_parallel_workers_per_gather = 2;
+SET min_parallel_table_scan_size = 0;
+SELECT count(*) FROM ft1;
+ count
+-------
+ 1000
+(1 row)
+
+RESET max_parallel_workers_per_gather;
+RESET min_parallel_table_scan_size;
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
-- used in CTE
WITH t1 AS (SELECT * FROM ft1 WHERE c1 <= 10) SELECT t2.c1, t2.c2, t2.c3, t2.c4 FROM t1, ft2 t2 WHERE t1.c1 = t2.c1 ORDER BY t1.c1;
c1 | c2 | c3 | c4
@@ -784,16 +1224,265 @@ EXPLAIN (VERBOSE, COSTS OFF)
Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (("C 1" = $1::integer))
(8 rows)
-SELECT * FROM "S 1"."T 1" a, ft2 b WHERE a."C 1" = 47 AND b.c1 = a.c2;
- C 1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
------+----+-------+------------------------------+--------------------------+----+------------+-----+----+----+-------+------------------------------+--------------------------+----+------------+-----
- 47 | 7 | 00047 | Tue Feb 17 00:00:00 1970 PST | Tue Feb 17 00:00:00 1970 | 7 | 7 | foo | 7 | 7 | 00007 | Thu Jan 08 00:00:00 1970 PST | Thu Jan 08 00:00:00 1970 | 7 | 7 | foo
+SELECT * FROM "S 1"."T 1" a, ft2 b WHERE a."C 1" = 47 AND b.c1 = a.c2;
+ C 1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
+-----+----+-------+------------------------------+--------------------------+----+------------+-----+----+----+-------+------------------------------+--------------------------+----+------------+-----
+ 47 | 7 | 00047 | Tue Feb 17 00:00:00 1970 PST | Tue Feb 17 00:00:00 1970 | 7 | 7 | foo | 7 | 7 | 00007 | Thu Jan 08 00:00:00 1970 PST | Thu Jan 08 00:00:00 1970 | 7 | 7 | foo
+(1 row)
+
+-- check both safe and unsafe join conditions
+EXPLAIN (VERBOSE, COSTS OFF)
+ SELECT * FROM ft2 a, ft2 b
+ WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------
+ Nested Loop
+ Output: a.c1, a.c2, a.c3, a.c4, a.c5, a.c6, a.c7, a.c8, b.c1, b.c2, b.c3, b.c4, b.c5, b.c6, b.c7, b.c8
+ -> Foreign Scan on public.ft2 a
+ Output: a.c1, a.c2, a.c3, a.c4, a.c5, a.c6, a.c7, a.c8
+ Filter: (a.c8 = 'foo'::user_enum)
+ Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE ((c2 = 6))
+ -> Foreign Scan on public.ft2 b
+ Output: b.c1, b.c2, b.c3, b.c4, b.c5, b.c6, b.c7, b.c8
+ Filter: ((b.c7)::text = upper((a.c7)::text))
+ Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (($1::integer = "C 1"))
+(10 rows)
+
+SELECT * FROM ft2 a, ft2 b
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
+-----+----+-------+------------------------------+--------------------------+----+------------+-----+-----+----+-------+------------------------------+--------------------------+----+------------+-----
+ 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 26 | 6 | 00026 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 26 | 6 | 00026 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 36 | 6 | 00036 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 36 | 6 | 00036 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 46 | 6 | 00046 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 46 | 6 | 00046 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 56 | 6 | 00056 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 56 | 6 | 00056 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 66 | 6 | 00066 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 66 | 6 | 00066 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 76 | 6 | 00076 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 76 | 6 | 00076 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 86 | 6 | 00086 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 86 | 6 | 00086 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 96 | 6 | 00096 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 96 | 6 | 00096 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 116 | 6 | 00116 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 116 | 6 | 00116 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 126 | 6 | 00126 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 126 | 6 | 00126 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 136 | 6 | 00136 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 136 | 6 | 00136 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 146 | 6 | 00146 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 146 | 6 | 00146 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 156 | 6 | 00156 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 156 | 6 | 00156 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 166 | 6 | 00166 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 166 | 6 | 00166 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 176 | 6 | 00176 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 176 | 6 | 00176 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 186 | 6 | 00186 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 186 | 6 | 00186 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 196 | 6 | 00196 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 196 | 6 | 00196 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 206 | 6 | 00206 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 206 | 6 | 00206 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 216 | 6 | 00216 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 216 | 6 | 00216 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 226 | 6 | 00226 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 226 | 6 | 00226 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 236 | 6 | 00236 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 236 | 6 | 00236 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 246 | 6 | 00246 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 246 | 6 | 00246 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 256 | 6 | 00256 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 256 | 6 | 00256 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 266 | 6 | 00266 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 266 | 6 | 00266 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 276 | 6 | 00276 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 276 | 6 | 00276 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 286 | 6 | 00286 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 286 | 6 | 00286 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 296 | 6 | 00296 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 296 | 6 | 00296 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 306 | 6 | 00306 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 306 | 6 | 00306 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 316 | 6 | 00316 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 316 | 6 | 00316 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 326 | 6 | 00326 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 326 | 6 | 00326 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 336 | 6 | 00336 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 336 | 6 | 00336 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 346 | 6 | 00346 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 346 | 6 | 00346 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 356 | 6 | 00356 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 356 | 6 | 00356 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 366 | 6 | 00366 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 366 | 6 | 00366 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 376 | 6 | 00376 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 376 | 6 | 00376 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 386 | 6 | 00386 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 386 | 6 | 00386 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 396 | 6 | 00396 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 396 | 6 | 00396 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 406 | 6 | 00406 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 406 | 6 | 00406 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 416 | 6 | 00416 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 416 | 6 | 00416 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 426 | 6 | 00426 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 426 | 6 | 00426 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 436 | 6 | 00436 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 436 | 6 | 00436 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 446 | 6 | 00446 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 446 | 6 | 00446 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 456 | 6 | 00456 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 456 | 6 | 00456 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 466 | 6 | 00466 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 466 | 6 | 00466 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 476 | 6 | 00476 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 476 | 6 | 00476 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 486 | 6 | 00486 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 486 | 6 | 00486 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 496 | 6 | 00496 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 496 | 6 | 00496 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 506 | 6 | 00506 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 506 | 6 | 00506 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 516 | 6 | 00516 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 516 | 6 | 00516 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 526 | 6 | 00526 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 526 | 6 | 00526 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 536 | 6 | 00536 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 536 | 6 | 00536 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 546 | 6 | 00546 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 546 | 6 | 00546 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 556 | 6 | 00556 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 556 | 6 | 00556 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 566 | 6 | 00566 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 566 | 6 | 00566 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 576 | 6 | 00576 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 576 | 6 | 00576 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 586 | 6 | 00586 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 586 | 6 | 00586 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 596 | 6 | 00596 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 596 | 6 | 00596 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 606 | 6 | 00606 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 606 | 6 | 00606 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 616 | 6 | 00616 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 616 | 6 | 00616 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 626 | 6 | 00626 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 626 | 6 | 00626 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 636 | 6 | 00636 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 636 | 6 | 00636 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 646 | 6 | 00646 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 646 | 6 | 00646 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 656 | 6 | 00656 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 656 | 6 | 00656 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 666 | 6 | 00666 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 666 | 6 | 00666 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 676 | 6 | 00676 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 676 | 6 | 00676 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 686 | 6 | 00686 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 686 | 6 | 00686 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 696 | 6 | 00696 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 696 | 6 | 00696 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 706 | 6 | 00706 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 706 | 6 | 00706 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 716 | 6 | 00716 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 716 | 6 | 00716 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 726 | 6 | 00726 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 726 | 6 | 00726 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 736 | 6 | 00736 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 736 | 6 | 00736 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 746 | 6 | 00746 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 746 | 6 | 00746 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 756 | 6 | 00756 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 756 | 6 | 00756 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 766 | 6 | 00766 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 766 | 6 | 00766 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 776 | 6 | 00776 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 776 | 6 | 00776 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 786 | 6 | 00786 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 786 | 6 | 00786 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 796 | 6 | 00796 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 796 | 6 | 00796 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 806 | 6 | 00806 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 806 | 6 | 00806 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 816 | 6 | 00816 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 816 | 6 | 00816 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 826 | 6 | 00826 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 826 | 6 | 00826 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 836 | 6 | 00836 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 836 | 6 | 00836 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 846 | 6 | 00846 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 846 | 6 | 00846 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 856 | 6 | 00856 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 856 | 6 | 00856 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 866 | 6 | 00866 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 866 | 6 | 00866 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 876 | 6 | 00876 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 876 | 6 | 00876 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 886 | 6 | 00886 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 886 | 6 | 00886 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 896 | 6 | 00896 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 896 | 6 | 00896 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 906 | 6 | 00906 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 906 | 6 | 00906 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 916 | 6 | 00916 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 916 | 6 | 00916 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 926 | 6 | 00926 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 926 | 6 | 00926 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 936 | 6 | 00936 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 936 | 6 | 00936 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 946 | 6 | 00946 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 946 | 6 | 00946 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 956 | 6 | 00956 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 956 | 6 | 00956 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 966 | 6 | 00966 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 966 | 6 | 00966 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 976 | 6 | 00976 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 976 | 6 | 00976 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 986 | 6 | 00986 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 986 | 6 | 00986 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+(100 rows)
+
+-- Test in streaming_fetch mode for rescan path
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'true');
+SELECT * FROM ft2 a, ft2 b
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
+-----+----+-------+------------------------------+--------------------------+----+------------+-----+-----+----+-------+------------------------------+--------------------------+----+------------+-----
+ 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 6 | 6 | 00006 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 16 | 6 | 00016 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 26 | 6 | 00026 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 26 | 6 | 00026 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 36 | 6 | 00036 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 36 | 6 | 00036 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 46 | 6 | 00046 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 46 | 6 | 00046 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 56 | 6 | 00056 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 56 | 6 | 00056 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 66 | 6 | 00066 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 66 | 6 | 00066 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 76 | 6 | 00076 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 76 | 6 | 00076 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 86 | 6 | 00086 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 86 | 6 | 00086 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 96 | 6 | 00096 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 96 | 6 | 00096 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 106 | 6 | 00106 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 116 | 6 | 00116 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 116 | 6 | 00116 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 126 | 6 | 00126 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 126 | 6 | 00126 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 136 | 6 | 00136 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 136 | 6 | 00136 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 146 | 6 | 00146 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 146 | 6 | 00146 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 156 | 6 | 00156 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 156 | 6 | 00156 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 166 | 6 | 00166 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 166 | 6 | 00166 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 176 | 6 | 00176 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 176 | 6 | 00176 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 186 | 6 | 00186 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 186 | 6 | 00186 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 196 | 6 | 00196 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 196 | 6 | 00196 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 206 | 6 | 00206 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 206 | 6 | 00206 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 216 | 6 | 00216 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 216 | 6 | 00216 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 226 | 6 | 00226 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 226 | 6 | 00226 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 236 | 6 | 00236 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 236 | 6 | 00236 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 246 | 6 | 00246 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 246 | 6 | 00246 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 256 | 6 | 00256 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 256 | 6 | 00256 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 266 | 6 | 00266 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 266 | 6 | 00266 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 276 | 6 | 00276 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 276 | 6 | 00276 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 286 | 6 | 00286 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 286 | 6 | 00286 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 296 | 6 | 00296 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 296 | 6 | 00296 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 306 | 6 | 00306 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 306 | 6 | 00306 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 316 | 6 | 00316 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 316 | 6 | 00316 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 326 | 6 | 00326 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 326 | 6 | 00326 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 336 | 6 | 00336 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 336 | 6 | 00336 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 346 | 6 | 00346 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 346 | 6 | 00346 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 356 | 6 | 00356 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 356 | 6 | 00356 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 366 | 6 | 00366 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 366 | 6 | 00366 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 376 | 6 | 00376 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 376 | 6 | 00376 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 386 | 6 | 00386 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 386 | 6 | 00386 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 396 | 6 | 00396 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 396 | 6 | 00396 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 406 | 6 | 00406 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 406 | 6 | 00406 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 416 | 6 | 00416 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 416 | 6 | 00416 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 426 | 6 | 00426 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 426 | 6 | 00426 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 436 | 6 | 00436 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 436 | 6 | 00436 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 446 | 6 | 00446 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 446 | 6 | 00446 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 456 | 6 | 00456 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 456 | 6 | 00456 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 466 | 6 | 00466 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 466 | 6 | 00466 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 476 | 6 | 00476 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 476 | 6 | 00476 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 486 | 6 | 00486 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 486 | 6 | 00486 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 496 | 6 | 00496 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 496 | 6 | 00496 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 506 | 6 | 00506 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 506 | 6 | 00506 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 516 | 6 | 00516 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 516 | 6 | 00516 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 526 | 6 | 00526 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 526 | 6 | 00526 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 536 | 6 | 00536 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 536 | 6 | 00536 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 546 | 6 | 00546 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 546 | 6 | 00546 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 556 | 6 | 00556 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 556 | 6 | 00556 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 566 | 6 | 00566 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 566 | 6 | 00566 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 576 | 6 | 00576 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 576 | 6 | 00576 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 586 | 6 | 00586 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 586 | 6 | 00586 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 596 | 6 | 00596 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 596 | 6 | 00596 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 606 | 6 | 00606 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 606 | 6 | 00606 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 616 | 6 | 00616 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 616 | 6 | 00616 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 626 | 6 | 00626 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 626 | 6 | 00626 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 636 | 6 | 00636 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 636 | 6 | 00636 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 646 | 6 | 00646 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 646 | 6 | 00646 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 656 | 6 | 00656 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 656 | 6 | 00656 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 666 | 6 | 00666 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 666 | 6 | 00666 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 676 | 6 | 00676 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 676 | 6 | 00676 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 686 | 6 | 00686 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 686 | 6 | 00686 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 696 | 6 | 00696 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 696 | 6 | 00696 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 706 | 6 | 00706 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 706 | 6 | 00706 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 716 | 6 | 00716 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 716 | 6 | 00716 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 726 | 6 | 00726 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 726 | 6 | 00726 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 736 | 6 | 00736 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 736 | 6 | 00736 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 746 | 6 | 00746 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 746 | 6 | 00746 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 756 | 6 | 00756 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 756 | 6 | 00756 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 766 | 6 | 00766 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 766 | 6 | 00766 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 776 | 6 | 00776 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 776 | 6 | 00776 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 786 | 6 | 00786 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 786 | 6 | 00786 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 796 | 6 | 00796 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 796 | 6 | 00796 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 806 | 6 | 00806 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 806 | 6 | 00806 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 816 | 6 | 00816 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 816 | 6 | 00816 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 826 | 6 | 00826 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 826 | 6 | 00826 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 836 | 6 | 00836 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 836 | 6 | 00836 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 846 | 6 | 00846 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 846 | 6 | 00846 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 856 | 6 | 00856 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 856 | 6 | 00856 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 866 | 6 | 00866 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 866 | 6 | 00866 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 876 | 6 | 00876 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 876 | 6 | 00876 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 886 | 6 | 00886 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 886 | 6 | 00886 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 896 | 6 | 00896 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 896 | 6 | 00896 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+ 906 | 6 | 00906 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo | 906 | 6 | 00906 | Wed Jan 07 00:00:00 1970 PST | Wed Jan 07 00:00:00 1970 | 6 | 6 | foo
+ 916 | 6 | 00916 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo | 916 | 6 | 00916 | Sat Jan 17 00:00:00 1970 PST | Sat Jan 17 00:00:00 1970 | 6 | 6 | foo
+ 926 | 6 | 00926 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo | 926 | 6 | 00926 | Tue Jan 27 00:00:00 1970 PST | Tue Jan 27 00:00:00 1970 | 6 | 6 | foo
+ 936 | 6 | 00936 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo | 936 | 6 | 00936 | Fri Feb 06 00:00:00 1970 PST | Fri Feb 06 00:00:00 1970 | 6 | 6 | foo
+ 946 | 6 | 00946 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo | 946 | 6 | 00946 | Mon Feb 16 00:00:00 1970 PST | Mon Feb 16 00:00:00 1970 | 6 | 6 | foo
+ 956 | 6 | 00956 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo | 956 | 6 | 00956 | Thu Feb 26 00:00:00 1970 PST | Thu Feb 26 00:00:00 1970 | 6 | 6 | foo
+ 966 | 6 | 00966 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo | 966 | 6 | 00966 | Sun Mar 08 00:00:00 1970 PST | Sun Mar 08 00:00:00 1970 | 6 | 6 | foo
+ 976 | 6 | 00976 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo | 976 | 6 | 00976 | Wed Mar 18 00:00:00 1970 PST | Wed Mar 18 00:00:00 1970 | 6 | 6 | foo
+ 986 | 6 | 00986 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo | 986 | 6 | 00986 | Sat Mar 28 00:00:00 1970 PST | Sat Mar 28 00:00:00 1970 | 6 | 6 | foo
+ 996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
+(100 rows)
+
+-- Test for streaming_fetch covering rescans and three active cursors
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1 AND a.c8 = 'foo'
+AND b.c7 = upper(a.c7);
+ count
+-------
+ 100
(1 row)
--- check both safe and unsafe join conditions
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
+-- Test in streaming_fetch mode for interleaved scans.
+-- The non-shippable condition a.c8 = 'foo' (user_enum is not a built-in type)
+-- prevents full join pushdown, so the planner issues two separate FDW scans
+-- that share the same loopback connection. When the inner scan's init_scan
+-- fires it calls save_to_tuplestore to drain the outer scan's unread rows.
+ ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'true');
+-- Show the plan: ft2 must appear as two independent ForeignScan nodes, not
+-- a single pushed-down remote join.
EXPLAIN (VERBOSE, COSTS OFF)
- SELECT * FROM ft2 a, ft2 b
- WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+SELECT * FROM ft2 a, ft2 b
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
QUERY PLAN
-------------------------------------------------------------------------------------------------------------
Nested Loop
@@ -802,12 +1491,16 @@ EXPLAIN (VERBOSE, COSTS OFF)
Output: a.c1, a.c2, a.c3, a.c4, a.c5, a.c6, a.c7, a.c8
Filter: (a.c8 = 'foo'::user_enum)
Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE ((c2 = 6))
+ streaming_fetch: true
-> Foreign Scan on public.ft2 b
Output: b.c1, b.c2, b.c3, b.c4, b.c5, b.c6, b.c7, b.c8
Filter: ((b.c7)::text = upper((a.c7)::text))
Remote SQL: SELECT "C 1", c2, c3, c4, c5, c6, c7, c8 FROM "S 1"."T 1" WHERE (($1::integer = "C 1"))
-(10 rows)
+ streaming_fetch: true
+(12 rows)
+-- Verify results with cursor path.
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
SELECT * FROM ft2 a, ft2 b
WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8 | c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
@@ -914,6 +1607,54 @@ WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo | 996 | 6 | 00996 | Tue Apr 07 00:00:00 1970 PST | Tue Apr 07 00:00:00 1970 | 6 | 6 | foo
(100 rows)
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'true');
+-- Three-way self-join to test streaming_fetch
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1
+AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------
+ Aggregate
+ Output: count(*)
+ -> Nested Loop
+ -> Nested Loop
+ Output: a.c1, b.c1
+ -> Foreign Scan on public.ft2 a
+ Output: a.c1, a.c7
+ Filter: (a.c8 = 'foo'::user_enum)
+ Remote SQL: SELECT "C 1", c7, c8 FROM "S 1"."T 1" WHERE ((c2 = 6))
+ streaming_fetch: true
+ -> Foreign Scan on public.ft2 b
+ Output: b.c1, b.c7
+ Filter: ((b.c7)::text = upper((a.c7)::text))
+ Remote SQL: SELECT "C 1", c7 FROM "S 1"."T 1" WHERE (($1::integer = "C 1"))
+ streaming_fetch: true
+ -> Foreign Scan on public.ft2 c
+ Output: c.c1
+ Remote SQL: SELECT "C 1" FROM "S 1"."T 1" WHERE (($1::integer = "C 1"))
+ streaming_fetch: true
+(19 rows)
+
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1
+AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ count
+-------
+ 100
+(1 row)
+
+-- output matches in cursor mode
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1
+AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ count
+-------
+ 100
+(1 row)
+
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
-- bug before 9.3.5 due to sloppy handling of remote-estimate parameters
SELECT * FROM ft1 WHERE c1 = ANY (ARRAY(SELECT c1 FROM ft2 WHERE c1 < 5));
c1 | c2 | c3 | c4 | c5 | c6 | c7 | c8
@@ -2183,6 +2924,43 @@ SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2
119
(10 rows)
+-- Test in streaming_fetch mode to cover the patch for two simultaneous active cursors
+-- with only one table using streaming_fetch
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
+ c1
+-----
+ 110
+ 111
+ 112
+ 113
+ 114
+ 115
+ 116
+ 117
+ 118
+ 119
+(10 rows)
+
+-- with both the tables using streaming_fetch
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'true');
+SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
+ c1
+-----
+ 110
+ 111
+ 112
+ 113
+ 114
+ 115
+ 116
+ 117
+ 118
+ 119
+(10 rows)
+
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
-- CROSS JOIN can be pushed down
EXPLAIN (VERBOSE, COSTS OFF)
SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
@@ -2234,6 +3012,16 @@ SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t
----+----
(0 rows)
+-- Test in streaming_fetch mode to cover the case with multiple cursors but only one active cursor at a time
+ALTER FOREIGN TABLE ft5 OPTIONS (streaming_fetch 'true');
+ALTER FOREIGN TABLE ft6 OPTIONS (streaming_fetch 'true');
+SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
+ c1 | c1
+----+----
+(0 rows)
+
+ALTER FOREIGN TABLE ft5 OPTIONS (SET streaming_fetch 'false');
+ALTER FOREIGN TABLE ft6 OPTIONS (SET streaming_fetch 'false');
-- unsafe join conditions (c8 has a UDT), not pushed down. Practically a CROSS
-- JOIN since c8 in both tables has same value.
EXPLAIN (VERBOSE, COSTS OFF)
@@ -2970,6 +3758,72 @@ select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (ran
100 | 49600 | 496.0000000000000000 | 1 | 991 | 0 | 49600
(1 row)
+-- Test with limit and streaming_fetch
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2 limit 1;
+ count | sum | avg | min | max | stddev | sum2
+-------+-------+----------------------+-----+-----+--------+-------
+ 100 | 49600 | 496.0000000000000000 | 1 | 991 | 0 | 49600
+(1 row)
+
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+-- Test LIMIT stopping before all tuples are consumed.
+-- The WHERE clause references c8 (a user-defined type that cannot be
+-- pushed to the remote), preventing LIMIT pushdown. The remote
+-- therefore streams all rows, and end_scan must discard the in-flight
+-- data when the local executor stops early.
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+-- LIMIT 5 with default fetch_size=100: stops well within the first
+-- chunk; end_scan discards ~995 rows still in flight on the connection.
+SELECT c1 FROM ft1 WHERE c8 = 'foo' ORDER BY c1 LIMIT 5;
+ c1
+----
+ 1
+ 2
+ 3
+ 4
+ 5
+(5 rows)
+
+-- Verify the connection is still usable after the early stop.
+SELECT count(*) FROM ft1;
+ count
+-------
+ 1000
+(1 row)
+
+-- fetch_size=10, LIMIT=15: consumes one full chunk (rows 1-10) plus 5
+-- rows from a second chunk (rows 11-15); end_scan then discards the
+-- remainder of that chunk and all subsequent in-flight chunks.
+ALTER FOREIGN TABLE ft1 OPTIONS (fetch_size '10');
+SELECT c1 FROM ft1 WHERE c8 = 'foo' ORDER BY c1 LIMIT 15;
+ c1
+----
+ 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+ 10
+ 11
+ 12
+ 13
+ 14
+ 15
+(15 rows)
+
+SELECT count(*) FROM ft1;
+ count
+-------
+ 1000
+(1 row)
+
+ALTER FOREIGN TABLE ft1 OPTIONS (DROP fetch_size);
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
-- Aggregate is not pushed down as aggregation contains random()
explain (verbose, costs off)
select sum(c1 * (random() <= 1)::int) as sum, avg(c1) from ft1;
@@ -10840,6 +11694,48 @@ SELECT 1 FROM ft1 LIMIT 1; -- should fail
ERROR: 08006
\set VERBOSITY default
COMMIT;
+-- ===================================================================
+-- streaming_fetch: error recovery when the remote backend terminates
+-- ===================================================================
+-- Enable streaming_fetch so init_scan is exercised on reconnect.
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+-- Establish a fresh remote connection.
+SELECT 1 FROM ft1 LIMIT 1;
+ ?column?
+----------
+ 1
+(1 row)
+
+-- Terminate the remote backend and wait for the termination to complete.
+DO $$ BEGIN
+PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity
+ WHERE application_name = 'fdw_retry_check';
+END $$;
+-- After the connection is broken, a streaming_fetch query should detect
+-- the broken connection, reestablish it, and succeed.
+BEGIN;
+SELECT c1 FROM ft1 ORDER BY c1 LIMIT 3;
+ c1
+----
+ 1
+ 3
+ 4
+(3 rows)
+
+-- Inside a subtransaction the broken connection must not be silently
+-- retried; the query should fail.
+DO $$ BEGIN
+PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity
+ WHERE application_name = 'fdw_retry_check';
+END $$;
+SAVEPOINT s2;
+-- The text of the error might vary across platforms, so only show SQLSTATE.
+\set VERBOSITY sqlstate
+SELECT 1 FROM ft1 LIMIT 1; -- should fail
+ERROR: 08006
+\set VERBOSITY default
+COMMIT;
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
-- =============================================================================
-- test connection invalidation cases and postgres_fdw_get_connections function
-- =============================================================================
@@ -12003,6 +12899,80 @@ SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c
ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
+-- Test async_capable with streaming_fetch
+-- No streaming_fetch at server, this should give Async Foreign Scan for for async_p1 and async_p2
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt;
+ QUERY PLAN
+----------------------------------------------------------
+ Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+(9 rows)
+
+-- streaming_fetch = false at loopback server, this should still give Async Foreign Scan for async_p1 and async_p2
+ALTER SERVER loopback OPTIONS (streaming_fetch 'false');
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt;
+ QUERY PLAN
+----------------------------------------------------------
+ Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+(9 rows)
+
+-- streaming_fetch = false at loopback server but true for async_p1, this should give Foreign Scan for async_p1
+ALTER FOREIGN TABLE async_p1 OPTIONS (ADD streaming_fetch 'true');
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt;
+ QUERY PLAN
+----------------------------------------------------------
+ Append
+ -> Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ streaming_fetch: true
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+(10 rows)
+
+-- streaming_fetch = true at loopback2 server, this should give Foreign Scan for async_p2 also
+ALTER SERVER loopback2 OPTIONS (streaming_fetch 'true');
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt;
+ QUERY PLAN
+----------------------------------------------------------
+ Append
+ -> Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ streaming_fetch: true
+ -> Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ streaming_fetch: true
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+(11 rows)
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (DROP streaming_fetch);
+ALTER SERVER loopback OPTIONS (DROP streaming_fetch);
+ALTER SERVER loopback2 OPTIONS (DROP streaming_fetch);
DROP TABLE local_tbl;
DROP INDEX base_tbl1_idx;
DROP INDEX base_tbl2_idx;
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 3944aedbacc..020c278cc35 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -121,6 +121,7 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
strcmp(def->defname, "parallel_commit") == 0 ||
strcmp(def->defname, "parallel_abort") == 0 ||
strcmp(def->defname, "keep_connections") == 0 ||
+ strcmp(def->defname, "streaming_fetch") == 0 ||
strcmp(def->defname, "restore_stats") == 0)
{
/* these accept only boolean values */
@@ -261,6 +262,9 @@ InitPgFdwOptions(void)
/* fetch_size is available on both server and table */
{"fetch_size", ForeignServerRelationId, false},
{"fetch_size", ForeignTableRelationId, false},
+ /* streaming_fetch is available on both server and table */
+ {"streaming_fetch", ForeignServerRelationId, false},
+ {"streaming_fetch", ForeignTableRelationId, false},
/* batch_size is available on both server and table */
{"batch_size", ForeignServerRelationId, false},
{"batch_size", ForeignTableRelationId, false},
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 350dc19f29a..c01a54b90fa 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -52,6 +52,7 @@
#include "utils/rel.h"
#include "utils/sampling.h"
#include "utils/selfuncs.h"
+#include "utils/tuplestore.h"
PG_MODULE_MAGIC_EXT(
.name = "postgres_fdw",
@@ -82,6 +83,7 @@ enum FdwScanPrivateIndex
FdwScanPrivateRetrievedAttrs,
/* Integer representing the desired fetch_size */
FdwScanPrivateFetchSize,
+ FdwScanPrivateStreamingFetch,
/*
* String describing join i.e. names of relations being joined and types
@@ -153,7 +155,7 @@ typedef struct PgFdwScanState
/* for remote query execution */
PGconn *conn; /* connection for the scan */
- PgFdwConnState *conn_state; /* extra per-connection state */
+ PgFdwConnState *conn_state;
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool scan_in_progress; /* is there a scan in progress? */
int numParams; /* number of parameters passed to query */
@@ -178,6 +180,12 @@ typedef struct PgFdwScanState
MemoryContext temp_cxt; /* context for per-tuple temporary data */
int fetch_size; /* number of tuples per fetch */
+ /* Required for the streaming_fetch mode */
+ bool streaming_fetch; /* set if the scan is using
+ * streaming_fetch mode */
+ Tuplestorestate *tuplestore; /* Tuplestore to save the tuples of the
+ * query for later fetch. */
+ bool rescan; /* identify when rescan is required */
} PgFdwScanState;
/*
@@ -730,7 +738,8 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
AttInMetadata *attinmeta,
List *retrieved_attrs,
ForeignScanState *fsstate,
- MemoryContext temp_context);
+ MemoryContext temp_context,
+ TupleDesc last_tupdesc);
static void conversion_error_callback(void *arg);
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
@@ -760,6 +769,13 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_i);
static int get_batch_size_option(Relation rel);
+/* Only required for non-cursor mode */
+static void set_streaming_fetch(DefElem *def, PgFdwRelationInfo *fpinfo);
+static void fetch_from_tuplestore(ForeignScanState *node);
+static void save_to_tuplestore(ForeignScanState *node);
+static void init_scan(ForeignScanState *node);
+static void end_scan(PGconn *conn);
+static bool is_active_scan(PgFdwScanState *fsstate);
/*
* Foreign-data wrapper handler function: return a struct with pointers
@@ -869,9 +885,12 @@ postgresGetForeignRelSize(PlannerInfo *root,
fpinfo->shippable_extensions = NIL;
fpinfo->fetch_size = 100;
fpinfo->async_capable = false;
+ fpinfo->streaming_fetch = false;
apply_server_options(fpinfo);
apply_table_options(fpinfo);
+ if (fpinfo->streaming_fetch)
+ fpinfo->async_capable = false;
/*
* If the table or the server is configured to use remote estimates,
@@ -1631,9 +1650,9 @@ postgresGetForeignPlan(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor.
* Items in the list must match order in enum FdwScanPrivateIndex.
*/
- fdw_private = list_make3(makeString(sql.data),
+ fdw_private = list_make4(makeString(sql.data),
retrieved_attrs,
- makeInteger(fpinfo->fetch_size));
+ makeInteger(fpinfo->fetch_size), makeBoolean(fpinfo->streaming_fetch));
if (IS_JOIN_REL(foreignrel) || IS_UPPER_REL(foreignrel))
fdw_private = lappend(fdw_private,
makeString(fpinfo->relation_name));
@@ -1767,6 +1786,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
FdwScanPrivateRetrievedAttrs);
fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
FdwScanPrivateFetchSize));
+ fsstate->streaming_fetch = boolVal(list_nth(fsplan->fdw_private,
+ FdwScanPrivateStreamingFetch));
/* Create contexts for batches of tuples and per-tuple temp workspace. */
fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
@@ -1808,6 +1829,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
/* Set the async-capable flag */
fsstate->async_capable = node->ss.ps.async_capable;
+ fsstate->tuplestore = NULL;
+ fsstate->rescan = false;
}
/*
@@ -1828,7 +1851,12 @@ postgresIterateForeignScan(ForeignScanState *node)
* first call after Begin or ReScan.
*/
if (!fsstate->scan_in_progress)
- create_cursor(node);
+ {
+ if (fsstate->streaming_fetch)
+ init_scan(node);
+ else
+ create_cursor(node);
+ }
/*
* Get some more tuples, if we've run out.
@@ -1865,7 +1893,8 @@ postgresReScanForeignScan(ForeignScanState *node)
{
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
char sql[64];
- PGresult *res;
+ PGresult *res = NULL;
+ bool reinitialize_scan = false;
/* If no scan is in progress, nothing to do. */
if (!fsstate->scan_in_progress)
@@ -1894,20 +1923,31 @@ postgresReScanForeignScan(ForeignScanState *node)
*/
if (node->ss.ps.chgParam != NULL)
{
- fsstate->scan_in_progress = false;
- snprintf(sql, sizeof(sql), "CLOSE c%u",
- fsstate->cursor_number);
+ reinitialize_scan = true;
}
else if (fsstate->fetch_ct_2 > 1)
{
if (PQserverVersion(fsstate->conn) < 150000)
+ {
snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
fsstate->cursor_number);
+ res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(res, fsstate->conn, sql);
+
+ PQclear(res);
+
+ /* Now force a fresh FETCH. */
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ fsstate->next_tuple = 0;
+ fsstate->fetch_ct_2 = 0;
+ fsstate->eof_reached = false;
+ return;
+ }
else
{
- fsstate->scan_in_progress = false;
- snprintf(sql, sizeof(sql), "CLOSE c%u",
- fsstate->cursor_number);
+ reinitialize_scan = true;
}
}
else
@@ -1916,18 +1956,36 @@ postgresReScanForeignScan(ForeignScanState *node)
fsstate->next_tuple = 0;
return;
}
-
- res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(res, fsstate->conn, sql);
- PQclear(res);
-
- /* Now force a fresh FETCH. */
- fsstate->tuples = NULL;
- fsstate->num_tuples = 0;
- fsstate->next_tuple = 0;
- fsstate->fetch_ct_2 = 0;
- fsstate->eof_reached = false;
+ if (reinitialize_scan)
+ {
+ if (fsstate->streaming_fetch)
+ {
+ fsstate->rescan = true;
+ if (is_active_scan(fsstate))
+ end_scan(fsstate->conn);
+ if (fsstate->tuplestore)
+ {
+ tuplestore_end(fsstate->tuplestore);
+ fsstate->tuplestore = NULL;
+ }
+ }
+ else
+ {
+ snprintf(sql, sizeof(sql), "CLOSE c%u",
+ fsstate->cursor_number);
+ res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(res, fsstate->conn, sql);
+ PQclear(res);
+ }
+ /* Now force a fresh FETCH. */
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ fsstate->next_tuple = 0;
+ fsstate->fetch_ct_2 = 0;
+ fsstate->eof_reached = false;
+ fsstate->scan_in_progress = false;
+ }
}
/*
@@ -1945,9 +2003,26 @@ postgresEndForeignScan(ForeignScanState *node)
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->scan_in_progress)
- close_cursor(fsstate->conn, fsstate->cursor_number,
- fsstate->conn_state);
+ {
+ if (fsstate->streaming_fetch)
+ {
+ /* Remove the pointer from conn_state since ending this scan. */
+ if (is_active_scan(fsstate))
+ fsstate->conn_state->active_scan = NULL;
+ if (fsstate->tuplestore)
+ tuplestore_end(fsstate->tuplestore);
+ /*
+ * tuplestore resides in batch_cxt, so we couldn't reset it
+ * earlier in streaming fetch mode.
+ */
+ MemoryContextReset(fsstate->batch_cxt);
+ end_scan(fsstate->conn);
+ }
+ else
+ close_cursor(fsstate->conn, fsstate->cursor_number,
+ fsstate->conn_state);
+ }
/* Release remote connection */
ReleaseConnection(fsstate->conn);
fsstate->conn = NULL;
@@ -3138,9 +3213,13 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
if (es->verbose)
{
char *sql;
+ bool stream_fetch;
sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
+ stream_fetch = boolVal(list_nth(fdw_private, FdwScanPrivateStreamingFetch));
ExplainPropertyText("Remote SQL", sql, es);
+ if (stream_fetch)
+ ExplainPropertyBool("streaming_fetch", stream_fetch, es);
}
}
@@ -3951,6 +4030,15 @@ create_cursor(ForeignScanState *node)
if (fsstate->conn_state->pendingAreq)
process_pending_request(fsstate->conn_state->pendingAreq);
+ /*
+ * If the other scan_in_progress is using streaming_fetch mode, then save
+ * the tuples to tuplestore before creating this cursor.
+ */
+ if (fsstate->conn_state->active_scan &&
+ fsstate->conn_state->active_scan->streaming_fetch &&
+ fsstate != fsstate->conn_state->active_scan)
+ save_to_tuplestore(node);
+
/*
* Construct array of query parameter values in text format. We do the
* conversions in the short-lived per-tuple context, so as not to cause a
@@ -4006,6 +4094,70 @@ create_cursor(ForeignScanState *node)
pfree(buf.data);
}
+static void
+init_scan(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ ExprContext *econtext = node->ss.ps.ps_ExprContext;
+ int numParams = fsstate->numParams;
+ const char **values = fsstate->param_values;
+ PGconn *conn = fsstate->conn;
+
+ /*
+ * Construct array of query parameter values in text format. We do the
+ * conversions in the short-lived per-tuple context, so as not to cause a
+ * memory leak over repeated scans.
+ */
+ if (numParams > 0)
+ {
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+
+ process_query_params(econtext,
+ fsstate->param_flinfo,
+ fsstate->param_exprs,
+ values);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+
+ /*
+ * Finish fetching tuples of the last query. Do this only when there is a
+ * different query than the current one. In the case of rescan,
+ * create_cursor is called simultaneously for the same query so to avoid
+ * calling save_to_tuplestore in such cases, check if the queries are
+ * different and tuplestore is not already filled for this query.
+ */
+ if (fsstate->conn_state->active_scan &&
+ fsstate != fsstate->conn_state->active_scan)
+ save_to_tuplestore(node);
+
+ /*
+ * To remember the current scan as the last one, when control switches to
+ * another scan
+ */
+ fsstate->conn_state->active_scan = fsstate;
+
+ if (!PQsendQueryParams(conn, fsstate->query, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(NULL, conn, fsstate->query);
+
+ /*
+ * Call for Chunked rows mode with same size of chunk as the fetch size
+ */
+ if (!PQsetChunkedRowsMode(conn, fsstate->fetch_size))
+ pgfdw_report_error(NULL, conn, fsstate->query);
+
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ fsstate->next_tuple = 0;
+ fsstate->fetch_ct_2 = 0;
+ fsstate->eof_reached = false;
+ fsstate->scan_in_progress = true;
+ fsstate->rescan = false;
+}
+
/*
* Fetch some more rows from the node's cursor.
*/
@@ -4015,16 +4167,18 @@ fetch_more_data(ForeignScanState *node)
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
PGconn *conn = fsstate->conn;
PGresult *res;
- int numrows;
- int i;
+ int numrows = 0,
+ i = 0;
MemoryContext oldcontext;
/*
* We'll store the tuples in the batch_cxt. First, flush the previous
- * batch.
+ * batch. When cursors are disabled, the tuplestore is created in
+ * batch_cxt, so it should not be reset in streaming_fetch mode.
*/
fsstate->tuples = NULL;
- MemoryContextReset(fsstate->batch_cxt);
+ if (!fsstate->streaming_fetch)
+ MemoryContextReset(fsstate->batch_cxt);
oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
if (fsstate->async_capable)
@@ -4043,7 +4197,7 @@ fetch_more_data(ForeignScanState *node)
/* Reset per-connection state */
fsstate->conn_state->pendingAreq = NULL;
}
- else
+ else if (!fsstate->streaming_fetch)
{
char sql[64];
@@ -4056,7 +4210,30 @@ fetch_more_data(ForeignScanState *node)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(res, conn, fsstate->query);
}
+ else
+ {
+ /*
+ * When cursor mode is not used, there is a special possibility --
+ * reading from the tuplestore. For processing of tuples nothing
+ * changes in cursor and mode without cursor during fetch.
+ */
+ if (fsstate->tuplestore)
+ {
+ /* Reading tuples from tuplestore */
+ fetch_from_tuplestore(node);
+ MemoryContextSwitchTo(oldcontext);
+ return;
+ }
+
+ /*
+ * Non-cursor mode uses PQSetChunkedRowsMode during init_scan, so just
+ * get the result here.
+ */
+ res = pgfdw_get_next_result(conn);
+ if (!res || PQresultStatus(res) == PGRES_FATAL_ERROR)
+ pgfdw_report_error(res, conn, fsstate->query);
+ }
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
@@ -4073,7 +4250,8 @@ fetch_more_data(ForeignScanState *node)
fsstate->attinmeta,
fsstate->retrieved_attrs,
node,
- fsstate->temp_cxt);
+ fsstate->temp_cxt,
+ NULL);
}
/* Update fetch_ct_2 */
@@ -4088,6 +4266,150 @@ fetch_more_data(ForeignScanState *node)
MemoryContextSwitchTo(oldcontext);
}
+static bool
+is_active_scan(PgFdwScanState *fsstate)
+{
+ if (fsstate->streaming_fetch && fsstate->conn_state->active_scan &&
+ fsstate == fsstate->conn_state->active_scan)
+ return true;
+ return false;
+}
+
+/*
+ * This is used in streaming_fetch mode only to fetch the tuples from the tuplestore.
+ */
+static void
+fetch_from_tuplestore(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ TupleTableSlot *slot;
+ int numrows = 0,
+ i = 0;
+
+ /* Retrieve the tuples from the tuplestore instead of actual fetch */
+ numrows = tuplestore_tuple_count(fsstate->tuplestore);
+ fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
+
+ while (tuplestore_gettupleslot(fsstate->tuplestore, true, true, slot))
+ {
+ fsstate->tuples[i++] = ExecFetchSlotHeapTuple(slot, true, NULL);
+ ExecClearTuple(slot);
+ }
+ fsstate->num_tuples = numrows;
+ fsstate->next_tuple = 0;
+ fsstate->eof_reached = true;
+
+ /* Clean up */
+ tuplestore_end(fsstate->tuplestore);
+ ExecDropSingleTupleTableSlot(slot);
+ fsstate->tuplestore = NULL;
+}
+
+/*
+ * In non-cursor mode only, save the tuples to tuplestore.
+ */
+static void
+save_to_tuplestore(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *active_fsstate = fsstate->conn_state->active_scan;
+ PGconn *conn = fsstate->conn;
+ PGresult *res;
+ int numrows = 0,
+ i = 0;
+ MemoryContext oldcontext;
+ /*
+ * When coming from rescan, the connection is not setup. It is then
+ * required to set up the connection based on the query in
+ * active_scan.
+ */
+ if (active_fsstate->rescan)
+ {
+ if (!PQsendQueryParams(conn, active_fsstate->query, active_fsstate->numParams,
+ NULL, active_fsstate->param_values, NULL, NULL, 0))
+ pgfdw_report_error(NULL, conn, active_fsstate->query);
+
+ if (!PQsetChunkedRowsMode(conn, active_fsstate->fetch_size))
+ pgfdw_report_error(NULL, conn, active_fsstate->query);
+ active_fsstate->rescan = false;
+ }
+ oldcontext = MemoryContextSwitchTo(active_fsstate->batch_cxt);
+
+ res = pgfdw_get_next_result(conn);
+
+ if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+ pgfdw_report_error(res, conn, active_fsstate->query);
+ else if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ {
+ /*
+ * This signifies there is nothing todo, all is fetched. But this
+ * happened only after this call, so set the eof_reached for the scan.
+ */
+ active_fsstate->eof_reached = true;
+ Assert(PQntuples(res) == 0);
+ PQclear(res);
+ }
+ else if (PQresultStatus(res) == PGRES_TUPLES_CHUNK)
+ {
+ active_fsstate->tuplestore = tuplestore_begin_heap(true, false, work_mem);
+
+ for (;;)
+ {
+ /*
+ * Since it is using PQSetChunkedRowsMode, we only get the
+ * fsstate->fetch_size tuples in one run, so keep on executing
+ * till we get NULL in PGresult i.e. all the tuples are retrieved.
+ */
+ CHECK_FOR_INTERRUPTS();
+ numrows = PQntuples(res);
+
+ /* Convert the data into HeapTuples */
+ Assert(IsA(node->ss.ps.plan, ForeignScan));
+ for (i = 0; i < numrows; i++)
+ {
+ HeapTuple temp_tuple;
+
+ temp_tuple = make_tuple_from_result_row(res, i,
+ active_fsstate->rel,
+ active_fsstate->attinmeta,
+ active_fsstate->retrieved_attrs,
+ node,
+ active_fsstate->temp_cxt,
+ active_fsstate->tupdesc);
+ tuplestore_puttuple(active_fsstate->tuplestore, temp_tuple);
+ heap_freetuple(temp_tuple);
+ }
+ PQclear(res);
+ res = pgfdw_get_next_result(conn);
+ if (res == NULL)
+ break;
+ if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+ pgfdw_report_error(res, conn, active_fsstate->query);
+ else if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ {
+ Assert(PQntuples(res) == 0);
+ PQclear(res);
+ break;
+ }
+ }
+ }
+
+ /*
+ * There are no more tuples to fetch, a final call to reach the terminal
+ * state.
+ */
+ res = pgfdw_get_next_result(conn);
+ Assert(res == NULL);
+
+ /*
+ * Remove the active_scan since it is completely fetched, so no need to
+ * remember it now.
+ */
+ fsstate->conn_state->active_scan = NULL;
+ MemoryContextSwitchTo(oldcontext);
+}
+
/*
* Force assorted GUC parameters to settings that ensure that we'll output
* data values in a form that is unambiguous to the remote server.
@@ -4163,6 +4485,26 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
PQclear(res);
}
+/*
+ * When cursors are disabled, end the scan.
+ */
+static void
+end_scan(PGconn *conn)
+{
+ PGresult *res;
+
+ res = pgfdw_get_result(conn);
+
+ /*
+ * Query complete, nothing to do.
+ */
+ if (res == NULL)
+ return;
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(res, conn, "scan not ended properly in streaming_fetch mode");
+ PQclear(res);
+}
+
/*
* create_foreign_modify
* Construct an execution state of a foreign insert/update/delete
@@ -4544,7 +4886,7 @@ store_returning_result(PgFdwModifyState *fmstate,
fmstate->attinmeta,
fmstate->retrieved_attrs,
NULL,
- fmstate->temp_cxt);
+ fmstate->temp_cxt, NULL);
/*
* The returning slot will not necessarily be suitable to store heaptuples
@@ -4823,7 +5165,7 @@ get_returning_data(ForeignScanState *node)
dmstate->attinmeta,
dmstate->retrieved_attrs,
node,
- dmstate->temp_cxt);
+ dmstate->temp_cxt, NULL);
ExecStoreHeapTuple(newtup, slot, false);
/* Get the updated/deleted tuple. */
if (dmstate->rel)
@@ -5455,7 +5797,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
for (;;)
{
int numrows;
- int i;
+ int i = 0;
/* Allow users to cancel long query */
CHECK_FOR_INTERRUPTS();
@@ -5576,7 +5918,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
astate->attinmeta,
astate->retrieved_attrs,
NULL,
- astate->temp_cxt);
+ astate->temp_cxt, NULL);
MemoryContextSwitchTo(oldcontext);
}
@@ -7092,6 +7434,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
(void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
else if (strcmp(def->defname, "async_capable") == 0)
fpinfo->async_capable = defGetBoolean(def);
+ else if (strcmp(def->defname, "streaming_fetch") == 0)
+ set_streaming_fetch(def, fpinfo);
}
}
@@ -7115,9 +7459,20 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
(void) parse_int(defGetString(def), &fpinfo->fetch_size, 0, NULL);
else if (strcmp(def->defname, "async_capable") == 0)
fpinfo->async_capable = defGetBoolean(def);
+ else if (strcmp(def->defname, "streaming_fetch") == 0)
+ set_streaming_fetch(def, fpinfo);
}
}
+static void
+set_streaming_fetch(DefElem *def, PgFdwRelationInfo *fpinfo)
+{
+ fpinfo->streaming_fetch = defGetBoolean(def);
+ if (fpinfo->streaming_fetch && PQlibVersion() < 170000)
+ ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("streaming_fetch requires libpq version 17 or later"));
+}
+
/*
* Merge FDW options from input relations into a new set of options for a join
* or an upper rel.
@@ -7150,6 +7505,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
fpinfo->fetch_size = fpinfo_o->fetch_size;
fpinfo->async_capable = fpinfo_o->async_capable;
+ fpinfo->streaming_fetch = fpinfo_o->streaming_fetch;
/* Merge the table level options from either side of the join. */
if (fpinfo_i)
@@ -8423,7 +8779,8 @@ make_tuple_from_result_row(PGresult *res,
AttInMetadata *attinmeta,
List *retrieved_attrs,
ForeignScanState *fsstate,
- MemoryContext temp_context)
+ MemoryContext temp_context,
+ TupleDesc last_tupdesc)
{
HeapTuple tuple;
TupleDesc tupdesc;
@@ -8447,9 +8804,13 @@ make_tuple_from_result_row(PGresult *res,
/*
* Get the tuple descriptor for the row. Use the rel's tupdesc if rel is
- * provided, otherwise look to the scan node's ScanTupleSlot.
+ * provided, otherwise look to the scan node's ScanTupleSlot. In case of
+ * non-cursor mode, use the tupledesc that is already provided, because
+ * getting from the current fsstate would be wrong in this case.
*/
- if (rel)
+ if (last_tupdesc)
+ tupdesc = last_tupdesc;
+ else if (rel)
tupdesc = RelationGetDescr(rel);
else
{
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index a2bb1ff352c..c2dfbe9d321 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -88,6 +88,7 @@ typedef struct PgFdwRelationInfo
UserMapping *user; /* only set in use_remote_estimate mode */
int fetch_size; /* fetch size for this remote table */
+ bool streaming_fetch; /* true if cursor-free fetch is enabled */
/*
* Name of the relation, for use while EXPLAINing ForeignScan. It is used
@@ -131,12 +132,16 @@ typedef struct PgFdwRelationInfo
int relation_index;
} PgFdwRelationInfo;
+typedef struct PgFdwScanState PgFdwScanState;
+
/*
* Extra control information relating to a connection.
*/
typedef struct PgFdwConnState
{
AsyncRequest *pendingAreq; /* pending async request */
+ PgFdwScanState *active_scan; /* last query executed, required for
+ * non-cursor mode */
} PgFdwConnState;
/*
@@ -164,6 +169,7 @@ extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern void do_sql_command(PGconn *conn, const char *sql);
extern PGresult *pgfdw_get_result(PGconn *conn);
+extern PGresult *pgfdw_get_next_result(PGconn *conn);
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
PgFdwConnState *state);
pg_noreturn extern void pgfdw_report_error(PGresult *res, PGconn *conn,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 267d3c1a7e7..e9f705e978f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -267,6 +267,243 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
ANALYZE ft1;
ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
+-- ===================================================================
+-- test streaming_fetch option
+-- ===================================================================
+CREATE SERVER fetch_stream_srv
+ FOREIGN DATA WRAPPER postgres_fdw
+ OPTIONS (dbname :'current_database', port :'current_port');
+CREATE USER MAPPING FOR CURRENT_USER SERVER fetch_stream_srv;
+
+CREATE TABLE local_tbl (id int, val text);
+INSERT INTO local_tbl VALUES (1, 'a'), (2, 'b'), (3, 'c');
+-- 1. streaming_fetch set at SERVER level only
+
+-- 1a. Set streaming_fetch = true on the server
+ALTER SERVER fetch_stream_srv OPTIONS (ADD streaming_fetch 'true');
+
+CREATE FOREIGN TABLE ft_server (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl');
+
+-- Verify option appears in foreign server
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch')
+ORDER BY srvname, option_name;
+
+-- Verify option is NOT present at table level (inherits from server)
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+
+-- 1b. Switch server-level to streaming_fetch = false
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'false');
+
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch')
+ORDER BY srvname, option_name;
+
+-- Verify option is NOT present at table level (inherits from server)
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+
+-- 2. streaming_fetch set at TABLE level only (no server-level option)
+ALTER SERVER fetch_stream_srv OPTIONS (DROP streaming_fetch);
+
+-- 2a. streaming_fetch = true at table level
+ALTER FOREIGN TABLE ft_server OPTIONS (ADD streaming_fetch 'true');
+
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+
+SELECT * FROM ft_server ORDER BY id;
+
+-- 2b. streaming_fetch = false at table level
+ALTER FOREIGN TABLE ft_server OPTIONS (SET streaming_fetch 'false');
+
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+
+SELECT * FROM ft_server ORDER BY id;
+
+-- 3. TABLE-level value overrides SERVER-level value
+
+-- 3a. Server = true, Table = false
+ALTER SERVER fetch_stream_srv OPTIONS (ADD streaming_fetch 'true');
+
+-- Server shows true
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch')
+ORDER BY srvname, option_name;
+
+-- Table shows its own override: false
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+
+-- Query must succeed and use table-level setting (false)
+SELECT * FROM ft_server ORDER BY id;
+
+-- 3b. Server = false, Table = true → effective value is true
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'false');
+
+ALTER FOREIGN TABLE ft_server OPTIONS (SET streaming_fetch 'true');
+
+-- Server shows false
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch')
+ORDER BY srvname, option_name;
+
+-- Table shows its own override: true
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_server'::regclass;
+
+-- Query must succeed and use table-level setting (true)
+SELECT * FROM ft_server ORDER BY id;
+
+-- 4. Negative tests: invalid values must be rejected
+
+-- streaming_fetch is boolean; non-boolean value must ERROR
+\set VERBOSITY terse
+
+CREATE FOREIGN TABLE ft_invalid (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl', streaming_fetch 'yes'); -- ERROR
+
+CREATE FOREIGN TABLE ft_invalid (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl', streaming_fetch '1'); -- ERROR
+
+CREATE FOREIGN TABLE ft_invalid (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl', streaming_fetch ''); -- ERROR
+
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'notabool'); -- ERROR
+
+\set VERBOSITY default
+
+-- 5. ALTER FOREIGN TABLE: add, change, and drop streaming_fetch
+
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'true');
+
+CREATE FOREIGN TABLE ft_alter_test (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl');
+
+-- No table-level option yet
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_alter_test'::regclass;
+
+-- ADD table-level option
+ALTER FOREIGN TABLE ft_alter_test OPTIONS (ADD streaming_fetch 'false');
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_alter_test'::regclass;
+
+SELECT * FROM ft_alter_test ORDER BY id;
+
+-- SET (change) table-level option
+ALTER FOREIGN TABLE ft_alter_test OPTIONS (SET streaming_fetch 'true');
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_alter_test'::regclass;
+
+SELECT * FROM ft_alter_test ORDER BY id;
+
+-- DROP table-level option (falls back to server-level)
+ALTER FOREIGN TABLE ft_alter_test OPTIONS (DROP streaming_fetch);
+SELECT ftrelid::regclass, ftoptions
+FROM pg_foreign_table
+WHERE ftrelid = 'ft_alter_test'::regclass;
+
+SELECT * FROM ft_alter_test ORDER BY id;
+
+DROP FOREIGN TABLE ft_alter_test;
+
+-- 6. streaming_fetch with non-default fetch_size values
+-- Use a 12-row table so chunk boundaries are distinct and predictable:
+-- fetch_size=1 gives 12 single-row chunks, fetch_size=5 gives chunks
+-- of 5+5+2, and fetch_size=1000 puts all rows in a single chunk.
+CREATE TABLE local_tbl_large (id int, val text);
+INSERT INTO local_tbl_large SELECT id, 'val' || id FROM generate_series(1, 12) id;
+
+ALTER SERVER fetch_stream_srv OPTIONS (SET streaming_fetch 'true');
+
+-- fetch_size = 1: every row is its own libpq chunk; exercises the path
+-- where pgfdw_get_next_result is called once per row.
+CREATE FOREIGN TABLE ft_fetchsize (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl_large', fetch_size '1');
+SELECT count(*) FROM ft_fetchsize;
+DROP FOREIGN TABLE ft_fetchsize;
+
+-- fetch_size = 5: three chunks with a partial last chunk (5+5+2);
+-- the final chunk is smaller than fetch_size.
+CREATE FOREIGN TABLE ft_fetchsize (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl_large', fetch_size '5');
+SELECT count(*) FROM ft_fetchsize;
+SELECT * FROM ft_fetchsize ORDER BY id;
+DROP FOREIGN TABLE ft_fetchsize;
+
+-- fetch_size exceeds the table row count: all rows arrive in one chunk
+-- followed immediately by the final empty PGRES_TUPLES_OK result.
+CREATE FOREIGN TABLE ft_fetchsize (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl_large', fetch_size '1000');
+SELECT count(*) FROM ft_fetchsize;
+DROP FOREIGN TABLE ft_fetchsize;
+
+DROP TABLE local_tbl_large;
+
+-- 7. streaming_fetch combined with use_remote_estimate
+-- use_remote_estimate issues a remote EXPLAIN to size the scan at plan
+-- time; streaming_fetch must not interfere with that EXPLAIN call.
+ALTER SERVER fetch_stream_srv OPTIONS (ADD use_remote_estimate 'true');
+
+CREATE FOREIGN TABLE ft_remote_est (id int, val text)
+ SERVER fetch_stream_srv
+ OPTIONS (schema_name 'public', table_name 'local_tbl');
+
+-- Verify both options are active at the server level.
+SELECT srvname, option_name, option_value
+FROM pg_foreign_server,
+ LATERAL pg_options_to_table(srvoptions)
+WHERE srvname = 'fetch_stream_srv'
+ AND option_name IN ('streaming_fetch', 'use_remote_estimate')
+ORDER BY option_name;
+
+-- Both options active: use_remote_estimate sizes the scan remotely via
+-- EXPLAIN, then streaming_fetch fetches rows without a cursor.
+SELECT * FROM ft_remote_est ORDER BY id;
+
+ALTER SERVER fetch_stream_srv OPTIONS (DROP use_remote_estimate);
+
+-- 9. streaming_fetch check the output in explain with verbose
+EXPLAIN (VERBOSE, COSTS OFF) SELECT * FROM ft_remote_est ORDER BY id;
+
+-- Cleanup
+DROP FOREIGN TABLE ft_remote_est;
+DROP USER MAPPING FOR CURRENT_USER SERVER fetch_stream_srv;
+DROP SERVER fetch_stream_srv CASCADE;
+DROP TABLE local_tbl;
-- ===================================================================
-- test subscription
-- ===================================================================
@@ -307,6 +544,32 @@ SELECT COUNT(*) FROM ft1 t1;
SELECT * FROM ft1 t1 WHERE t1.c3 IN (SELECT c3 FROM ft2 t2 WHERE c1 <= 10) ORDER BY c1;
-- subquery+MAX
SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+-- Test in streaming_fetch mode to cover process_query_params path
+-- with only one table using streaming_fetch
+ALTER FOREIGN TABLE ft1 OPTIONS (streaming_fetch 'true');
+SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+-- Test join with only one table using streaming_fetch at a time
+SELECT t1.c1, t2."C 1" FROM ft2 t1 JOIN "S 1"."T 1" t2 ON (t1.c1 = t2."C 1") OFFSET 100 LIMIT 10;
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+ALTER FOREIGN TABLE ft2 OPTIONS (streaming_fetch 'true');
+SELECT t1.c1, t2."C 1" FROM ft2 t1 JOIN "S 1"."T 1" t2 ON (t1.c1 = t2."C 1") OFFSET 100 LIMIT 10;
+-- with both the tables using streaming_fetch
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+SELECT * FROM ft1 t1 WHERE t1.c3 = (SELECT MAX(c3) FROM ft2 t2) ORDER BY c1;
+-- Test join with both the tables using streaming_fetch
+SELECT t1.c1, t2."C 1" FROM ft2 t1 JOIN "S 1"."T 1" t2 ON (t1.c1 = t2."C 1") OFFSET 100 LIMIT 10;
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
+-- streaming_fetch: verify correct results when parallel-friendly settings
+-- are active locally. With no cursor on the remote side, the remote
+-- planner is free to choose a parallel plan; results must match exactly.
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+SET max_parallel_workers_per_gather = 2;
+SET min_parallel_table_scan_size = 0;
+SELECT count(*) FROM ft1;
+RESET max_parallel_workers_per_gather;
+RESET min_parallel_table_scan_size;
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
-- used in CTE
WITH t1 AS (SELECT * FROM ft1 WHERE c1 <= 10) SELECT t2.c1, t2.c2, t2.c3, t2.c4 FROM t1, ft2 t2 WHERE t1.c1 = t2.c1 ORDER BY t1.c1;
-- fixed values
@@ -386,6 +649,54 @@ EXPLAIN (VERBOSE, COSTS OFF)
WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
SELECT * FROM ft2 a, ft2 b
WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+
+-- Test in streaming_fetch mode for rescan path
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'true');
+SELECT * FROM ft2 a, ft2 b
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+-- Test for streaming_fetch covering rescans and three active cursors
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1 AND a.c8 = 'foo'
+AND b.c7 = upper(a.c7);
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
+
+-- Test in streaming_fetch mode for interleaved scans.
+-- The non-shippable condition a.c8 = 'foo' (user_enum is not a built-in type)
+-- prevents full join pushdown, so the planner issues two separate FDW scans
+-- that share the same loopback connection. When the inner scan's init_scan
+-- fires it calls save_to_tuplestore to drain the outer scan's unread rows.
+ ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'true');
+
+-- Show the plan: ft2 must appear as two independent ForeignScan nodes, not
+-- a single pushed-down remote join.
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM ft2 a, ft2 b
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+
+-- Verify results with cursor path.
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
+SELECT * FROM ft2 a, ft2 b
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'true');
+
+-- Three-way self-join to test streaming_fetch
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1
+AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1
+AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+
+-- output matches in cursor mode
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
+SELECT count(*) FROM ft2 a, ft2 b, ft2 c
+WHERE a.c2 = 6 AND b.c1 = a.c1 AND c.c1 = b.c1
+AND a.c8 = 'foo' AND b.c7 = upper(a.c7);
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
+
+
-- bug before 9.3.5 due to sloppy handling of remote-estimate parameters
SELECT * FROM ft1 WHERE c1 = ANY (ARRAY(SELECT c1 FROM ft2 WHERE c1 < 5));
SELECT * FROM ft2 WHERE c1 = ANY (ARRAY(SELECT c1 FROM ft1 WHERE c1 < 5));
@@ -676,6 +987,16 @@ SELECT t1.c1 FROM ft1 t1 WHERE EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c1)
EXPLAIN (VERBOSE, COSTS OFF)
SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
+
+-- Test in streaming_fetch mode to cover the patch for two simultaneous active cursors
+-- with only one table using streaming_fetch
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
+-- with both the tables using streaming_fetch
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'true');
+SELECT t1.c1 FROM ft1 t1 WHERE NOT EXISTS (SELECT 1 FROM ft2 t2 WHERE t1.c1 = t2.c2) ORDER BY t1.c1 OFFSET 100 LIMIT 10;
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+ALTER FOREIGN TABLE ft2 OPTIONS (SET streaming_fetch 'false');
-- CROSS JOIN can be pushed down
EXPLAIN (VERBOSE, COSTS OFF)
SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
@@ -684,6 +1005,12 @@ SELECT t1.c1, t2.c1 FROM ft1 t1 CROSS JOIN ft2 t2 ORDER BY t1.c1, t2.c1 OFFSET 1
EXPLAIN (VERBOSE, COSTS OFF)
SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
+-- Test in streaming_fetch mode to cover the case with multiple cursors but only one active cursor at a time
+ALTER FOREIGN TABLE ft5 OPTIONS (streaming_fetch 'true');
+ALTER FOREIGN TABLE ft6 OPTIONS (streaming_fetch 'true');
+SELECT t1.c1, t2.c1 FROM ft5 t1 JOIN ft6 t2 ON (t1.c1 = t2.c1) ORDER BY t1.c1, t2.c1 OFFSET 100 LIMIT 10;
+ALTER FOREIGN TABLE ft5 OPTIONS (SET streaming_fetch 'false');
+ALTER FOREIGN TABLE ft6 OPTIONS (SET streaming_fetch 'false');
-- unsafe join conditions (c8 has a UDT), not pushed down. Practically a CROSS
-- JOIN since c8 in both tables has same value.
EXPLAIN (VERBOSE, COSTS OFF)
@@ -831,6 +1158,31 @@ explain (verbose, costs off)
select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2 limit 1;
select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2 limit 1;
+-- Test with limit and streaming_fetch
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+select count(c6), sum(c1), avg(c1), min(c2), max(c1), stddev(c2), sum(c1) * (random() <= 1)::int as sum2 from ft1 where c2 < 5 group by c2 order by 1, 2 limit 1;
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+
+-- Test LIMIT stopping before all tuples are consumed.
+-- The WHERE clause references c8 (a user-defined type that cannot be
+-- pushed to the remote), preventing LIMIT pushdown. The remote
+-- therefore streams all rows, and end_scan must discard the in-flight
+-- data when the local executor stops early.
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+-- LIMIT 5 with default fetch_size=100: stops well within the first
+-- chunk; end_scan discards ~995 rows still in flight on the connection.
+SELECT c1 FROM ft1 WHERE c8 = 'foo' ORDER BY c1 LIMIT 5;
+-- Verify the connection is still usable after the early stop.
+SELECT count(*) FROM ft1;
+-- fetch_size=10, LIMIT=15: consumes one full chunk (rows 1-10) plus 5
+-- rows from a second chunk (rows 11-15); end_scan then discards the
+-- remainder of that chunk and all subsequent in-flight chunks.
+ALTER FOREIGN TABLE ft1 OPTIONS (fetch_size '10');
+SELECT c1 FROM ft1 WHERE c8 = 'foo' ORDER BY c1 LIMIT 15;
+SELECT count(*) FROM ft1;
+ALTER FOREIGN TABLE ft1 OPTIONS (DROP fetch_size);
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+
-- Aggregate is not pushed down as aggregation contains random()
explain (verbose, costs off)
select sum(c1 * (random() <= 1)::int) as sum, avg(c1) from ft1;
@@ -3542,6 +3894,41 @@ SELECT 1 FROM ft1 LIMIT 1; -- should fail
\set VERBOSITY default
COMMIT;
+-- ===================================================================
+-- streaming_fetch: error recovery when the remote backend terminates
+-- ===================================================================
+-- Enable streaming_fetch so init_scan is exercised on reconnect.
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'true');
+
+-- Establish a fresh remote connection.
+SELECT 1 FROM ft1 LIMIT 1;
+
+-- Terminate the remote backend and wait for the termination to complete.
+DO $$ BEGIN
+PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity
+ WHERE application_name = 'fdw_retry_check';
+END $$;
+
+-- After the connection is broken, a streaming_fetch query should detect
+-- the broken connection, reestablish it, and succeed.
+BEGIN;
+SELECT c1 FROM ft1 ORDER BY c1 LIMIT 3;
+
+-- Inside a subtransaction the broken connection must not be silently
+-- retried; the query should fail.
+DO $$ BEGIN
+PERFORM pg_terminate_backend(pid, 180000) FROM pg_stat_activity
+ WHERE application_name = 'fdw_retry_check';
+END $$;
+SAVEPOINT s2;
+-- The text of the error might vary across platforms, so only show SQLSTATE.
+\set VERBOSITY sqlstate
+SELECT 1 FROM ft1 LIMIT 1; -- should fail
+\set VERBOSITY default
+COMMIT;
+
+ALTER FOREIGN TABLE ft1 OPTIONS (SET streaming_fetch 'false');
+
-- =============================================================================
-- test connection invalidation cases and postgres_fdw_get_connections function
-- =============================================================================
@@ -4100,6 +4487,29 @@ SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c
ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
+-- Test async_capable with streaming_fetch
+-- No streaming_fetch at server, this should give Async Foreign Scan for for async_p1 and async_p2
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt;
+
+-- streaming_fetch = false at loopback server, this should still give Async Foreign Scan for async_p1 and async_p2
+ALTER SERVER loopback OPTIONS (streaming_fetch 'false');
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt;
+
+-- streaming_fetch = false at loopback server but true for async_p1, this should give Foreign Scan for async_p1
+ALTER FOREIGN TABLE async_p1 OPTIONS (ADD streaming_fetch 'true');
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt;
+
+-- streaming_fetch = true at loopback2 server, this should give Foreign Scan for async_p2 also
+ALTER SERVER loopback2 OPTIONS (streaming_fetch 'true');
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt;
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (DROP streaming_fetch);
+ALTER SERVER loopback OPTIONS (DROP streaming_fetch);
+ALTER SERVER loopback2 OPTIONS (DROP streaming_fetch);
DROP TABLE local_tbl;
DROP INDEX base_tbl1_idx;
DROP INDEX base_tbl2_idx;
diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml
index b81f33732fb..84630da8212 100644
--- a/doc/src/sgml/postgres-fdw.sgml
+++ b/doc/src/sgml/postgres-fdw.sgml
@@ -476,6 +476,21 @@ OPTIONS (ADD password_required 'false');
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>streaming_fetch</literal> (<type>boolean</type>)</term>
+ <listitem>
+ <para>
+ Specifies whether to fetch rows from the remote server without
+ using a cursor. When enabled, the remote server is free to use
+ parallel query plans, which can significantly improve performance
+ for large scans. When a second scan begins on the same connection
+ while one is already in progress, the remaining rows of the first scan
+ are buffered locally (using work_mem) and replayed on demand.
+ Asynchronous execution is not supported in this mode and is disabled
+ automatically. The default is <literal>false</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist>
</sect3>
--
2.39.5 (Apple Git-154)
[application/octet-stream] v10-0001-postgres_fdw-Rename-cursor_exists-flag-to-scan_i.patch (4.0K, 4-v10-0001-postgres_fdw-Rename-cursor_exists-flag-to-scan_i.patch)
download | inline diff:
From e30dfe2e0b5062427ae647cdc63a5db38acf2d64 Mon Sep 17 00:00:00 2001
From: Rafia Sabih <[email protected]>
Date: Tue, 5 May 2026 12:49:36 +0200
Subject: [PATCH v10 1/2] postgres_fdw: Rename cursor_exists flag to
scan_in_progress
The cursor_exists flag in PgFdwScanState was named specifically for the
cursor-based tuple-fetching path. A follow-up patch introduces a
cursor-free fetching mechanism, making the old name misleading in that
context. Rename it to scan_in_progress to reflect its actual purpose:
tracking whether a scan has been started, regardless of the fetching
method in use.
---
contrib/postgres_fdw/postgres_fdw.c | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index c42cb690c7b..350dc19f29a 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -155,7 +155,7 @@ typedef struct PgFdwScanState
PGconn *conn; /* connection for the scan */
PgFdwConnState *conn_state; /* extra per-connection state */
unsigned int cursor_number; /* quasi-unique ID for my cursor */
- bool cursor_exists; /* have we created the cursor? */
+ bool scan_in_progress; /* is there a scan in progress? */
int numParams; /* number of parameters passed to query */
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
@@ -1758,7 +1758,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
- fsstate->cursor_exists = false;
+ fsstate->scan_in_progress = false;
/* Get private info created by planner functions. */
fsstate->query = strVal(list_nth(fsplan->fdw_private,
@@ -1827,7 +1827,7 @@ postgresIterateForeignScan(ForeignScanState *node)
* already created the cursor before we get here, even if this is the
* first call after Begin or ReScan.
*/
- if (!fsstate->cursor_exists)
+ if (!fsstate->scan_in_progress)
create_cursor(node);
/*
@@ -1867,8 +1867,8 @@ postgresReScanForeignScan(ForeignScanState *node)
char sql[64];
PGresult *res;
- /* If we haven't created the cursor yet, nothing to do. */
- if (!fsstate->cursor_exists)
+ /* If no scan is in progress, nothing to do. */
+ if (!fsstate->scan_in_progress)
return;
/*
@@ -1894,7 +1894,7 @@ postgresReScanForeignScan(ForeignScanState *node)
*/
if (node->ss.ps.chgParam != NULL)
{
- fsstate->cursor_exists = false;
+ fsstate->scan_in_progress = false;
snprintf(sql, sizeof(sql), "CLOSE c%u",
fsstate->cursor_number);
}
@@ -1905,7 +1905,7 @@ postgresReScanForeignScan(ForeignScanState *node)
fsstate->cursor_number);
else
{
- fsstate->cursor_exists = false;
+ fsstate->scan_in_progress = false;
snprintf(sql, sizeof(sql), "CLOSE c%u",
fsstate->cursor_number);
}
@@ -1944,7 +1944,7 @@ postgresEndForeignScan(ForeignScanState *node)
return;
/* Close the cursor if open, to prevent accumulation of cursors */
- if (fsstate->cursor_exists)
+ if (fsstate->scan_in_progress)
close_cursor(fsstate->conn, fsstate->cursor_number,
fsstate->conn_state);
@@ -3994,8 +3994,8 @@ create_cursor(ForeignScanState *node)
pgfdw_report_error(res, conn, fsstate->query);
PQclear(res);
- /* Mark the cursor as created, and show no tuples have been retrieved */
- fsstate->cursor_exists = true;
+ /* Mark the scan as started, and show no tuples have been retrieved */
+ fsstate->scan_in_progress = true;
fsstate->tuples = NULL;
fsstate->num_tuples = 0;
fsstate->next_tuple = 0;
@@ -8332,7 +8332,7 @@ fetch_more_data_begin(AsyncRequest *areq)
Assert(!fsstate->conn_state->pendingAreq);
/* Create the cursor synchronously. */
- if (!fsstate->cursor_exists)
+ if (!fsstate->scan_in_progress)
create_cursor(node);
/* We will send this query, but not wait for the response. */
--
2.39.5 (Apple Git-154)
^ permalink raw reply [nested|flat] 2+ messages in thread
end of thread, other threads:[~2026-05-20 10:40 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz follow: Atom feed)
-- links below jump to the message on this page --
2026-01-26 11:44 Re: Bypassing cursors in postgres_fdw to enable parallel plans Rafia Sabih <[email protected]>
2026-05-20 10:40 ` Rafia Sabih <[email protected]>
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox