From 14e2fcb457ef3e9a3ad004425ec7e334a3619fd7 Mon Sep 17 00:00:00 2001 From: Rafia Sabih Date: Mon, 6 Jan 2025 09:21:18 +0100 Subject: [PATCH] Add a fetch mechanism without cursors This adds a GUC to enable/ disable cursor mode in postgres_fdw. The GUC is called postgres_fdw.use_cursor. When it is set, everything works as it is now. However, there is a limitation to the current mechanism, it is unable to use parallel plans at local side because of the use of cursors. Now, if a user wants to overcome this, then one can unset the abovementioned GUC. In non-cursor mode cursors are not used and hence the parallel plans can be used at the local side. In non-cursor mode fetch_size is used to as is. A caveat with the non-cursor mode is that when simultaneous queries are fired at the local side, i.e. more than one cursor is opened at a time, then we use Tuplestore, so there might be some memory related performance degradation only in those cases. --- contrib/postgres_fdw/connection.c | 7 + contrib/postgres_fdw/option.c | 17 +++ contrib/postgres_fdw/postgres_fdw.c | 221 +++++++++++++++++++++------- contrib/postgres_fdw/postgres_fdw.h | 2 + 4 files changed, 193 insertions(+), 54 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 2326f391d34..95e30773a19 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -868,6 +868,13 @@ pgfdw_get_result(PGconn *conn) return libpqsrv_get_result_last(conn, pgfdw_we_get_result); } +PGresult * +pgfdw_get_next_result(PGconn *conn) +{ + return libpqsrv_get_result(conn, pgfdw_we_get_result); +} + + /* * Report an error we got from the remote server. * diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 232d85354b2..a5d7b747536 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -49,6 +49,7 @@ static PQconninfoOption *libpq_options; * GUC parameters */ char *pgfdw_application_name = NULL; +bool pgfdw_use_cursor = true; /* * Helper functions @@ -585,5 +586,21 @@ _PG_init(void) NULL, NULL); + /* + * If use_cursor is set to false, then the new way of fetching is used, which allows for the + * use of parallel plans at the local side. In the cursor mode, parallel plans could not be + * used. + */ + DefineCustomBoolVariable("postgres_fdw.use_cursor", + "If set uses the cursor, otherwise fetches without cursor", + NULL, + &pgfdw_use_cursor, + true, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); + MarkGUCPrefixReserved("postgres_fdw"); } diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index cf564341184..06407da60ef 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "executor/execAsync.h" +#include "executor/executor.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -542,6 +543,7 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_i); static int get_batch_size_option(Relation rel); +static bool only_query = true; /* Only to be used in the non cursor mode*/ /* * Foreign-data wrapper handler function: return a struct with pointers @@ -1544,6 +1546,11 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) /* Get private info created by planner functions. */ fsstate->query = strVal(list_nth(fsplan->fdw_private, FdwScanPrivateSelectSql)); + + /* We need to remember that there is already a query running. */ + if (fsstate->cursor_number >= 2) + only_query = false; + fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, FdwScanPrivateRetrievedAttrs); fsstate->fetch_size = intVal(list_nth(fsplan->fdw_private, @@ -3733,7 +3740,7 @@ create_cursor(ForeignScanState *node) const char **values = fsstate->param_values; PGconn *conn = fsstate->conn; StringInfoData buf; - PGresult *res; + PGresult *res = NULL; /* First, process a pending asynchronous request, if any. */ if (fsstate->conn_state->pendingAreq) @@ -3758,36 +3765,53 @@ create_cursor(ForeignScanState *node) MemoryContextSwitchTo(oldcontext); } - /* Construct the DECLARE CURSOR command */ initStringInfo(&buf); - appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", - fsstate->cursor_number, fsstate->query); - /* - * Notice that we pass NULL for paramTypes, thus forcing the remote server - * to infer types for all parameters. Since we explicitly cast every - * parameter (see deparse.c), the "inference" is trivial and will produce - * the desired result. This allows us to avoid assuming that the remote - * server has the same OIDs we do for the parameters' types. - */ - if (!PQsendQueryParams(conn, buf.data, numParams, - NULL, values, NULL, NULL, 0)) - pgfdw_report_error(ERROR, NULL, conn, false, buf.data); + if (pgfdw_use_cursor) + { + /* Construct the DECLARE CURSOR command */ + appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", + fsstate->cursor_number, fsstate->query); + + /* + * Notice that we pass NULL for paramTypes, thus forcing the remote server + * to infer types for all parameters. Since we explicitly cast every + * parameter (see deparse.c), the "inference" is trivial and will produce + * the desired result. This allows us to avoid assuming that the remote + * server has the same OIDs we do for the parameters' types. + */ + if (!PQsendQueryParams(conn, buf.data, numParams, + NULL, values, NULL, NULL, 0)) + pgfdw_report_error(ERROR, NULL, conn, false, buf.data); + + /* + * Get the result, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_get_result(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, true, fsstate->query); + } + else + { + /* Fetch without cursors */ + appendStringInfo(&buf, "%s", fsstate->query); + + if (!PQsendQueryParams(conn, buf.data, numParams, + NULL, values, NULL, NULL, 0)) + pgfdw_report_error(ERROR, NULL, conn, false, buf.data); + + /* Call for Chunked rows mode with same size of chunk as the fetch size */ + if (!PQsetChunkedRowsMode(conn, fsstate->fetch_size)) + pgfdw_report_error(ERROR, NULL, conn, false, buf.data); + } - /* - * Get the result, and check for success. - * - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_get_result(conn); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, conn, true, fsstate->query); PQclear(res); /* Mark the cursor as created, and show no tuples have been retrieved */ - fsstate->cursor_exists = true; - fsstate->tuples = NULL; + fsstate->cursor_exists = true; // We need this even for non-cursor mode. fsstate->num_tuples = 0; fsstate->next_tuple = 0; fsstate->fetch_ct_2 = 0; @@ -3806,6 +3830,7 @@ fetch_more_data(ForeignScanState *node) PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; + bool already_done = false; /* * We'll store the tuples in the batch_cxt. First, flush the previous @@ -3820,7 +3845,7 @@ fetch_more_data(ForeignScanState *node) { PGconn *conn = fsstate->conn; int numrows; - int i; + int i = 0; if (fsstate->async_capable) { @@ -3838,7 +3863,7 @@ fetch_more_data(ForeignScanState *node) /* Reset per-connection state */ fsstate->conn_state->pendingAreq = NULL; } - else + if (pgfdw_use_cursor) { char sql[64]; @@ -3851,32 +3876,113 @@ fetch_more_data(ForeignScanState *node) if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, conn, false, fsstate->query); } + else + { + /* Non-cursor mode uses PQSetChunkedRowsMode during create_cursor, so just get the result here. */ + res = pgfdw_get_next_result(conn); - /* Convert the data into HeapTuples */ - numrows = PQntuples(res); - fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); - fsstate->num_tuples = numrows; - fsstate->next_tuple = 0; + if (res == NULL) + break; - for (i = 0; i < numrows; i++) + else if (PQresultStatus(res) == PGRES_FATAL_ERROR) + pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query); + else if (PQresultStatus(res) == PGRES_TUPLES_CHUNK) + { + int total = 0; + if (!only_query) + { + /* + * When this is not the only query running, we extract all the tuples + * in one go and store them in tuplestore. + * Since it is using PQSetChunkedRowsMode, we get only the fsstate->fetch_size + * tuples in one run, so keep on executing till we get NULL in PGresult. + */ + Tuplestorestate *tuplestore = tuplestore_begin_heap(false, true, work_mem); + TupleTableSlot *slot = MakeSingleTupleTableSlot(fsstate->tupdesc, &TTSOpsMinimalTuple); + HeapTuple temp_tuple = (HeapTuple) palloc0(sizeof(HeapTuple)); + + i = 0; + for (;;) + { + CHECK_FOR_INTERRUPTS(); + numrows = PQntuples(res); + + /* Convert the data into HeapTuples */ + Assert(IsA(node->ss.ps.plan, ForeignScan)); + for (i = 0; i < numrows; i++) + { + temp_tuple = make_tuple_from_result_row(res, i, + fsstate->rel, + fsstate->attinmeta, + fsstate->retrieved_attrs, + node, + fsstate->temp_cxt); + tuplestore_puttuple(tuplestore, temp_tuple); + total++; + } + res = pgfdw_get_next_result(conn); + + if (res == NULL) + break; + + else if (PQresultStatus(res) == PGRES_TUPLES_OK) + { + while (res!= NULL) + res = pgfdw_get_next_result(conn); + break; + } + else if (PQresultStatus(res) == PGRES_FATAL_ERROR) + pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query); + } + if (total > 0) + { + already_done = true; + numrows = total; + fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); + fsstate->num_tuples = numrows; + fsstate->next_tuple = 0; + for (i = 0; i < numrows; i++) + { + while (tuplestore_gettupleslot(tuplestore, true, true, slot)) + fsstate->tuples[i++] = ExecFetchSlotHeapTuple(slot, true, NULL); + } + } + /* EOF is reached because when we are storing all tuples to the tuplestore. */ + fsstate->eof_reached = true; + pfree(temp_tuple); + ExecDropSingleTupleTableSlot(slot); + tuplestore_end(tuplestore); + } + } + } + if (!already_done) { - Assert(IsA(node->ss.ps.plan, ForeignScan)); - - fsstate->tuples[i] = - make_tuple_from_result_row(res, i, - fsstate->rel, - fsstate->attinmeta, - fsstate->retrieved_attrs, - node, - fsstate->temp_cxt); + /* Convert the data into HeapTuples */ + numrows = PQntuples(res); + fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); + fsstate->num_tuples = numrows; + fsstate->next_tuple = 0; + + for (i = 0; i < numrows; i++) + { + Assert(IsA(node->ss.ps.plan, ForeignScan)); + + fsstate->tuples[i] = + make_tuple_from_result_row(res, i, + fsstate->rel, + fsstate->attinmeta, + fsstate->retrieved_attrs, + node, + fsstate->temp_cxt); + } + + /* Must be EOF if we didn't get as many tuples as we asked for. */ + fsstate->eof_reached = (numrows < fsstate->fetch_size); } /* Update fetch_ct_2 */ if (fsstate->fetch_ct_2 < 2) fsstate->fetch_ct_2++; - - /* Must be EOF if we didn't get as many tuples as we asked for. */ - fsstate->eof_reached = (numrows < fsstate->fetch_size); } PG_FINALLY(); { @@ -3955,16 +4061,23 @@ close_cursor(PGconn *conn, unsigned int cursor_number, char sql[64]; PGresult *res; - snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number); + if (pgfdw_use_cursor) + { + snprintf(sql, sizeof(sql), "CLOSE c%u", cursor_number); - /* - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_exec_query(conn, sql, conn_state); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, conn, true, sql); - PQclear(res); + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(conn, sql, conn_state); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, conn, true, sql); + PQclear(res); + } + else + { + while (pgfdw_get_result(conn) != NULL) {} + } } /* diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 9e501660d18..8c177ec9946 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -164,6 +164,7 @@ extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); extern void do_sql_command(PGconn *conn, const char *sql); extern PGresult *pgfdw_get_result(PGconn *conn); +extern PGresult *pgfdw_get_next_result(PGconn *conn); extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, @@ -177,6 +178,7 @@ extern List *ExtractExtensionList(const char *extensionsString, bool warnOnMissing); extern char *process_pgfdw_appname(const char *appname); extern char *pgfdw_application_name; +extern bool pgfdw_use_cursor; /* in deparse.c */ extern void classifyConditions(PlannerInfo *root, -- 2.39.5 (Apple Git-154)