public inbox for [email protected]  
help / color / mirror / Atom feed
From: Jeff Davis <[email protected]>
To: Ashutosh Bapat <[email protected]>
Cc: Bharath Rupireddy <[email protected]>
Cc: Joe Conway <[email protected]>
Cc: [email protected]
Subject: Re: [17] CREATE SUBSCRIPTION ... SERVER
Date: Tue, 23 Jan 2024 17:45:07 -0800
Message-ID: <[email protected]> (raw)
In-Reply-To: <CAExHW5vDPBB_PShBmSj=v=Jf1K=vSmWdPZTzCJsLpqDgqek+sA@mail.gmail.com>
References: <[email protected]>
	<CAExHW5vv-78ixJs3arsuZ+12A4AyqehJnDPhejp0m4FAKG6JZA@mail.gmail.com>
	<[email protected]>
	<CAExHW5sAo4Gv3AwuxQNGSQ2Z45B8_MZEb-SG-B8D-hLMdrquAg@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<CAExHW5unvpDv6yMSmqurHP7Du1PqoJFWVxeK-4YNm5EnoNJiSQ@mail.gmail.com>
	<[email protected]>
	<CAExHW5sQVJeCgCGMkum_k4Z4VPaK5GkdAjHW-d+F3MngBZ9qBA@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<[email protected]>
	<CALj2ACXDua2Az15Kj3OZFaRm49G8-faemiEEv8t9GNCcsxv8Hw@mail.gmail.com>
	<[email protected]>
	<CAExHW5v5poohyfNRVQp3yjVE56ej0dWvVMbysSSVtg21aHT3uQ@mail.gmail.com>
	<[email protected]>
	<CAExHW5vF+cp9efQj=-W+Rhbr9f9ZZqt3XFnP0NB5jKsm=C=5tw@mail.gmail.com>
	<[email protected]>
	<[email protected]>
	<CAExHW5uCzS-VeSYQHTHxFSdQik-f_O892xmzrzm2fuO+ro+otA@mail.gmail.com>
	<[email protected]>
	<CAExHW5vDPBB_PShBmSj=v=Jf1K=vSmWdPZTzCJsLpqDgqek+sA@mail.gmail.com>

On Tue, 2024-01-23 at 15:21 +0530, Ashutosh Bapat wrote:
> I am with the prefix. The changes it causes make review difficult. If
> you can separate those changes into a patch that will help.

I ended up just removing the dummy FDW. Real users are likely to want
to use postgres_fdw, and if not, it's easy enough to issue a CREATE
FOREIGN DATA WRAPPER. Or I can bring it back if desired.

Updated patch set (patches are renumbered):

  * removed dummy FDW and test churn
  * made a new pg_connection_validator function which leaves
postgresql_fdw_validator in place. (I didn't document the new function
-- should I?)
  * included your tests improvements
  * removed dependency from the subscription to the user mapping -- we
don't depend on the user mapping for foreign tables, so we shouldn't
depend on them here. Of course a change to a user mapping still
invalidates the subscription worker and it will restart.
  * general cleanup

Overall it's simpler and hopefully easier to review. The patch to
introduce the pg_create_connection role could use some more discussion,
but I believe 0001 and 0002 are nearly ready.

Regards,
	Jeff Davis



Attachments:

  [text/x-patch] v9-0001-Add-SQL-function-pg_conninfo_from_server.patch (26.2K, 2-v9-0001-Add-SQL-function-pg_conninfo_from_server.patch)
  download | inline diff:
From ba021281fe7910fa197888b299281acbfda30c36 Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Tue, 23 Jan 2024 11:11:21 -0800
Subject: [PATCH v9 1/3] Add SQL function pg_conninfo_from_server().

Retrieves valid Postgres connection string from a foreign server. Any
foreign server may be used, though it's expected to provide valid
libpq connection options. Invalid or unrecognized options will be
ignored.

Extends walreceiver API to return available libpq options.

In preparation for CREATE SUBSCRIPTION ... SERVER.

Discussion: https://postgr.es/m/[email protected]
---
 .../postgres_fdw/expected/postgres_fdw.out    |  14 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   6 +
 doc/src/sgml/func.sgml                        |  19 ++
 src/backend/foreign/foreign.c                 | 255 +++++++++++++++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  48 ++++
 src/include/catalog/pg_proc.dat               |   8 +
 src/include/foreign/foreign.h                 |   2 +
 src/include/replication/walreceiver.h         |  20 ++
 src/test/regress/expected/foreign_data.out    |  46 ++++
 src/test/regress/sql/foreign_data.sql         |  40 +++
 10 files changed, 449 insertions(+), 9 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index b5a38aeb21..8a7a15cc51 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -24,6 +24,13 @@ CREATE USER MAPPING FOR public SERVER testserver1
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
 CREATE USER MAPPING FOR public SERVER loopback3;
+-- test pg_conninfo_from_server()
+SELECT pg_conninfo_from_server('testserver1', CURRENT_USER, false);
+      pg_conninfo_from_server      
+-----------------------------------
+ user = 'value' password = 'value'
+(1 row)
+
 -- ===================================================================
 -- create objects used through FDW loopback server
 -- ===================================================================
@@ -196,6 +203,13 @@ ALTER USER MAPPING FOR public SERVER testserver1
 -- permitted to check validation.
 ALTER USER MAPPING FOR public SERVER testserver1
 	OPTIONS (ADD sslkey 'value', ADD sslcert 'value');
+-- check pg_conninfo_from_server() after ALTERs
+SELECT pg_conninfo_from_server('testserver1', CURRENT_USER, false);
+                                                                                                                                                                                                                                   pg_conninfo_from_server                                                                                                                                                                                                                                    
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ service = 'value' connect_timeout = 'value' dbname = 'value' host = 'value' hostaddr = 'value' port = 'value' application_name = 'value' keepalives = 'value' keepalives_idle = 'value' keepalives_interval = 'value' tcp_user_timeout = 'value' sslcompression = 'value' sslmode = 'value' sslcert = 'value' sslkey = 'value' sslrootcert = 'value' sslcrl = 'value' krbsrvname = 'value' gsslib = 'value' gssdelegation = 'value' sslpassword = 'dummy' sslkey = 'value' sslcert = 'value'
+(1 row)
+
 ALTER FOREIGN TABLE ft1 OPTIONS (schema_name 'S 1', table_name 'T 1');
 ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1');
 ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index f410c3db4e..0d8478120d 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -28,6 +28,9 @@ CREATE USER MAPPING FOR CURRENT_USER SERVER loopback;
 CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2;
 CREATE USER MAPPING FOR public SERVER loopback3;
 
+-- test pg_conninfo_from_server()
+SELECT pg_conninfo_from_server('testserver1', CURRENT_USER, false);
+
 -- ===================================================================
 -- create objects used through FDW loopback server
 -- ===================================================================
@@ -213,6 +216,9 @@ ALTER USER MAPPING FOR public SERVER testserver1
 ALTER USER MAPPING FOR public SERVER testserver1
 	OPTIONS (ADD sslkey 'value', ADD sslcert 'value');
 
+-- check pg_conninfo_from_server() after ALTERs
+SELECT pg_conninfo_from_server('testserver1', CURRENT_USER, false);
+
 ALTER FOREIGN TABLE ft1 OPTIONS (schema_name 'S 1', table_name 'T 1');
 ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1');
 ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 210c7c0b02..79e1792eae 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -27985,6 +27985,25 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
        </para></entry>
       </row>
 
+      <row>
+       <entry role="func_table_entry"><para role="func_signature">
+        <indexterm>
+         <primary>pg_conninfo_from_server</primary>
+        </indexterm>
+        <function>pg_conninfo_from_server</function> ( <parameter>servername</parameter> <type>text</type>, <parameter>username</parameter> <type>text</type>, <parameter>append_overrides</parameter> <type>boolean</type> )
+        <returnvalue>text</returnvalue>
+       </para>
+       <para>
+        Returns connection string generated from the foreign server and user
+        mapping associated with the given
+        <replaceable>servername</replaceable> and
+        <replaceable>username</replaceable>. If
+        <replaceable>append_overrides</replaceable> is
+        <literal>true</literal>, it appends override parameters necessary for
+        making outbound connections.
+       </para></entry>
+      </row>
+
       <row>
        <entry id="pg-logical-emit-message" role="func_table_entry"><para role="func_signature">
         <indexterm>
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index 02e1898131..b4635d6eba 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -18,11 +18,15 @@
 #include "catalog/pg_foreign_server.h"
 #include "catalog/pg_foreign_table.h"
 #include "catalog/pg_user_mapping.h"
+#include "commands/defrem.h"
 #include "foreign/fdwapi.h"
 #include "foreign/foreign.h"
 #include "funcapi.h"
 #include "lib/stringinfo.h"
+#include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "replication/walreceiver.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -190,6 +194,146 @@ GetForeignServerByName(const char *srvname, bool missing_ok)
 }
 
 
+/*
+ * Values in connection strings must be enclosed in single quotes. Single
+ * quotes and backslashes must be escaped with backslash. NB: these rules are
+ * different from the rules for escaping a SQL literal.
+ */
+static void
+appendEscapedValue(StringInfo str, const char *val)
+{
+	appendStringInfoChar(str, '\'');
+	for (int i = 0; val[i] != '\0'; i++)
+	{
+		if (val[i] == '\\' || val[i] == '\'')
+			appendStringInfoChar(str, '\\');
+		appendStringInfoChar(str, val[i]);
+	}
+	appendStringInfoChar(str, '\'');
+}
+
+
+/*
+ * Check if the provided option is one of libpq conninfo options.
+ * context is the Oid of the catalog the option came from, or 0 if we
+ * don't care.
+ */
+static bool
+is_libpq_conninfo_option(const char *option, Oid context)
+{
+	const ConnectionOption *opt;
+
+	/* skip options that must be overridden */
+	if (strcmp(option, "client_encoding") == 0)
+		return false;
+
+	for (opt = walrcv_conninfo_options(); opt->optname; opt++)
+	{
+		if (strcmp(opt->optname, option) == 0)
+		{
+			if (opt->isdebug)
+				return false;
+
+			if (opt->issecret || strcmp(opt->optname, "user") == 0)
+				return (context == UserMappingRelationId);
+
+			return (context == ForeignServerRelationId);
+		}
+	}
+	return false;
+}
+
+
+/*
+ * Helper for ForeignServerConnectionString().
+ *
+ * Transform a List of DefElem into a connection string.
+ */
+static char *
+options_to_conninfo(List *options, bool append_overrides)
+{
+	StringInfoData	 str;
+	ListCell		*lc;
+	char			*sep = "";
+
+	initStringInfo(&str);
+	foreach(lc, options)
+	{
+		DefElem *d = (DefElem *) lfirst(lc);
+		char *name = d->defname;
+		char *value;
+
+		/* ignore unknown options */
+		if (!is_libpq_conninfo_option(name, ForeignServerRelationId) &&
+			!is_libpq_conninfo_option(name, UserMappingRelationId))
+			continue;
+
+		value = defGetString(d);
+
+		appendStringInfo(&str, "%s%s = ", sep, name);
+		appendEscapedValue(&str, value);
+		sep = " ";
+	}
+
+	/* override client_encoding */
+	if (append_overrides)
+	{
+		appendStringInfo(&str, "%sclient_encoding = ", sep);
+		appendEscapedValue(&str, GetDatabaseEncodingName());
+		sep = " ";
+	}
+
+	return str.data;
+}
+
+
+/*
+ * Given a user ID and server ID, return a postgres connection string suitable
+ * to pass to libpq.
+ */
+char *
+ForeignServerConnectionString(Oid userid, Oid serverid, bool append_overrides)
+{
+	static MemoryContext	 tmpcontext = NULL;
+	ForeignServer			*server;
+	UserMapping				*um;
+	List					*options;
+	char					*conninfo;
+	MemoryContext			 oldcontext;
+
+	/* Load the library providing us libpq calls. */
+	load_file("libpqwalreceiver", false);
+
+	/*
+	 * Use a temporary context rather than trying to track individual
+	 * allocations in GetForeignServer() and GetUserMapping().
+	 */
+	if (tmpcontext == NULL)
+		tmpcontext = AllocSetContextCreate(TopMemoryContext,
+										   "temp context for building connection string",
+										   ALLOCSET_DEFAULT_SIZES);
+
+	oldcontext = MemoryContextSwitchTo(tmpcontext);
+
+	server = GetForeignServer(serverid);
+	um = GetUserMapping(userid, serverid);
+
+	/* user mapping options override server options */
+	options = list_concat(server->options, um->options);
+
+	conninfo = options_to_conninfo(options, append_overrides);
+
+	MemoryContextSwitchTo(oldcontext);
+
+	/* copy only conninfo into the current context */
+	conninfo = pstrdup(conninfo);
+
+	MemoryContextReset(tmpcontext);
+
+	return conninfo;
+}
+
+
 /*
  * GetUserMapping - look up the user mapping.
  *
@@ -549,10 +693,103 @@ pg_options_to_table(PG_FUNCTION_ARGS)
 }
 
 
+/*
+ * pg_conninfo_from_server
+ *
+ * Extract connection string from the given foreign server.
+ */
+Datum
+pg_conninfo_from_server(PG_FUNCTION_ARGS)
+{
+	char *server_name = text_to_cstring(PG_GETARG_TEXT_P(0));
+	char *user_name = text_to_cstring(PG_GETARG_TEXT_P(1));
+	bool  append_overrides = PG_GETARG_BOOL(2);
+	Oid serverid = get_foreign_server_oid(server_name, false);
+	Oid userid = get_role_oid_or_public(user_name);
+	AclResult aclresult;
+	char *conninfo;
+
+	/* if the specified userid is not PUBLIC, check SET ROLE privileges */
+	if (userid != ACL_ID_PUBLIC)
+		check_can_set_role(GetUserId(), userid);
+
+	/* ACL check on foreign server */
+	aclresult = object_aclcheck(ForeignServerRelationId, serverid,
+								GetUserId(), ACL_USAGE);
+	if (aclresult != ACLCHECK_OK)
+		aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server_name);
+
+	conninfo = ForeignServerConnectionString(userid, serverid,
+											 append_overrides);
+
+	PG_RETURN_TEXT_P(cstring_to_text(conninfo));
+}
+
+
+/*
+ * Validate the generic option given to SERVER or USER MAPPING.
+ * Raise an ERROR if the option or its value is considered invalid.
+ *
+ * Valid server options are all libpq conninfo options except
+ * user and password -- these may only appear in USER MAPPING options.
+ */
+Datum
+pg_connection_validator(PG_FUNCTION_ARGS)
+{
+	List	   *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
+	Oid			catalog = PG_GETARG_OID(1);
+
+	ListCell   *cell;
+
+	/* Load the library providing us libpq calls. */
+	load_file("libpqwalreceiver", false);
+
+	foreach(cell, options_list)
+	{
+		DefElem    *def = lfirst(cell);
+
+		if (!is_libpq_conninfo_option(def->defname, catalog))
+		{
+			const ConnectionOption *opt;
+			const char *closest_match;
+			ClosestMatchState match_state;
+			bool		has_valid_options = false;
+
+			/*
+			 * Unknown option specified, complain about it. Provide a hint
+			 * with a valid option that looks similar, if there is one.
+			 */
+			initClosestMatch(&match_state, def->defname, 4);
+			for (opt = walrcv_conninfo_options(); opt->optname; opt++)
+			{
+				if (is_libpq_conninfo_option(opt->optname, catalog))
+				{
+					has_valid_options = true;
+					updateClosestMatch(&match_state, opt->optname);
+				}
+			}
+
+			closest_match = getClosestMatch(&match_state);
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("invalid option \"%s\"", def->defname),
+					 has_valid_options ? closest_match ?
+					 errhint("Perhaps you meant the option \"%s\".",
+							 closest_match) : 0 :
+					 errhint("There are no valid options in this context.")));
+
+			PG_RETURN_BOOL(false);
+		}
+	}
+
+	PG_RETURN_BOOL(true);
+}
+
+
 /*
  * Describes the valid options for postgresql FDW, server, and user mapping.
  */
-struct ConnectionOption
+struct TestConnectionOption
 {
 	const char *optname;
 	Oid			optcontext;		/* Oid of catalog in which option may appear */
@@ -563,7 +800,7 @@ struct ConnectionOption
  *
  * The list is small - don't bother with bsearch if it stays so.
  */
-static const struct ConnectionOption libpq_conninfo_options[] = {
+static const struct TestConnectionOption test_conninfo_options[] = {
 	{"authtype", ForeignServerRelationId},
 	{"service", ForeignServerRelationId},
 	{"user", UserMappingRelationId},
@@ -584,16 +821,16 @@ static const struct ConnectionOption libpq_conninfo_options[] = {
 
 
 /*
- * Check if the provided option is one of libpq conninfo options.
+ * Check if the provided option is one of the test conninfo options.
  * context is the Oid of the catalog the option came from, or 0 if we
  * don't care.
  */
 static bool
-is_conninfo_option(const char *option, Oid context)
+is_test_conninfo_option(const char *option, Oid context)
 {
-	const struct ConnectionOption *opt;
+	const struct TestConnectionOption *opt;
 
-	for (opt = libpq_conninfo_options; opt->optname; opt++)
+	for (opt = test_conninfo_options; opt->optname; opt++)
 		if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
 			return true;
 	return false;
@@ -624,9 +861,9 @@ postgresql_fdw_validator(PG_FUNCTION_ARGS)
 	{
 		DefElem    *def = lfirst(cell);
 
-		if (!is_conninfo_option(def->defname, catalog))
+		if (!is_test_conninfo_option(def->defname, catalog))
 		{
-			const struct ConnectionOption *opt;
+			const struct TestConnectionOption *opt;
 			const char *closest_match;
 			ClosestMatchState match_state;
 			bool		has_valid_options = false;
@@ -636,7 +873,7 @@ postgresql_fdw_validator(PG_FUNCTION_ARGS)
 			 * with a valid option that looks similar, if there is one.
 			 */
 			initClosestMatch(&match_state, def->defname, 4);
-			for (opt = libpq_conninfo_options; opt->optname; opt++)
+			for (opt = test_conninfo_options; opt->optname; opt++)
 			{
 				if (catalog == opt->optcontext)
 				{
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 77669074e8..a1845e6dfa 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -52,6 +52,7 @@ static WalReceiverConn *libpqrcv_connect(const char *conninfo,
 										 const char *appname, char **err);
 static void libpqrcv_check_conninfo(const char *conninfo,
 									bool must_use_password);
+static const ConnectionOption *libpqrcv_conninfo_options(void);
 static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
 static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
 									char **sender_host, int *sender_port);
@@ -85,6 +86,7 @@ static void libpqrcv_disconnect(WalReceiverConn *conn);
 static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	.walrcv_connect = libpqrcv_connect,
 	.walrcv_check_conninfo = libpqrcv_check_conninfo,
+	.walrcv_conninfo_options = libpqrcv_conninfo_options,
 	.walrcv_get_conninfo = libpqrcv_get_conninfo,
 	.walrcv_get_senderinfo = libpqrcv_get_senderinfo,
 	.walrcv_identify_system = libpqrcv_identify_system,
@@ -337,6 +339,52 @@ libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
 	PQconninfoFree(opts);
 }
 
+static const ConnectionOption *
+libpqrcv_conninfo_options(void)
+{
+	static ConnectionOption	*connection_options = NULL;
+
+	if (connection_options == NULL)
+	{
+		PQconninfoOption	*conndefaults	= PQconndefaults();
+		PQconninfoOption	*lopt;
+		ConnectionOption	*tmp_options	= NULL;
+		ConnectionOption	*popt;
+		size_t				 options_size	= 0;
+		int					 num_libpq_opts	= 0;
+
+		for (lopt = conndefaults; lopt->keyword; lopt++)
+			num_libpq_opts++;
+
+		/* leave room for all-zero entry at the end */
+		options_size = sizeof(ConnectionOption) * (num_libpq_opts + 1);
+		tmp_options = MemoryContextAllocZero(TopMemoryContext, options_size);
+
+		popt = tmp_options;
+		for (lopt = conndefaults; lopt->keyword; lopt++)
+		{
+			if (strchr(lopt->dispchar, '*'))
+				popt->issecret = true;
+			else if (strchr(lopt->dispchar, 'D'))
+				popt->isdebug = true;
+
+			popt->optname = MemoryContextStrdup(TopMemoryContext,
+												lopt->keyword);
+			popt++;
+		}
+
+		/* last entry is all zero */
+		Assert(popt->optname == NULL);
+
+		PQconninfoFree(conndefaults);
+
+		/* if everything succeeded, set static variable */
+		connection_options = tmp_options;
+	}
+
+	return connection_options;
+}
+
 /*
  * Return a user-displayable conninfo string.  Any security-sensitive fields
  * are obfuscated.
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index ad74e07dbb..5890d22dd9 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -7516,6 +7516,14 @@
   proname => 'postgresql_fdw_validator', prorettype => 'bool',
   proargtypes => '_text oid', prosrc => 'postgresql_fdw_validator' },
 
+{ oid => '6015', descr => '(internal)',
+  proname => 'pg_connection_validator', prorettype => 'bool',
+  proargtypes => '_text oid', prosrc => 'pg_connection_validator' },
+
+{ oid => '6123', descr => 'extract connection string from the given foreign server',
+  proname => 'pg_conninfo_from_server', prorettype => 'text',
+  proargtypes => 'text text bool', prosrc => 'pg_conninfo_from_server' },
+
 { oid => '2290', descr => 'I/O',
   proname => 'record_in', provolatile => 's', prorettype => 'record',
   proargtypes => 'cstring oid int4', prosrc => 'record_in' },
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index 82b8153100..b5b9b97f4d 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -69,6 +69,8 @@ extern ForeignServer *GetForeignServerExtended(Oid serverid,
 											   bits16 flags);
 extern ForeignServer *GetForeignServerByName(const char *srvname,
 											 bool missing_ok);
+extern char *ForeignServerConnectionString(Oid userid, Oid serverid,
+										   bool append_overrides);
 extern UserMapping *GetUserMapping(Oid userid, Oid serverid);
 extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid);
 extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid,
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0899891cdb..a2ecbf825a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -223,6 +223,16 @@ typedef struct WalRcvExecResult
 	TupleDesc	tupledesc;
 } WalRcvExecResult;
 
+/*
+ * Describes the valid options for postgresql FDW, server, and user mapping.
+ */
+typedef struct ConnectionOption
+{
+	const char *optname;
+	bool		issecret;		/* is option for a password? */
+	bool		isdebug;		/* is option a debug option? */
+} ConnectionOption;
+
 /* WAL receiver - libpqwalreceiver hooks */
 
 /*
@@ -250,6 +260,13 @@ typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
 typedef void (*walrcv_check_conninfo_fn) (const char *conninfo,
 										  bool must_use_password);
 
+/*
+ * walrcv_conninfo_options_fn
+ *
+ * Return a pointer to a static array of the available options from libpq.
+ */
+typedef const struct ConnectionOption *(*walrcv_conninfo_options_fn) (void);
+
 /*
  * walrcv_get_conninfo_fn
  *
@@ -389,6 +406,7 @@ typedef struct WalReceiverFunctionsType
 {
 	walrcv_connect_fn walrcv_connect;
 	walrcv_check_conninfo_fn walrcv_check_conninfo;
+	walrcv_conninfo_options_fn walrcv_conninfo_options;
 	walrcv_get_conninfo_fn walrcv_get_conninfo;
 	walrcv_get_senderinfo_fn walrcv_get_senderinfo;
 	walrcv_identify_system_fn walrcv_identify_system;
@@ -410,6 +428,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err)
 #define walrcv_check_conninfo(conninfo, must_use_password) \
 	WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
+#define walrcv_conninfo_options() \
+	WalReceiverFunctions->walrcv_conninfo_options()
 #define walrcv_get_conninfo(conn) \
 	WalReceiverFunctions->walrcv_get_conninfo(conn)
 #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
diff --git a/src/test/regress/expected/foreign_data.out b/src/test/regress/expected/foreign_data.out
index 1dfe23cc1e..0211531f32 100644
--- a/src/test/regress/expected/foreign_data.out
+++ b/src/test/regress/expected/foreign_data.out
@@ -688,6 +688,52 @@ DROP SERVER s7;
  t1     | regress_test_role
 (8 rows)
 
+--
+-- test pg_conninfo_from_server().
+--
+-- use test validator function (not all libpq options supported)
+CREATE FOREIGN DATA WRAPPER regress_connection_fdw
+  VALIDATOR pg_connection_validator;
+\set VERBOSITY terse
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (host 'thehost', client_encoding 'LATIN1'); -- fail
+ERROR:  invalid option "client_encoding"
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (host 'thehost', nonsense 'asdf'); -- fail
+ERROR:  invalid option "nonsense"
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (host 'thehost', password 'secret'); -- fail
+ERROR:  invalid option "password"
+\set VERBOSITY default
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (hsot 'thehost'); -- fail - misspelling
+ERROR:  invalid option "hsot"
+HINT:  Perhaps you meant the option "host".
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (host 'thehost', port '5432');
+CREATE USER MAPPING FOR regress_test_role SERVER connection_server
+  OPTIONS (user 'role', password 'secret', host 'otherhost'); -- fail
+ERROR:  invalid option "host"
+CREATE USER MAPPING FOR regress_test_role SERVER connection_server
+  OPTIONS (user 'role', password 'secret');
+CREATE USER MAPPING FOR PUBLIC SERVER connection_server
+  OPTIONS (user 'publicuser', password $pwd$'\"$# secret'$pwd$);
+SELECT pg_conninfo_from_server('connection_server', 'regress_test_role', false);
+                     pg_conninfo_from_server                      
+------------------------------------------------------------------
+ host = 'thehost' port = '5432' user = 'role' password = 'secret'
+(1 row)
+
+SELECT pg_conninfo_from_server('connection_server', 'regress_test_role2', false);
+                             pg_conninfo_from_server                              
+----------------------------------------------------------------------------------
+ host = 'thehost' port = '5432' user = 'publicuser' password = '\'\\"$# secret\''
+(1 row)
+
+DROP USER MAPPING FOR regress_test_role SERVER connection_server;
+DROP USER MAPPING FOR PUBLIC SERVER connection_server;
+DROP SERVER connection_server;
+DROP FOREIGN DATA WRAPPER regress_connection_fdw;
 -- CREATE FOREIGN TABLE
 CREATE SCHEMA foreign_schema;
 CREATE SERVER s0 FOREIGN DATA WRAPPER dummy;
diff --git a/src/test/regress/sql/foreign_data.sql b/src/test/regress/sql/foreign_data.sql
index eefb860adc..a8e2edfeee 100644
--- a/src/test/regress/sql/foreign_data.sql
+++ b/src/test/regress/sql/foreign_data.sql
@@ -291,6 +291,46 @@ RESET ROLE;
 DROP SERVER s7;
 \deu
 
+--
+-- test pg_conninfo_from_server().
+--
+
+-- use test validator function (not all libpq options supported)
+CREATE FOREIGN DATA WRAPPER regress_connection_fdw
+  VALIDATOR pg_connection_validator;
+
+\set VERBOSITY terse
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (host 'thehost', client_encoding 'LATIN1'); -- fail
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (host 'thehost', nonsense 'asdf'); -- fail
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (host 'thehost', password 'secret'); -- fail
+\set VERBOSITY default
+
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (hsot 'thehost'); -- fail - misspelling
+
+CREATE SERVER connection_server FOREIGN DATA WRAPPER regress_connection_fdw
+  OPTIONS (host 'thehost', port '5432');
+
+CREATE USER MAPPING FOR regress_test_role SERVER connection_server
+  OPTIONS (user 'role', password 'secret', host 'otherhost'); -- fail
+
+CREATE USER MAPPING FOR regress_test_role SERVER connection_server
+  OPTIONS (user 'role', password 'secret');
+CREATE USER MAPPING FOR PUBLIC SERVER connection_server
+  OPTIONS (user 'publicuser', password $pwd$'\"$# secret'$pwd$);
+
+SELECT pg_conninfo_from_server('connection_server', 'regress_test_role', false);
+
+SELECT pg_conninfo_from_server('connection_server', 'regress_test_role2', false);
+
+DROP USER MAPPING FOR regress_test_role SERVER connection_server;
+DROP USER MAPPING FOR PUBLIC SERVER connection_server;
+DROP SERVER connection_server;
+DROP FOREIGN DATA WRAPPER regress_connection_fdw;
+
 -- CREATE FOREIGN TABLE
 CREATE SCHEMA foreign_schema;
 CREATE SERVER s0 FOREIGN DATA WRAPPER dummy;
-- 
2.34.1



  [text/x-patch] v9-0002-CREATE-SUSBCRIPTION-.-SERVER.patch (49.1K, 3-v9-0002-CREATE-SUSBCRIPTION-.-SERVER.patch)
  download | inline diff:
From 5d677ca7654f083280b2634d941e09258fa99c78 Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Tue, 2 Jan 2024 13:42:48 -0800
Subject: [PATCH v9 2/3] CREATE SUSBCRIPTION ... SERVER.

Allow specifying a foreign server for CREATE SUBSCRIPTION, rather than
a raw connection string with CONNECTION.

Using a foreign server as a layer of indirection improves management
of multiple subscriptions to the same server. It also provides
integration with user mappings in case different subscriptions have
different owners or a subscription changes owners.

Discussion: https://postgr.es/m/[email protected]
Reviewed-by: Ashutosh Bapat
---
 contrib/postgres_fdw/Makefile                 |   2 +
 .../postgres_fdw/expected/postgres_fdw.out    |   8 +
 contrib/postgres_fdw/meson.build              |   5 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   7 +
 contrib/postgres_fdw/t/010_subscription.pl    |  71 ++++++++
 doc/src/sgml/ref/alter_subscription.sgml      |  18 +-
 doc/src/sgml/ref/create_subscription.sgml     |  11 +-
 src/backend/catalog/pg_subscription.c         |  39 +++-
 src/backend/commands/subscriptioncmds.c       | 168 ++++++++++++++++--
 src/backend/foreign/foreign.c                 |  25 +++
 src/backend/parser/gram.y                     |  20 +++
 src/backend/replication/logical/worker.c      |  16 +-
 src/bin/pg_dump/pg_dump.c                     |  27 ++-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/tab-complete.c                   |   2 +-
 src/include/catalog/pg_subscription.h         |   7 +-
 src/include/foreign/foreign.h                 |   1 +
 src/include/nodes/parsenodes.h                |   3 +
 src/test/regress/expected/foreign_data.out    |  14 ++
 src/test/regress/expected/subscription.out    |  53 ++++++
 src/test/regress/sql/foreign_data.sql         |  18 ++
 src/test/regress/sql/subscription.sql         |  58 ++++++
 src/test/subscription/t/001_rep_changes.pl    |  60 +++++++
 23 files changed, 601 insertions(+), 33 deletions(-)
 create mode 100644 contrib/postgres_fdw/t/010_subscription.pl

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index c1b0cad453..c3498ea6b4 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -18,6 +18,8 @@ DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql
 
 REGRESS = postgres_fdw
 
+TAP_TESTS = 1
+
 ifdef USE_PGXS
 PG_CONFIG = pg_config
 PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 8a7a15cc51..ecd0230738 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -270,6 +270,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 -- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+-- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
 CREATE PUBLICATION testpub_ftbl FOR TABLE ft1;  -- should fail
diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build
index 2b86d8a6ee..cf7071dbf8 100644
--- a/contrib/postgres_fdw/meson.build
+++ b/contrib/postgres_fdw/meson.build
@@ -39,4 +39,9 @@ tests += {
     ],
     'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'],
   },
+  'tap': {
+    'tests': [
+      't/010_subscription.pl',
+    ],
+  },
 }
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 0d8478120d..1c9c12703f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -254,6 +254,13 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 
+-- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+
 -- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl
new file mode 100644
index 0000000000..a39e8fdbba
--- /dev/null
+++ b/contrib/postgres_fdw/t/010_subscription.pl
@@ -0,0 +1,71 @@
+
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_ins AS SELECT a, a + 1 as b FROM generate_series(1,1002) AS a");
+
+# Replicate the changes without columns
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_no_col default VALUES");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int, b int)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins");
+
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+	"CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE USER MAPPING FOR PUBLIC SERVER tap_server"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE FOREIGN TABLE f_tab_ins (a int, b int) SERVER tap_server OPTIONS(table_name 'tab_ins')"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins SELECT a, a + 1 FROM generate_series(1003,1050) a");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM (SELECT f.b = l.b as match FROM tab_ins l, f_tab_ins f WHERE l.a = f.a) WHERE match");
+is($result, qq(1050), 'check initial data was copied to subscriber');
+
+done_testing();
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 6d36ff0dc9..6d219145a9 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -21,6 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@@ -94,13 +95,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-altersubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the foreign server
+      <replaceable>servername</replaceable>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-altersubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
      <para>
-      This clause replaces the connection string originally set by
-      <xref linkend="sql-createsubscription"/>.  See there for more
-      information.
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the connection
+      string <replaceable>conninfo</replaceable>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index c7ace922f9..24538baf98 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
-    CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
+    { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
     PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
     [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 </synopsis>
@@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-createsubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      A foreign server to use for the connection.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-createsubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c516c25ac7..5a2eaa803d 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -20,12 +20,15 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "storage/lmgr.h"
+#include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -40,7 +43,7 @@ static List *textarray_to_stringlist(ArrayType *textarray);
  * Fetch the subscription from the syscache.
  */
 Subscription *
-GetSubscription(Oid subid, bool missing_ok)
+GetSubscription(Oid subid, bool missing_ok, bool aclcheck)
 {
 	HeapTuple	tup;
 	Subscription *sub;
@@ -75,10 +78,36 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->runasowner = subform->subrunasowner;
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
-								   tup,
-								   Anum_pg_subscription_subconninfo);
-	sub->conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(subform->subserver))
+	{
+		AclResult	aclresult;
+
+		/* recheck ACL if requested */
+		if (aclcheck)
+		{
+			aclresult = object_aclcheck(ForeignServerRelationId,
+										subform->subserver,
+										subform->subowner, ACL_USAGE);
+
+			if (aclresult != ACLCHECK_OK)
+				ereport(ERROR,
+						(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+						 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+								GetUserNameFromId(subform->subowner, false),
+								ForeignServerName(subform->subserver))));
+		}
+
+		sub->conninfo = ForeignServerConnectionString(subform->subowner,
+													  subform->subserver,
+													  true);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+									   tup,
+									   Anum_pg_subscription_subconninfo);
+		sub->conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 75e6cd8ae3..983b5d17fe 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -25,14 +25,17 @@
 #include "catalog/objectaddress.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_database_d.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "catalog/pg_user_mapping.h"
 #include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -574,6 +577,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	Datum		values[Natts_pg_subscription];
 	Oid			owner = GetUserId();
 	HeapTuple	tup;
+	Oid			serverid;
 	char	   *conninfo;
 	char		originname[NAMEDATALEN];
 	List	   *publications;
@@ -666,15 +670,40 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.synchronous_commit == NULL)
 		opts.synchronous_commit = "off";
 
-	conninfo = stmt->conninfo;
-	publications = stmt->publication;
-
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
+	if (stmt->servername)
+	{
+		ForeignServer	*server;
+
+		Assert(!stmt->conninfo);
+		conninfo = NULL;
+
+		server = GetForeignServerByName(stmt->servername, false);
+		aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
+
+		/* make sure a user mapping exists */
+		GetUserMapping(owner, server->serverid);
+
+		serverid = server->serverid;
+		conninfo = ForeignServerConnectionString(owner, serverid, true);
+	}
+	else
+	{
+		Assert(stmt->conninfo);
+
+		serverid = InvalidOid;
+		conninfo = stmt->conninfo;
+	}
+
 	/* Check the connection info string. */
 	walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
 
+	publications = stmt->publication;
+
 	/* Everything ok, form a new tuple. */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
@@ -697,8 +726,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
-	values[Anum_pg_subscription_subconninfo - 1] =
-		CStringGetTextDatum(conninfo);
+	values[Anum_pg_subscription_subserver - 1] = serverid;
+	if (!OidIsValid(serverid))
+		values[Anum_pg_subscription_subconninfo - 1] =
+			CStringGetTextDatum(conninfo);
+	else
+		nulls[Anum_pg_subscription_subconninfo - 1] = true;
 	if (opts.slot_name)
 		values[Anum_pg_subscription_subslotname - 1] =
 			DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@@ -719,6 +752,17 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
+	if (stmt->servername)
+	{
+		ObjectAddress referenced;
+		Assert(OidIsValid(serverid));
+
+		ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
+
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
@@ -835,8 +879,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.enabled)
 		ApplyLauncherWakeupAtCommit();
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
 
 	return myself;
@@ -1104,7 +1146,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 		aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SUBSCRIPTION,
 					   stmt->subname);
 
-	sub = GetSubscription(subid, false);
+	/*
+	 * Skip ACL checks on the subscription's foreign server, if any. If
+	 * changing the server (or replacing it with a raw connection), then the
+	 * old one will be removed anyway. If changing something unrelated,
+	 * there's no need to do an additional ACL check here; that will be done
+	 * by the subscription worker anyway.
+	 */
+	sub = GetSubscription(subid, false, false);
 
 	/*
 	 * Don't allow non-superuser modification of a subscription with
@@ -1124,6 +1173,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	memset(nulls, false, sizeof(nulls));
 	memset(replaces, false, sizeof(replaces));
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
 	switch (stmt->kind)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
@@ -1244,7 +1295,80 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SERVER:
+			{
+				ForeignServer	*new_server;
+				ObjectAddress	 referenced;
+				AclResult		 aclresult;
+				char			*conninfo;
+
+				/*
+				 * Remove what was there before, either another foreign server
+				 * or a connection string.
+				 */
+				if (form->subserver)
+				{
+					deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+													   DEPENDENCY_NORMAL,
+													   ForeignServerRelationId, form->subserver);
+				}
+				else
+				{
+					nulls[Anum_pg_subscription_subconninfo - 1] = true;
+					replaces[Anum_pg_subscription_subconninfo - 1] = true;
+				}
+
+				/*
+				 * Find the new server and user mapping. Check ACL of server
+				 * based on current user ID, but find the user mapping based
+				 * on the subscription owner.
+				 */
+				new_server = GetForeignServerByName(stmt->servername, false);
+				aclresult = object_aclcheck(ForeignServerRelationId,
+											new_server->serverid,
+											form->subowner, ACL_USAGE);
+				if (aclresult != ACLCHECK_OK)
+					ereport(ERROR,
+							(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+							 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+									GetUserNameFromId(form->subowner, false),
+									ForeignServerName(new_server->serverid))));
+
+				/* make sure a user mapping exists */
+				GetUserMapping(form->subowner, new_server->serverid);
+
+				conninfo = ForeignServerConnectionString(form->subowner,
+														 new_server->serverid,
+														 true);
+
+				/* Load the library providing us libpq calls. */
+				load_file("libpqwalreceiver", false);
+				/* Check the connection info string. */
+				walrcv_check_conninfo(conninfo,
+									  sub->passwordrequired && !sub->ownersuperuser);
+
+				values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+
+				ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
+				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+				update_tuple = true;
+			}
+			break;
+
 		case ALTER_SUBSCRIPTION_CONNECTION:
+			/* remove reference to foreign server and dependencies, if present */
+			if (form->subserver)
+			{
+				deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+												   DEPENDENCY_NORMAL,
+												   ForeignServerRelationId, form->subserver);
+
+				values[Anum_pg_subscription_subserver - 1] = InvalidOid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+			}
+
 			/* Load the library providing us libpq calls. */
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
@@ -1455,8 +1579,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
 	/* Wake up related replication workers to handle this change quickly. */
@@ -1541,9 +1663,28 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	subname = pstrdup(NameStr(*DatumGetName(datum)));
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
-								   Anum_pg_subscription_subconninfo);
-	conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(form->subserver))
+	{
+		AclResult aclresult;
+
+		aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
+									form->subowner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("subscription owner \"%s\" does not have permission on foreign server \"%s\"",
+							GetUserNameFromId(form->subowner, false),
+							ForeignServerName(form->subserver))));
+
+		conninfo = ForeignServerConnectionString(form->subowner,
+												 form->subserver, true);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
+									   Anum_pg_subscription_subconninfo);
+		conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@@ -1644,6 +1785,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 
 	/* Clean up dependencies */
+	deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index b4635d6eba..db2cf6780d 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -179,6 +179,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
 }
 
 
+/*
+ * ForeignServerName - get name of foreign server.
+ */
+char *
+ForeignServerName(Oid serverid)
+{
+	Form_pg_foreign_server serverform;
+	char *servername;
+	HeapTuple	tp;
+
+	tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
+
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for foreign server %u", serverid);
+
+	serverform = (Form_pg_foreign_server) GETSTRUCT(tp);
+
+	servername = pstrdup(NameStr(serverform->srvname));
+
+	ReleaseSysCache(tp);
+
+	return servername;
+}
+
+
 /*
  * GetForeignServerByName - look up the foreign server definition by name.
  */
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 3460fea56b..c27e0b8b5d 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10654,6 +10654,16 @@ CreateSubscriptionStmt:
 					n->options = $8;
 					$$ = (Node *) n;
 				}
+			| CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
+				{
+					CreateSubscriptionStmt *n =
+						makeNode(CreateSubscriptionStmt);
+					n->subname = $3;
+					n->servername = $5;
+					n->publication = $7;
+					n->options = $8;
+					$$ = (Node *) n;
+				}
 		;
 
 /*****************************************************************************
@@ -10683,6 +10693,16 @@ AlterSubscriptionStmt:
 					n->conninfo = $5;
 					$$ = (Node *) n;
 				}
+			| ALTER SUBSCRIPTION name SERVER name
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+
+					n->kind = ALTER_SUBSCRIPTION_SERVER;
+					n->subname = $3;
+					n->servername = $5;
+					$$ = (Node *) n;
+				}
 			| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 9b598caf3c..0ade3150bf 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3897,7 +3897,7 @@ maybe_reread_subscription(void)
 	/* Ensure allocations in permanent context. */
 	oldctx = MemoryContextSwitchTo(ApplyContext);
 
-	newsub = GetSubscription(MyLogicalRepWorker->subid, true);
+	newsub = GetSubscription(MyLogicalRepWorker->subid, true, true);
 
 	/*
 	 * Exit if the subscription was removed. This normally should not happen
@@ -4003,7 +4003,9 @@ maybe_reread_subscription(void)
 }
 
 /*
- * Callback from subscription syscache invalidation.
+ * Callback from subscription syscache invalidation. Also needed for server or
+ * user mapping invalidation, which can change the connection information for
+ * subscriptions that connect using a server object.
  */
 static void
 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -4602,7 +4604,7 @@ InitializeLogRepWorker(void)
 	StartTransactionCommand();
 	oldctx = MemoryContextSwitchTo(ApplyContext);
 
-	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true);
+	MySubscription = GetSubscription(MyLogicalRepWorker->subid, true, true);
 	if (!MySubscription)
 	{
 		ereport(LOG,
@@ -4639,6 +4641,14 @@ InitializeLogRepWorker(void)
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+								  subscription_change_cb,
+								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(USERMAPPINGOID,
+								  subscription_change_cb,
+								  (Datum) 0);
 
 	CacheRegisterSyscacheCallback(AUTHOID,
 								  subscription_change_cb,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index bc20a025ce..5312008a82 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4634,6 +4634,7 @@ getSubscriptions(Archive *fout)
 	int			i_subdisableonerr;
 	int			i_subpasswordrequired;
 	int			i_subrunasowner;
+	int			i_subservername;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4705,10 +4706,12 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_ORIGIN_ANY);
 
 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
-		appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n"
+		appendPQExpBufferStr(query, " fs.srvname AS subservername,\n"
+							 " o.remote_lsn AS suboriginremotelsn,\n"
 							 " s.subenabled\n");
 	else
-		appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n"
+		appendPQExpBufferStr(query, " NULL AS subservername,\n"
+							 " NULL AS suboriginremotelsn,\n"
 							 " false AS subenabled\n");
 
 	appendPQExpBufferStr(query,
@@ -4716,6 +4719,8 @@ getSubscriptions(Archive *fout)
 
 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
 		appendPQExpBufferStr(query,
+							 "LEFT JOIN pg_catalog.pg_foreign_server fs \n"
+							 "    ON fs.oid = s.subserver \n"
 							 "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
 							 "    ON o.external_id = 'pg_' || s.oid::text \n");
 
@@ -4741,6 +4746,7 @@ getSubscriptions(Archive *fout)
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
 	i_subrunasowner = PQfnumber(res, "subrunasowner");
+	i_subservername = PQfnumber(res, "subservername");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -4760,7 +4766,10 @@ getSubscriptions(Archive *fout)
 		AssignDumpId(&subinfo[i].dobj);
 		subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
 		subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
-
+		if (PQgetisnull(res, i, i_subservername))
+			subinfo[i].subservername = NULL;
+		else
+			subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
 		subinfo[i].subbinary =
 			pg_strdup(PQgetvalue(res, i, i_subbinary));
 		subinfo[i].substream =
@@ -4986,9 +4995,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
 					  qsubname);
 
-	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
 					  qsubname);
-	appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	if (subinfo->subservername)
+	{
+		appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
+	}
+	else
+	{
+		appendPQExpBuffer(query, "CONNECTION ");
+		appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	}
 
 	/* Build list of quoted publications and append them to query. */
 	if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index f0772d2157..849950e470 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -659,6 +659,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *subpasswordrequired;
 	char	   *subrunasowner;
+	char	   *subservername;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subsynccommit;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index ada711d02f..616c90c48b 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3327,7 +3327,7 @@ psql_completion(const char *text, int start, int end)
 
 /* CREATE SUBSCRIPTION */
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
-		COMPLETE_WITH("CONNECTION");
+		COMPLETE_WITH("SERVER", "CONNECTION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
 		COMPLETE_WITH("PUBLICATION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index ab206bad7d..01141febb5 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -93,9 +93,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subrunasowner;	/* True if replication should execute as the
 								 * subscription owner */
 
+	Oid			subserver;		/* Set if connecting with server */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
-	text		subconninfo BKI_FORCE_NOT_NULL;
+	text		subconninfo;	/* Set if connecting with connection string */
 
 	/* Slot name on publisher */
 	NameData	subslotname BKI_FORCE_NULL;
@@ -165,7 +167,8 @@ typedef struct Subscription
  */
 #define LOGICALREP_STREAM_PARALLEL 'p'
 
-extern Subscription *GetSubscription(Oid subid, bool missing_ok);
+extern Subscription *GetSubscription(Oid subid, bool missing_ok,
+									 bool aclcheck);
 extern void FreeSubscription(Subscription *sub);
 extern void DisableSubscription(Oid subid);
 
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index b5b9b97f4d..a2f04ce9af 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -65,6 +65,7 @@ typedef struct ForeignTable
 
 
 extern ForeignServer *GetForeignServer(Oid serverid);
+extern char *ForeignServerName(Oid serverid);
 extern ForeignServer *GetForeignServerExtended(Oid serverid,
 											   bits16 flags);
 extern ForeignServer *GetForeignServerByName(const char *srvname,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b3181f34ae..6d6b242cec 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4041,6 +4041,7 @@ typedef struct CreateSubscriptionStmt
 {
 	NodeTag		type;
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
@@ -4049,6 +4050,7 @@ typedef struct CreateSubscriptionStmt
 typedef enum AlterSubscriptionType
 {
 	ALTER_SUBSCRIPTION_OPTIONS,
+	ALTER_SUBSCRIPTION_SERVER,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_SET_PUBLICATION,
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -4063,6 +4065,7 @@ typedef struct AlterSubscriptionStmt
 	NodeTag		type;
 	AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
diff --git a/src/test/regress/expected/foreign_data.out b/src/test/regress/expected/foreign_data.out
index 0211531f32..30aa23a8ff 100644
--- a/src/test/regress/expected/foreign_data.out
+++ b/src/test/regress/expected/foreign_data.out
@@ -733,6 +733,20 @@ SELECT pg_conninfo_from_server('connection_server', 'regress_test_role2', false)
 DROP USER MAPPING FOR regress_test_role SERVER connection_server;
 DROP USER MAPPING FOR PUBLIC SERVER connection_server;
 DROP SERVER connection_server;
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER regress_connection_fdw;   -- ERROR: no permissions on FDW
+ERROR:  permission denied for foreign-data wrapper regress_connection_fdw
+RESET ROLE;
+GRANT USAGE ON FOREIGN DATA WRAPPER regress_connection_fdw TO regress_test_role;
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER regress_connection_fdw;
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+ERROR:  schema "bar" does not exist
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
+RESET ROLE;
+REVOKE USAGE ON FOREIGN DATA WRAPPER regress_connection_fdw FROM regress_test_role;
 DROP FOREIGN DATA WRAPPER regress_connection_fdw;
 -- CREATE FOREIGN TABLE
 CREATE SCHEMA foreign_schema;
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index b15eddbff3..b0a1a3cc26 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -144,6 +144,59 @@ ERROR:  could not connect to the publisher: invalid port number: "-1"
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
+RESET SESSION AUTHORIZATION;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+DROP SUBSCRIPTION regress_testsub6;
+-- test using a server object instead of connection string
+RESET SESSION AUTHORIZATION;
+CREATE FOREIGN DATA WRAPPER regress_connection_fdw
+  VALIDATOR pg_connection_validator;
+CREATE SERVER regress_testserver1 FOREIGN DATA WRAPPER regress_connection_fdw;
+CREATE SERVER regress_testserver2 FOREIGN DATA WRAPPER regress_connection_fdw;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver1
+  OPTIONS (password 'secret');
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2
+  OPTIONS (password 'secret');
+GRANT USAGE ON FOREIGN SERVER regress_testserver2 TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver1 PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false); -- fails
+ERROR:  permission denied for foreign server regress_testserver1
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver2 PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+RESET SESSION AUTHORIZATION;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver1; -- fails
+ERROR:  subscription owner "regress_subscription_user3" does not have permission on foreign server "regress_testserver1"
+GRANT USAGE ON FOREIGN SERVER regress_testserver1 TO regress_subscription_user3;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver1;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2;
+DROP SERVER regress_testserver2;
+-- test an FDW with no validator
+CREATE FOREIGN DATA WRAPPER regress_fdw;
+CREATE SERVER regress_testserver3 FOREIGN DATA WRAPPER regress_fdw
+  OPTIONS (abc 'xyz');
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver3
+  OPTIONS (password 'secret');
+GRANT USAGE ON FOREIGN SERVER regress_testserver3 TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver3;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver1;
+RESET SESSION AUTHORIZATION;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver3;
+DROP SERVER regress_testserver3;
+DROP FOREIGN DATA WRAPPER regress_fdw;
+DROP SUBSCRIPTION regress_testsub6;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver1;
+DROP SERVER regress_testserver1;
+DROP FOREIGN DATA WRAPPER regress_connection_fdw;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
 \dRs+
                                                                                                            List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit |          Conninfo           | Skip LSN 
diff --git a/src/test/regress/sql/foreign_data.sql b/src/test/regress/sql/foreign_data.sql
index a8e2edfeee..7956705217 100644
--- a/src/test/regress/sql/foreign_data.sql
+++ b/src/test/regress/sql/foreign_data.sql
@@ -329,6 +329,24 @@ SELECT pg_conninfo_from_server('connection_server', 'regress_test_role2', false)
 DROP USER MAPPING FOR regress_test_role SERVER connection_server;
 DROP USER MAPPING FOR PUBLIC SERVER connection_server;
 DROP SERVER connection_server;
+
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER regress_connection_fdw;   -- ERROR: no permissions on FDW
+RESET ROLE;
+GRANT USAGE ON FOREIGN DATA WRAPPER regress_connection_fdw TO regress_test_role;
+SET ROLE regress_test_role;
+
+CREATE SERVER t3 FOREIGN DATA WRAPPER regress_connection_fdw;
+
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
+
+RESET ROLE;
+REVOKE USAGE ON FOREIGN DATA WRAPPER regress_connection_fdw FROM regress_test_role;
+
 DROP FOREIGN DATA WRAPPER regress_connection_fdw;
 
 -- CREATE FOREIGN TABLE
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 444e563ff3..4d44f141b7 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -88,6 +88,64 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
 -- fail - invalid connection string during ALTER
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 
+RESET SESSION AUTHORIZATION;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_testsub6;
+
+-- test using a server object instead of connection string
+
+RESET SESSION AUTHORIZATION;
+CREATE FOREIGN DATA WRAPPER regress_connection_fdw
+  VALIDATOR pg_connection_validator;
+CREATE SERVER regress_testserver1 FOREIGN DATA WRAPPER regress_connection_fdw;
+CREATE SERVER regress_testserver2 FOREIGN DATA WRAPPER regress_connection_fdw;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver1
+  OPTIONS (password 'secret');
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2
+  OPTIONS (password 'secret');
+GRANT USAGE ON FOREIGN SERVER regress_testserver2 TO regress_subscription_user3;
+
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver1 PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false); -- fails
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver2 PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false);
+RESET SESSION AUTHORIZATION;
+
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver1; -- fails
+GRANT USAGE ON FOREIGN SERVER regress_testserver1 TO regress_subscription_user3;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver1;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2;
+DROP SERVER regress_testserver2;
+
+-- test an FDW with no validator
+CREATE FOREIGN DATA WRAPPER regress_fdw;
+CREATE SERVER regress_testserver3 FOREIGN DATA WRAPPER regress_fdw
+  OPTIONS (abc 'xyz');
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver3
+  OPTIONS (password 'secret');
+GRANT USAGE ON FOREIGN SERVER regress_testserver3 TO regress_subscription_user3;
+
+SET SESSION AUTHORIZATION regress_subscription_user3;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver3;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver1;
+
+RESET SESSION AUTHORIZATION;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver3;
+DROP SERVER regress_testserver3;
+DROP FOREIGN DATA WRAPPER regress_fdw;
+
+DROP SUBSCRIPTION regress_testsub6;
+
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver1;
+DROP SERVER regress_testserver1;
+DROP FOREIGN DATA WRAPPER regress_connection_fdw;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
+
 \dRs+
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 9ccebd890a..8653423d08 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -27,6 +27,8 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_ins2 AS SELECT generate_series(1,1002) AS a");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a");
 $node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
@@ -65,6 +67,7 @@ $node_publisher->safe_psql('postgres',
 # Setup structure on subscriber
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins2 (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
 $node_subscriber->safe_psql('postgres',
@@ -110,6 +113,25 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
 );
 
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+	"CREATE FOREIGN DATA WRAPPER test_connection_fdw VALIDATOR pg_connection_validator"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE SERVER tap_sub2_server FOREIGN DATA WRAPPER test_connection_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE USER MAPPING FOR PUBLIC SERVER tap_sub2_server"
+);
+
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_simple_pub FOR TABLE tab_ins2");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub2 SERVER tap_sub2_server PUBLICATION tap_simple_pub WITH (password_required=false)"
+);
+
 # Wait for initial table sync to finish
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
@@ -121,11 +143,22 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
 is($result, qq(1002), 'check initial data was copied to subscriber');
 
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins2");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr'");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
 $node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a");
 
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins2 SELECT generate_series(1,50)");
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 SERVER tap_sub2_server");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rep SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20");
@@ -158,6 +191,10 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_ins");
 is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
 
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_rep");
 is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
@@ -449,10 +486,27 @@ $node_publisher->poll_query_until('postgres',
   or die
   "Timed out while waiting for apply to restart after changing PUBLICATION";
 
+# test that changes to a foreign server subscription cause the worker
+# to restart
+$oldpid = $node_publisher->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER SERVER tap_sub2_server OPTIONS (sslmode 'disable')"
+);
+$node_publisher->poll_query_until('postgres',
+	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+  )
+  or die
+  "Timed out while waiting for apply to restart after changing PUBLICATION";
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
 
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins2 SELECT generate_series(1001,1100)");
+
 # Restart the publisher and check the state of the subscriber which
 # should be in a streaming state after catching up.
 $node_publisher->stop('fast');
@@ -465,6 +519,11 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, qq(1152|1|1100),
 	'check replicated inserts after subscription publication change');
 
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1152|1|1100),
+	'check replicated inserts after subscription publication change');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_rep");
 is($result, qq(20|-20|-1),
@@ -533,6 +592,7 @@ $node_publisher->poll_query_until('postgres',
 
 # check all the cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2");
 
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_subscription");
-- 
2.34.1



  [text/x-patch] v9-0003-Introduce-pg_create_connection-predefined-role.patch (31.4K, 4-v9-0003-Introduce-pg_create_connection-predefined-role.patch)
  download | inline diff:
From bc3cbaac821d10dc33f2b64843a83c1af13ecbe2 Mon Sep 17 00:00:00 2001
From: Jeff Davis <[email protected]>
Date: Tue, 2 Jan 2024 13:13:54 -0800
Subject: [PATCH v9 3/3] Introduce pg_create_connection predefined role.

In addition to pg_create_subscription, membership in this role is
necessary to create a subscription with a connection string (CREATE
SUBSCRIPTION ... CONNECTION '...'). The pg_create_subscription role is
a member of pg_create_connection, so by default pg_create_subscription
has the same capability as before.

An administrator may revoke pg_create_connection from
pg_create_subscription, which will enable the privileges to be
separated. That is, permit CREATE SUBSCRIPTION ... SERVER, but not
permit CREATE SUBSCRIPTION ... CONNECTION.

Discussion: https://postgr.es/m/[email protected]
---
 .../postgres_fdw/expected/postgres_fdw.out    |  2 +-
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  2 +-
 contrib/postgres_fdw/t/010_subscription.pl    |  2 +-
 doc/src/sgml/ref/alter_server.sgml            | 14 ++++++
 doc/src/sgml/ref/alter_subscription.sgml      |  4 +-
 doc/src/sgml/ref/create_server.sgml           | 14 ++++++
 doc/src/sgml/ref/create_subscription.sgml     |  4 +-
 doc/src/sgml/user-manag.sgml                  | 12 ++++-
 src/backend/catalog/system_functions.sql      |  2 +
 src/backend/commands/foreigncmds.c            | 31 ++++++++++++
 src/backend/commands/subscriptioncmds.c       | 31 ++++++++++--
 src/backend/foreign/foreign.c                 |  1 +
 src/backend/parser/gram.y                     | 30 ++++++++++--
 src/include/catalog/pg_authid.dat             |  5 ++
 src/include/catalog/pg_foreign_server.h       |  1 +
 src/include/foreign/foreign.h                 |  1 +
 src/include/nodes/parsenodes.h                |  3 ++
 src/test/regress/expected/subscription.out    | 47 +++++++++++++++++--
 src/test/regress/sql/subscription.sql         | 47 +++++++++++++++++--
 src/test/subscription/t/001_rep_changes.pl    |  2 +-
 20 files changed, 234 insertions(+), 21 deletions(-)

diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index ecd0230738..eec57c0aa6 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -2,7 +2,7 @@
 -- create FDW objects
 -- ===================================================================
 CREATE EXTENSION postgres_fdw;
-CREATE SERVER testserver1 FOREIGN DATA WRAPPER postgres_fdw;
+CREATE SERVER testserver1 FOREIGN DATA WRAPPER postgres_fdw FOR SUBSCRIPTION;
 DO $d$
     BEGIN
         EXECUTE $$CREATE SERVER loopback FOREIGN DATA WRAPPER postgres_fdw
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 1c9c12703f..c35e974a94 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -4,7 +4,7 @@
 
 CREATE EXTENSION postgres_fdw;
 
-CREATE SERVER testserver1 FOREIGN DATA WRAPPER postgres_fdw;
+CREATE SERVER testserver1 FOREIGN DATA WRAPPER postgres_fdw FOR SUBSCRIPTION;
 DO $d$
     BEGIN
         EXECUTE $$CREATE SERVER loopback FOREIGN DATA WRAPPER postgres_fdw
diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl
index a39e8fdbba..3ae2b6da4a 100644
--- a/contrib/postgres_fdw/t/010_subscription.pl
+++ b/contrib/postgres_fdw/t/010_subscription.pl
@@ -38,7 +38,7 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab
 my $publisher_host = $node_publisher->host;
 my $publisher_port = $node_publisher->port;
 $node_subscriber->safe_psql('postgres',
-	"CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+	"CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw FOR SUBSCRIPTION OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
 );
 
 $node_subscriber->safe_psql('postgres',
diff --git a/doc/src/sgml/ref/alter_server.sgml b/doc/src/sgml/ref/alter_server.sgml
index 467bf85589..1a4227e548 100644
--- a/doc/src/sgml/ref/alter_server.sgml
+++ b/doc/src/sgml/ref/alter_server.sgml
@@ -22,6 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 ALTER SERVER <replaceable class="parameter">name</replaceable> [ VERSION '<replaceable class="parameter">new_version</replaceable>' ]
+    [ { FOR | NO } SUBSCRIPTION ]
     [ OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ] ) ]
 ALTER SERVER <replaceable class="parameter">name</replaceable> OWNER TO { <replaceable>new_owner</replaceable> | CURRENT_ROLE | CURRENT_USER | SESSION_USER }
 ALTER SERVER <replaceable class="parameter">name</replaceable> RENAME TO <replaceable>new_name</replaceable>
@@ -70,6 +71,19 @@ ALTER SERVER <replaceable class="parameter">name</replaceable> RENAME TO <replac
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>{ FOR | NO } SUBSCRIPTION</literal></term>
+    <listitem>
+     <para>
+      This clause specifies whether the foreign server may be used for a
+      subscription (see <xref linkend="sql-createsubscription"/>). The default
+      is <literal>NO SUBSCRIPTION</literal>. Only members of the role
+      <literal>pg_create_connection</literal> may specify <literal>FOR
+      SUBSCRIPTION</literal>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>OPTIONS ( [ ADD | SET | DROP ] <replaceable class="parameter">option</replaceable> ['<replaceable class="parameter">value</replaceable>'] [, ... ] )</literal></term>
     <listitem>
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 6d219145a9..513f54c4b4 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -101,7 +101,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
      <para>
       This clause replaces the foreign server or connection string originally
       set by <xref linkend="sql-createsubscription"/> with the foreign server
-      <replaceable>servername</replaceable>.
+      <replaceable>servername</replaceable>. The foreign server must have been
+      created with <literal>FOR SUBSCRIPTION</literal> (see <xref
+      linkend="sql-createserver"/>).
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_server.sgml b/doc/src/sgml/ref/create_server.sgml
index 05f4019453..913cebabf2 100644
--- a/doc/src/sgml/ref/create_server.sgml
+++ b/doc/src/sgml/ref/create_server.sgml
@@ -23,6 +23,7 @@ PostgreSQL documentation
 <synopsis>
 CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</replaceable> [ TYPE '<replaceable class="parameter">server_type</replaceable>' ] [ VERSION '<replaceable class="parameter">server_version</replaceable>' ]
     FOREIGN DATA WRAPPER <replaceable class="parameter">fdw_name</replaceable>
+    [ { FOR | NO } SUBSCRIPTION ]
     [ OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] ) ]
 </synopsis>
  </refsynopsisdiv>
@@ -104,6 +105,19 @@ CREATE SERVER [ IF NOT EXISTS ] <replaceable class="parameter">server_name</repl
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>{ FOR | NO } SUBSCRIPTION</literal></term>
+    <listitem>
+     <para>
+      This clause specifies whether the foreign server may be used for a
+      subscription (see <xref linkend="sql-createsubscription"/>). The default
+      is <literal>NO SUBSCRIPTION</literal>. Only members of the role
+      <literal>pg_create_connection</literal> may specify <literal>FOR
+      SUBSCRIPTION</literal>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>OPTIONS ( <replaceable class="parameter">option</replaceable> '<replaceable class="parameter">value</replaceable>' [, ... ] )</literal></term>
     <listitem>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index 24538baf98..f80a027ddc 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -81,7 +81,9 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
     <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
     <listitem>
      <para>
-      A foreign server to use for the connection.
+      A foreign server to use for the connection. The foreign server must have
+      been created with <literal>FOR SUBSCRIPTION</literal> (see <xref
+      linkend="sql-createserver"/>).
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/user-manag.sgml b/doc/src/sgml/user-manag.sgml
index 1c011ac62b..da1a37e60b 100644
--- a/doc/src/sgml/user-manag.sgml
+++ b/doc/src/sgml/user-manag.sgml
@@ -687,11 +687,19 @@ DROP ROLE doomed_role;
        <entry>Allow use of connection slots reserved via
        <xref linkend="guc-reserved-connections"/>.</entry>
       </row>
+      <row>
+       <entry>pg_create_connection</entry>
+       <entry>Allow users to specify a connection string directly in <link
+       linkend="sql-createsubscription"><command>CREATE
+       SUBSCRIPTION</command></link>.</entry>
+      </row>
       <row>
        <entry>pg_create_subscription</entry>
        <entry>Allow users with <literal>CREATE</literal> permission on the
-       database to issue
-       <link linkend="sql-createsubscription"><command>CREATE SUBSCRIPTION</command></link>.</entry>
+       database to issue <link
+       linkend="sql-createsubscription"><command>CREATE
+       SUBSCRIPTION</command></link>.  This role is a member of
+       <literal>pg_create_connection</literal>.</entry>
       </row>
      </tbody>
     </tgroup>
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index f315fecf18..73512688de 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -781,3 +781,5 @@ GRANT pg_read_all_settings TO pg_monitor;
 GRANT pg_read_all_stats TO pg_monitor;
 
 GRANT pg_stat_scan_tables TO pg_monitor;
+
+GRANT pg_create_connection TO pg_create_subscription;
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index cf61bbac1f..f76689b8a7 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -21,6 +21,7 @@
 #include "catalog/dependency.h"
 #include "catalog/indexing.h"
 #include "catalog/objectaccess.h"
+#include "catalog/pg_authid_d.h"
 #include "catalog/pg_foreign_data_wrapper.h"
 #include "catalog/pg_foreign_server.h"
 #include "catalog/pg_foreign_table.h"
@@ -923,6 +924,18 @@ CreateForeignServer(CreateForeignServerStmt *stmt)
 	else
 		nulls[Anum_pg_foreign_server_srvversion - 1] = true;
 
+	if (stmt->forsubscription)
+	{
+		if (!has_privs_of_role(ownerId, ROLE_PG_CREATE_CONNECTION))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied to create server for subscription"),
+					 errdetail("Only roles with privileges of the \"%s\" role may create foreign servers with FOR SUBSCRIPTION specified.",
+							   "pg_create_subscription")));
+
+		values[Anum_pg_foreign_server_srvforsubscription - 1] = true;
+	}
+
 	/* Start with a blank acl */
 	nulls[Anum_pg_foreign_server_srvacl - 1] = true;
 
@@ -979,6 +992,7 @@ AlterForeignServer(AlterForeignServerStmt *stmt)
 	bool		repl_null[Natts_pg_foreign_server];
 	bool		repl_repl[Natts_pg_foreign_server];
 	Oid			srvId;
+	bool		forsubscription;
 	Form_pg_foreign_server srvForm;
 	ObjectAddress address;
 
@@ -1020,6 +1034,23 @@ AlterForeignServer(AlterForeignServerStmt *stmt)
 		repl_repl[Anum_pg_foreign_server_srvversion - 1] = true;
 	}
 
+	if (stmt->has_forsubscription)
+	{
+		repl_val[Anum_pg_foreign_server_srvforsubscription - 1] = stmt->forsubscription;
+		repl_repl[Anum_pg_foreign_server_srvforsubscription - 1] = true;
+		forsubscription = stmt->forsubscription;
+	}
+	else
+		forsubscription = srvForm->srvforsubscription;
+
+	if (forsubscription &&
+		!has_privs_of_role(srvForm->srvowner, ROLE_PG_CREATE_CONNECTION))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 errmsg("permission denied to alter server for subscription"),
+				 errdetail("Only roles with privileges of the \"%s\" role may alter foreign servers with FOR SUBSCRIPTION specified.",
+						   "pg_create_connection")));
+
 	if (stmt->options)
 	{
 		ForeignDataWrapper *fdw = GetForeignDataWrapper(srvForm->srvfdw);
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 983b5d17fe..1e0c2e5b99 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -608,9 +608,9 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		PreventInTransactionBlock(isTopLevel, "CREATE SUBSCRIPTION ... WITH (create_slot = true)");
 
 	/*
-	 * We don't want to allow unprivileged users to be able to trigger
-	 * attempts to access arbitrary network destinations, so require the user
-	 * to have been specifically authorized to create subscriptions.
+	 * We don't want to allow unprivileged users to utilize the resources that
+	 * a subscription requires (such as a background worker), so require the
+	 * user to have been specifically authorized to create subscriptions.
 	 */
 	if (!has_privs_of_role(owner, ROLE_PG_CREATE_SUBSCRIPTION))
 		ereport(ERROR,
@@ -685,6 +685,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 		if (aclresult != ACLCHECK_OK)
 			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
 
+		if (!server->forsubscription)
+			ereport(ERROR,
+					(errmsg("foreign server \"%s\" not usable for subscription",
+							server->servername),
+					 errhint("Specify FOR SUBSCRIPTION when creating the foreign server.")));
+
 		/* make sure a user mapping exists */
 		GetUserMapping(owner, server->serverid);
 
@@ -695,6 +701,19 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	{
 		Assert(stmt->conninfo);
 
+		/*
+		 * We don't want to allow unprivileged users to be able to trigger
+		 * attempts to access arbitrary network destinations, so require the user
+		 * to have been specifically authorized to create connections.
+		 */
+		if (!has_privs_of_role(owner, ROLE_PG_CREATE_CONNECTION))
+			ereport(ERROR,
+					(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+					 errmsg("permission denied to create subscription with a connection string"),
+					 errdetail("Only roles with privileges of the \"%s\" role may create subscriptions with CONNECTION specified.",
+							   "pg_create_connection"),
+					 errhint("Create a subscription to a foreign server by specifying SERVER instead.")));
+
 		serverid = InvalidOid;
 		conninfo = stmt->conninfo;
 	}
@@ -1334,6 +1353,12 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 									GetUserNameFromId(form->subowner, false),
 									ForeignServerName(new_server->serverid))));
 
+				if (!new_server->forsubscription)
+					ereport(ERROR,
+							(errmsg("foreign server \"%s\" not usable for subscription",
+									new_server->servername),
+							 errhint("Specify FOR SUBSCRIPTION when creating the foreign server.")));
+
 				/* make sure a user mapping exists */
 				GetUserMapping(form->subowner, new_server->serverid);
 
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index db2cf6780d..8606d57b39 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -148,6 +148,7 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
 	server->servername = pstrdup(NameStr(serverform->srvname));
 	server->owner = serverform->srvowner;
 	server->fdwid = serverform->srvfdw;
+	server->forsubscription = serverform->srvforsubscription;
 
 	/* Extract server type */
 	datum = SysCacheGetAttr(FOREIGNSERVEROID,
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index c27e0b8b5d..3abcebd8b3 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -366,6 +366,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 %type <str>		opt_type
 %type <str>		foreign_server_version opt_foreign_server_version
+%type <boolean>	for_subscription opt_for_subscription
 %type <str>		opt_in_database
 
 %type <str>		parameter_name
@@ -5397,7 +5398,7 @@ generic_option_arg:
  *****************************************************************************/
 
 CreateForeignServerStmt: CREATE SERVER name opt_type opt_foreign_server_version
-						 FOREIGN DATA_P WRAPPER name create_generic_options
+						 FOREIGN DATA_P WRAPPER name opt_for_subscription create_generic_options
 				{
 					CreateForeignServerStmt *n = makeNode(CreateForeignServerStmt);
 
@@ -5405,12 +5406,13 @@ CreateForeignServerStmt: CREATE SERVER name opt_type opt_foreign_server_version
 					n->servertype = $4;
 					n->version = $5;
 					n->fdwname = $9;
-					n->options = $10;
+					n->forsubscription = $10;
+					n->options = $11;
 					n->if_not_exists = false;
 					$$ = (Node *) n;
 				}
 				| CREATE SERVER IF_P NOT EXISTS name opt_type opt_foreign_server_version
-						 FOREIGN DATA_P WRAPPER name create_generic_options
+						 FOREIGN DATA_P WRAPPER name opt_for_subscription create_generic_options
 				{
 					CreateForeignServerStmt *n = makeNode(CreateForeignServerStmt);
 
@@ -5418,7 +5420,8 @@ CreateForeignServerStmt: CREATE SERVER name opt_type opt_foreign_server_version
 					n->servertype = $7;
 					n->version = $8;
 					n->fdwname = $12;
-					n->options = $13;
+					n->forsubscription = $13;
+					n->options = $14;
 					n->if_not_exists = true;
 					$$ = (Node *) n;
 				}
@@ -5440,6 +5443,16 @@ opt_foreign_server_version:
 			| /*EMPTY*/				{ $$ = NULL; }
 		;
 
+for_subscription:
+			FOR SUBSCRIPTION		{ $$ = true; }
+			| NO SUBSCRIPTION		{ $$ = false; }
+		;
+
+opt_for_subscription:
+			for_subscription		{ $$ = $1; }
+			| /*EMPTY*/				{ $$ = false; }
+		;
+
 /*****************************************************************************
  *
  *		QUERY :
@@ -5457,6 +5470,15 @@ AlterForeignServerStmt: ALTER SERVER name foreign_server_version alter_generic_o
 					n->has_version = true;
 					$$ = (Node *) n;
 				}
+			| ALTER SERVER name for_subscription
+				{
+					AlterForeignServerStmt *n = makeNode(AlterForeignServerStmt);
+
+					n->servername = $3;
+					n->forsubscription = $4;
+					n->has_forsubscription = true;
+					$$ = (Node *) n;
+				}
 			| ALTER SERVER name foreign_server_version
 				{
 					AlterForeignServerStmt *n = makeNode(AlterForeignServerStmt);
diff --git a/src/include/catalog/pg_authid.dat b/src/include/catalog/pg_authid.dat
index 82a2ec2862..dcfad7a0c0 100644
--- a/src/include/catalog/pg_authid.dat
+++ b/src/include/catalog/pg_authid.dat
@@ -94,5 +94,10 @@
   rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
   rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
   rolpassword => '_null_', rolvaliduntil => '_null_' },
+{ oid => '6122', oid_symbol => 'ROLE_PG_CREATE_CONNECTION',
+  rolname => 'pg_create_connection', rolsuper => 'f', rolinherit => 't',
+  rolcreaterole => 'f', rolcreatedb => 'f', rolcanlogin => 'f',
+  rolreplication => 'f', rolbypassrls => 'f', rolconnlimit => '-1',
+  rolpassword => '_null_', rolvaliduntil => '_null_' },
 
 ]
diff --git a/src/include/catalog/pg_foreign_server.h b/src/include/catalog/pg_foreign_server.h
index a4b81936b0..6736af24f5 100644
--- a/src/include/catalog/pg_foreign_server.h
+++ b/src/include/catalog/pg_foreign_server.h
@@ -31,6 +31,7 @@ CATALOG(pg_foreign_server,1417,ForeignServerRelationId)
 	NameData	srvname;		/* foreign server name */
 	Oid			srvowner BKI_LOOKUP(pg_authid); /* server owner */
 	Oid			srvfdw BKI_LOOKUP(pg_foreign_data_wrapper); /* server FDW */
+	bool		srvforsubscription BKI_DEFAULT(f); /* usable for subscription */
 
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	text		srvtype;
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index a2f04ce9af..e1d93c26ba 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -36,6 +36,7 @@ typedef struct ForeignServer
 	Oid			serverid;		/* server Oid */
 	Oid			fdwid;			/* foreign-data wrapper */
 	Oid			owner;			/* server owner user Oid */
+	bool		forsubscription;	/* usable for a subscription */
 	char	   *servername;		/* name of the server */
 	char	   *servertype;		/* server type, optional */
 	char	   *serverversion;	/* server version, optional */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 6d6b242cec..00547bbd88 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2726,6 +2726,7 @@ typedef struct CreateForeignServerStmt
 	char	   *version;		/* optional server version */
 	char	   *fdwname;		/* FDW name */
 	bool		if_not_exists;	/* just do nothing if it already exists? */
+	bool		forsubscription;	/* usable for subscription */
 	List	   *options;		/* generic options to server */
 } CreateForeignServerStmt;
 
@@ -2734,8 +2735,10 @@ typedef struct AlterForeignServerStmt
 	NodeTag		type;
 	char	   *servername;		/* server name */
 	char	   *version;		/* optional server version */
+	bool		forsubscription;	/* usable for subscription */
 	List	   *options;		/* generic options to server */
 	bool		has_version;	/* version specified */
+	bool		has_forsubscription; /* [FOR|NO] SUBSCRIPTION specified */
 } AlterForeignServerStmt;
 
 /* ----------------------
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index b0a1a3cc26..5bd812b393 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -153,19 +153,58 @@ HINT:  To initiate replication, you must manually create the replication slot, e
 DROP SUBSCRIPTION regress_testsub6;
 -- test using a server object instead of connection string
 RESET SESSION AUTHORIZATION;
+CREATE ROLE regress_connection_role;
 CREATE FOREIGN DATA WRAPPER regress_connection_fdw
   VALIDATOR pg_connection_validator;
-CREATE SERVER regress_testserver1 FOREIGN DATA WRAPPER regress_connection_fdw;
+CREATE SERVER regress_testserver1 FOREIGN DATA WRAPPER regress_connection_fdw
+  FOR SUBSCRIPTION;
 CREATE SERVER regress_testserver2 FOREIGN DATA WRAPPER regress_connection_fdw;
+ALTER SERVER regress_testserver1 OWNER TO regress_connection_role;
+ALTER SERVER regress_testserver2 OWNER TO regress_connection_role;
 CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver1
   OPTIONS (password 'secret');
 CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2
   OPTIONS (password 'secret');
 GRANT USAGE ON FOREIGN SERVER regress_testserver2 TO regress_subscription_user3;
+-- temporarily revoke pg_create_connection from pg_create_subscription
+-- to test that CREATE SUBSCRIPTION ... CONNECTION fails
+REVOKE pg_create_connection FROM pg_create_subscription;
 SET SESSION AUTHORIZATION regress_subscription_user3;
+-- fail - not a member of pg_create_connection, cannot use CONNECTION
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+ERROR:  permission denied to create subscription with a connection string
+DETAIL:  Only roles with privileges of the "pg_create_connection" role may create subscriptions with CONNECTION specified.
+HINT:  Create a subscription to a foreign server by specifying SERVER instead.
 CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver1 PUBLICATION testpub
-  WITH (slot_name = NONE, connect = false); -- fails
+  WITH (slot_name = NONE, connect = false); -- fail - no USAGE
 ERROR:  permission denied for foreign server regress_testserver1
+RESET SESSION AUTHORIZATION;
+GRANT USAGE ON FOREIGN SERVER regress_testserver1 TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver1 PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver2; -- fail - not FOR SUBSCRIPTION
+ERROR:  foreign server "regress_testserver2" not usable for subscription
+HINT:  Specify FOR SUBSCRIPTION when creating the foreign server.
+DROP SUBSCRIPTION regress_testsub6;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver2 PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false); -- fail - not FOR SUBSCRIPTION
+ERROR:  foreign server "regress_testserver2" not usable for subscription
+HINT:  Specify FOR SUBSCRIPTION when creating the foreign server.
+RESET SESSION AUTHORIZATION;
+REVOKE USAGE ON FOREIGN SERVER regress_testserver1 FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_connection_role;
+ALTER SERVER regress_testserver2 FOR SUBSCRIPTION; -- fails - need pg_create_connection
+ERROR:  permission denied to alter server for subscription
+DETAIL:  Only roles with privileges of the "pg_create_connection" role may alter foreign servers with FOR SUBSCRIPTION specified.
+RESET SESSION AUTHORIZATION;
+GRANT pg_create_connection TO regress_connection_role;
+SET SESSION AUTHORIZATION regress_connection_role;
+ALTER SERVER regress_testserver2 FOR SUBSCRIPTION;
+SET SESSION AUTHORIZATION regress_subscription_user3;
 CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver2 PUBLICATION testpub
   WITH (slot_name = NONE, connect = false);
 WARNING:  subscription was created, but is not connected
@@ -180,7 +219,7 @@ DROP SERVER regress_testserver2;
 -- test an FDW with no validator
 CREATE FOREIGN DATA WRAPPER regress_fdw;
 CREATE SERVER regress_testserver3 FOREIGN DATA WRAPPER regress_fdw
-  OPTIONS (abc 'xyz');
+  FOR SUBSCRIPTION OPTIONS (abc 'xyz');
 CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver3
   OPTIONS (password 'secret');
 GRANT USAGE ON FOREIGN SERVER regress_testserver3 TO regress_subscription_user3;
@@ -196,6 +235,8 @@ DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver1;
 DROP SERVER regress_testserver1;
 DROP FOREIGN DATA WRAPPER regress_connection_fdw;
 REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+-- re-grant pg_create_connection to pg_create_subscription
+GRANT pg_create_connection TO pg_create_subscription;
 SET SESSION AUTHORIZATION regress_subscription_user;
 \dRs+
                                                                                                            List of subscriptions
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 4d44f141b7..068a8f8c47 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -98,19 +98,56 @@ DROP SUBSCRIPTION regress_testsub6;
 -- test using a server object instead of connection string
 
 RESET SESSION AUTHORIZATION;
+CREATE ROLE regress_connection_role;
 CREATE FOREIGN DATA WRAPPER regress_connection_fdw
   VALIDATOR pg_connection_validator;
-CREATE SERVER regress_testserver1 FOREIGN DATA WRAPPER regress_connection_fdw;
+CREATE SERVER regress_testserver1 FOREIGN DATA WRAPPER regress_connection_fdw
+  FOR SUBSCRIPTION;
 CREATE SERVER regress_testserver2 FOREIGN DATA WRAPPER regress_connection_fdw;
+ALTER SERVER regress_testserver1 OWNER TO regress_connection_role;
+ALTER SERVER regress_testserver2 OWNER TO regress_connection_role;
 CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver1
   OPTIONS (password 'secret');
 CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2
   OPTIONS (password 'secret');
 GRANT USAGE ON FOREIGN SERVER regress_testserver2 TO regress_subscription_user3;
 
+-- temporarily revoke pg_create_connection from pg_create_subscription
+-- to test that CREATE SUBSCRIPTION ... CONNECTION fails
+REVOKE pg_create_connection FROM pg_create_subscription;
+
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+-- fail - not a member of pg_create_connection, cannot use CONNECTION
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver1 PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false); -- fail - no USAGE
+
+RESET SESSION AUTHORIZATION;
+GRANT USAGE ON FOREIGN SERVER regress_testserver1 TO regress_subscription_user3;
 SET SESSION AUTHORIZATION regress_subscription_user3;
+
 CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver1 PUBLICATION testpub
-  WITH (slot_name = NONE, connect = false); -- fails
+  WITH (slot_name = NONE, connect = false);
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver2; -- fail - not FOR SUBSCRIPTION
+DROP SUBSCRIPTION regress_testsub6;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver2 PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false); -- fail - not FOR SUBSCRIPTION
+
+RESET SESSION AUTHORIZATION;
+REVOKE USAGE ON FOREIGN SERVER regress_testserver1 FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+SET SESSION AUTHORIZATION regress_connection_role;
+ALTER SERVER regress_testserver2 FOR SUBSCRIPTION; -- fails - need pg_create_connection
+RESET SESSION AUTHORIZATION;
+GRANT pg_create_connection TO regress_connection_role;
+SET SESSION AUTHORIZATION regress_connection_role;
+ALTER SERVER regress_testserver2 FOR SUBSCRIPTION;
+
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
 CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver2 PUBLICATION testpub
   WITH (slot_name = NONE, connect = false);
 RESET SESSION AUTHORIZATION;
@@ -124,7 +161,7 @@ DROP SERVER regress_testserver2;
 -- test an FDW with no validator
 CREATE FOREIGN DATA WRAPPER regress_fdw;
 CREATE SERVER regress_testserver3 FOREIGN DATA WRAPPER regress_fdw
-  OPTIONS (abc 'xyz');
+  FOR SUBSCRIPTION OPTIONS (abc 'xyz');
 CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver3
   OPTIONS (password 'secret');
 GRANT USAGE ON FOREIGN SERVER regress_testserver3 TO regress_subscription_user3;
@@ -144,6 +181,10 @@ DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver1;
 DROP SERVER regress_testserver1;
 DROP FOREIGN DATA WRAPPER regress_connection_fdw;
 REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+
+-- re-grant pg_create_connection to pg_create_subscription
+GRANT pg_create_connection TO pg_create_subscription;
+
 SET SESSION AUTHORIZATION regress_subscription_user;
 
 \dRs+
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 8653423d08..81861f77e1 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -119,7 +119,7 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE FOREIGN DATA WRAPPER test_connection_fdw VALIDATOR pg_connection_validator"
 );
 $node_subscriber->safe_psql('postgres',
-	"CREATE SERVER tap_sub2_server FOREIGN DATA WRAPPER test_connection_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+	"CREATE SERVER tap_sub2_server FOREIGN DATA WRAPPER test_connection_fdw FOR SUBSCRIPTION OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
 );
 
 $node_subscriber->safe_psql('postgres',
-- 
2.34.1



view thread (63+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected]
  Subject: Re: [17] CREATE SUBSCRIPTION ... SERVER
  In-Reply-To: <[email protected]>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox