public inbox for [email protected]
help / color / mirror / Atom feedFrom: Rafia Sabih <[email protected]>
To: PostgreSQL Hackers <[email protected]>
Cc: Robert Haas <[email protected]>
Subject: Bypassing cursors in postgres_fdw to enable parallel plans
Date: Mon, 6 Jan 2025 09:52:10 +0100
Message-ID: <CA+FpmFcmO5ctjYgQxSomJC=mCugqPo+51Le2wdxX0kWxjvBBig@mail.gmail.com> (raw)
Hello hackers,
At present, in postgres_fdw, if a query which is using a parallel plan is
fired from the remote end fails to use the parallel plan locally because of
the presence of CURSORS. Consider the following example,
Local server,
Table:
create table t ( i int, j int, k text);
insert into t values(generate_series(1,10000), generate_series(1, 10000),
'check_this_out');
Query
select * from t where i > 1000;
Query plan
Gather (cost=0.00..116.08 rows=9000 width=23)
Workers Planned: 2
-> Parallel Seq Scan on t (cost=0.00..116.08 rows=3750 width=23)
Filter: (i > 1000)
Foreign server
create extension postgres_fdw;
CREATE SERVER foreign_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS
(host '127.0.0.1', port '5432', dbname 'postgres');
CREATE USER MAPPING FOR user1 SERVER foreign_server OPTIONS (user 'user1');
Table
CREATE FOREIGN TABLE foreign_table ( i int, j int, k text ) SERVER
foreign_server OPTIONS (schema_name 'public', table_name 't');
Query
select * from t where i > 1000;
Query plan at the local server
Seq Scan on t (cost=0.00..189.00 rows=9000 width=23)
Filter: (i > 1000)
I have used auto_explain extension to get the query plans at the local
server and also following settings in .conf to force the parallel plans for
the purpose of demonstration --
min_parallel_table_scan_size = 0
parallel_tuple_cost= 0
parallel_setup_cost = 0
with the patch:
set postgres_fdw.use_cursor = false;
Query plan at the local server
Gather (cost=0.00..116.08 rows=9000 width=23)
Workers Planned: 2
-> Parallel Seq Scan on t (cost=0.00..116.08 rows=3750 width=23)
Filter: (i > 1000)
Now, to overcome this limitation, I have worked on this idea (suggested by
my colleague Bernd Helmle) of bypassing the cursors. The way it works is as
follows,
there is a new GUC introduced postgres_fdw.use_cursor, which when unset
uses the mode without the cursor. Now, it uses PQsetChunkedRowsMode in
create_cursor when non-cursor mode is used. The size of the chunk is the
same as the fetch_size. Now in fetch_more_data, when non-cursor mode is
used, pgfdw_get_next_result is used to get the chunk in PGresult and
processed in the same manner as before.
Now, the issue comes when there are simultaneous queries, which is the case
with the join queries where all the tables involved in the join are at the
local server. Because in that case we have multiple cursors opened at the
same time and without a cursor mechanism we do not have any information or
any other structure to know what to fetch from which query. To handle that
case, we have a flag only_query, which is unset as soon as we have assigned
the cursor_number >= 2, in postgresBeginForeignScan. Now, in fetch_more
data, when we find out that only_query is unset, then we fetch all the data
for the query and store it in a Tuplestore. These tuples are then
transferred to the fsstate->tuples and then processed as usual.
So yes there is a performance drawback in the case of simultaneous queries,
however, the ability to use parallel plans is really an added advantage for
the users. Plus, we can keep things as before by this new GUC --
use_cursor, in case we are losing more for some workloads. So, in short I
feel hopeful that this could be a good idea and a good time to improve
postgres_fdw.
Looking forward to your reviews, comments, etc.
--
Regards,
Rafia Sabih
CYBERTEC PostgreSQL International GmbH
Attachments:
[application/octet-stream] 0001-Add-a-fetch-mechanism-without-cursors.patch (13.7K, 3-0001-Add-a-fetch-mechanism-without-cursors.patch)
download | inline diff:
From 14e2fcb457ef3e9a3ad004425ec7e334a3619fd7 Mon Sep 17 00:00:00 2001
From: Rafia Sabih <[email protected]>
Date: Mon, 6 Jan 2025 09:21:18 +0100
Subject: [PATCH] Add a fetch mechanism without cursors
This adds a GUC to enable/ disable cursor mode in postgres_fdw.
The GUC is called postgres_fdw.use_cursor. When it is set, everything
works as it is now. However, there is a limitation to the current
mechanism, it is unable to use parallel plans at local side because
of the use of cursors. Now, if a user wants to overcome this, then
one can unset the abovementioned GUC. In non-cursor mode cursors are
not used and hence the parallel plans can be used at the local side.
In non-cursor mode fetch_size is used to as is.
A caveat with the non-cursor mode is that when simultaneous queries are
fired at the local side, i.e. more than one cursor is opened at a time,
then we use Tuplestore, so there might be some memory related performance
degradation only in those cases.
---
contrib/postgres_fdw/connection.c | 7 +
contrib/postgres_fdw/option.c | 17 +++
contrib/postgres_fdw/postgres_fdw.c | 221 +++++++++++++++++++++-------
contrib/postgres_fdw/postgres_fdw.h | 2 +
4 files changed, 193 insertions(+), 54 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 2326f391d34..95e30773a19 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -868,6 +868,13 @@ pgfdw_get_result(PGconn *conn)
return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
}
+PGresult *
+pgfdw_get_next_result(PGconn *conn)
+{
+ return libpqsrv_get_result(conn, pgfdw_we_get_result);
+}
+
+
/*
* Report an error we got from the remote server.
*
diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c
index 232d85354b2..a5d7b747536 100644
--- a/contrib/postgres_fdw/option.c
+++ b/contrib/postgres_fdw/option.c
@@ -49,6 +49,7 @@ static PQconninfoOption *libpq_options;
* GUC parameters
*/
char *pgfdw_application_name = NULL;
+bool pgfdw_use_cursor = true;
/*
* Helper functions
@@ -585,5 +586,21 @@ _PG_init(void)
NULL,
NULL);
+ /*
+ * If use_cursor is set to false, then the new way of fetching is used, which allows for the
+ * use of parallel plans at the local side. In the cursor mode, parallel plans could not be
+ * used.
+ */
+ DefineCustomBoolVariable("postgres_fdw.use_cursor",
+ "If set uses the cursor, otherwise fetches without cursor",
+ NULL,
+ &pgfdw_use_cursor,
+ true,
+ PGC_USERSET,
+ 0,
+ NULL,
+ NULL,
+ NULL);
+
MarkGUCPrefixReserved("postgres_fdw");
}
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index cf564341184..06407da60ef 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "executor/execAsync.h"
+#include "executor/executor.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -542,6 +543,7 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo,
const PgFdwRelationInfo *fpinfo_i);
static int get_batch_size_option(Relation rel);
+static bool only_query = true; /* Only to be used in the non cursor mode*/
/*
* Foreign-data wrapper handler function: return a struct with pointers
@@ -1544,6 +1546,11 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
/* Get private info created by planner functions. */
fsstate->query = strVal(list_nth(fsplan->fdw_private,
FdwScanPrivateSelectSql));
+
+ /* We need to remember that there is already a query running. */
+ if (fsstate->cursor_number >= 2)
+ only_query = false;
+
fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
FdwScanPrivateRetrievedAttrs);
fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private,
@@ -3733,7 +3740,7 @@ create_cursor(ForeignScanState *node)
const char **values = fsstate->param_values;
PGconn *conn = fsstate->conn;
StringInfoData buf;
- PGresult *res;
+ PGresult *res = NULL;
/* First, process a pending asynchronous request, if any. */
if (fsstate->conn_state->pendingAreq)
@@ -3758,36 +3765,53 @@ create_cursor(ForeignScanState *node)
MemoryContextSwitchTo(oldcontext);
}
- /* Construct the DECLARE CURSOR command */
initStringInfo(&buf);
- appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
- fsstate->cursor_number, fsstate->query);
- /*
- * Notice that we pass NULL for paramTypes, thus forcing the remote server
- * to infer types for all parameters. Since we explicitly cast every
- * parameter (see deparse.c), the "inference" is trivial and will produce
- * the desired result. This allows us to avoid assuming that the remote
- * server has the same OIDs we do for the parameters' types.
- */
- if (!PQsendQueryParams(conn, buf.data, numParams,
- NULL, values, NULL, NULL, 0))
- pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+ if (pgfdw_use_cursor)
+ {
+ /* Construct the DECLARE CURSOR command */
+ appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
+ fsstate->cursor_number, fsstate->query);
+
+ /*
+ * Notice that we pass NULL for paramTypes, thus forcing the remote server
+ * to infer types for all parameters. Since we explicitly cast every
+ * parameter (see deparse.c), the "inference" is trivial and will produce
+ * the desired result. This allows us to avoid assuming that the remote
+ * server has the same OIDs we do for the parameters' types.
+ */
+ if (!PQsendQueryParams(conn, buf.data, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+ /*
+ * Get the result, and check for success.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = pgfdw_get_result(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
+ }
+ else
+ {
+ /* Fetch without cursors */
+ appendStringInfo(&buf, "%s", fsstate->query);
+
+ if (!PQsendQueryParams(conn, buf.data, numParams,
+ NULL, values, NULL, NULL, 0))
+ pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+
+ /* Call for Chunked rows mode with same size of chunk as the fetch size */
+ if (!PQsetChunkedRowsMode(conn, fsstate->fetch_size))
+ pgfdw_report_error(ERROR, NULL, conn, false, buf.data);
+ }
- /*
- * Get the result, and check for success.
- *
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
- */
- res = pgfdw_get_result(conn);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, conn, true, fsstate->query);
PQclear(res);
/* Mark the cursor as created, and show no tuples have been retrieved */
- fsstate->cursor_exists = true;
- fsstate->tuples = NULL;
+ fsstate->cursor_exists = true; // We need this even for non-cursor mode.
fsstate->num_tuples = 0;
fsstate->next_tuple = 0;
fsstate->fetch_ct_2 = 0;
@@ -3806,6 +3830,7 @@ fetch_more_data(ForeignScanState *node)
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
PGresult *volatile res = NULL;
MemoryContext oldcontext;
+ bool already_done = false;
/*
* We'll store the tuples in the batch_cxt. First, flush the previous
@@ -3820,7 +3845,7 @@ fetch_more_data(ForeignScanState *node)
{
PGconn *conn = fsstate->conn;
int numrows;
- int i;
+ int i = 0;
if (fsstate->async_capable)
{
@@ -3838,7 +3863,7 @@ fetch_more_data(ForeignScanState *node)
/* Reset per-connection state */
fsstate->conn_state->pendingAreq = NULL;
}
- else
+ if (pgfdw_use_cursor)
{
char sql[64];
@@ -3851,32 +3876,113 @@ fetch_more_data(ForeignScanState *node)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
}
+ else
+ {
+ /* Non-cursor mode uses PQSetChunkedRowsMode during create_cursor, so just get the result here. */
+ res = pgfdw_get_next_result(conn);
- /* Convert the data into HeapTuples */
- numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
- fsstate->num_tuples = numrows;
- fsstate->next_tuple = 0;
+ if (res == NULL)
+ break;
- for (i = 0; i < numrows; i++)
+ else if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+ pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query);
+ else if (PQresultStatus(res) == PGRES_TUPLES_CHUNK)
+ {
+ int total = 0;
+ if (!only_query)
+ {
+ /*
+ * When this is not the only query running, we extract all the tuples
+ * in one go and store them in tuplestore.
+ * Since it is using PQSetChunkedRowsMode, we get only the fsstate->fetch_size
+ * tuples in one run, so keep on executing till we get NULL in PGresult.
+ */
+ Tuplestorestate *tuplestore = tuplestore_begin_heap(false, true, work_mem);
+ TupleTableSlot *slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple);
+ HeapTuple temp_tuple = (HeapTuple) palloc0(sizeof(HeapTuple));
+
+ i = 0;
+ for (;;)
+ {
+ CHECK_FOR_INTERRUPTS();
+ numrows = PQntuples(res);
+
+ /* Convert the data into HeapTuples */
+ Assert(IsA(node->ss.ps.plan, ForeignScan));
+ for (i = 0; i < numrows; i++)
+ {
+ temp_tuple = make_tuple_from_result_row(res, i,
+ fsstate->rel,
+ fsstate->attinmeta,
+ fsstate->retrieved_attrs,
+ node,
+ fsstate->temp_cxt);
+ tuplestore_puttuple(tuplestore, temp_tuple);
+ total++;
+ }
+ res = pgfdw_get_next_result(conn);
+
+ if (res == NULL)
+ break;
+
+ else if (PQresultStatus(res) == PGRES_TUPLES_OK)
+ {
+ while (res!= NULL)
+ res = pgfdw_get_next_result(conn);
+ break;
+ }
+ else if (PQresultStatus(res) == PGRES_FATAL_ERROR)
+ pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query);
+ }
+ if (total > 0)
+ {
+ already_done = true;
+ numrows = total;
+ fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ fsstate->num_tuples = numrows;
+ fsstate->next_tuple = 0;
+ for (i = 0; i < numrows; i++)
+ {
+ while (tuplestore_gettupleslot(tuplestore, true, true, slot))
+ fsstate->tuples[i++] = ExecFetchSlotHeapTuple(slot, true, NULL);
+ }
+ }
+ /* EOF is reached because when we are storing all tuples to the tuplestore. */
+ fsstate->eof_reached = true;
+ pfree(temp_tuple);
+ ExecDropSingleTupleTableSlot(slot);
+ tuplestore_end(tuplestore);
+ }
+ }
+ }
+ if (!already_done)
{
- Assert(IsA(node->ss.ps.plan, ForeignScan));
-
- fsstate->tuples[i] =
- make_tuple_from_result_row(res, i,
- fsstate->rel,
- fsstate->attinmeta,
- fsstate->retrieved_attrs,
- node,
- fsstate->temp_cxt);
+ /* Convert the data into HeapTuples */
+ numrows = PQntuples(res);
+ fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ fsstate->num_tuples = numrows;
+ fsstate->next_tuple = 0;
+
+ for (i = 0; i < numrows; i++)
+ {
+ Assert(IsA(node->ss.ps.plan, ForeignScan));
+
+ fsstate->tuples[i] =
+ make_tuple_from_result_row(res, i,
+ fsstate->rel,
+ fsstate->attinmeta,
+ fsstate->retrieved_attrs,
+ node,
+ fsstate->temp_cxt);
+ }
+
+ /* Must be EOF if we didn't get as many tuples as we asked for. */
+ fsstate->eof_reached = (numrows < fsstate->fetch_size);
}
/* Update fetch_ct_2 */
if (fsstate->fetch_ct_2 < 2)
fsstate->fetch_ct_2++;
-
- /* Must be EOF if we didn't get as many tuples as we asked for. */
- fsstate->eof_reached = (numrows < fsstate->fetch_size);
}
PG_FINALLY();
{
@@ -3955,16 +4061,23 @@ close_cursor(PGconn *conn, unsigned int cursor_number,
char sql[64];
PGresult *res;
- snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
+ if (pgfdw_use_cursor)
+ {
+ snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number);
- /*
- * We don't use a PG_TRY block here, so be careful not to throw error
- * without releasing the PGresult.
- */
- res = pgfdw_exec_query(conn, sql, conn_state);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, conn, true, sql);
- PQclear(res);
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = pgfdw_exec_query(conn, sql, conn_state);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, conn, true, sql);
+ PQclear(res);
+ }
+ else
+ {
+ while (pgfdw_get_result(conn) != NULL) {}
+ }
}
/*
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 9e501660d18..8c177ec9946 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -164,6 +164,7 @@ extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern void do_sql_command(PGconn *conn, const char *sql);
extern PGresult *pgfdw_get_result(PGconn *conn);
+extern PGresult *pgfdw_get_next_result(PGconn *conn);
extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
PgFdwConnState *state);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
@@ -177,6 +178,7 @@ extern List *ExtractExtensionList(const char *extensionsString,
bool warnOnMissing);
extern char *process_pgfdw_appname(const char *appname);
extern char *pgfdw_application_name;
+extern bool pgfdw_use_cursor;
/* in deparse.c */
extern void classifyConditions(PlannerInfo *root,
--
2.39.5 (Apple Git-154)
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected]
Subject: Re: Bypassing cursors in postgres_fdw to enable parallel plans
In-Reply-To: <CA+FpmFcmO5ctjYgQxSomJC=mCugqPo+51Le2wdxX0kWxjvBBig@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox