public inbox for [email protected]  
help / color / mirror / Atom feed
From: Hayato Kuroda (Fujitsu) <[email protected]>
To: 'Amit Kapila' <[email protected]>
To: Gyan Sreejith <[email protected]>
Cc: Shlok Kyal <[email protected]>
Cc: vignesh C <[email protected]>
Cc: Euler Taveira <[email protected]>
Cc: [email protected] <[email protected]>
Cc: Peter Smith <[email protected]>
Subject: RE: [Proposal] Adding Log File Capability to pg_createsubscriber
Date: Wed, 18 Mar 2026 13:44:37 +0000
Message-ID: <OS9PR01MB1214902CC518A8102333FA88DF54EA@OS9PR01MB12149.jpnprd01.prod.outlook.com> (raw)
In-Reply-To: <CAA4eK1K1GLKiuFmcpcKJzhS-o2o2-aKEuaGWmHz-MfJhttaKFg@mail.gmail.com>
References: <CAEqnbaUthOQARV1dscGvB_EsqC-YfxiM6rWkVDHc+G+f4oSUHw@mail.gmail.com>
	<CAHut+PvizcpeHA1Twf_hwe=wANQ1LV5zY6_q+39gTFJc7+bCKg@mail.gmail.com>
	<CAEqnbaVNZYbB_YufchM49d=XC1ZGrVV8ikCPmGotWoCZASY3Uw@mail.gmail.com>
	<CAEqnbaV8QMbXZtt25QtGUPAaQZnD-B0HniFSroapMh+QTZgKsQ@mail.gmail.com>
	<OSCPR01MB14966FD0961F512B29BD46D6BF5AAA@OSCPR01MB14966.jpnprd01.prod.outlook.com>
	<CALDaNm0gX2D6fD5yur-R5gagA7+AfLmLZU5Z8+tgjo61b-Y01w@mail.gmail.com>
	<[email protected]>
	<CAA4eK1+2PYU-y2weY_zFeysP8Ux45iBNdaYMZuJtMTqXwCxawQ@mail.gmail.com>
	<CAEqnbaVVp_g1m1nhOBZjtiAz5W-xwPQCmWH4hjYYehA+ktYg9Q@mail.gmail.com>
	<CALDaNm2T__Uha1fn274bP3jKDSwm37ewWYvA+vOo75FTT2t3SA@mail.gmail.com>
	<CALDaNm1fF6yBa2PCv9gO_MVcjKLQFgi9eAL=g5cU=xtbOr=qZw@mail.gmail.com>
	<CAEqnbaUa27R9OcVDHkNuwsZ1uCpy8sTXpH78N1ug0kj60mnchw@mail.gmail.com>
	<CALDaNm1Mj+eNVETmYAYd4ojrbTbQU5iCX20Fso24r2VuXF69AQ@mail.gmail.com>
	<CAEqnbaUWSPGHLL2nuyNHVKU0TB7uGBx9w0NvnisWFyf5TtwRTQ@mail.gmail.com>
	<CALDaNm3Dj7WRdfD+fQ86XzS07vewaFSWx--kB5689_sC7rt1Uw@mail.gmail.com>
	<CAEqnbaVgFU2Pr=xhhDmA=sK7XPBDBxECovqbSht91ZbHmnteUg@mail.gmail.com>
	<CALDaNm17ujrJ2xHyz6r81WAiFUs38RcT8D5ebdxssASpGko0HA@mail.gmail.com>
	<CAEqnbaUbVgGMXBNFbk=sqFJT=_TkAUYGYsFHJKFw5K492M_B=A@mail.gmail.com>
	<CALDaNm2SYq_dO7qXHn+BnxmxvmBupH6tofUnx4DKC09h5xfAyQ@mail.gmail.com>
	<CAEqnbaWjoSby+_FQOKqTiDwf9AsWVjcGzRn-BQtdivC8xn0ADw@mail.gmail.com>
	<[email protected]>
	<CAA4eK1+jd6zH-LL8FOrw01chrNetRoZGKXSqV7GNo_--UXYc_w@mail.gmail.com>
	<CAEqnbaU50vLy031AbvmfXJ3_qv9iS4pVaMdmsnmpF87r=EqW7Q@mail.gmail.com>
	<CALDaNm1P5pZ2tTcaVRvjUAXLAyT7S18GPwp-4bc0xKXKZoXi0w@mail.gmail.com>
	<CAEqnbaWxYp7TbnhKfkjW8c=jRPARsQ1on86bt_pcqvK9vCtLhQ@mail.gmail.com>
	<CANhcyEWaCV7B54oOhuE_BhVCF887h23jKmkHDDWvJRKZ=pBbBQ@mail.gmail.com>
	<CAEqnbaVRQwPNYqXVJu3YmPiS0d-kG00H81fEBR8ZULP0GJrp7Q@mail.gmail.com>
	<CAA4eK1K1GLKiuFmcpcKJzhS-o2o2-aKEuaGWmHz-MfJhttaKFg@mail.gmail.com>

Dear Amit,

> 
> This looks odd to me. I think it would be better to encapsulate this
> in one function (something like we have in pg_log_v) and then based on
> log level, do required handling.
>

Based on the suggestion, I updated patches. Please see attached series.

v11-0001 replaces existing pg_log_xxx families to new reporting function
pg_createsub_log(). Now only pg_fatal is overwritten, which is same as pg_uprade.

v11-0002 is the rebased version of v10. Nothing new, but codes are adjusted based
on 0001.

v11-0003 contains changes from me [1]. 0002 and 0003 can be combined if it's OK.

[1]: https://www.postgresql.org/message-id/OS9PR01MB12149C7DE09F13C3BCBD9357DF54EA%40OS9PR01MB12149.jpnpr...

Best regards,
Hayato Kuroda
FUJITSU LIMITED



Attachments:

  [application/octet-stream] v11-0001-pg_createsubscriber-use-own-reporting-functions.patch (55.0K, 2-v11-0001-pg_createsubscriber-use-own-reporting-functions.patch)
  download | inline diff:
From 1048938cae87dd47eb486ccf01718b4be1b31eea Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 18 Mar 2026 20:20:50 +0900
Subject: [PATCH v11 1/3] pg_createsubscriber: use own reporting functions

This commit converts all pg_log_xxx families to call a new reporting function
pg_createsub_log(). Also, pg_fatal() is overwritten to use its own.

This commit changes nothing from the outside, but is needed for the upcoming
commit.
---
 src/bin/pg_basebackup/pg_createsubscriber.c | 706 +++++++++++++-------
 1 file changed, 453 insertions(+), 253 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 2bc84505aab..b02af9a4cc0 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -31,6 +31,12 @@
 #include "fe_utils/version.h"
 #include "getopt_long.h"
 
+/*
+ * For now, pg_createsubscriber does not use common/logging.c; use our own
+ * pg_fatal.
+ */
+#undef pg_fatal
+
 #define	DEFAULT_SUB_PORT	"50432"
 #define	OBJECTTYPE_PUBLICATIONS  0x0001
 
@@ -146,6 +152,11 @@ static void drop_existing_subscription(PGconn *conn, const char *subname,
 									   const char *dbname);
 static void get_publisher_databases(struct CreateSubscriberOptions *opt,
 									bool dbnamespecified);
+static void pg_createsub_log(enum pg_log_level, enum pg_log_part,
+							 const char *pg_restrict fmt,...)
+			pg_attribute_printf(3, 4);
+pg_noreturn static void pg_fatal(const char *pg_restrict fmt,...)
+			pg_attribute_printf(1, 2);
 
 #define	WAIT_INTERVAL	1		/* 1 second */
 
@@ -174,6 +185,27 @@ static bool recovery_ended = false;
 static bool standby_running = false;
 static bool recovery_params_set = false;
 
+static void
+pg_createsub_log(enum pg_log_level level, enum pg_log_part part,
+				 const char *pg_restrict fmt,...)
+{
+	va_list		args;
+
+	va_start(args, fmt);
+	pg_log_generic_v(level, part, fmt, args);
+	va_end(args);
+}
+
+static void
+pg_fatal(const char *pg_restrict fmt,...)
+{
+	va_list		args;
+
+	va_start(args, fmt);
+	pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, args);
+	va_end(args);
+	exit(1);
+}
 
 /*
  * Clean up objects created by pg_createsubscriber.
@@ -205,7 +237,8 @@ cleanup_objects_atexit(void)
 		if (durable_rename(conf_filename, conf_filename_disabled) != 0)
 		{
 			/* durable_rename() has already logged something. */
-			pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
+			pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+							 "A manual removal of the recovery parameters may be required.");
 		}
 	}
 
@@ -219,9 +252,11 @@ cleanup_objects_atexit(void)
 	 */
 	if (recovery_ended)
 	{
-		pg_log_warning("failed after the end of recovery");
-		pg_log_warning_hint("The target server cannot be used as a physical replica anymore.  "
-							"You must recreate the physical replica before continuing.");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+						 "failed after the end of recovery");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+						 "The target server cannot be used as a physical replica anymore.  "
+						 "You must recreate the physical replica before continuing.");
 	}
 
 	for (int i = 0; i < num_dbs; i++)
@@ -251,17 +286,21 @@ cleanup_objects_atexit(void)
 				 */
 				if (dbinfo->made_publication)
 				{
-					pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
-								   dbinfo->pubname,
-								   dbinfo->dbname);
-					pg_log_warning_hint("Drop this publication before trying again.");
+					pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+									 "publication \"%s\" created in database \"%s\" on primary was left behind",
+									 dbinfo->pubname,
+									 dbinfo->dbname);
+					pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+									 "Drop this publication before trying again.");
 				}
 				if (dbinfo->made_replslot)
 				{
-					pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
-								   dbinfo->replslotname,
-								   dbinfo->dbname);
-					pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+					pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+									 "replication slot \"%s\" created in database \"%s\" on primary was left behind",
+									 dbinfo->replslotname,
+									 dbinfo->dbname);
+					pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+									 "Drop this replication slot soon to avoid retention of WAL files.");
 				}
 			}
 		}
@@ -342,7 +381,8 @@ get_base_conninfo(const char *conninfo, char **dbname)
 	conn_opts = PQconninfoParse(conninfo, &errmsg);
 	if (conn_opts == NULL)
 	{
-		pg_log_error("could not parse connection string: %s", errmsg);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not parse connection string: %s", errmsg);
 		PQfreemem(errmsg);
 		return NULL;
 	}
@@ -426,7 +466,8 @@ get_exec_path(const char *argv0, const char *progname)
 					 progname, full_path, "pg_createsubscriber");
 	}
 
-	pg_log_debug("%s path is:  %s", progname, exec_path);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "%s path is:  %s", progname, exec_path);
 
 	return exec_path;
 }
@@ -443,8 +484,9 @@ check_data_directory(const char *datadir)
 	uint32		major_version;
 	char	   *version_str;
 
-	pg_log_info("checking if directory \"%s\" is a cluster data directory",
-				datadir);
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "checking if directory \"%s\" is a cluster data directory",
+					 datadir);
 
 	if (stat(datadir, &statbuf) != 0)
 	{
@@ -462,9 +504,11 @@ check_data_directory(const char *datadir)
 	major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
 	if (major_version != PG_MAJORVERSION_NUM)
 	{
-		pg_log_error("data directory is of wrong version");
-		pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
-							"PG_VERSION", version_str, PG_MAJORVERSION);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "data directory is of wrong version");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+						 "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
+						 "PG_VERSION", version_str, PG_MAJORVERSION);
 		exit(1);
 	}
 }
@@ -547,14 +591,16 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 			dbinfo[i].subname = NULL;
 		/* Other fields will be filled later */
 
-		pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
-					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
-					 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
-					 dbinfo[i].pubconninfo);
-		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
-					 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
-					 dbinfo[i].subconninfo,
-					 dbinfos.two_phase ? "true" : "false");
+		pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+						 "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+						 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
+						 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
+						 dbinfo[i].pubconninfo);
+		pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+						 "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
+						 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
+						 dbinfo[i].subconninfo,
+						 dbinfos.two_phase ? "true" : "false");
 
 		if (num_pubs > 0)
 			pubcell = pubcell->next;
@@ -582,8 +628,9 @@ connect_database(const char *conninfo, bool exit_on_error)
 	conn = PQconnectdb(conninfo);
 	if (PQstatus(conn) != CONNECTION_OK)
 	{
-		pg_log_error("connection to database failed: %s",
-					 PQerrorMessage(conn));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "connection to database failed: %s",
+						 PQerrorMessage(conn));
 		PQfinish(conn);
 
 		if (exit_on_error)
@@ -595,8 +642,9 @@ connect_database(const char *conninfo, bool exit_on_error)
 	res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not clear \"search_path\": %s",
-					 PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not clear \"search_path\": %s",
+						 PQresultErrorMessage(res));
 		PQclear(res);
 		PQfinish(conn);
 
@@ -635,27 +683,31 @@ get_primary_sysid(const char *conninfo)
 	PGresult   *res;
 	uint64		sysid;
 
-	pg_log_info("getting system identifier from publisher");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "getting system identifier from publisher");
 
 	conn = connect_database(conninfo, true);
 
 	res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not get system identifier: %s",
-					 PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not get system identifier: %s",
+						 PQresultErrorMessage(res));
 		disconnect_database(conn, true);
 	}
 	if (PQntuples(res) != 1)
 	{
-		pg_log_error("could not get system identifier: got %d rows, expected %d row",
-					 PQntuples(res), 1);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not get system identifier: got %d rows, expected %d row",
+						 PQntuples(res), 1);
 		disconnect_database(conn, true);
 	}
 
 	sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
 
-	pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "system identifier is %" PRIu64 " on publisher", sysid);
 
 	PQclear(res);
 	disconnect_database(conn, false);
@@ -675,7 +727,8 @@ get_standby_sysid(const char *datadir)
 	bool		crc_ok;
 	uint64		sysid;
 
-	pg_log_info("getting system identifier from subscriber");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "getting system identifier from subscriber");
 
 	cf = get_controlfile(datadir, &crc_ok);
 	if (!crc_ok)
@@ -683,7 +736,8 @@ get_standby_sysid(const char *datadir)
 
 	sysid = cf->system_identifier;
 
-	pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "system identifier is %" PRIu64 " on subscriber", sysid);
 
 	pg_free(cf);
 
@@ -704,7 +758,8 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
 
 	char	   *cmd_str;
 
-	pg_log_info("modifying system identifier of subscriber");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "modifying system identifier of subscriber");
 
 	cf = get_controlfile(subscriber_dir, &crc_ok);
 	if (!crc_ok)
@@ -721,31 +776,37 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
 	cf->system_identifier |= getpid() & 0xFFF;
 
 	if (dry_run)
-		pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
-					cf->system_identifier);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would set system identifier to %" PRIu64 " on subscriber",
+						 cf->system_identifier);
 	else
 	{
 		update_controlfile(subscriber_dir, cf, true);
-		pg_log_info("system identifier is %" PRIu64 " on subscriber",
-					cf->system_identifier);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "system identifier is %" PRIu64 " on subscriber",
+						 cf->system_identifier);
 	}
 
 	if (dry_run)
-		pg_log_info("dry-run: would run pg_resetwal on the subscriber");
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would run pg_resetwal on the subscriber");
 	else
-		pg_log_info("running pg_resetwal on the subscriber");
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "running pg_resetwal on the subscriber");
 
 	cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
 					   subscriber_dir, DEVNULL);
 
-	pg_log_debug("pg_resetwal command is: %s", cmd_str);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "pg_resetwal command is: %s", cmd_str);
 
 	if (!dry_run)
 	{
 		int			rc = system(cmd_str);
 
 		if (rc == 0)
-			pg_log_info("successfully reset WAL on the subscriber");
+			pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+							 "successfully reset WAL on the subscriber");
 		else
 			pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
 	}
@@ -771,15 +832,17 @@ generate_object_name(PGconn *conn)
 				 "WHERE datname = pg_catalog.current_database()");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not obtain database OID: %s",
-					 PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain database OID: %s",
+						 PQresultErrorMessage(res));
 		disconnect_database(conn, true);
 	}
 
 	if (PQntuples(res) != 1)
 	{
-		pg_log_error("could not obtain database OID: got %d rows, expected %d row",
-					 PQntuples(res), 1);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain database OID: got %d rows, expected %d row",
+						 PQntuples(res), 1);
 		disconnect_database(conn, true);
 	}
 
@@ -819,8 +882,9 @@ find_publication(PGconn *conn, const char *pubname, const char *dbname)
 	res = PQexec(conn, str->data);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
-					 pubname, dbname, PQerrorMessage(conn));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not find publication \"%s\" in database \"%s\": %s",
+						 pubname, dbname, PQerrorMessage(conn));
 		disconnect_database(conn, true);
 	}
 
@@ -873,8 +937,9 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 		if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
 		{
 			/* Reuse existing publication on publisher. */
-			pg_log_info("use existing publication \"%s\" in database \"%s\"",
-						dbinfo[i].pubname, dbinfo[i].dbname);
+			pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+							 "use existing publication \"%s\" in database \"%s\"",
+							 dbinfo[i].pubname, dbinfo[i].dbname);
 			/* Don't remove pre-existing publication if an error occurs. */
 			dbinfo[i].made_publication = false;
 		}
@@ -912,8 +977,9 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 			res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
 			if (PQresultStatus(res) != PGRES_TUPLES_OK)
 			{
-				pg_log_error("could not write an additional WAL record: %s",
-							 PQresultErrorMessage(res));
+				pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+								 "could not write an additional WAL record: %s",
+								 PQresultErrorMessage(res));
 				disconnect_database(conn, true);
 			}
 			PQclear(res);
@@ -938,8 +1004,9 @@ server_is_in_recovery(PGconn *conn)
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not obtain recovery progress: %s",
-					 PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain recovery progress: %s",
+						 PQresultErrorMessage(res));
 		disconnect_database(conn, true);
 	}
 
@@ -971,7 +1038,8 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 	int			max_prepared_transactions;
 	char	   *max_slot_wal_keep_size;
 
-	pg_log_info("checking settings on publisher");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "checking settings on publisher");
 
 	conn = connect_database(dbinfo[0].pubconninfo, true);
 
@@ -981,7 +1049,8 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 	 */
 	if (server_is_in_recovery(conn))
 	{
-		pg_log_error("primary server cannot be in recovery");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "primary server cannot be in recovery");
 		disconnect_database(conn, true);
 	}
 
@@ -1007,8 +1076,9 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not obtain publisher settings: %s",
-					 PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain publisher settings: %s",
+						 PQresultErrorMessage(res));
 		disconnect_database(conn, true);
 	}
 
@@ -1022,48 +1092,63 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 
 	PQclear(res);
 
-	pg_log_debug("publisher: wal_level: %s", wal_level);
-	pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
-	pg_log_debug("publisher: current replication slots: %d", cur_repslots);
-	pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
-	pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
-	pg_log_debug("publisher: max_prepared_transactions: %d",
-				 max_prepared_transactions);
-	pg_log_debug("publisher: max_slot_wal_keep_size: %s",
-				 max_slot_wal_keep_size);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "publisher: wal_level: %s", wal_level);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "publisher: max_replication_slots: %d", max_repslots);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "publisher: current replication slots: %d", cur_repslots);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "publisher: max_wal_senders: %d", max_walsenders);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "publisher: current wal senders: %d", cur_walsenders);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "publisher: max_prepared_transactions: %d",
+					 max_prepared_transactions);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "publisher: max_slot_wal_keep_size: %s",
+					 max_slot_wal_keep_size);
 
 	disconnect_database(conn, false);
 
 	if (strcmp(wal_level, "minimal") == 0)
 	{
-		pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "publisher requires \"wal_level\" >= \"replica\"");
 		failed = true;
 	}
 
 	if (max_repslots - cur_repslots < num_dbs)
 	{
-		pg_log_error("publisher requires %d replication slots, but only %d remain",
-					 num_dbs, max_repslots - cur_repslots);
-		pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-						  "max_replication_slots", cur_repslots + num_dbs);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "publisher requires %d replication slots, but only %d remain",
+						 num_dbs, max_repslots - cur_repslots);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Increase the configuration parameter \"%s\" to at least %d.",
+						 "max_replication_slots", cur_repslots + num_dbs);
 		failed = true;
 	}
 
 	if (max_walsenders - cur_walsenders < num_dbs)
 	{
-		pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
-					 num_dbs, max_walsenders - cur_walsenders);
-		pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-						  "max_wal_senders", cur_walsenders + num_dbs);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "publisher requires %d WAL sender processes, but only %d remain",
+						 num_dbs, max_walsenders - cur_walsenders);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Increase the configuration parameter \"%s\" to at least %d.",
+						 "max_wal_senders", cur_walsenders + num_dbs);
 		failed = true;
 	}
 
 	if (max_prepared_transactions != 0 && !dbinfos.two_phase)
 	{
-		pg_log_warning("two_phase option will not be enabled for replication slots");
-		pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled.  "
-							  "Prepared transactions will be replicated at COMMIT PREPARED.");
-		pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+						 "two_phase option will not be enabled for replication slots");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_DETAIL,
+						 "Subscriptions will be created with the two_phase option disabled.  "
+						 "Prepared transactions will be replicated at COMMIT PREPARED.");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+						 "You can use the command-line option --enable-two-phase to enable two_phase.");
 	}
 
 	/*
@@ -1073,9 +1158,11 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 	 */
 	if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
 	{
-		pg_log_warning("required WAL could be removed from the publisher");
-		pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
-							"max_slot_wal_keep_size");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+						 "required WAL could be removed from the publisher");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+						 "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
+						 "max_slot_wal_keep_size");
 	}
 
 	pg_free(wal_level);
@@ -1106,14 +1193,16 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
 	int			max_replorigins;
 	int			max_wprocs;
 
-	pg_log_info("checking settings on subscriber");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "checking settings on subscriber");
 
 	conn = connect_database(dbinfo[0].subconninfo, true);
 
 	/* The target server must be a standby */
 	if (!server_is_in_recovery(conn))
 	{
-		pg_log_error("target server must be a standby");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "target server must be a standby");
 		disconnect_database(conn, true);
 	}
 
@@ -1137,8 +1226,9 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not obtain subscriber settings: %s",
-					 PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain subscriber settings: %s",
+						 PQresultErrorMessage(res));
 		disconnect_database(conn, true);
 	}
 
@@ -1148,12 +1238,16 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
 	if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
 		primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
 
-	pg_log_debug("subscriber: max_logical_replication_workers: %d",
-				 max_lrworkers);
-	pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
-	pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "subscriber: max_logical_replication_workers: %d",
+					 max_lrworkers);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "subscriber: max_active_replication_origins: %d", max_replorigins);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "subscriber: max_worker_processes: %d", max_wprocs);
 	if (primary_slot_name)
-		pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
+		pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+						 "subscriber: primary_slot_name: %s", primary_slot_name);
 
 	PQclear(res);
 
@@ -1161,28 +1255,34 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
 
 	if (max_replorigins < num_dbs)
 	{
-		pg_log_error("subscriber requires %d active replication origins, but only %d remain",
-					 num_dbs, max_replorigins);
-		pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-						  "max_active_replication_origins", num_dbs);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "subscriber requires %d active replication origins, but only %d remain",
+						 num_dbs, max_replorigins);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Increase the configuration parameter \"%s\" to at least %d.",
+						 "max_active_replication_origins", num_dbs);
 		failed = true;
 	}
 
 	if (max_lrworkers < num_dbs)
 	{
-		pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
-					 num_dbs, max_lrworkers);
-		pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-						  "max_logical_replication_workers", num_dbs);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "subscriber requires %d logical replication workers, but only %d remain",
+						 num_dbs, max_lrworkers);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Increase the configuration parameter \"%s\" to at least %d.",
+						 "max_logical_replication_workers", num_dbs);
 		failed = true;
 	}
 
 	if (max_wprocs < num_dbs + 1)
 	{
-		pg_log_error("subscriber requires %d worker processes, but only %d remain",
-					 num_dbs + 1, max_wprocs);
-		pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
-						  "max_worker_processes", num_dbs + 1);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "subscriber requires %d worker processes, but only %d remain",
+						 num_dbs + 1, max_wprocs);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Increase the configuration parameter \"%s\" to at least %d.",
+						 "max_worker_processes", num_dbs + 1);
 		failed = true;
 	}
 
@@ -1215,19 +1315,22 @@ drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname
 	appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
 
 	if (dry_run)
-		pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
-					subname, dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would drop subscription \"%s\" in database \"%s\"",
+						 subname, dbname);
 	else
 	{
-		pg_log_info("dropping subscription \"%s\" in database \"%s\"",
-					subname, dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dropping subscription \"%s\" in database \"%s\"",
+						 subname, dbname);
 
 		res = PQexec(conn, query->data);
 
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
-			pg_log_error("could not drop subscription \"%s\": %s",
-						 subname, PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "could not drop subscription \"%s\": %s",
+							 subname, PQresultErrorMessage(res));
 			disconnect_database(conn, true);
 		}
 
@@ -1261,8 +1364,9 @@ check_and_drop_existing_subscriptions(PGconn *conn,
 
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not obtain pre-existing subscriptions: %s",
-					 PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain pre-existing subscriptions: %s",
+						 PQresultErrorMessage(res));
 		disconnect_database(conn, true);
 	}
 
@@ -1373,7 +1477,8 @@ setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const c
 						  lsn);
 	}
 
-	pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "recovery parameters:\n%s", recoveryconfcontents->data);
 
 	if (!dry_run)
 	{
@@ -1427,9 +1532,11 @@ drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotnam
 	}
 	else
 	{
-		pg_log_warning("could not drop replication slot \"%s\" on primary",
-					   slotname);
-		pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+						 "could not drop replication slot \"%s\" on primary",
+						 slotname);
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+						 "Drop this replication slot soon to avoid retention of WAL files.");
 	}
 }
 
@@ -1461,9 +1568,11 @@ drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
 		}
 		else
 		{
-			pg_log_warning("could not obtain failover replication slot information: %s",
-						   PQresultErrorMessage(res));
-			pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
+			pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+							 "could not obtain failover replication slot information: %s",
+							 PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+							 "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
 		}
 
 		PQclear(res);
@@ -1471,8 +1580,10 @@ drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
 	}
 	else
 	{
-		pg_log_warning("could not drop failover replication slot");
-		pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+						 "could not drop failover replication slot");
+		pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+						 "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
 	}
 }
 
@@ -1494,11 +1605,13 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	Assert(conn != NULL);
 
 	if (dry_run)
-		pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
-					slot_name, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
+						 slot_name, dbinfo->dbname);
 	else
-		pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
-					slot_name, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "creating the replication slot \"%s\" in database \"%s\" on publisher",
+						 slot_name, dbinfo->dbname);
 
 	slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
 
@@ -1509,16 +1622,18 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
 
 	PQfreemem(slot_name_esc);
 
-	pg_log_debug("command is: %s", str->data);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "command is: %s", str->data);
 
 	if (!dry_run)
 	{
 		res = PQexec(conn, str->data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
-			pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
-						 slot_name, dbinfo->dbname,
-						 PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "could not create replication slot \"%s\" in database \"%s\": %s",
+							 slot_name, dbinfo->dbname,
+							 PQresultErrorMessage(res));
 			PQclear(res);
 			destroyPQExpBuffer(str);
 			return NULL;
@@ -1547,11 +1662,13 @@ drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
 	Assert(conn != NULL);
 
 	if (dry_run)
-		pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
-					slot_name, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
+						 slot_name, dbinfo->dbname);
 	else
-		pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
-					slot_name, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dropping the replication slot \"%s\" in database \"%s\"",
+						 slot_name, dbinfo->dbname);
 
 	slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
 
@@ -1559,15 +1676,17 @@ drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
 
 	PQfreemem(slot_name_esc);
 
-	pg_log_debug("command is: %s", str->data);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "command is: %s", str->data);
 
 	if (!dry_run)
 	{
 		res = PQexec(conn, str->data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
-			pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
-						 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "could not drop replication slot \"%s\" in database \"%s\": %s",
+							 slot_name, dbinfo->dbname, PQresultErrorMessage(res));
 			dbinfo->made_replslot = false;	/* don't try again. */
 		}
 
@@ -1587,25 +1706,32 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
 	{
 		if (WIFEXITED(rc))
 		{
-			pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "pg_ctl failed with exit code %d",
+							 WEXITSTATUS(rc));
 		}
 		else if (WIFSIGNALED(rc))
 		{
 #if defined(WIN32)
-			pg_log_error("pg_ctl was terminated by exception 0x%X",
-						 WTERMSIG(rc));
-			pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "pg_ctl was terminated by exception 0x%X",
+							 WTERMSIG(rc));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+							 "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
 #else
-			pg_log_error("pg_ctl was terminated by signal %d: %s",
-						 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "pg_ctl was terminated by signal %d: %s",
+							 WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
 #endif
 		}
 		else
 		{
-			pg_log_error("pg_ctl exited with unrecognized status %d", rc);
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "pg_ctl exited with unrecognized status %d", rc);
 		}
 
-		pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+						 "The failed command was: %s", pg_ctl_cmd);
 		exit(1);
 	}
 }
@@ -1650,12 +1776,14 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	if (restrict_logical_worker)
 		appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
 
-	pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "pg_ctl command is: %s", pg_ctl_cmd->data);
 	rc = system(pg_ctl_cmd->data);
 	pg_ctl_status(pg_ctl_cmd->data, rc);
 	standby_running = true;
 	destroyPQExpBuffer(pg_ctl_cmd);
-	pg_log_info("server was started");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "server was started");
 }
 
 static void
@@ -1666,11 +1794,13 @@ stop_standby_server(const char *datadir)
 
 	pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
 						  datadir);
-	pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "pg_ctl command is: %s", pg_ctl_cmd);
 	rc = system(pg_ctl_cmd);
 	pg_ctl_status(pg_ctl_cmd, rc);
 	standby_running = false;
-	pg_log_info("server was stopped");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "server was stopped");
 }
 
 /*
@@ -1689,7 +1819,8 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
 	bool		ready = false;
 	int			timer = 0;
 
-	pg_log_info("waiting for the target server to reach the consistent state");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "waiting for the target server to reach the consistent state");
 
 	conn = connect_database(conninfo, true);
 
@@ -1707,7 +1838,8 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
 		if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
 		{
 			stop_standby_server(subscriber_dir);
-			pg_log_error("recovery timed out");
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "recovery timed out");
 			disconnect_database(conn, true);
 		}
 
@@ -1721,8 +1853,10 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
 	if (!ready)
 		pg_fatal("server did not end recovery");
 
-	pg_log_info("target server reached the consistent state");
-	pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "target server reached the consistent state");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_HINT,
+					 "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
 }
 
 /*
@@ -1749,8 +1883,9 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	res = PQexec(conn, str->data);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not obtain publication information: %s",
-					 PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain publication information: %s",
+						 PQresultErrorMessage(res));
 		disconnect_database(conn, true);
 	}
 
@@ -1763,8 +1898,10 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 		 * pg_createsubscriber_ prefix followed by the exact database oid and
 		 * a random number.
 		 */
-		pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
-		pg_log_error_hint("Consider renaming this publication before continuing.");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "publication \"%s\" already exists", dbinfo->pubname);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Consider renaming this publication before continuing.");
 		disconnect_database(conn, true);
 	}
 
@@ -1772,24 +1909,28 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	resetPQExpBuffer(str);
 
 	if (dry_run)
-		pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
-					dbinfo->pubname, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would create publication \"%s\" in database \"%s\"",
+						 dbinfo->pubname, dbinfo->dbname);
 	else
-		pg_log_info("creating publication \"%s\" in database \"%s\"",
-					dbinfo->pubname, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "creating publication \"%s\" in database \"%s\"",
+						 dbinfo->pubname, dbinfo->dbname);
 
 	appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
 					  ipubname_esc);
 
-	pg_log_debug("command is: %s", str->data);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "command is: %s", str->data);
 
 	if (!dry_run)
 	{
 		res = PQexec(conn, str->data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
-			pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
-						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "could not create publication \"%s\" in database \"%s\": %s",
+							 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
 			disconnect_database(conn, true);
 		}
 		PQclear(res);
@@ -1819,25 +1960,29 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
 	pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
 
 	if (dry_run)
-		pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
-					pubname, dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would drop publication \"%s\" in database \"%s\"",
+						 pubname, dbname);
 	else
-		pg_log_info("dropping publication \"%s\" in database \"%s\"",
-					pubname, dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dropping publication \"%s\" in database \"%s\"",
+						 pubname, dbname);
 
 	appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
 
 	PQfreemem(pubname_esc);
 
-	pg_log_debug("command is: %s", str->data);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "command is: %s", str->data);
 
 	if (!dry_run)
 	{
 		res = PQexec(conn, str->data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
-			pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
-						 pubname, dbname, PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "could not drop publication \"%s\" in database \"%s\": %s",
+							 pubname, dbname, PQresultErrorMessage(res));
 			*made_publication = false;	/* don't try again. */
 
 			/*
@@ -1872,15 +2017,17 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 
 	if (drop_all_pubs)
 	{
-		pg_log_info("dropping all existing publications in database \"%s\"",
-					dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dropping all existing publications in database \"%s\"",
+						 dbinfo->dbname);
 
 		/* Fetch all publication names */
 		res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
-			pg_log_error("could not obtain publication information: %s",
-						 PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "could not obtain publication information: %s",
+							 PQresultErrorMessage(res));
 			PQclear(res);
 			disconnect_database(conn, true);
 		}
@@ -1903,11 +2050,13 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 		else
 		{
 			if (dry_run)
-				pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
-							dbinfo->pubname, dbinfo->dbname);
+				pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+								 "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
+								 dbinfo->pubname, dbinfo->dbname);
 			else
-				pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
-							dbinfo->pubname, dbinfo->dbname);
+				pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+								 "preserve existing publication \"%s\" in database \"%s\"",
+								 dbinfo->pubname, dbinfo->dbname);
 		}
 	}
 }
@@ -1941,11 +2090,13 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
 
 	if (dry_run)
-		pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
-					dbinfo->subname, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would create subscription \"%s\" in database \"%s\"",
+						 dbinfo->subname, dbinfo->dbname);
 	else
-		pg_log_info("creating subscription \"%s\" in database \"%s\"",
-					dbinfo->subname, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "creating subscription \"%s\" in database \"%s\"",
+						 dbinfo->subname, dbinfo->dbname);
 
 	appendPQExpBuffer(str,
 					  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
@@ -1959,15 +2110,17 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	PQfreemem(pubconninfo_esc);
 	PQfreemem(replslotname_esc);
 
-	pg_log_debug("command is: %s", str->data);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "command is: %s", str->data);
 
 	if (!dry_run)
 	{
 		res = PQexec(conn, str->data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
-			pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
-						 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "could not create subscription \"%s\" in database \"%s\": %s",
+							 dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
 			disconnect_database(conn, true);
 		}
 		PQclear(res);
@@ -2011,15 +2164,17 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons
 	res = PQexec(conn, str->data);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not obtain subscription OID: %s",
-					 PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain subscription OID: %s",
+						 PQresultErrorMessage(res));
 		disconnect_database(conn, true);
 	}
 
 	if (PQntuples(res) != 1 && !dry_run)
 	{
-		pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
-					 PQntuples(res), 1);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain subscription OID: got %d rows, expected %d row",
+						 PQntuples(res), 1);
 		disconnect_database(conn, true);
 	}
 
@@ -2043,26 +2198,30 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons
 	originname = psprintf("pg_%u", suboid);
 
 	if (dry_run)
-		pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
-					originname, lsnstr, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
+						 originname, lsnstr, dbinfo->dbname);
 	else
-		pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
-					originname, lsnstr, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
+						 originname, lsnstr, dbinfo->dbname);
 
 	resetPQExpBuffer(str);
 	appendPQExpBuffer(str,
 					  "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
 					  originname, lsnstr);
 
-	pg_log_debug("command is: %s", str->data);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "command is: %s", str->data);
 
 	if (!dry_run)
 	{
 		res = PQexec(conn, str->data);
 		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
-			pg_log_error("could not set replication progress for subscription \"%s\": %s",
-						 dbinfo->subname, PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "could not set replication progress for subscription \"%s\": %s",
+							 dbinfo->subname, PQresultErrorMessage(res));
 			disconnect_database(conn, true);
 		}
 		PQclear(res);
@@ -2093,23 +2252,27 @@ enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
 
 	if (dry_run)
-		pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
-					dbinfo->subname, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "dry-run: would enable subscription \"%s\" in database \"%s\"",
+						 dbinfo->subname, dbinfo->dbname);
 	else
-		pg_log_info("enabling subscription \"%s\" in database \"%s\"",
-					dbinfo->subname, dbinfo->dbname);
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "enabling subscription \"%s\" in database \"%s\"",
+						 dbinfo->subname, dbinfo->dbname);
 
 	appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
 
-	pg_log_debug("command is: %s", str->data);
+	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+					 "command is: %s", str->data);
 
 	if (!dry_run)
 	{
 		res = PQexec(conn, str->data);
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
-			pg_log_error("could not enable subscription \"%s\": %s",
-						 dbinfo->subname, PQresultErrorMessage(res));
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "could not enable subscription \"%s\": %s",
+							 dbinfo->subname, PQresultErrorMessage(res));
 			disconnect_database(conn, true);
 		}
 
@@ -2154,7 +2317,9 @@ get_publisher_databases(struct CreateSubscriberOptions *opt,
 	res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
-		pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "could not obtain a list of databases: %s",
+						 PQresultErrorMessage(res));
 		PQclear(res);
 		disconnect_database(conn, true);
 	}
@@ -2258,9 +2423,11 @@ main(int argc, char **argv)
 #ifndef WIN32
 	if (geteuid() == 0)
 	{
-		pg_log_error("cannot be executed by \"root\"");
-		pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
-						  progname);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "cannot be executed by \"root\"");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "You must run %s as the PostgreSQL superuser.",
+						 progname);
 		exit(1);
 	}
 #endif
@@ -2351,7 +2518,9 @@ main(int argc, char **argv)
 				break;
 			default:
 				/* getopt_long already emitted a complaint */
-				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+				pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+								 "Try \"%s --help\" for more information.",
+								 progname);
 				exit(1);
 		}
 	}
@@ -2372,9 +2541,12 @@ main(int argc, char **argv)
 
 		if (bad_switch)
 		{
-			pg_log_error("options %s and %s cannot be used together",
-						 bad_switch, "-a/--all");
-			pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "options %s and %s cannot be used together",
+							 bad_switch, "-a/--all");
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+							 "Try \"%s --help\" for more information.",
+							 progname);
 			exit(1);
 		}
 	}
@@ -2382,17 +2554,21 @@ main(int argc, char **argv)
 	/* Any non-option arguments? */
 	if (optind < argc)
 	{
-		pg_log_error("too many command-line arguments (first is \"%s\")",
-					 argv[optind]);
-		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "too many command-line arguments (first is \"%s\")",
+						 argv[optind]);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Try \"%s --help\" for more information.", progname);
 		exit(1);
 	}
 
 	/* Required arguments */
 	if (subscriber_dir == NULL)
 	{
-		pg_log_error("no subscriber data directory specified");
-		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "no subscriber data directory specified");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Try \"%s --help\" for more information.", progname);
 		exit(1);
 	}
 
@@ -2419,22 +2595,27 @@ main(int argc, char **argv)
 		 * identical entries for physical and logical replication. If there is
 		 * not, we would fail anyway.
 		 */
-		pg_log_error("no publisher connection string specified");
-		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "no publisher connection string specified");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Try \"%s --help\" for more information.", progname);
 		exit(1);
 	}
 
 	if (dry_run)
-		pg_log_info("Executing in dry-run mode.\n"
-					"The target directory will not be modified.");
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "Executing in dry-run mode.\n"
+						 "The target directory will not be modified.");
 
-	pg_log_info("validating publisher connection string");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "validating publisher connection string");
 	pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
 										  &dbname_conninfo);
 	if (pub_base_conninfo == NULL)
 		exit(1);
 
-	pg_log_info("validating subscriber connection string");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "validating subscriber connection string");
 	sub_base_conninfo = get_sub_conninfo(&opt);
 
 	/*
@@ -2451,7 +2632,8 @@ main(int argc, char **argv)
 
 	if (opt.database_names.head == NULL)
 	{
-		pg_log_info("no database was specified");
+		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+						 "no database was specified");
 
 		/*
 		 * Try to obtain the dbname from the publisher conninfo. If dbname
@@ -2462,14 +2644,17 @@ main(int argc, char **argv)
 			simple_string_list_append(&opt.database_names, dbname_conninfo);
 			num_dbs++;
 
-			pg_log_info("database name \"%s\" was extracted from the publisher connection string",
-						dbname_conninfo);
+			pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+							 "database name \"%s\" was extracted from the publisher connection string",
+							 dbname_conninfo);
 		}
 		else
 		{
-			pg_log_error("no database name specified");
-			pg_log_error_hint("Try \"%s --help\" for more information.",
-							  progname);
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "no database name specified");
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+							 "Try \"%s --help\" for more information.",
+							 progname);
 			exit(1);
 		}
 	}
@@ -2477,23 +2662,29 @@ main(int argc, char **argv)
 	/* Number of object names must match number of databases */
 	if (num_pubs > 0 && num_pubs != num_dbs)
 	{
-		pg_log_error("wrong number of publication names specified");
-		pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
-							num_pubs, num_dbs);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "wrong number of publication names specified");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+						 "The number of specified publication names (%d) must match the number of specified database names (%d).",
+						 num_pubs, num_dbs);
 		exit(1);
 	}
 	if (num_subs > 0 && num_subs != num_dbs)
 	{
-		pg_log_error("wrong number of subscription names specified");
-		pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
-							num_subs, num_dbs);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "wrong number of subscription names specified");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+						 "The number of specified subscription names (%d) must match the number of specified database names (%d).",
+						 num_subs, num_dbs);
 		exit(1);
 	}
 	if (num_replslots > 0 && num_replslots != num_dbs)
 	{
-		pg_log_error("wrong number of replication slot names specified");
-		pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
-							num_replslots, num_dbs);
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "wrong number of replication slot names specified");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+						 "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
+						 num_replslots, num_dbs);
 		exit(1);
 	}
 
@@ -2504,9 +2695,11 @@ main(int argc, char **argv)
 			dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
 		else
 		{
-			pg_log_error("invalid object type \"%s\" specified for %s",
-						 cell->val, "--clean");
-			pg_log_error_hint("The valid value is: \"%s\"", "publications");
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+							 "invalid object type \"%s\" specified for %s",
+							 cell->val, "--clean");
+			pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+							 "The valid value is: \"%s\"", "publications");
 			exit(1);
 		}
 	}
@@ -2550,8 +2743,10 @@ main(int argc, char **argv)
 	 */
 	if (stat(pidfile, &statbuf) == 0)
 	{
-		pg_log_error("standby server is running");
-		pg_log_error_hint("Stop the standby server and try again.");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+						 "standby server is running");
+		pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+						 "Stop the standby server and try again.");
 		exit(1);
 	}
 
@@ -2560,7 +2755,8 @@ main(int argc, char **argv)
 	 * by command-line options). The goal is to avoid connections during the
 	 * transformation steps.
 	 */
-	pg_log_info("starting the standby server with command-line options");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "starting the standby server with command-line options");
 	start_standby_server(&opt, true, false);
 
 	/* Check if the standby server is ready for logical replication */
@@ -2576,7 +2772,8 @@ main(int argc, char **argv)
 	 * guarantees it) *before* creating the replication slots in
 	 * setup_publisher().
 	 */
-	pg_log_info("stopping the subscriber");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "stopping the subscriber");
 	stop_standby_server(subscriber_dir);
 
 	/* Create the required objects for each database on publisher */
@@ -2590,7 +2787,8 @@ main(int argc, char **argv)
 	 * until accepting connections. We don't want to start logical replication
 	 * during setup.
 	 */
-	pg_log_info("starting the subscriber");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "starting the subscriber");
 	start_standby_server(&opt, true, true);
 
 	/* Waiting the subscriber to be promoted */
@@ -2611,7 +2809,8 @@ main(int argc, char **argv)
 	drop_failover_replication_slots(dbinfos.dbinfo);
 
 	/* Stop the subscriber */
-	pg_log_info("stopping the subscriber");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "stopping the subscriber");
 	stop_standby_server(subscriber_dir);
 
 	/* Change system identifier from subscriber */
@@ -2619,7 +2818,8 @@ main(int argc, char **argv)
 
 	success = true;
 
-	pg_log_info("Done!");
+	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+					 "Done!");
 
 	return 0;
 }
-- 
2.47.3



  [application/octet-stream] v11-0002-Add-a-new-argument-l-logdir-to-pg_createsubscrib.patch (14.4K, 3-v11-0002-Add-a-new-argument-l-logdir-to-pg_createsubscrib.patch)
  download | inline diff:
From ca05dfade11209fdcfa4223a9fbb9d43814eda78 Mon Sep 17 00:00:00 2001
From: Gyan Sreejith <[email protected]>
Date: Tue, 17 Mar 2026 19:10:28 -0400
Subject: [PATCH v11 2/3] Add a new argument -l <logdir> to
 pg_createsubscriber.

Enabling the option to write messages to log files in the specified directory.
A new directory is created if required. A subdirectory is created with timestamp as its name, and it will contain two new logfiles:
1. pg_createsubscriber_server.log  - captures messages related to starting and stopping the standby server.
2. pg_createsubscriber_internal.log - captures internal diagnostic output from pg_createsubscriber.

For example, if we specify -l abc as an argument, and if the timestamp on running it is 20260119T204317.204, a directory abc is created if it doesn't exist already, with 20260119T204317.204 as its subdirectory and it will contain the two log files pg_createsubscriber_server.log and pg_createsubscriber_internal.log
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  28 +++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 166 +++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              |  41 ++++-
 3 files changed, 225 insertions(+), 10 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index cf45ff3573d..2898a5ea111 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -136,6 +136,34 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-l <replaceable class="parameter">directory</replaceable></option></term>
+     <term><option>--logdir=<replaceable class="parameter">directory</replaceable></option></term>
+     <listitem>
+      <para>
+       Specify the name of the log directory. A new directory is created with
+       this name if it does not exist. A subdirectory with a timestamp
+       indicating the time at which pg_createsubscriber was run will be created.
+       The following two log files will be created in the subdirectory with a
+       umask of 077 so that access is disallowed to other users by default.
+       <itemizedlist>
+        <listitem>
+         <para>
+          pg_createsubscriber_server.log which captures logs related to stopping
+          and starting the standby server,
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          pg_createsubscriber_internal.log which captures internal diagnostic
+          output (validations, checks, etc.)
+         </para>
+        </listitem>
+       </itemizedlist>
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-n</option></term>
      <term><option>--dry-run</option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index b02af9a4cc0..87e4f95b22e 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -55,10 +55,14 @@
 #define INCLUDED_CONF_FILE			"pg_createsubscriber.conf"
 #define INCLUDED_CONF_FILE_DISABLED	INCLUDED_CONF_FILE ".disabled"
 
+#define SERVER_LOG_FILE_NAME "pg_createsubscriber_server"
+#define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal"
+
 /* Command-line options */
 struct CreateSubscriberOptions
 {
 	char	   *config_file;	/* configuration file */
+	char	   *log_dir;		/* log directory name */
 	char	   *pub_conninfo_str;	/* publisher connection string */
 	char	   *socket_dir;		/* directory for Unix-domain socket, if any */
 	char	   *sub_port;		/* subscriber port number */
@@ -157,6 +161,9 @@ static void pg_createsub_log(enum pg_log_level, enum pg_log_part,
 			pg_attribute_printf(3, 4);
 pg_noreturn static void pg_fatal(const char *pg_restrict fmt,...)
 			pg_attribute_printf(1, 2);
+static void internal_log_file_write(enum pg_log_level level,
+									const char *pg_restrict fmt, va_list args)
+			pg_attribute_printf(2, 0);
 
 #define	WAIT_INTERVAL	1		/* 1 second */
 
@@ -178,6 +185,10 @@ static pg_prng_state prng_state;
 static char *pg_ctl_path = NULL;
 static char *pg_resetwal_path = NULL;
 
+static FILE *internal_log_file_fp = NULL;	/* File ptr to log all messages to */
+static char *log_timestamp = NULL;	/* Timestamp to be used in all log file
+									 * names */
+
 /* standby / subscriber data directory */
 static char *subscriber_dir = NULL;
 
@@ -192,7 +203,13 @@ pg_createsub_log(enum pg_log_level level, enum pg_log_part part,
 	va_list		args;
 
 	va_start(args, fmt);
-	pg_log_generic_v(level, part, fmt, args);
+
+	if (internal_log_file_fp != NULL)
+		internal_log_file_write(level, fmt, args);
+
+	if (internal_log_file_fp == NULL || level > PG_LOG_INFO)
+		pg_log_generic_v(level, part, fmt, args);
+
 	va_end(args);
 }
 
@@ -308,6 +325,12 @@ cleanup_objects_atexit(void)
 
 	if (standby_running)
 		stop_standby_server(subscriber_dir);
+
+	if (internal_log_file_fp != NULL)
+	{
+		fclose(internal_log_file_fp);
+		internal_log_file_fp = NULL;
+	}
 }
 
 static void
@@ -322,6 +345,7 @@ usage(void)
 			 "                                  databases and databases that don't allow connections\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
+	printf(_("  -l, --logdir=LOGDIR             location for the new log directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
 	printf(_("  -p, --subscriber-port=PORT      subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
 	printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
@@ -756,6 +780,7 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
 	bool		crc_ok;
 	struct timeval tv;
 
+	char	   *out_file;
 	char	   *cmd_str;
 
 	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
@@ -794,8 +819,13 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
 		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
 						 "running pg_resetwal on the subscriber");
 
-	cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
-					   subscriber_dir, DEVNULL);
+	if (opt->log_dir != NULL)
+		out_file = psprintf("%s/%s/%s.log", opt->log_dir, log_timestamp, SERVER_LOG_FILE_NAME);
+	else
+		out_file = DEVNULL;
+
+	cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
+					   subscriber_dir, out_file);
 
 	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
 					 "pg_resetwal command is: %s", cmd_str);
@@ -1018,6 +1048,110 @@ server_is_in_recovery(PGconn *conn)
 	return ret == 0;
 }
 
+static void
+internal_log_file_write(enum pg_log_level level, const char *pg_restrict fmt,
+						va_list args)
+{
+	if (level < __pg_log_level)
+		return;
+
+	if (internal_log_file_fp == NULL)
+		return;
+
+	vfprintf(internal_log_file_fp, fmt, args);
+
+	fprintf(internal_log_file_fp, "\n");
+	fflush(internal_log_file_fp);
+}
+
+/*
+ * Open a new logfile with proper permissions.
+ * From src/backend/postmaster/syslogger.c
+ */
+static FILE *
+logfile_open(const char *filename, const char *mode)
+{
+	FILE	   *fh;
+	mode_t		oumask;
+
+	oumask = umask((mode_t) ((~(S_IRUSR | S_IWUSR)) & (S_IRWXU | S_IRWXG | S_IRWXO)));
+	fh = fopen(filename, mode);
+	umask(oumask);
+
+	if (fh)
+	{
+		setvbuf(fh, NULL, PG_IOLBF, 0);
+
+#ifdef WIN32
+		/* use CRLF line endings on Windows */
+		_setmode(_fileno(fh), _O_TEXT);
+#endif
+	}
+	else
+		pg_fatal("could not open log file \"%s\": %m",
+				 filename);
+
+	return fh;
+}
+
+static void
+make_dir(const char *dir)
+{
+	struct stat statbuf;
+
+	if (stat(dir, &statbuf) == 0)
+		return;
+
+	if (errno != ENOENT)
+		pg_fatal("could not stat directory \"%s\": %m", dir);
+
+	if (mkdir(dir, S_IRWXU) == 0)
+	{
+		pg_log_info("directory %s created", dir);
+		return;
+	}
+
+	pg_fatal("could not create log directory \"%s\": %m", dir);
+}
+
+static void
+make_output_dirs(const char *log_dir)
+{
+	char		timestamp[128];
+	struct timeval tval;
+	time_t		now;
+	struct tm	tmbuf;
+	char		timestamp_dir[MAXPGPATH];
+	int			len;
+
+	/* Generate timestamp */
+	gettimeofday(&tval, NULL);
+	now = tval.tv_sec;
+
+	strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
+			 localtime_r(&now, &tmbuf));
+
+	/* append milliseconds */
+	snprintf(timestamp + strlen(timestamp),
+			 sizeof(timestamp) - strlen(timestamp), ".%03u",
+			 (unsigned int) (tval.tv_usec / 1000));
+
+	log_timestamp = pg_strdup(timestamp);
+
+	/* Create base directory (ignore if exists) */
+	make_dir(log_dir);
+
+	/* Build timestamp directory path */
+	len = snprintf(timestamp_dir, MAXPGPATH, "%s/%s", log_dir, timestamp);
+
+	if (len >= MAXPGPATH)
+		pg_fatal("directory path for log files, %s/%s, is too long",
+				 log_dir, timestamp);
+
+	/* Create timestamp directory */
+	make_dir(timestamp_dir);
+}
+
 /*
  * Is the primary server ready for logical replication?
  *
@@ -1776,6 +1910,9 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	if (restrict_logical_worker)
 		appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
 
+	if (opt->log_dir != NULL)
+		appendPQExpBuffer(pg_ctl_cmd, " -l %s/%s/%s.log", opt->log_dir, log_timestamp, SERVER_LOG_FILE_NAME);
+
 	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
 					 "pg_ctl command is: %s", pg_ctl_cmd->data);
 	rc = system(pg_ctl_cmd->data);
@@ -2346,6 +2483,7 @@ main(int argc, char **argv)
 		{"all", no_argument, NULL, 'a'},
 		{"database", required_argument, NULL, 'd'},
 		{"pgdata", required_argument, NULL, 'D'},
+		{"logdir", required_argument, NULL, 'l'},
 		{"dry-run", no_argument, NULL, 'n'},
 		{"subscriber-port", required_argument, NULL, 'p'},
 		{"publisher-server", required_argument, NULL, 'P'},
@@ -2404,6 +2542,7 @@ main(int argc, char **argv)
 	/* Default settings */
 	subscriber_dir = NULL;
 	opt.config_file = NULL;
+	opt.log_dir = NULL;
 	opt.pub_conninfo_str = NULL;
 	opt.socket_dir = NULL;
 	opt.sub_port = DEFAULT_SUB_PORT;
@@ -2434,7 +2573,7 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
+	while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -2455,6 +2594,10 @@ main(int argc, char **argv)
 				subscriber_dir = pg_strdup(optarg);
 				canonicalize_path(subscriber_dir);
 				break;
+			case 'l':
+				opt.log_dir = pg_strdup(optarg);
+				canonicalize_path(opt.log_dir);
+				break;
 			case 'n':
 				dry_run = true;
 				break;
@@ -2525,6 +2668,18 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (opt.log_dir != NULL)
+	{
+		char	   *internal_log_file;
+
+		make_output_dirs(opt.log_dir);
+		internal_log_file = psprintf("%s/%s/%s.log", opt.log_dir, log_timestamp,
+									 INTERNAL_LOG_FILE_NAME);
+
+		if ((internal_log_file_fp = logfile_open(internal_log_file, "a")) == NULL)
+			pg_fatal("could not open log file \"%s\": %m", internal_log_file);
+	}
+
 	/* Validate that --all is not used with incompatible options */
 	if (opt.all_dbs)
 	{
@@ -2821,5 +2976,8 @@ main(int argc, char **argv)
 	pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
 					 "Done!");
 
+	if (internal_log_file_fp != NULL)
+		fclose(internal_log_file_fp);
+
 	return 0;
 }
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 0c27fca7bb7..4ddfb621a5d 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -14,6 +14,7 @@ program_version_ok('pg_createsubscriber');
 program_options_handling_ok('pg_createsubscriber');
 
 my $datadir = PostgreSQL::Test::Utils::tempdir;
+my $logdir = PostgreSQL::Test::Utils::tempdir + "/logdir";
 
 # Generate a database with a name made of a range of ASCII characters.
 # Extracted from 002_pg_upgrade.pl.
@@ -362,9 +363,35 @@ command_ok(
 		'--subscription' => 'sub2',
 		'--database' => $db1,
 		'--database' => $db2,
+		'--logdir' => $logdir,
 	],
 	'run pg_createsubscriber --dry-run on node S');
 
+# Check that the log files were created
+my @server_log_files = glob "$logdir/*/pg_createsubscriber_server.log";
+is( scalar(@server_log_files), 1, "
+    pg_createsubscriber_server.log file was created");
+my $server_log_file_size = -s $server_log_files[0];
+isnt($server_log_file_size, 0,
+	"pg_createsubscriber_server.log file not empty");
+my $server_log = slurp_file($server_log_files[0]);
+like(
+	$server_log,
+	qr/consistent recovery state reached/,
+	"server reached consistent recovery state");
+
+my @internal_log_files = glob "$logdir/*/pg_createsubscriber_internal.log";
+is( scalar(@internal_log_files), 1, "
+    pg_createsubscriber_internal.log file was created");
+my $internal_log_file_size = -s $internal_log_files[0];
+isnt($internal_log_file_size, 0,
+	"pg_createsubscriber_internal.log file not empty");
+my $internal_log = slurp_file($internal_log_files[0]);
+like(
+	$internal_log,
+	qr/target server reached the consistent state/,
+	"log shows consistent state reached");
+
 # Check if node S is still a standby
 $node_s->start;
 is($node_s->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
@@ -444,7 +471,8 @@ is(scalar(() = $stderr =~ /would create subscription/g),
 
 # Create a user-defined publication, and a table that is not a member of that
 # publication.
-$node_p->safe_psql($db1, qq(
+$node_p->safe_psql(
+	$db1, qq(
 	CREATE PUBLICATION test_pub3 FOR TABLE tbl1;
 	CREATE TABLE not_replicated (a int);
 ));
@@ -540,8 +568,7 @@ second row
 third row),
 	"logical replication works in database $db1");
 $result = $node_s->safe_psql($db1, 'SELECT * FROM not_replicated');
-is($result, qq(),
-	"table is not replicated in database $db1");
+is($result, qq(), "table is not replicated in database $db1");
 
 # Check result in database $db2
 $result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2');
@@ -555,8 +582,10 @@ my $sysid_s = $node_s->safe_psql('postgres',
 isnt($sysid_p, $sysid_s, 'system identifier was changed');
 
 # Verify that pub2 was created in $db2
-is($node_p->safe_psql($db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"),
-	'1', "publication pub2 was created in $db2");
+is( $node_p->safe_psql(
+		$db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"),
+	'1',
+	"publication pub2 was created in $db2");
 
 # Get subscription and publication names
 $result = $node_s->safe_psql(
@@ -581,7 +610,7 @@ $result = $node_s->safe_psql(
     )
 );
 
-is($result, qq($db1|{test_pub3}
+is( $result, qq($db1|{test_pub3}
 $db2|{pub2}),
 	"subscriptions use the correct publications");
 
-- 
2.47.3



  [application/octet-stream] v11-0003-Address-comments-from-Hayato-Kuroda.patch (5.3K, 4-v11-0003-Address-comments-from-Hayato-Kuroda.patch)
  download | inline diff:
From 1f72429ebf20958973edb84822c2fbbf603165f5 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 18 Mar 2026 21:43:08 +0900
Subject: [PATCH v11 3/3] Address comments from Hayato Kuroda

---
 src/bin/pg_basebackup/pg_createsubscriber.c | 89 +++++++++------------
 1 file changed, 36 insertions(+), 53 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 87e4f95b22e..83b64960da8 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -186,8 +186,7 @@ static char *pg_ctl_path = NULL;
 static char *pg_resetwal_path = NULL;
 
 static FILE *internal_log_file_fp = NULL;	/* File ptr to log all messages to */
-static char *log_timestamp = NULL;	/* Timestamp to be used in all log file
-									 * names */
+static char logdir[MAXPGPATH];	/* Directory log files are put (if specified) */
 
 /* standby / subscriber data directory */
 static char *subscriber_dir = NULL;
@@ -819,8 +818,13 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
 		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
 						 "running pg_resetwal on the subscriber");
 
-	if (opt->log_dir != NULL)
-		out_file = psprintf("%s/%s/%s.log", opt->log_dir, log_timestamp, SERVER_LOG_FILE_NAME);
+	/*
+	 * Redirecting the output to the logfile if specified. Since the output
+	 * would be very short, around one line, we do not provide a separate file
+	 * for it; it's done as a part of the server log.
+	 */
+	if (opt->log_dir)
+		out_file = psprintf("%s/%s.log", logdir, SERVER_LOG_FILE_NAME);
 	else
 		out_file = DEVNULL;
 
@@ -1052,13 +1056,13 @@ static void
 internal_log_file_write(enum pg_log_level level, const char *pg_restrict fmt,
 						va_list args)
 {
-	if (level < __pg_log_level)
-		return;
+	Assert(internal_log_file_fp);
 
-	if (internal_log_file_fp == NULL)
+	/* Do nothing if log level is too low. */
+	if (level < __pg_log_level)
 		return;
 
-	vfprintf(internal_log_file_fp, fmt, args);
+	vfprintf(internal_log_file_fp, _(fmt), args);
 
 	fprintf(internal_log_file_fp, "\n");
 	fflush(internal_log_file_fp);
@@ -1095,33 +1099,12 @@ logfile_open(const char *filename, const char *mode)
 }
 
 static void
-make_dir(const char *dir)
-{
-	struct stat statbuf;
-
-	if (stat(dir, &statbuf) == 0)
-		return;
-
-	if (errno != ENOENT)
-		pg_fatal("could not stat directory \"%s\": %m", dir);
-
-	if (mkdir(dir, S_IRWXU) == 0)
-	{
-		pg_log_info("directory %s created", dir);
-		return;
-	}
-
-	pg_fatal("could not create log directory \"%s\": %m", dir);
-}
-
-static void
-make_output_dirs(const char *log_dir)
+make_output_dirs(const char *log_basedir)
 {
 	char		timestamp[128];
 	struct timeval tval;
 	time_t		now;
 	struct tm	tmbuf;
-	char		timestamp_dir[MAXPGPATH];
 	int			len;
 
 	/* Generate timestamp */
@@ -1136,20 +1119,20 @@ make_output_dirs(const char *log_dir)
 			 sizeof(timestamp) - strlen(timestamp), ".%03u",
 			 (unsigned int) (tval.tv_usec / 1000));
 
-	log_timestamp = pg_strdup(timestamp);
-
-	/* Create base directory (ignore if exists) */
-	make_dir(log_dir);
-
 	/* Build timestamp directory path */
-	len = snprintf(timestamp_dir, MAXPGPATH, "%s/%s", log_dir, timestamp);
+	len = snprintf(logdir, MAXPGPATH, "%s/%s", log_basedir, timestamp);
 
 	if (len >= MAXPGPATH)
 		pg_fatal("directory path for log files, %s/%s, is too long",
-				 log_dir, timestamp);
+				 logdir, timestamp);
+
+	/* Create base directory (ignore if exists) */
+	if (mkdir(log_basedir, S_IRWXU) < 0 && errno != EEXIST)
+		pg_fatal("could not create directory \"%s\": %m", log_basedir);
 
-	/* Create timestamp directory */
-	make_dir(timestamp_dir);
+	/* Create BASE_DIR/$timestamp */
+	if (mkdir(logdir, S_IRWXU) < 0)
+		pg_fatal("could not create directory \"%s\": %m", logdir);
 }
 
 /*
@@ -1910,8 +1893,8 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	if (restrict_logical_worker)
 		appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
 
-	if (opt->log_dir != NULL)
-		appendPQExpBuffer(pg_ctl_cmd, " -l %s/%s/%s.log", opt->log_dir, log_timestamp, SERVER_LOG_FILE_NAME);
+	if (opt->log_dir)
+		appendPQExpBuffer(pg_ctl_cmd, " -l %s/%s.log", logdir, SERVER_LOG_FILE_NAME);
 
 	pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
 					 "pg_ctl command is: %s", pg_ctl_cmd->data);
@@ -2668,18 +2651,6 @@ main(int argc, char **argv)
 		}
 	}
 
-	if (opt.log_dir != NULL)
-	{
-		char	   *internal_log_file;
-
-		make_output_dirs(opt.log_dir);
-		internal_log_file = psprintf("%s/%s/%s.log", opt.log_dir, log_timestamp,
-									 INTERNAL_LOG_FILE_NAME);
-
-		if ((internal_log_file_fp = logfile_open(internal_log_file, "a")) == NULL)
-			pg_fatal("could not open log file \"%s\": %m", internal_log_file);
-	}
-
 	/* Validate that --all is not used with incompatible options */
 	if (opt.all_dbs)
 	{
@@ -2757,6 +2728,18 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (opt.log_dir != NULL)
+	{
+		char	   *internal_log_file;
+
+		make_output_dirs(opt.log_dir);
+		internal_log_file = psprintf("%s/%s.log", logdir,
+									 INTERNAL_LOG_FILE_NAME);
+
+		if ((internal_log_file_fp = logfile_open(internal_log_file, "a")) == NULL)
+			pg_fatal("could not open log file \"%s\": %m", internal_log_file);
+	}
+
 	if (dry_run)
 		pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
 						 "Executing in dry-run mode.\n"
-- 
2.47.3



view thread (65+ messages)  latest in thread

reply

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Reply to all the recipients using the --to and --cc options:
  reply via email

  To: [email protected]
  Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
  Subject: RE: [Proposal] Adding Log File Capability to pg_createsubscriber
  In-Reply-To: <OS9PR01MB1214902CC518A8102333FA88DF54EA@OS9PR01MB12149.jpnprd01.prod.outlook.com>

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox