From 4663f28197b2fda75bb3f99c8447853cd1af159f Mon Sep 17 00:00:00 2001
From: Jeff Davis <jeff@j-davis.com>
Date: Tue, 2 Jan 2024 13:42:48 -0800
Subject: [PATCH v6 3/4] CREATE SUSBCRIPTION ... SERVER.

---
 contrib/postgres_fdw/Makefile                 |   2 +
 .../postgres_fdw/expected/postgres_fdw.out    |  14 ++
 contrib/postgres_fdw/meson.build              |   5 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |   8 +
 contrib/postgres_fdw/t/010_subscription.pl    |  68 +++++++
 doc/src/sgml/ref/alter_subscription.sgml      |  18 +-
 doc/src/sgml/ref/create_subscription.sgml     |  11 +-
 src/backend/catalog/pg_subscription.c         |  31 ++-
 src/backend/commands/subscriptioncmds.c       | 182 ++++++++++++++++--
 src/backend/foreign/foreign.c                 |  25 +++
 src/backend/parser/gram.y                     |  20 ++
 src/backend/replication/logical/worker.c      |  12 +-
 src/bin/pg_dump/pg_dump.c                     |  27 ++-
 src/bin/pg_dump/pg_dump.h                     |   1 +
 src/bin/psql/tab-complete.c                   |   2 +-
 src/include/catalog/pg_subscription.h         |   4 +-
 src/include/foreign/foreign.h                 |   1 +
 src/include/nodes/parsenodes.h                |   3 +
 src/test/regress/expected/foreign_data.out    |  14 ++
 src/test/regress/expected/subscription.out    |  40 ++++
 src/test/regress/sql/foreign_data.sql         |  17 ++
 src/test/regress/sql/subscription.sql         |  45 +++++
 src/test/subscription/t/001_rep_changes.pl    |  57 ++++++
 23 files changed, 579 insertions(+), 28 deletions(-)
 create mode 100644 contrib/postgres_fdw/t/010_subscription.pl

diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile
index c1b0cad453..c3498ea6b4 100644
--- a/contrib/postgres_fdw/Makefile
+++ b/contrib/postgres_fdw/Makefile
@@ -18,6 +18,8 @@ DATA = postgres_fdw--1.0.sql postgres_fdw--1.0--1.1.sql
 
 REGRESS = postgres_fdw
 
+TAP_TESTS = 1
+
 ifdef USE_PGXS
 PG_CONFIG = pg_config
 PGXS := $(shell $(PG_CONFIG) --pgxs)
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index d83f6ae8cb..0aa751e099 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -256,6 +256,20 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 -- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+SELECT pg_conninfo_from_server('testserver1', CURRENT_USER, false);
+                                                                                                                                                                                                                                   pg_conninfo_from_server                                                                                                                                                                                                                                    
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ service = 'value' connect_timeout = 'value' dbname = 'value' host = 'value' hostaddr = 'value' port = 'value' application_name = 'value' keepalives = 'value' keepalives_idle = 'value' keepalives_interval = 'value' tcp_user_timeout = 'value' sslcompression = 'value' sslmode = 'value' sslcert = 'value' sslkey = 'value' sslrootcert = 'value' sslcrl = 'value' krbsrvname = 'value' gsslib = 'value' gssdelegation = 'value' sslpassword = 'dummy' sslkey = 'value' sslcert = 'value'
+(1 row)
+
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+-- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
 CREATE PUBLICATION testpub_ftbl FOR TABLE ft1;  -- should fail
diff --git a/contrib/postgres_fdw/meson.build b/contrib/postgres_fdw/meson.build
index 2b86d8a6ee..cf7071dbf8 100644
--- a/contrib/postgres_fdw/meson.build
+++ b/contrib/postgres_fdw/meson.build
@@ -39,4 +39,9 @@ tests += {
     ],
     'regress_args': ['--dlpath', meson.build_root() / 'src/test/regress'],
   },
+  'tap': {
+    'tests': [
+      't/010_subscription.pl',
+    ],
+  },
 }
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 90c8fa4b70..3b2716b82e 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -248,6 +248,14 @@ SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1;  -- should work again
 ANALYZE ft1;
 ALTER FOREIGN TABLE ft2 OPTIONS (use_remote_estimate 'true');
 
+-- ===================================================================
+-- test subscription
+-- ===================================================================
+CREATE SUBSCRIPTION regress_pgfdw_subscription SERVER testserver1
+  PUBLICATION pub1 WITH (slot_name = NONE, connect = false);
+SELECT pg_conninfo_from_server('testserver1', CURRENT_USER, false);
+DROP SUBSCRIPTION regress_pgfdw_subscription;
+
 -- ===================================================================
 -- test error case for create publication on foreign table
 -- ===================================================================
diff --git a/contrib/postgres_fdw/t/010_subscription.pl b/contrib/postgres_fdw/t/010_subscription.pl
new file mode 100644
index 0000000000..daa0b9edd2
--- /dev/null
+++ b/contrib/postgres_fdw/t/010_subscription.pl
@@ -0,0 +1,68 @@
+
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a");
+
+# Replicate the changes without columns
+$node_publisher->safe_psql('postgres', "CREATE TABLE tab_no_col()");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_no_col default VALUES");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE EXTENSION postgres_fdw");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_ins");
+
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+	"CREATE SERVER tap_server FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE USER MAPPING FOR PUBLIC SERVER tap_server"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub SERVER tap_server PUBLICATION tap_pub WITH (password_required=false)"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins SELECT generate_series(1,50)");
+
+$node_publisher->wait_for_catchup('tap_sub');
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
+is($result, qq(1052), 'check initial data was copied to subscriber');
+
+done_testing();
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 6d36ff0dc9..6d219145a9 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -21,6 +21,7 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
+ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SERVER <replaceable>servername</replaceable>
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> CONNECTION '<replaceable>conninfo</replaceable>'
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> SET PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> ADD PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...] [ WITH ( <replaceable class="parameter">publication_option</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
@@ -94,13 +95,24 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-altersubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the foreign server
+      <replaceable>servername</replaceable>.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-altersubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
      <para>
-      This clause replaces the connection string originally set by
-      <xref linkend="sql-createsubscription"/>.  See there for more
-      information.
+      This clause replaces the foreign server or connection string originally
+      set by <xref linkend="sql-createsubscription"/> with the connection
+      string <replaceable>conninfo</replaceable>.
      </para>
     </listitem>
    </varlistentry>
diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml
index c7ace922f9..24538baf98 100644
--- a/doc/src/sgml/ref/create_subscription.sgml
+++ b/doc/src/sgml/ref/create_subscription.sgml
@@ -22,7 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
 <synopsis>
 CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceable>
-    CONNECTION '<replaceable class="parameter">conninfo</replaceable>'
+    { SERVER <replaceable class="parameter">servername</replaceable> | CONNECTION '<replaceable class="parameter">conninfo</replaceable>' }
     PUBLICATION <replaceable class="parameter">publication_name</replaceable> [, ...]
     [ WITH ( <replaceable class="parameter">subscription_parameter</replaceable> [= <replaceable class="parameter">value</replaceable>] [, ... ] ) ]
 </synopsis>
@@ -77,6 +77,15 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
     </listitem>
    </varlistentry>
 
+   <varlistentry id="sql-createsubscription-params-server">
+    <term><literal>SERVER <replaceable class="parameter">servername</replaceable></literal></term>
+    <listitem>
+     <para>
+      A foreign server to use for the connection.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry id="sql-createsubscription-params-connection">
     <term><literal>CONNECTION '<replaceable class="parameter">conninfo</replaceable>'</literal></term>
     <listitem>
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index c516c25ac7..b3cbc170d5 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -20,12 +20,15 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "catalog/indexing.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "storage/lmgr.h"
+#include "utils/acl.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -75,10 +78,30 @@ GetSubscription(Oid subid, bool missing_ok)
 	sub->runasowner = subform->subrunasowner;
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
-								   tup,
-								   Anum_pg_subscription_subconninfo);
-	sub->conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(subform->subserver))
+	{
+		AclResult	aclresult;
+
+		/* recheck ACL */
+		aclresult = object_aclcheck(ForeignServerRelationId,
+									subform->subserver,
+									subform->subowner, ACL_USAGE);
+
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER,
+						   ForeignServerName(subform->subserver));
+
+		sub->conninfo = ForeignServerConnectionString(subform->subowner,
+													  subform->subserver,
+													  true);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
+									   tup,
+									   Anum_pg_subscription_subconninfo);
+		sub->conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID,
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 75e6cd8ae3..60287c73e7 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -25,14 +25,17 @@
 #include "catalog/objectaddress.h"
 #include "catalog/pg_authid_d.h"
 #include "catalog/pg_database_d.h"
+#include "catalog/pg_foreign_server.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
+#include "catalog/pg_user_mapping.h"
 #include "commands/dbcommands.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/subscriptioncmds.h"
 #include "executor/executor.h"
+#include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "pgstat.h"
@@ -574,6 +577,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	Datum		values[Natts_pg_subscription];
 	Oid			owner = GetUserId();
 	HeapTuple	tup;
+	Oid			serverid;
+	Oid			umid;
 	char	   *conninfo;
 	char		originname[NAMEDATALEN];
 	List	   *publications;
@@ -666,15 +671,42 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.synchronous_commit == NULL)
 		opts.synchronous_commit = "off";
 
-	conninfo = stmt->conninfo;
-	publications = stmt->publication;
-
 	/* Load the library providing us libpq calls. */
 	load_file("libpqwalreceiver", false);
 
+	if (stmt->servername)
+	{
+		ForeignServer	*server;
+		UserMapping		*um;
+
+		Assert(!stmt->conninfo);
+		conninfo = NULL;
+
+		server = GetForeignServerByName(stmt->servername, false);
+		aclresult = object_aclcheck(ForeignServerRelationId, server->serverid, owner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
+
+		um = GetUserMapping(owner, server->serverid);
+
+		serverid = server->serverid;
+		umid = um->umid;
+		conninfo = ForeignServerConnectionString(owner, serverid, true);
+	}
+	else
+	{
+		Assert(stmt->conninfo);
+
+		serverid = InvalidOid;
+		umid = InvalidOid;
+		conninfo = stmt->conninfo;
+	}
+
 	/* Check the connection info string. */
 	walrcv_check_conninfo(conninfo, opts.passwordrequired && !superuser());
 
+	publications = stmt->publication;
+
 	/* Everything ok, form a new tuple. */
 	memset(values, 0, sizeof(values));
 	memset(nulls, false, sizeof(nulls));
@@ -697,8 +729,12 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
 	values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
 	values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
-	values[Anum_pg_subscription_subconninfo - 1] =
-		CStringGetTextDatum(conninfo);
+	values[Anum_pg_subscription_subserver - 1] = serverid;
+	if (!OidIsValid(serverid))
+		values[Anum_pg_subscription_subconninfo - 1] =
+			CStringGetTextDatum(conninfo);
+	else
+		nulls[Anum_pg_subscription_subconninfo - 1] = true;
 	if (opts.slot_name)
 		values[Anum_pg_subscription_subslotname - 1] =
 			DirectFunctionCall1(namein, CStringGetDatum(opts.slot_name));
@@ -719,6 +755,20 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
+	if (stmt->servername)
+	{
+		ObjectAddress referenced;
+		Assert(OidIsValid(serverid) && OidIsValid(umid));
+
+		ObjectAddressSet(referenced, ForeignServerRelationId, serverid);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+		ObjectAddressSet(referenced, UserMappingRelationId, umid);
+		recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+	}
+
 	ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
 	replorigin_create(originname);
 
@@ -835,8 +885,6 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	if (opts.enabled)
 		ApplyLauncherWakeupAtCommit();
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostCreateHook(SubscriptionRelationId, subid, 0);
 
 	return myself;
@@ -1124,6 +1172,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	memset(nulls, false, sizeof(nulls));
 	memset(replaces, false, sizeof(replaces));
 
+	ObjectAddressSet(myself, SubscriptionRelationId, subid);
+
 	switch (stmt->kind)
 	{
 		case ALTER_SUBSCRIPTION_OPTIONS:
@@ -1244,7 +1294,89 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				break;
 			}
 
+		case ALTER_SUBSCRIPTION_SERVER:
+			{
+				ForeignServer	*new_server;
+				UserMapping		*new_um;
+				ObjectAddress	 referenced;
+				AclResult		 aclresult;
+				char			*conninfo;
+
+				/*
+				 * Remove what was there before, either another foreign server
+				 * or a connection string.
+				 */
+				if (form->subserver)
+				{
+					UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+
+					deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+													   DEPENDENCY_NORMAL,
+													   ForeignServerRelationId, form->subserver);
+					deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+													   DEPENDENCY_NORMAL,
+													   UserMappingRelationId, old_um->umid);
+				}
+				else
+				{
+					nulls[Anum_pg_subscription_subconninfo - 1] = true;
+					replaces[Anum_pg_subscription_subconninfo - 1] = true;
+				}
+
+				/*
+				 * Find the new server and user mapping. Check ACL of server
+				 * based on current user ID, but find the user mapping based
+				 * on the subscription owner.
+				 */
+				new_server = GetForeignServerByName(stmt->servername, false);
+				aclresult = object_aclcheck(ForeignServerRelationId,
+											new_server->serverid,
+											form->subowner, ACL_USAGE);
+				if (aclresult != ACLCHECK_OK)
+					aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER,
+								   new_server->servername);
+
+				new_um = GetUserMapping(form->subowner, new_server->serverid);
+
+				conninfo = ForeignServerConnectionString(form->subowner,
+														 new_server->serverid,
+														 true);
+
+				/* Load the library providing us libpq calls. */
+				load_file("libpqwalreceiver", false);
+				/* Check the connection info string. */
+				walrcv_check_conninfo(conninfo,
+									  sub->passwordrequired && !sub->ownersuperuser);
+
+				values[Anum_pg_subscription_subserver - 1] = new_server->serverid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+
+				ObjectAddressSet(referenced, ForeignServerRelationId, new_server->serverid);
+				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+
+				ObjectAddressSet(referenced, UserMappingRelationId, new_um->umid);
+				recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
+				update_tuple = true;
+			}
+			break;
+
 		case ALTER_SUBSCRIPTION_CONNECTION:
+			/* remove reference to foreign server and dependencies, if present */
+			if (form->subserver)
+			{
+				UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+
+				deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+												   DEPENDENCY_NORMAL,
+												   ForeignServerRelationId, form->subserver);
+				deleteDependencyRecordsForSpecific(SubscriptionRelationId, form->oid,
+												   DEPENDENCY_NORMAL,
+												   UserMappingRelationId, old_um->umid);
+
+				values[Anum_pg_subscription_subserver - 1] = InvalidOid;
+				replaces[Anum_pg_subscription_subserver - 1] = true;
+			}
+
 			/* Load the library providing us libpq calls. */
 			load_file("libpqwalreceiver", false);
 			/* Check the connection info string. */
@@ -1455,8 +1587,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 	table_close(rel, RowExclusiveLock);
 
-	ObjectAddressSet(myself, SubscriptionRelationId, subid);
-
 	InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
 
 	/* Wake up related replication workers to handle this change quickly. */
@@ -1541,9 +1671,25 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	subname = pstrdup(NameStr(*DatumGetName(datum)));
 
 	/* Get conninfo */
-	datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
-								   Anum_pg_subscription_subconninfo);
-	conninfo = TextDatumGetCString(datum);
+	if (OidIsValid(form->subserver))
+	{
+		AclResult aclresult;
+
+		aclresult = object_aclcheck(ForeignServerRelationId, form->subserver,
+									form->subowner, ACL_USAGE);
+		if (aclresult != ACLCHECK_OK)
+			aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER,
+						   ForeignServerName(form->subserver));
+
+		conninfo = ForeignServerConnectionString(form->subowner,
+												 form->subserver, true);
+	}
+	else
+	{
+		datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, tup,
+									   Anum_pg_subscription_subconninfo);
+		conninfo = TextDatumGetCString(datum);
+	}
 
 	/* Get slotname */
 	datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
@@ -1644,6 +1790,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	}
 
 	/* Clean up dependencies */
+	deleteDependencyRecordsFor(SubscriptionRelationId, subid, false);
 	deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0);
 
 	/* Remove any associated relation synchronization states. */
@@ -1853,6 +2000,17 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 		aclcheck_error(aclresult, OBJECT_DATABASE,
 					   get_database_name(MyDatabaseId));
 
+	if (form->subserver)
+	{
+		UserMapping *old_um = GetUserMapping(form->subowner, form->subserver);
+		UserMapping *new_um = GetUserMapping(newOwnerId, form->subserver);
+
+		if (changeDependencyFor(SubscriptionRelationId, form->oid,
+								UserMappingRelationId, old_um->umid, new_um->umid) != 1)
+			elog(ERROR, "could not change user mapping dependency for subscription %u",
+				 form->oid);
+	}
+
 	form->subowner = newOwnerId;
 	CatalogTupleUpdate(rel, &tup->t_self, tup);
 
diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index fd1b644d72..d5d78c347e 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -180,6 +180,31 @@ GetForeignServerExtended(Oid serverid, bits16 flags)
 }
 
 
+/*
+ * ForeignServerName - get name of foreign server.
+ */
+char *
+ForeignServerName(Oid serverid)
+{
+	Form_pg_foreign_server serverform;
+	char *servername;
+	HeapTuple	tp;
+
+	tp = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverid));
+
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for foreign server %u", serverid);
+
+	serverform = (Form_pg_foreign_server) GETSTRUCT(tp);
+
+	servername = pstrdup(NameStr(serverform->srvname));
+
+	ReleaseSysCache(tp);
+
+	return servername;
+}
+
+
 /*
  * GetForeignServerByName - look up the foreign server definition by name.
  */
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 6b88096e8e..0a024ab637 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -10648,6 +10648,16 @@ CreateSubscriptionStmt:
 					n->options = $8;
 					$$ = (Node *) n;
 				}
+			| CREATE SUBSCRIPTION name SERVER name PUBLICATION name_list opt_definition
+				{
+					CreateSubscriptionStmt *n =
+						makeNode(CreateSubscriptionStmt);
+					n->subname = $3;
+					n->servername = $5;
+					n->publication = $7;
+					n->options = $8;
+					$$ = (Node *) n;
+				}
 		;
 
 /*****************************************************************************
@@ -10677,6 +10687,16 @@ AlterSubscriptionStmt:
 					n->conninfo = $5;
 					$$ = (Node *) n;
 				}
+			| ALTER SUBSCRIPTION name SERVER name
+				{
+					AlterSubscriptionStmt *n =
+						makeNode(AlterSubscriptionStmt);
+
+					n->kind = ALTER_SUBSCRIPTION_SERVER;
+					n->subname = $3;
+					n->servername = $5;
+					$$ = (Node *) n;
+				}
 			| ALTER SUBSCRIPTION name REFRESH PUBLICATION opt_definition
 				{
 					AlterSubscriptionStmt *n =
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 911835c5cb..4cb94a1ee8 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4003,7 +4003,9 @@ maybe_reread_subscription(void)
 }
 
 /*
- * Callback from subscription syscache invalidation.
+ * Callback from subscription syscache invalidation. Also needed for server or
+ * user mapping invalidation, which can change the connection information for
+ * subscriptions that connect using a server object.
  */
 static void
 subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -4639,6 +4641,14 @@ InitializeLogRepWorker(void)
 	CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
 								  subscription_change_cb,
 								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
+								  subscription_change_cb,
+								  (Datum) 0);
+	/* Keep us informed about subscription changes. */
+	CacheRegisterSyscacheCallback(USERMAPPINGOID,
+								  subscription_change_cb,
+								  (Datum) 0);
 
 	CacheRegisterSyscacheCallback(AUTHOID,
 								  subscription_change_cb,
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 7b9c79005e..6456416ad3 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -4655,6 +4655,7 @@ getSubscriptions(Archive *fout)
 	int			i_subdisableonerr;
 	int			i_subpasswordrequired;
 	int			i_subrunasowner;
+	int			i_subservername;
 	int			i_subconninfo;
 	int			i_subslotname;
 	int			i_subsynccommit;
@@ -4726,10 +4727,12 @@ getSubscriptions(Archive *fout)
 						  LOGICALREP_ORIGIN_ANY);
 
 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
-		appendPQExpBufferStr(query, " o.remote_lsn AS suboriginremotelsn,\n"
+		appendPQExpBufferStr(query, " fs.srvname AS subservername,\n"
+							 " o.remote_lsn AS suboriginremotelsn,\n"
 							 " s.subenabled\n");
 	else
-		appendPQExpBufferStr(query, " NULL AS suboriginremotelsn,\n"
+		appendPQExpBufferStr(query, " NULL AS subservername,\n"
+							 " NULL AS suboriginremotelsn,\n"
 							 " false AS subenabled\n");
 
 	appendPQExpBufferStr(query,
@@ -4737,6 +4740,8 @@ getSubscriptions(Archive *fout)
 
 	if (dopt->binary_upgrade && fout->remoteVersion >= 170000)
 		appendPQExpBufferStr(query,
+							 "LEFT JOIN pg_catalog.pg_foreign_server fs \n"
+							 "    ON fs.oid = s.subserver \n"
 							 "LEFT JOIN pg_catalog.pg_replication_origin_status o \n"
 							 "    ON o.external_id = 'pg_' || s.oid::text \n");
 
@@ -4762,6 +4767,7 @@ getSubscriptions(Archive *fout)
 	i_subdisableonerr = PQfnumber(res, "subdisableonerr");
 	i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
 	i_subrunasowner = PQfnumber(res, "subrunasowner");
+	i_subservername = PQfnumber(res, "subservername");
 	i_subconninfo = PQfnumber(res, "subconninfo");
 	i_subslotname = PQfnumber(res, "subslotname");
 	i_subsynccommit = PQfnumber(res, "subsynccommit");
@@ -4781,7 +4787,10 @@ getSubscriptions(Archive *fout)
 		AssignDumpId(&subinfo[i].dobj);
 		subinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_subname));
 		subinfo[i].rolname = getRoleName(PQgetvalue(res, i, i_subowner));
-
+		if (PQgetisnull(res, i, i_subservername))
+			subinfo[i].subservername = NULL;
+		else
+			subinfo[i].subservername = pg_strdup(PQgetvalue(res, i, i_subservername));
 		subinfo[i].subbinary =
 			pg_strdup(PQgetvalue(res, i, i_subbinary));
 		subinfo[i].substream =
@@ -5007,9 +5016,17 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo)
 	appendPQExpBuffer(delq, "DROP SUBSCRIPTION %s;\n",
 					  qsubname);
 
-	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s CONNECTION ",
+	appendPQExpBuffer(query, "CREATE SUBSCRIPTION %s ",
 					  qsubname);
-	appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	if (subinfo->subservername)
+	{
+		appendPQExpBuffer(query, "SERVER %s", fmtId(subinfo->subservername));
+	}
+	else
+	{
+		appendPQExpBuffer(query, "CONNECTION ");
+		appendStringLiteralAH(query, subinfo->subconninfo, fout);
+	}
 
 	/* Build list of quoted publications and append them to query. */
 	if (!parsePGArray(subinfo->subpublications, &pubnames, &npubnames))
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 9a34347cfc..d3aaa26861 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -669,6 +669,7 @@ typedef struct _SubscriptionInfo
 	char	   *subdisableonerr;
 	char	   *subpasswordrequired;
 	char	   *subrunasowner;
+	char	   *subservername;
 	char	   *subconninfo;
 	char	   *subslotname;
 	char	   *subsynccommit;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 09914165e4..995f4f7f22 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -3322,7 +3322,7 @@ psql_completion(const char *text, int start, int end)
 
 /* CREATE SUBSCRIPTION */
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny))
-		COMPLETE_WITH("CONNECTION");
+		COMPLETE_WITH("SERVER", "CONNECTION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION", MatchAny))
 		COMPLETE_WITH("PUBLICATION");
 	else if (Matches("CREATE", "SUBSCRIPTION", MatchAny, "CONNECTION",
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index ca32625585..74e904b11d 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -93,9 +93,11 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
 	bool		subrunasowner;	/* True if replication should execute as the
 								 * subscription owner */
 
+	Oid			subserver;		/* Set if connecting with server */
+
 #ifdef CATALOG_VARLEN			/* variable-length fields start here */
 	/* Connection string to the publisher */
-	text		subconninfo BKI_FORCE_NOT_NULL;
+	text		subconninfo;	/* Set if connecting with connection string */
 
 	/* Slot name on publisher */
 	NameData	subslotname BKI_FORCE_NULL;
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index b5b9b97f4d..a2f04ce9af 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -65,6 +65,7 @@ typedef struct ForeignTable
 
 
 extern ForeignServer *GetForeignServer(Oid serverid);
+extern char *ForeignServerName(Oid serverid);
 extern ForeignServer *GetForeignServerExtended(Oid serverid,
 											   bits16 flags);
 extern ForeignServer *GetForeignServerByName(const char *srvname,
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b3181f34ae..6d6b242cec 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -4041,6 +4041,7 @@ typedef struct CreateSubscriptionStmt
 {
 	NodeTag		type;
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
@@ -4049,6 +4050,7 @@ typedef struct CreateSubscriptionStmt
 typedef enum AlterSubscriptionType
 {
 	ALTER_SUBSCRIPTION_OPTIONS,
+	ALTER_SUBSCRIPTION_SERVER,
 	ALTER_SUBSCRIPTION_CONNECTION,
 	ALTER_SUBSCRIPTION_SET_PUBLICATION,
 	ALTER_SUBSCRIPTION_ADD_PUBLICATION,
@@ -4063,6 +4065,7 @@ typedef struct AlterSubscriptionStmt
 	NodeTag		type;
 	AlterSubscriptionType kind; /* ALTER_SUBSCRIPTION_OPTIONS, etc */
 	char	   *subname;		/* Name of the subscription */
+	char	   *servername;		/* Server name of publisher */
 	char	   *conninfo;		/* Connection string to publisher */
 	List	   *publication;	/* One or more publication to subscribe to */
 	List	   *options;		/* List of DefElem nodes */
diff --git a/src/test/regress/expected/foreign_data.out b/src/test/regress/expected/foreign_data.out
index d51e83ff92..6685c134c6 100644
--- a/src/test/regress/expected/foreign_data.out
+++ b/src/test/regress/expected/foreign_data.out
@@ -432,6 +432,20 @@ SELECT pg_conninfo_from_server('connection_server', 'regress_test_role2', false)
 DROP USER MAPPING FOR regress_test_role SERVER connection_server;
 DROP USER MAPPING FOR PUBLIC SERVER connection_server;
 DROP SERVER connection_server;
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;   -- ERROR: no permissions on FDW
+ERROR:  permission denied for foreign-data wrapper pg_connection_fdw
+RESET ROLE;
+GRANT USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw TO regress_test_role;
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+ERROR:  schema "bar" does not exist
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
+RESET ROLE;
+REVOKE USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw FROM regress_test_role;
 -- ALTER SERVER
 ALTER SERVER s0;                                            -- ERROR
 ERROR:  syntax error at or near ";"
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index b15eddbff3..7b866a6fe6 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -144,6 +144,46 @@ ERROR:  could not connect to the publisher: invalid port number: "-1"
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
+RESET SESSION AUTHORIZATION;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+DROP SUBSCRIPTION regress_testsub6;
+-- test using a server object instead of connection string
+RESET SESSION AUTHORIZATION;
+CREATE SERVER regress_testserver FOREIGN DATA WRAPPER pg_connection_fdw;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver
+  OPTIONS (password 'secret');
+GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+RESET SESSION AUTHORIZATION;
+-- test an FDW with no validator
+CREATE FOREIGN DATA WRAPPER regress_fdw;
+CREATE SERVER regress_testserver2 FOREIGN DATA WRAPPER regress_fdw
+  OPTIONS (abc 'xyz');
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2
+  OPTIONS (password 'secret');
+GRANT USAGE ON FOREIGN SERVER regress_testserver2 TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver2;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver;
+RESET SESSION AUTHORIZATION;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2;
+DROP SERVER regress_testserver2;
+DROP FOREIGN DATA WRAPPER regress_fdw;
+ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping
+ERROR:  user mapping not found for user "regress_subscription_user", server "regress_testserver"
+DROP SUBSCRIPTION regress_testsub6;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver;
+DROP SERVER regress_testserver;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
 \dRs+
                                                                                                            List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit |          Conninfo           | Skip LSN 
diff --git a/src/test/regress/sql/foreign_data.sql b/src/test/regress/sql/foreign_data.sql
index 43461de7f9..337acafc2d 100644
--- a/src/test/regress/sql/foreign_data.sql
+++ b/src/test/regress/sql/foreign_data.sql
@@ -212,6 +212,23 @@ DROP USER MAPPING FOR regress_test_role SERVER connection_server;
 DROP USER MAPPING FOR PUBLIC SERVER connection_server;
 DROP SERVER connection_server;
 
+SET ROLE regress_test_role;
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;   -- ERROR: no permissions on FDW
+RESET ROLE;
+GRANT USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw TO regress_test_role;
+SET ROLE regress_test_role;
+
+CREATE SERVER t3 FOREIGN DATA WRAPPER pg_connection_fdw;
+
+IMPORT FOREIGN SCHEMA foo FROM SERVER t3 INTO bar; -- fails
+
+CREATE USER MAPPING FOR PUBLIC SERVER t3 OPTIONS (user 'x', password 'secret');
+DROP USER MAPPING FOR PUBLIC SERVER t3;
+DROP SERVER t3;
+
+RESET ROLE;
+REVOKE USAGE ON FOREIGN DATA WRAPPER pg_connection_fdw FROM regress_test_role;
+
 -- ALTER SERVER
 ALTER SERVER s0;                                            -- ERROR
 ALTER SERVER s0 OPTIONS (a '1');                            -- ERROR
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 444e563ff3..95c826030b 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -88,6 +88,51 @@ CREATE SUBSCRIPTION regress_testsub5 CONNECTION 'port=-1' PUBLICATION testpub;
 -- fail - invalid connection string during ALTER
 ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 
+RESET SESSION AUTHORIZATION;
+GRANT CREATE ON DATABASE REGRESSION TO regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user3;
+
+CREATE SUBSCRIPTION regress_testsub6 CONNECTION 'dbname=regress_doesnotexist password=regress_fakepassword' PUBLICATION testpub WITH (slot_name = NONE, connect = false);
+DROP SUBSCRIPTION regress_testsub6;
+
+-- test using a server object instead of connection string
+
+RESET SESSION AUTHORIZATION;
+CREATE SERVER regress_testserver FOREIGN DATA WRAPPER pg_connection_fdw;
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver
+  OPTIONS (password 'secret');
+GRANT USAGE ON FOREIGN SERVER regress_testserver TO regress_subscription_user3;
+
+SET SESSION AUTHORIZATION regress_subscription_user3;
+CREATE SUBSCRIPTION regress_testsub6 SERVER regress_testserver PUBLICATION testpub
+  WITH (slot_name = NONE, connect = false);
+RESET SESSION AUTHORIZATION;
+
+-- test an FDW with no validator
+CREATE FOREIGN DATA WRAPPER regress_fdw;
+CREATE SERVER regress_testserver2 FOREIGN DATA WRAPPER regress_fdw
+  OPTIONS (abc 'xyz');
+CREATE USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2
+  OPTIONS (password 'secret');
+GRANT USAGE ON FOREIGN SERVER regress_testserver2 TO regress_subscription_user3;
+
+SET SESSION AUTHORIZATION regress_subscription_user3;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver2;
+ALTER SUBSCRIPTION regress_testsub6 SERVER regress_testserver;
+
+RESET SESSION AUTHORIZATION;
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver2;
+DROP SERVER regress_testserver2;
+DROP FOREIGN DATA WRAPPER regress_fdw;
+
+ALTER SUBSCRIPTION regress_testsub6 OWNER TO regress_subscription_user; -- fail, no user mapping
+DROP SUBSCRIPTION regress_testsub6;
+
+DROP USER MAPPING FOR regress_subscription_user3 SERVER regress_testserver;
+DROP SERVER regress_testserver;
+REVOKE CREATE ON DATABASE regression FROM regress_subscription_user3;
+SET SESSION AUTHORIZATION regress_subscription_user;
+
 \dRs+
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 9ccebd890a..4cbf2dceaa 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -27,6 +27,8 @@ $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_notrep AS SELECT generate_series(1,10) AS a");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_ins AS SELECT generate_series(1,1002) AS a");
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE tab_ins2 AS SELECT generate_series(1,1002) AS a");
 $node_publisher->safe_psql('postgres',
 	"CREATE TABLE tab_full AS SELECT generate_series(1,10) AS a");
 $node_publisher->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
@@ -65,6 +67,7 @@ $node_publisher->safe_psql('postgres',
 # Setup structure on subscriber
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_notrep (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins (a int)");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_ins2 (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full (a int)");
 $node_subscriber->safe_psql('postgres', "CREATE TABLE tab_full2 (x text)");
 $node_subscriber->safe_psql('postgres',
@@ -110,6 +113,22 @@ $node_subscriber->safe_psql('postgres',
 	"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub, tap_pub_ins_only"
 );
 
+my $publisher_host = $node_publisher->host;
+my $publisher_port = $node_publisher->port;
+$node_subscriber->safe_psql('postgres',
+	"CREATE SERVER tap_sub2_server FOREIGN DATA WRAPPER pg_connection_fdw OPTIONS (host '$publisher_host', port '$publisher_port', dbname 'postgres')"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE USER MAPPING FOR PUBLIC SERVER tap_sub2_server"
+);
+
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_simple_pub FOR TABLE tab_ins2");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub2 SERVER tap_sub2_server PUBLICATION tap_simple_pub WITH (password_required=false)"
+);
+
 # Wait for initial table sync to finish
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
@@ -121,11 +140,22 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins");
 is($result, qq(1002), 'check initial data was copied to subscriber');
 
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins2");
+is($result, qq(1002), 'check initial data was copied to subscriber');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 CONNECTION '$publisher_connstr'");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20");
 $node_publisher->safe_psql('postgres', "UPDATE tab_ins SET a = -a");
 
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins2 SELECT generate_series(1,50)");
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub2 SERVER tap_sub2_server");
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_rep SELECT generate_series(1,50)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep WHERE a > 20");
@@ -158,6 +188,10 @@ $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_ins");
 is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
 
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1052|1|1002), 'check replicated inserts on subscriber');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_rep");
 is($result, qq(20|-20|-1), 'check replicated changes on subscriber');
@@ -449,10 +483,27 @@ $node_publisher->poll_query_until('postgres',
   or die
   "Timed out while waiting for apply to restart after changing PUBLICATION";
 
+# test that changes to a foreign server subscription cause the worker
+# to restart
+$oldpid = $node_publisher->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER SERVER tap_sub2_server OPTIONS (sslmode 'disable')"
+);
+$node_publisher->poll_query_until('postgres',
+	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub2' AND state = 'streaming';"
+  )
+  or die
+  "Timed out while waiting for apply to restart after changing PUBLICATION";
+
 $node_publisher->safe_psql('postgres',
 	"INSERT INTO tab_ins SELECT generate_series(1001,1100)");
 $node_publisher->safe_psql('postgres', "DELETE FROM tab_rep");
 
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab_ins2 SELECT generate_series(1001,1100)");
+
 # Restart the publisher and check the state of the subscriber which
 # should be in a streaming state after catching up.
 $node_publisher->stop('fast');
@@ -465,6 +516,11 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, qq(1152|1|1100),
 	'check replicated inserts after subscription publication change');
 
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*), min(a), max(a) FROM tab_ins2");
+is($result, qq(1152|1|1100),
+	'check replicated inserts after subscription publication change');
+
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*), min(a), max(a) FROM tab_rep");
 is($result, qq(20|-20|-1),
@@ -533,6 +589,7 @@ $node_publisher->poll_query_until('postgres',
 
 # check all the cleanup
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_renamed");
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub2");
 
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_subscription");
-- 
2.34.1

