From 395bd8eb74de4b9a96044f56e77174fea0afd071 Mon Sep 17 00:00:00 2001
From: Laurenz Albe <laurenz.albe@cybertec.at>
Date: Wed, 13 Nov 2024 15:08:17 +0100
Subject: [PATCH 16/21] allow parallel execution queries with session variables

---
 doc/src/sgml/parallel.sgml                    |   6 -
 src/backend/commands/session_variable.c       |  23 +++
 src/backend/executor/execMain.c               |  20 ++-
 src/backend/executor/execParallel.c           | 147 +++++++++++++++++-
 src/backend/optimizer/util/clauses.c          |  18 +--
 src/backend/tcop/pquery.c                     |   3 +
 src/include/commands/session_variable.h       |   1 +
 src/include/executor/execdesc.h               |   4 +
 src/include/nodes/execnodes.h                 |   1 +
 .../regress/expected/session_variables.out    |  12 +-
 10 files changed, 206 insertions(+), 29 deletions(-)

diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml
index 683dede6adc..1ce9abf86f5 100644
--- a/doc/src/sgml/parallel.sgml
+++ b/doc/src/sgml/parallel.sgml
@@ -515,12 +515,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
         Plan nodes that reference a correlated <literal>SubPlan</literal>.
       </para>
     </listitem>
-
-    <listitem>
-      <para>
-        Plan nodes that use a session variable.
-      </para>
-    </listitem>
   </itemizedlist>
 
  <sect2 id="parallel-labeling">
diff --git a/src/backend/commands/session_variable.c b/src/backend/commands/session_variable.c
index 95dd7445c15..bb6abb792be 100644
--- a/src/backend/commands/session_variable.c
+++ b/src/backend/commands/session_variable.c
@@ -991,6 +991,29 @@ GetSessionVariable(Oid varid, bool *isNull)
 	return copy_session_variable_value(svar, isNull);
 }
 
+/*
+ * Returns a copy of the value of the session variable (in the current memory
+ * context) plus typid of the session variable. The caller is responsible for
+ * permission checks.
+ */
+Datum
+GetSessionVariableWithTypeid(Oid varid, bool *isNull, Oid *typid)
+{
+	SVariable	svar;
+
+	svar = get_session_variable(varid);
+
+	*typid = svar->typid;
+
+	/*
+	 * Although "svar" is freshly validated in this point, svar->is_valid can
+	 * be false, if an invalidation message was processed during the domain check.
+	 * But the variable and all its dependencies are locked now, so we don't need
+	 * to repeat the validation.
+	 */
+	return copy_session_variable_value(svar, isNull);
+}
+
 /*
  * Assign the result of the evaluated expression to the session variable
  */
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 67e0b9ba2ac..1feba422bfb 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -220,7 +220,19 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
 	 *    be changed inside query execution time, and then a reference to
 	 *    previously returned value can be corrupted).
 	 */
-	if (queryDesc->plannedstmt->sessionVariables)
+	if (queryDesc->num_session_variables > 0)
+	{
+		/*
+		 * When a parallel query needs to access query parameters (including
+		 * related session variables), then related session variables are
+		 * restored (deserialized) in queryDesc already. So just push pointer
+		 * of this array to executor's estate.
+		 */
+		Assert(IsParallelWorker());
+		estate->es_session_variables = queryDesc->session_variables;
+		estate->es_num_session_variables = queryDesc->num_session_variables;
+	}
+	else if (queryDesc->plannedstmt->sessionVariables)
 	{
 		int			nSessionVariables;
 		int			i = 0;
@@ -259,9 +271,9 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
 								   get_session_variable_name(varid));
 			}
 
-			estate->es_session_variables[i].value =
-				GetSessionVariable(varid,
-								   &estate->es_session_variables[i].isnull);
+			estate->es_session_variables[i].value = GetSessionVariableWithTypeid(varid,
+												   &estate->es_session_variables[i].isnull,
+												   &estate->es_session_variables[i].typid);
 
 			i++;
 		}
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 1bedb808368..aee66beac91 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -12,8 +12,9 @@
  * workers and ensuring that their state generally matches that of the
  * leader; see src/backend/access/transam/README.parallel for details.
  * However, we must save and restore relevant executor state, such as
- * any ParamListInfo associated with the query, buffer/WAL usage info, and
- * the actual plan to be passed down to the worker.
+ * any ParamListInfo associated with the query, buffer/WAL usage info,
+ * session variables buffer, and the actual plan to be passed down to
+ * the worker.
  *
  * IDENTIFICATION
  *	  src/backend/executor/execParallel.c
@@ -64,6 +65,7 @@
 #define PARALLEL_KEY_QUERY_TEXT		UINT64CONST(0xE000000000000008)
 #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
 #define PARALLEL_KEY_WAL_USAGE			UINT64CONST(0xE00000000000000A)
+#define PARALLEL_KEY_SESSION_VARIABLES	UINT64CONST(0xE00000000000000B)
 
 #define PARALLEL_TUPLE_QUEUE_SIZE		65536
 
@@ -138,6 +140,12 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
 /* Helper function that runs in the parallel worker. */
 static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc);
 
+/* Helper functions that can pass values of session variables */
+static Size EstimateSessionVariables(EState *estate);
+static void SerializeSessionVariables(EState *estate, char **start_address);
+static SessionVariableValue *RestoreSessionVariables(char **start_address,
+													 int *num_session_variables);
+
 /*
  * Create a serialized representation of the plan to be sent to each worker.
  */
@@ -598,6 +606,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	char	   *pstmt_data;
 	char	   *pstmt_space;
 	char	   *paramlistinfo_space;
+	char	   *session_variables_space;
 	BufferUsage *bufusage_space;
 	WalUsage   *walusage_space;
 	SharedExecutorInstrumentation *instrumentation = NULL;
@@ -607,6 +616,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	int			instrumentation_len = 0;
 	int			jit_instrumentation_len = 0;
 	int			instrument_offset = 0;
+	int			session_variables_len = 0;
 	Size		dsa_minsize = dsa_minimum_size();
 	char	   *query_string;
 	int			query_len;
@@ -662,6 +672,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len);
 	shm_toc_estimate_keys(&pcxt->estimator, 1);
 
+	/* Estimate space for serialized session variables. */
+	session_variables_len = EstimateSessionVariables(estate);
+	shm_toc_estimate_chunk(&pcxt->estimator, session_variables_len);
+	shm_toc_estimate_keys(&pcxt->estimator, 1);
+
 	/*
 	 * Estimate space for BufferUsage.
 	 *
@@ -763,6 +778,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space);
 	SerializeParamList(estate->es_param_list_info, &paramlistinfo_space);
 
+	/* Store serialized session variables. */
+	session_variables_space = shm_toc_allocate(pcxt->toc, session_variables_len);
+	shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_VARIABLES, session_variables_space);
+	SerializeSessionVariables(estate, &session_variables_space);
+
 	/* Allocate space for each worker's BufferUsage; no need to initialize. */
 	bufusage_space = shm_toc_allocate(pcxt->toc,
 									  mul_size(sizeof(BufferUsage), pcxt->nworkers));
@@ -1420,6 +1440,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	SharedJitInstrumentation *jit_instrumentation;
 	int			instrument_options = 0;
 	void	   *area_space;
+	char	   *sessionvariable_space;
 	dsa_area   *area;
 	ParallelWorkerContext pwcxt;
 
@@ -1445,6 +1466,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false);
 	area = dsa_attach_in_place(area_space, seg);
 
+	/* Reconstruct session variables. */
+	sessionvariable_space = shm_toc_lookup(toc,
+										   PARALLEL_KEY_SESSION_VARIABLES,
+										   false);
+	queryDesc->session_variables =
+		RestoreSessionVariables(&sessionvariable_space,
+								&queryDesc->num_session_variables);
+
 	/* Start up the executor */
 	queryDesc->plannedstmt->jitFlags = fpes->jit_flags;
 	if (!ExecutorStart(queryDesc, fpes->eflags))
@@ -1513,3 +1542,117 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
 	FreeQueryDesc(queryDesc);
 	receiver->rDestroy(receiver);
 }
+
+/*
+ * Estimate the amount of space required to serialize a session variable.
+ */
+static Size
+EstimateSessionVariables(EState *estate)
+{
+	int			i;
+	Size		sz = sizeof(int);
+
+	if (estate->es_session_variables == NULL)
+		return sz;
+
+	for (i = 0; i < estate->es_num_session_variables; i++)
+	{
+		SessionVariableValue *svarval;
+		Oid			typeOid;
+		int16		typLen;
+		bool		typByVal;
+
+		svarval = &estate->es_session_variables[i];
+
+		typeOid = svarval->typid;
+
+		sz = add_size(sz, sizeof(Oid)); /* space for type OID */
+
+		/* space for datum/isnull */
+		Assert(OidIsValid(typeOid));
+		get_typlenbyval(typeOid, &typLen, &typByVal);
+
+		sz = add_size(sz,
+					  datumEstimateSpace(svarval->value, svarval->isnull, typByVal, typLen));
+	}
+
+	return sz;
+}
+
+/*
+ * Serialize a session variables buffer into caller-provided storage.
+ *
+ * We write the number of parameters first, as a 4-byte integer, and then
+ * write details for each parameter in turn.  The details for each parameter
+ * consist of a 4-byte type OID, and then the datum as serialized by
+ * datumSerialize().  The caller is responsible for ensuring that there is
+ * enough storage to store the number of bytes that will be written; use
+ * EstimateSessionVariables to find out how many will be needed.
+ * *start_address is updated to point to the byte immediately following those
+ * written.
+ *
+ * RestoreSessionVariables can be used to recreate a session variable buffer
+ * based on the serialized representation;
+ */
+static void
+SerializeSessionVariables(EState *estate, char **start_address)
+{
+	int			nparams;
+	int			i;
+
+	/* Write number of parameters. */
+	nparams = estate->es_num_session_variables;
+	memcpy(*start_address, &nparams, sizeof(int));
+	*start_address += sizeof(int);
+
+	/* Write each parameter in turn. */
+	for (i = 0; i < nparams; i++)
+	{
+		SessionVariableValue *svarval;
+		Oid			typeOid;
+		int16		typLen;
+		bool		typByVal;
+
+		svarval = &estate->es_session_variables[i];
+		typeOid = svarval->typid;
+
+		/* Write type OID. */
+		memcpy(*start_address, &typeOid, sizeof(Oid));
+		*start_address += sizeof(Oid);
+
+		Assert(OidIsValid(typeOid));
+		get_typlenbyval(typeOid, &typLen, &typByVal);
+
+		datumSerialize(svarval->value, svarval->isnull, typByVal, typLen,
+					   start_address);
+	}
+}
+
+static SessionVariableValue *
+RestoreSessionVariables(char **start_address, int *num_session_variables)
+{
+	SessionVariableValue *session_variables;
+	int			i;
+	int			nparams;
+
+	memcpy(&nparams, *start_address, sizeof(int));
+	*start_address += sizeof(int);
+
+	*num_session_variables = nparams;
+	session_variables = (SessionVariableValue *)
+		palloc(nparams * sizeof(SessionVariableValue));
+
+	for (i = 0; i < nparams; i++)
+	{
+		SessionVariableValue *svarval = &session_variables[i];
+
+		/* Read type OID. */
+		memcpy(&svarval->typid, *start_address, sizeof(Oid));
+		*start_address += sizeof(Oid);
+
+		/* Read datum/isnull. */
+		svarval->value = datumRestore(start_address, &svarval->isnull);
+	}
+
+	return session_variables;
+}
diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c
index f60828f862e..175390c6c28 100644
--- a/src/backend/optimizer/util/clauses.c
+++ b/src/backend/optimizer/util/clauses.c
@@ -923,25 +923,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
 
 	/*
 	 * We can't pass Params to workers at the moment either, so they are also
-	 * parallel-restricted, unless they are PARAM_EXTERN Params or are
-	 * PARAM_EXEC Params listed in safe_param_ids, meaning they could be
-	 * either generated within workers or can be computed by the leader and
-	 * then their value can be passed to workers.
+	 * parallel-restricted, unless they are PARAM_EXTERN or PARAM_VARIABLE
+	 * Params or are PARAM_EXEC Params listed in safe_param_ids, meaning they
+	 * could be either generated within workers or can be computed by the
+	 * leader and then their value can be passed to workers.
 	 */
 	else if (IsA(node, Param))
 	{
 		Param	   *param = (Param *) node;
 
-		if (param->paramkind == PARAM_EXTERN)
+		if (param->paramkind == PARAM_EXTERN ||
+			param->paramkind == PARAM_VARIABLE)
 			return false;
 
-		/* we don't support passing session variables to workers */
-		if (param->paramkind == PARAM_VARIABLE)
-		{
-			if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
-				return true;
-		}
-
 		if (param->paramkind != PARAM_EXEC ||
 			!list_member_int(context->safe_param_ids, param->paramid))
 		{
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index dea24453a6c..dce885190c2 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -92,6 +92,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt,
 	qd->queryEnv = queryEnv;
 	qd->instrument_options = instrument_options;	/* instrumentation wanted? */
 
+	qd->num_session_variables = 0;
+	qd->session_variables = NULL;
+
 	/* null these fields until set by ExecutorStart */
 	qd->tupDesc = NULL;
 	qd->estate = NULL;
diff --git a/src/include/commands/session_variable.h b/src/include/commands/session_variable.h
index 4492bee6d69..f66b6c0b864 100644
--- a/src/include/commands/session_variable.h
+++ b/src/include/commands/session_variable.h
@@ -29,6 +29,7 @@ extern void AtEOSubXact_SessionVariables(bool isCommit, SubTransactionId mySubid
 
 extern void SetSessionVariable(Oid varid, Datum value, bool isNull);
 extern Datum GetSessionVariable(Oid varid, bool *isNull);
+extern Datum GetSessionVariableWithTypeid(Oid varid, bool *isNull, Oid *typid);
 
 extern void ExecuteLetStmt(ParseState *pstate, LetStmt *stmt, ParamListInfo params,
 						   QueryEnvironment *queryEnv, QueryCompletion *qc);
diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h
index ba53305ad42..41eeb1c83c9 100644
--- a/src/include/executor/execdesc.h
+++ b/src/include/executor/execdesc.h
@@ -52,6 +52,10 @@ typedef struct QueryDesc
 	/* This field is set by ExecutePlan */
 	bool		already_executed;	/* true if previously executed */
 
+	/* reference to session variables buffer */
+	int			num_session_variables;
+	SessionVariableValue *session_variables;
+
 	/* This is always set NULL by the core system, but plugins can change it */
 	struct Instrumentation *totaltime;	/* total time spent in ExecutorRun */
 } QueryDesc;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 1b8ad640922..5632f77aee7 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -642,6 +642,7 @@ typedef struct AsyncRequest
  */
 typedef struct SessionVariableValue
 {
+	Oid			typid;
 	bool		isnull;
 	Datum		value;
 } SessionVariableValue;
diff --git a/src/test/regress/expected/session_variables.out b/src/test/regress/expected/session_variables.out
index 0ed2d5fbb01..d0658116683 100644
--- a/src/test/regress/expected/session_variables.out
+++ b/src/test/regress/expected/session_variables.out
@@ -1284,12 +1284,14 @@ SELECT count(*) FROM svar_test WHERE a%10 = zero;
 
 -- parallel execution is not supported yet
 EXPLAIN (COSTS OFF) SELECT count(*) FROM svar_test WHERE a%10 = zero;
-            QUERY PLAN             
------------------------------------
+                 QUERY PLAN                 
+--------------------------------------------
  Aggregate
-   ->  Seq Scan on svar_test
-         Filter: ((a % 10) = zero)
-(3 rows)
+   ->  Gather
+         Workers Planned: 2
+         ->  Parallel Seq Scan on svar_test
+               Filter: ((a % 10) = zero)
+(5 rows)
 
 LET zero = (SELECT count(*) FROM svar_test);
 -- result should be 1000
-- 
2.48.1

