public inbox for [email protected]
help / color / mirror / Atom feedFrom: Gyan Sreejith <[email protected]>
To: Kuroda, Hayato/黒田 隼人 <[email protected]>
Cc: Amit Kapila <[email protected]>
Cc: Shlok Kyal <[email protected]>
Cc: vignesh C <[email protected]>
Cc: Euler Taveira <[email protected]>
Cc: [email protected] <[email protected]>
Cc: Peter Smith <[email protected]>
Subject: Re: [Proposal] Adding Log File Capability to pg_createsubscriber
Date: Fri, 20 Mar 2026 13:00:14 -0400
Message-ID: <CAEqnbaXb-TKUm7P-=_zrgQ=shRXkkZscPOHEL9OS6Cb2V8YT8Q@mail.gmail.com> (raw)
In-Reply-To: <TYRPR01MB12156F431E4B83ECFF14034BDF54CA@TYRPR01MB12156.jpnprd01.prod.outlook.com>
References: <OSOPR01MB1215385EAC6A76650444E40B0F54FA@OSOPR01MB12153.jpnprd01.prod.outlook.com>
<CAEqnbaU=EgqhxEx0ig4TdY8pdt0Vn+vmCJBuoORRHOzovW9dWA@mail.gmail.com>
<TYRPR01MB12156F431E4B83ECFF14034BDF54CA@TYRPR01MB12156.jpnprd01.prod.outlook.com>
Thank you, Kuroda-san and Nisha!
I have fixed everything that Nisha suggested. And I agree that all messages
should be written to both the log file and the terminal.
Thank you,
Gyan
Attachments:
[application/octet-stream] v15-0002-Add-a-new-argument-l-logdir-to-pg_createsubscrib.patch (15.7K, 3-v15-0002-Add-a-new-argument-l-logdir-to-pg_createsubscrib.patch)
download | inline diff:
From e1c62305bda2ae932621d31ee55058b7326122aa Mon Sep 17 00:00:00 2001
From: Gyan Sreejith <[email protected]>
Date: Fri, 20 Mar 2026 12:39:36 -0400
Subject: [PATCH v15 2/2] Add a new argument -l <logdir> to
pg_createsubscriber.
Enabling the option to write messages to log files in the specified directory.
A new directory is created if required. A subdirectory is created with timestamp as its name, and it will contain two new logfiles:
1. pg_createsubscriber_server.log - captures messages related to starting and stopping the standby server.
2. pg_createsubscriber_internal.log - captures internal diagnostic output from pg_createsubscriber.
For example, if we specify -l abc as an argument, and if the timestamp on running it is 20260119T204317.204, a directory abc is created if it doesn't exist already, with 20260119T204317.204 as its subdirectory and it will contain the two log files pg_createsubscriber_server.log and pg_createsubscriber_internal.log
---
doc/src/sgml/ref/pg_createsubscriber.sgml | 29 +++
src/bin/pg_basebackup/pg_createsubscriber.c | 174 +++++++++++++++++-
.../t/040_pg_createsubscriber.pl | 41 ++++-
3 files changed, 232 insertions(+), 12 deletions(-)
diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index cf45ff3573d..ff635ba26cb 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -136,6 +136,35 @@ PostgreSQL documentation
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>-l <replaceable class="parameter">directory</replaceable></option></term>
+ <term><option>--logdir=<replaceable class="parameter">directory</replaceable></option></term>
+ <listitem>
+ <para>
+ Specify the name of the log directory. A new directory is created with
+ this name if it does not exist. A subdirectory with a timestamp
+ indicating the time at which <application>pg_createsubscriber</application>
+ was run will be created. The following two log files will be created in
+ the subdirectory with a umask of 077 so that access is disallowed to
+ other users by default.
+ <itemizedlist>
+ <listitem>
+ <para>
+ <literal>pg_createsubscriber_server.log</literal> which captures logs
+ related to stopping and starting the standby server,
+ </para>
+ </listitem>
+ <listitem>
+ <para>
+ <literal>pg_createsubscriber_internal.log</literal> which captures
+ internal diagnostic output (validations, checks, etc.)
+ </para>
+ </listitem>
+ </itemizedlist>
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><option>-n</option></term>
<term><option>--dry-run</option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index c3df83025d7..f2d8cd38abf 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -55,10 +55,14 @@
#define INCLUDED_CONF_FILE "pg_createsubscriber.conf"
#define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled"
+#define SERVER_LOG_FILE_NAME "pg_createsubscriber_server"
+#define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal"
+
/* Command-line options */
struct CreateSubscriberOptions
{
char *config_file; /* configuration file */
+ char *log_dir; /* log directory name */
char *pub_conninfo_str; /* publisher connection string */
char *socket_dir; /* directory for Unix-domain socket, if any */
char *sub_port; /* subscriber port number */
@@ -155,8 +159,14 @@ static void get_publisher_databases(struct CreateSubscriberOptions *opt,
static void pg_createsub_log(enum pg_log_level, enum pg_log_part,
const char *pg_restrict fmt,...)
pg_attribute_printf(3, 4);
+static void pg_createsub_log_v(enum pg_log_level level, enum pg_log_part part,
+ const char *pg_restrict fmt, va_list args)
+ pg_attribute_printf(3, 0);
pg_noreturn static void pg_fatal(const char *pg_restrict fmt,...)
pg_attribute_printf(1, 2);
+static void internal_log_file_write(enum pg_log_level level,
+ const char *pg_restrict fmt, va_list args)
+ pg_attribute_printf(2, 0);
#define WAIT_INTERVAL 1 /* 1 second */
@@ -178,6 +188,9 @@ static pg_prng_state prng_state;
static char *pg_ctl_path = NULL;
static char *pg_resetwal_path = NULL;
+static FILE *internal_log_file_fp = NULL; /* File ptr to log all messages to */
+static char logdir[MAXPGPATH]; /* Directory log files are put (if specified) */
+
/* standby / subscriber data directory */
static char *subscriber_dir = NULL;
@@ -186,8 +199,28 @@ static bool standby_running = false;
static bool recovery_params_set = false;
/*
- * Report a message with a given log level
+ * Report a message with a given log level to stderr and log file
+ * (if specified).
*/
+static void
+pg_createsub_log_v(enum pg_log_level level, enum pg_log_part part,
+ const char *pg_restrict fmt, va_list args)
+{
+ if (internal_log_file_fp != NULL)
+ {
+ /* Output to both stderr and the log file */
+ va_list arg_cpy;
+
+ va_copy(arg_cpy, args);
+ pg_log_generic_v(level, part, fmt, arg_cpy);
+ va_end(arg_cpy);
+
+ internal_log_file_write(level, fmt, args);
+ }
+ else
+ pg_log_generic_v(level, part, fmt, args);
+}
+
static void
pg_createsub_log(enum pg_log_level level, enum pg_log_part part,
const char *pg_restrict fmt,...)
@@ -196,7 +229,7 @@ pg_createsub_log(enum pg_log_level level, enum pg_log_part part,
va_start(args, fmt);
- pg_log_generic_v(level, part, fmt, args);
+ pg_createsub_log_v(level, part, fmt, args);
va_end(args);
}
@@ -211,7 +244,7 @@ pg_fatal(const char *pg_restrict fmt,...)
va_start(args, fmt);
- pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, args);
+ pg_createsub_log_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, args);
va_end(args);
@@ -319,6 +352,12 @@ cleanup_objects_atexit(void)
if (standby_running)
stop_standby_server(subscriber_dir);
+
+ if (internal_log_file_fp != NULL)
+ {
+ fclose(internal_log_file_fp);
+ internal_log_file_fp = NULL;
+ }
}
static void
@@ -333,6 +372,7 @@ usage(void)
" databases and databases that don't allow connections\n"));
printf(_(" -d, --database=DBNAME database in which to create a subscription\n"));
printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n"));
+ printf(_(" -l, --logdir=LOGDIR location for the log directory\n"));
printf(_(" -n, --dry-run dry run, just show what would be done\n"));
printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT);
printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n"));
@@ -767,6 +807,7 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
bool crc_ok;
struct timeval tv;
+ char *out_file;
char *cmd_str;
pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
@@ -805,8 +846,20 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
"running pg_resetwal on the subscriber");
- cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
- subscriber_dir, DEVNULL);
+ /*
+ * Redirecting the output to the logfile if specified. Since the output
+ * would be very short, around one line, we do not provide a separate file
+ * for it; it's done as a part of the server log.
+ */
+ if (opt->log_dir)
+ out_file = psprintf("%s/%s.log", logdir, SERVER_LOG_FILE_NAME);
+ else
+ out_file = DEVNULL;
+
+ cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path,
+ subscriber_dir, out_file);
+ if (opt->log_dir)
+ pg_free(out_file);
pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
"pg_resetwal command is: %s", cmd_str);
@@ -823,6 +876,7 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
}
pg_free(cf);
+ pg_free(cmd_str);
}
/*
@@ -1029,6 +1083,89 @@ server_is_in_recovery(PGconn *conn)
return ret == 0;
}
+static void
+internal_log_file_write(enum pg_log_level level, const char *pg_restrict fmt,
+ va_list args)
+{
+ Assert(internal_log_file_fp);
+
+ /* Do nothing if log level is too low. */
+ if (level < __pg_log_level)
+ return;
+
+ vfprintf(internal_log_file_fp, _(fmt), args);
+
+ fprintf(internal_log_file_fp, "\n");
+ fflush(internal_log_file_fp);
+}
+
+/*
+ * Open a new logfile with proper permissions.
+ * From src/backend/postmaster/syslogger.c
+ */
+static FILE *
+logfile_open(const char *filename, const char *mode)
+{
+ FILE *fh;
+ mode_t oumask;
+
+ oumask = umask((mode_t) ((~(S_IRUSR | S_IWUSR)) & (S_IRWXU | S_IRWXG | S_IRWXO)));
+ fh = fopen(filename, mode);
+ umask(oumask);
+
+ if (fh)
+ {
+ setvbuf(fh, NULL, PG_IOLBF, 0);
+
+#ifdef WIN32
+ /* use CRLF line endings on Windows */
+ _setmode(_fileno(fh), _O_TEXT);
+#endif
+ }
+ else
+ pg_fatal("could not open log file \"%s\": %m",
+ filename);
+
+ return fh;
+}
+
+static void
+make_output_dirs(const char *log_basedir)
+{
+ char timestamp[128];
+ struct timeval tval;
+ time_t now;
+ struct tm tmbuf;
+ int len;
+
+ /* Generate timestamp */
+ gettimeofday(&tval, NULL);
+ now = tval.tv_sec;
+
+ strftime(timestamp, sizeof(timestamp), "%Y%m%dT%H%M%S",
+ localtime_r(&now, &tmbuf));
+
+ /* append milliseconds */
+ snprintf(timestamp + strlen(timestamp),
+ sizeof(timestamp) - strlen(timestamp), ".%03u",
+ (unsigned int) (tval.tv_usec / 1000));
+
+ /* Build timestamp directory path */
+ len = snprintf(logdir, MAXPGPATH, "%s/%s", log_basedir, timestamp);
+
+ if (len >= MAXPGPATH)
+ pg_fatal("directory path for log files, %s/%s, is too long",
+ logdir, timestamp);
+
+ /* Create base directory (ignore if exists) */
+ if (mkdir(log_basedir, S_IRWXU) < 0 && errno != EEXIST)
+ pg_fatal("could not create directory \"%s\": %m", log_basedir);
+
+ /* Create BASE_DIR/$timestamp */
+ if (mkdir(logdir, S_IRWXU) < 0)
+ pg_fatal("could not create directory \"%s\": %m", logdir);
+}
+
/*
* Is the primary server ready for logical replication?
*
@@ -1787,6 +1924,9 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
if (restrict_logical_worker)
appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
+ if (opt->log_dir)
+ appendPQExpBuffer(pg_ctl_cmd, " -l \"%s/%s.log\"", logdir, SERVER_LOG_FILE_NAME);
+
pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
"pg_ctl command is: %s", pg_ctl_cmd->data);
rc = system(pg_ctl_cmd->data);
@@ -2357,6 +2497,7 @@ main(int argc, char **argv)
{"all", no_argument, NULL, 'a'},
{"database", required_argument, NULL, 'd'},
{"pgdata", required_argument, NULL, 'D'},
+ {"logdir", required_argument, NULL, 'l'},
{"dry-run", no_argument, NULL, 'n'},
{"subscriber-port", required_argument, NULL, 'p'},
{"publisher-server", required_argument, NULL, 'P'},
@@ -2415,6 +2556,7 @@ main(int argc, char **argv)
/* Default settings */
subscriber_dir = NULL;
opt.config_file = NULL;
+ opt.log_dir = NULL;
opt.pub_conninfo_str = NULL;
opt.socket_dir = NULL;
opt.sub_port = DEFAULT_SUB_PORT;
@@ -2445,7 +2587,7 @@ main(int argc, char **argv)
get_restricted_token();
- while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v",
+ while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v",
long_options, &option_index)) != -1)
{
switch (c)
@@ -2466,6 +2608,10 @@ main(int argc, char **argv)
subscriber_dir = pg_strdup(optarg);
canonicalize_path(subscriber_dir);
break;
+ case 'l':
+ opt.log_dir = pg_strdup(optarg);
+ canonicalize_path(opt.log_dir);
+ break;
case 'n':
dry_run = true;
break;
@@ -2613,6 +2759,19 @@ main(int argc, char **argv)
exit(1);
}
+ if (opt.log_dir != NULL)
+ {
+ char *internal_log_file;
+
+ make_output_dirs(opt.log_dir);
+ internal_log_file = psprintf("%s/%s.log", logdir,
+ INTERNAL_LOG_FILE_NAME);
+
+ /* logfile_open() will exit if there is an error */
+ internal_log_file_fp = logfile_open(internal_log_file, "a");
+ pg_free(internal_log_file);
+ }
+
if (dry_run)
pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
"Executing in dry-run mode.\n"
@@ -2832,5 +2991,8 @@ main(int argc, char **argv)
pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
"Done!");
+ if (internal_log_file_fp != NULL)
+ fclose(internal_log_file_fp);
+
return 0;
}
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 0c27fca7bb7..858082c70df 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -14,6 +14,7 @@ program_version_ok('pg_createsubscriber');
program_options_handling_ok('pg_createsubscriber');
my $datadir = PostgreSQL::Test::Utils::tempdir;
+my $logdir = PostgreSQL::Test::Utils::tempdir;
# Generate a database with a name made of a range of ASCII characters.
# Extracted from 002_pg_upgrade.pl.
@@ -362,9 +363,35 @@ command_ok(
'--subscription' => 'sub2',
'--database' => $db1,
'--database' => $db2,
+ '--logdir' => $logdir,
],
'run pg_createsubscriber --dry-run on node S');
+# Check that the log files were created
+my @server_log_files = glob "$logdir/*/pg_createsubscriber_server.log";
+is(scalar(@server_log_files),
+ 1, "pg_createsubscriber_server.log file was created");
+my $server_log_file_size = -s $server_log_files[0];
+isnt($server_log_file_size, 0,
+ "pg_createsubscriber_server.log file not empty");
+my $server_log = slurp_file($server_log_files[0]);
+like(
+ $server_log,
+ qr/consistent recovery state reached/,
+ "server reached consistent recovery state");
+
+my @internal_log_files = glob "$logdir/*/pg_createsubscriber_internal.log";
+is(scalar(@internal_log_files),
+ 1, "pg_createsubscriber_internal.log file was created");
+my $internal_log_file_size = -s $internal_log_files[0];
+isnt($internal_log_file_size, 0,
+ "pg_createsubscriber_internal.log file not empty");
+my $internal_log = slurp_file($internal_log_files[0]);
+like(
+ $internal_log,
+ qr/target server reached the consistent state/,
+ "log shows consistent state reached");
+
# Check if node S is still a standby
$node_s->start;
is($node_s->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
@@ -444,7 +471,8 @@ is(scalar(() = $stderr =~ /would create subscription/g),
# Create a user-defined publication, and a table that is not a member of that
# publication.
-$node_p->safe_psql($db1, qq(
+$node_p->safe_psql(
+ $db1, qq(
CREATE PUBLICATION test_pub3 FOR TABLE tbl1;
CREATE TABLE not_replicated (a int);
));
@@ -540,8 +568,7 @@ second row
third row),
"logical replication works in database $db1");
$result = $node_s->safe_psql($db1, 'SELECT * FROM not_replicated');
-is($result, qq(),
- "table is not replicated in database $db1");
+is($result, qq(), "table is not replicated in database $db1");
# Check result in database $db2
$result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2');
@@ -555,8 +582,10 @@ my $sysid_s = $node_s->safe_psql('postgres',
isnt($sysid_p, $sysid_s, 'system identifier was changed');
# Verify that pub2 was created in $db2
-is($node_p->safe_psql($db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"),
- '1', "publication pub2 was created in $db2");
+is( $node_p->safe_psql(
+ $db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"),
+ '1',
+ "publication pub2 was created in $db2");
# Get subscription and publication names
$result = $node_s->safe_psql(
@@ -581,7 +610,7 @@ $result = $node_s->safe_psql(
)
);
-is($result, qq($db1|{test_pub3}
+is( $result, qq($db1|{test_pub3}
$db2|{pub2}),
"subscriptions use the correct publications");
--
2.43.0
[application/octet-stream] v15-0001-pg_createsubscriber-use-own-reporting-functions.patch (55.1K, 4-v15-0001-pg_createsubscriber-use-own-reporting-functions.patch)
download | inline diff:
From ee6b65eb5c777c5063dd1b1cf0b39fd604539da6 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <[email protected]>
Date: Wed, 18 Mar 2026 20:20:50 +0900
Subject: [PATCH v15 1/2] pg_createsubscriber: use own reporting functions
This commit converts all pg_log_xxx families to call a new reporting function
pg_createsub_log(). Also, pg_fatal() is overwritten to use its own.
This commit changes nothing from the outside, but is needed for the upcoming
commit.
---
src/bin/pg_basebackup/pg_createsubscriber.c | 717 +++++++++++++-------
1 file changed, 464 insertions(+), 253 deletions(-)
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 2bc84505aab..c3df83025d7 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -31,6 +31,12 @@
#include "fe_utils/version.h"
#include "getopt_long.h"
+/*
+ * For now, pg_createsubscriber does not use common/logging.c; use our own
+ * pg_fatal.
+ */
+#undef pg_fatal
+
#define DEFAULT_SUB_PORT "50432"
#define OBJECTTYPE_PUBLICATIONS 0x0001
@@ -146,6 +152,11 @@ static void drop_existing_subscription(PGconn *conn, const char *subname,
const char *dbname);
static void get_publisher_databases(struct CreateSubscriberOptions *opt,
bool dbnamespecified);
+static void pg_createsub_log(enum pg_log_level, enum pg_log_part,
+ const char *pg_restrict fmt,...)
+ pg_attribute_printf(3, 4);
+pg_noreturn static void pg_fatal(const char *pg_restrict fmt,...)
+ pg_attribute_printf(1, 2);
#define WAIT_INTERVAL 1 /* 1 second */
@@ -174,6 +185,38 @@ static bool recovery_ended = false;
static bool standby_running = false;
static bool recovery_params_set = false;
+/*
+ * Report a message with a given log level
+ */
+static void
+pg_createsub_log(enum pg_log_level level, enum pg_log_part part,
+ const char *pg_restrict fmt,...)
+{
+ va_list args;
+
+ va_start(args, fmt);
+
+ pg_log_generic_v(level, part, fmt, args);
+
+ va_end(args);
+}
+
+/*
+ * Report a fatal error and exit
+ */
+static void
+pg_fatal(const char *pg_restrict fmt,...)
+{
+ va_list args;
+
+ va_start(args, fmt);
+
+ pg_log_generic_v(PG_LOG_ERROR, PG_LOG_PRIMARY, fmt, args);
+
+ va_end(args);
+
+ exit(1);
+}
/*
* Clean up objects created by pg_createsubscriber.
@@ -205,7 +248,8 @@ cleanup_objects_atexit(void)
if (durable_rename(conf_filename, conf_filename_disabled) != 0)
{
/* durable_rename() has already logged something. */
- pg_log_warning_hint("A manual removal of the recovery parameters may be required.");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+ "A manual removal of the recovery parameters may be required.");
}
}
@@ -219,9 +263,11 @@ cleanup_objects_atexit(void)
*/
if (recovery_ended)
{
- pg_log_warning("failed after the end of recovery");
- pg_log_warning_hint("The target server cannot be used as a physical replica anymore. "
- "You must recreate the physical replica before continuing.");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+ "failed after the end of recovery");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+ "The target server cannot be used as a physical replica anymore. "
+ "You must recreate the physical replica before continuing.");
}
for (int i = 0; i < num_dbs; i++)
@@ -251,17 +297,21 @@ cleanup_objects_atexit(void)
*/
if (dbinfo->made_publication)
{
- pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind",
- dbinfo->pubname,
- dbinfo->dbname);
- pg_log_warning_hint("Drop this publication before trying again.");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+ "publication \"%s\" created in database \"%s\" on primary was left behind",
+ dbinfo->pubname,
+ dbinfo->dbname);
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+ "Drop this publication before trying again.");
}
if (dbinfo->made_replslot)
{
- pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind",
- dbinfo->replslotname,
- dbinfo->dbname);
- pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+ "replication slot \"%s\" created in database \"%s\" on primary was left behind",
+ dbinfo->replslotname,
+ dbinfo->dbname);
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+ "Drop this replication slot soon to avoid retention of WAL files.");
}
}
}
@@ -342,7 +392,8 @@ get_base_conninfo(const char *conninfo, char **dbname)
conn_opts = PQconninfoParse(conninfo, &errmsg);
if (conn_opts == NULL)
{
- pg_log_error("could not parse connection string: %s", errmsg);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not parse connection string: %s", errmsg);
PQfreemem(errmsg);
return NULL;
}
@@ -426,7 +477,8 @@ get_exec_path(const char *argv0, const char *progname)
progname, full_path, "pg_createsubscriber");
}
- pg_log_debug("%s path is: %s", progname, exec_path);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "%s path is: %s", progname, exec_path);
return exec_path;
}
@@ -443,8 +495,9 @@ check_data_directory(const char *datadir)
uint32 major_version;
char *version_str;
- pg_log_info("checking if directory \"%s\" is a cluster data directory",
- datadir);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "checking if directory \"%s\" is a cluster data directory",
+ datadir);
if (stat(datadir, &statbuf) != 0)
{
@@ -462,9 +515,11 @@ check_data_directory(const char *datadir)
major_version = GET_PG_MAJORVERSION_NUM(get_pg_version(datadir, &version_str));
if (major_version != PG_MAJORVERSION_NUM)
{
- pg_log_error("data directory is of wrong version");
- pg_log_error_detail("File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
- "PG_VERSION", version_str, PG_MAJORVERSION);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "data directory is of wrong version");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+ "File \"%s\" contains \"%s\", which is not compatible with this program's version \"%s\".",
+ "PG_VERSION", version_str, PG_MAJORVERSION);
exit(1);
}
}
@@ -547,14 +602,16 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
dbinfo[i].subname = NULL;
/* Other fields will be filled later */
- pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
- dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
- dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
- dbinfo[i].pubconninfo);
- pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
- dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
- dbinfo[i].subconninfo,
- dbinfos.two_phase ? "true" : "false");
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i,
+ dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
+ dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
+ dbinfo[i].pubconninfo);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
+ dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
+ dbinfo[i].subconninfo,
+ dbinfos.two_phase ? "true" : "false");
if (num_pubs > 0)
pubcell = pubcell->next;
@@ -582,8 +639,9 @@ connect_database(const char *conninfo, bool exit_on_error)
conn = PQconnectdb(conninfo);
if (PQstatus(conn) != CONNECTION_OK)
{
- pg_log_error("connection to database failed: %s",
- PQerrorMessage(conn));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "connection to database failed: %s",
+ PQerrorMessage(conn));
PQfinish(conn);
if (exit_on_error)
@@ -595,8 +653,9 @@ connect_database(const char *conninfo, bool exit_on_error)
res = PQexec(conn, ALWAYS_SECURE_SEARCH_PATH_SQL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not clear \"search_path\": %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not clear \"search_path\": %s",
+ PQresultErrorMessage(res));
PQclear(res);
PQfinish(conn);
@@ -635,27 +694,31 @@ get_primary_sysid(const char *conninfo)
PGresult *res;
uint64 sysid;
- pg_log_info("getting system identifier from publisher");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "getting system identifier from publisher");
conn = connect_database(conninfo, true);
res = PQexec(conn, "SELECT system_identifier FROM pg_catalog.pg_control_system()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not get system identifier: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not get system identifier: %s",
+ PQresultErrorMessage(res));
disconnect_database(conn, true);
}
if (PQntuples(res) != 1)
{
- pg_log_error("could not get system identifier: got %d rows, expected %d row",
- PQntuples(res), 1);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not get system identifier: got %d rows, expected %d row",
+ PQntuples(res), 1);
disconnect_database(conn, true);
}
sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10);
- pg_log_info("system identifier is %" PRIu64 " on publisher", sysid);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "system identifier is %" PRIu64 " on publisher", sysid);
PQclear(res);
disconnect_database(conn, false);
@@ -675,7 +738,8 @@ get_standby_sysid(const char *datadir)
bool crc_ok;
uint64 sysid;
- pg_log_info("getting system identifier from subscriber");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "getting system identifier from subscriber");
cf = get_controlfile(datadir, &crc_ok);
if (!crc_ok)
@@ -683,7 +747,8 @@ get_standby_sysid(const char *datadir)
sysid = cf->system_identifier;
- pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "system identifier is %" PRIu64 " on subscriber", sysid);
pg_free(cf);
@@ -704,7 +769,8 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
char *cmd_str;
- pg_log_info("modifying system identifier of subscriber");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "modifying system identifier of subscriber");
cf = get_controlfile(subscriber_dir, &crc_ok);
if (!crc_ok)
@@ -721,31 +787,37 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt)
cf->system_identifier |= getpid() & 0xFFF;
if (dry_run)
- pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber",
- cf->system_identifier);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would set system identifier to %" PRIu64 " on subscriber",
+ cf->system_identifier);
else
{
update_controlfile(subscriber_dir, cf, true);
- pg_log_info("system identifier is %" PRIu64 " on subscriber",
- cf->system_identifier);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "system identifier is %" PRIu64 " on subscriber",
+ cf->system_identifier);
}
if (dry_run)
- pg_log_info("dry-run: would run pg_resetwal on the subscriber");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would run pg_resetwal on the subscriber");
else
- pg_log_info("running pg_resetwal on the subscriber");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "running pg_resetwal on the subscriber");
cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path,
subscriber_dir, DEVNULL);
- pg_log_debug("pg_resetwal command is: %s", cmd_str);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "pg_resetwal command is: %s", cmd_str);
if (!dry_run)
{
int rc = system(cmd_str);
if (rc == 0)
- pg_log_info("successfully reset WAL on the subscriber");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "successfully reset WAL on the subscriber");
else
pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc));
}
@@ -771,15 +843,17 @@ generate_object_name(PGconn *conn)
"WHERE datname = pg_catalog.current_database()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not obtain database OID: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain database OID: %s",
+ PQresultErrorMessage(res));
disconnect_database(conn, true);
}
if (PQntuples(res) != 1)
{
- pg_log_error("could not obtain database OID: got %d rows, expected %d row",
- PQntuples(res), 1);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain database OID: got %d rows, expected %d row",
+ PQntuples(res), 1);
disconnect_database(conn, true);
}
@@ -819,8 +893,9 @@ find_publication(PGconn *conn, const char *pubname, const char *dbname)
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
- pubname, dbname, PQerrorMessage(conn));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not find publication \"%s\" in database \"%s\": %s",
+ pubname, dbname, PQerrorMessage(conn));
disconnect_database(conn, true);
}
@@ -873,8 +948,9 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
{
/* Reuse existing publication on publisher. */
- pg_log_info("use existing publication \"%s\" in database \"%s\"",
- dbinfo[i].pubname, dbinfo[i].dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "use existing publication \"%s\" in database \"%s\"",
+ dbinfo[i].pubname, dbinfo[i].dbname);
/* Don't remove pre-existing publication if an error occurs. */
dbinfo[i].made_publication = false;
}
@@ -912,8 +988,9 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
res = PQexec(conn, "SELECT pg_log_standby_snapshot()");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not write an additional WAL record: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not write an additional WAL record: %s",
+ PQresultErrorMessage(res));
disconnect_database(conn, true);
}
PQclear(res);
@@ -938,8 +1015,9 @@ server_is_in_recovery(PGconn *conn)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not obtain recovery progress: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain recovery progress: %s",
+ PQresultErrorMessage(res));
disconnect_database(conn, true);
}
@@ -971,7 +1049,8 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
int max_prepared_transactions;
char *max_slot_wal_keep_size;
- pg_log_info("checking settings on publisher");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "checking settings on publisher");
conn = connect_database(dbinfo[0].pubconninfo, true);
@@ -981,7 +1060,8 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
*/
if (server_is_in_recovery(conn))
{
- pg_log_error("primary server cannot be in recovery");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "primary server cannot be in recovery");
disconnect_database(conn, true);
}
@@ -1007,8 +1087,9 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not obtain publisher settings: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain publisher settings: %s",
+ PQresultErrorMessage(res));
disconnect_database(conn, true);
}
@@ -1022,48 +1103,63 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
PQclear(res);
- pg_log_debug("publisher: wal_level: %s", wal_level);
- pg_log_debug("publisher: max_replication_slots: %d", max_repslots);
- pg_log_debug("publisher: current replication slots: %d", cur_repslots);
- pg_log_debug("publisher: max_wal_senders: %d", max_walsenders);
- pg_log_debug("publisher: current wal senders: %d", cur_walsenders);
- pg_log_debug("publisher: max_prepared_transactions: %d",
- max_prepared_transactions);
- pg_log_debug("publisher: max_slot_wal_keep_size: %s",
- max_slot_wal_keep_size);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "publisher: wal_level: %s", wal_level);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "publisher: max_replication_slots: %d", max_repslots);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "publisher: current replication slots: %d", cur_repslots);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "publisher: max_wal_senders: %d", max_walsenders);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "publisher: current wal senders: %d", cur_walsenders);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "publisher: max_prepared_transactions: %d",
+ max_prepared_transactions);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "publisher: max_slot_wal_keep_size: %s",
+ max_slot_wal_keep_size);
disconnect_database(conn, false);
if (strcmp(wal_level, "minimal") == 0)
{
- pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "publisher requires \"wal_level\" >= \"replica\"");
failed = true;
}
if (max_repslots - cur_repslots < num_dbs)
{
- pg_log_error("publisher requires %d replication slots, but only %d remain",
- num_dbs, max_repslots - cur_repslots);
- pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
- "max_replication_slots", cur_repslots + num_dbs);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "publisher requires %d replication slots, but only %d remain",
+ num_dbs, max_repslots - cur_repslots);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Increase the configuration parameter \"%s\" to at least %d.",
+ "max_replication_slots", cur_repslots + num_dbs);
failed = true;
}
if (max_walsenders - cur_walsenders < num_dbs)
{
- pg_log_error("publisher requires %d WAL sender processes, but only %d remain",
- num_dbs, max_walsenders - cur_walsenders);
- pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
- "max_wal_senders", cur_walsenders + num_dbs);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "publisher requires %d WAL sender processes, but only %d remain",
+ num_dbs, max_walsenders - cur_walsenders);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Increase the configuration parameter \"%s\" to at least %d.",
+ "max_wal_senders", cur_walsenders + num_dbs);
failed = true;
}
if (max_prepared_transactions != 0 && !dbinfos.two_phase)
{
- pg_log_warning("two_phase option will not be enabled for replication slots");
- pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. "
- "Prepared transactions will be replicated at COMMIT PREPARED.");
- pg_log_warning_hint("You can use the command-line option --enable-two-phase to enable two_phase.");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+ "two_phase option will not be enabled for replication slots");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_DETAIL,
+ "Subscriptions will be created with the two_phase option disabled. "
+ "Prepared transactions will be replicated at COMMIT PREPARED.");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+ "You can use the command-line option --enable-two-phase to enable two_phase.");
}
/*
@@ -1073,9 +1169,11 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
*/
if (dry_run && (strcmp(max_slot_wal_keep_size, "-1") != 0))
{
- pg_log_warning("required WAL could be removed from the publisher");
- pg_log_warning_hint("Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
- "max_slot_wal_keep_size");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+ "required WAL could be removed from the publisher");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+ "Set the configuration parameter \"%s\" to -1 to ensure that required WAL files are not prematurely removed.",
+ "max_slot_wal_keep_size");
}
pg_free(wal_level);
@@ -1106,14 +1204,16 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
int max_replorigins;
int max_wprocs;
- pg_log_info("checking settings on subscriber");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "checking settings on subscriber");
conn = connect_database(dbinfo[0].subconninfo, true);
/* The target server must be a standby */
if (!server_is_in_recovery(conn))
{
- pg_log_error("target server must be a standby");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "target server must be a standby");
disconnect_database(conn, true);
}
@@ -1137,8 +1237,9 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not obtain subscriber settings: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain subscriber settings: %s",
+ PQresultErrorMessage(res));
disconnect_database(conn, true);
}
@@ -1148,12 +1249,16 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
if (strcmp(PQgetvalue(res, 3, 0), "") != 0)
primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0));
- pg_log_debug("subscriber: max_logical_replication_workers: %d",
- max_lrworkers);
- pg_log_debug("subscriber: max_active_replication_origins: %d", max_replorigins);
- pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "subscriber: max_logical_replication_workers: %d",
+ max_lrworkers);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "subscriber: max_active_replication_origins: %d", max_replorigins);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "subscriber: max_worker_processes: %d", max_wprocs);
if (primary_slot_name)
- pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "subscriber: primary_slot_name: %s", primary_slot_name);
PQclear(res);
@@ -1161,28 +1266,34 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
if (max_replorigins < num_dbs)
{
- pg_log_error("subscriber requires %d active replication origins, but only %d remain",
- num_dbs, max_replorigins);
- pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
- "max_active_replication_origins", num_dbs);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "subscriber requires %d active replication origins, but only %d remain",
+ num_dbs, max_replorigins);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Increase the configuration parameter \"%s\" to at least %d.",
+ "max_active_replication_origins", num_dbs);
failed = true;
}
if (max_lrworkers < num_dbs)
{
- pg_log_error("subscriber requires %d logical replication workers, but only %d remain",
- num_dbs, max_lrworkers);
- pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
- "max_logical_replication_workers", num_dbs);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "subscriber requires %d logical replication workers, but only %d remain",
+ num_dbs, max_lrworkers);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Increase the configuration parameter \"%s\" to at least %d.",
+ "max_logical_replication_workers", num_dbs);
failed = true;
}
if (max_wprocs < num_dbs + 1)
{
- pg_log_error("subscriber requires %d worker processes, but only %d remain",
- num_dbs + 1, max_wprocs);
- pg_log_error_hint("Increase the configuration parameter \"%s\" to at least %d.",
- "max_worker_processes", num_dbs + 1);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "subscriber requires %d worker processes, but only %d remain",
+ num_dbs + 1, max_wprocs);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Increase the configuration parameter \"%s\" to at least %d.",
+ "max_worker_processes", num_dbs + 1);
failed = true;
}
@@ -1215,19 +1326,22 @@ drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname
appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
if (dry_run)
- pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"",
- subname, dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would drop subscription \"%s\" in database \"%s\"",
+ subname, dbname);
else
{
- pg_log_info("dropping subscription \"%s\" in database \"%s\"",
- subname, dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dropping subscription \"%s\" in database \"%s\"",
+ subname, dbname);
res = PQexec(conn, query->data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
- pg_log_error("could not drop subscription \"%s\": %s",
- subname, PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not drop subscription \"%s\": %s",
+ subname, PQresultErrorMessage(res));
disconnect_database(conn, true);
}
@@ -1261,8 +1375,9 @@ check_and_drop_existing_subscriptions(PGconn *conn,
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not obtain pre-existing subscriptions: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain pre-existing subscriptions: %s",
+ PQresultErrorMessage(res));
disconnect_database(conn, true);
}
@@ -1373,7 +1488,8 @@ setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const c
lsn);
}
- pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "recovery parameters:\n%s", recoveryconfcontents->data);
if (!dry_run)
{
@@ -1427,9 +1543,11 @@ drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotnam
}
else
{
- pg_log_warning("could not drop replication slot \"%s\" on primary",
- slotname);
- pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files.");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+ "could not drop replication slot \"%s\" on primary",
+ slotname);
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+ "Drop this replication slot soon to avoid retention of WAL files.");
}
}
@@ -1461,9 +1579,11 @@ drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
}
else
{
- pg_log_warning("could not obtain failover replication slot information: %s",
- PQresultErrorMessage(res));
- pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+ "could not obtain failover replication slot information: %s",
+ PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+ "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
}
PQclear(res);
@@ -1471,8 +1591,10 @@ drop_failover_replication_slots(struct LogicalRepInfo *dbinfo)
}
else
{
- pg_log_warning("could not drop failover replication slot");
- pg_log_warning_hint("Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_PRIMARY,
+ "could not drop failover replication slot");
+ pg_createsub_log(PG_LOG_WARNING, PG_LOG_HINT,
+ "Drop the failover replication slots on subscriber soon to avoid retention of WAL files.");
}
}
@@ -1494,11 +1616,13 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
Assert(conn != NULL);
if (dry_run)
- pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
- slot_name, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher",
+ slot_name, dbinfo->dbname);
else
- pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher",
- slot_name, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "creating the replication slot \"%s\" in database \"%s\" on publisher",
+ slot_name, dbinfo->dbname);
slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
@@ -1509,16 +1633,18 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
PQfreemem(slot_name_esc);
- pg_log_debug("command is: %s", str->data);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "command is: %s", str->data);
if (!dry_run)
{
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not create replication slot \"%s\" in database \"%s\": %s",
- slot_name, dbinfo->dbname,
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not create replication slot \"%s\" in database \"%s\": %s",
+ slot_name, dbinfo->dbname,
+ PQresultErrorMessage(res));
PQclear(res);
destroyPQExpBuffer(str);
return NULL;
@@ -1547,11 +1673,13 @@ drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
Assert(conn != NULL);
if (dry_run)
- pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"",
- slot_name, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would drop the replication slot \"%s\" in database \"%s\"",
+ slot_name, dbinfo->dbname);
else
- pg_log_info("dropping the replication slot \"%s\" in database \"%s\"",
- slot_name, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dropping the replication slot \"%s\" in database \"%s\"",
+ slot_name, dbinfo->dbname);
slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name));
@@ -1559,15 +1687,17 @@ drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
PQfreemem(slot_name_esc);
- pg_log_debug("command is: %s", str->data);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "command is: %s", str->data);
if (!dry_run)
{
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not drop replication slot \"%s\" in database \"%s\": %s",
- slot_name, dbinfo->dbname, PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not drop replication slot \"%s\" in database \"%s\": %s",
+ slot_name, dbinfo->dbname, PQresultErrorMessage(res));
dbinfo->made_replslot = false; /* don't try again. */
}
@@ -1587,25 +1717,32 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
{
if (WIFEXITED(rc))
{
- pg_log_error("pg_ctl failed with exit code %d", WEXITSTATUS(rc));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "pg_ctl failed with exit code %d",
+ WEXITSTATUS(rc));
}
else if (WIFSIGNALED(rc))
{
#if defined(WIN32)
- pg_log_error("pg_ctl was terminated by exception 0x%X",
- WTERMSIG(rc));
- pg_log_error_detail("See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "pg_ctl was terminated by exception 0x%X",
+ WTERMSIG(rc));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+ "See C include file \"ntstatus.h\" for a description of the hexadecimal value.");
#else
- pg_log_error("pg_ctl was terminated by signal %d: %s",
- WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "pg_ctl was terminated by signal %d: %s",
+ WTERMSIG(rc), pg_strsignal(WTERMSIG(rc)));
#endif
}
else
{
- pg_log_error("pg_ctl exited with unrecognized status %d", rc);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "pg_ctl exited with unrecognized status %d", rc);
}
- pg_log_error_detail("The failed command was: %s", pg_ctl_cmd);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+ "The failed command was: %s", pg_ctl_cmd);
exit(1);
}
}
@@ -1650,12 +1787,14 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
if (restrict_logical_worker)
appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
- pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "pg_ctl command is: %s", pg_ctl_cmd->data);
rc = system(pg_ctl_cmd->data);
pg_ctl_status(pg_ctl_cmd->data, rc);
standby_running = true;
destroyPQExpBuffer(pg_ctl_cmd);
- pg_log_info("server was started");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "server was started");
}
static void
@@ -1666,11 +1805,13 @@ stop_standby_server(const char *datadir)
pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path,
datadir);
- pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "pg_ctl command is: %s", pg_ctl_cmd);
rc = system(pg_ctl_cmd);
pg_ctl_status(pg_ctl_cmd, rc);
standby_running = false;
- pg_log_info("server was stopped");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "server was stopped");
}
/*
@@ -1689,7 +1830,8 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
bool ready = false;
int timer = 0;
- pg_log_info("waiting for the target server to reach the consistent state");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "waiting for the target server to reach the consistent state");
conn = connect_database(conninfo, true);
@@ -1707,7 +1849,8 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
{
stop_standby_server(subscriber_dir);
- pg_log_error("recovery timed out");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "recovery timed out");
disconnect_database(conn, true);
}
@@ -1721,8 +1864,10 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions
if (!ready)
pg_fatal("server did not end recovery");
- pg_log_info("target server reached the consistent state");
- pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "target server reached the consistent state");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_HINT,
+ "If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing.");
}
/*
@@ -1749,8 +1894,9 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not obtain publication information: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain publication information: %s",
+ PQresultErrorMessage(res));
disconnect_database(conn, true);
}
@@ -1763,8 +1909,10 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
* pg_createsubscriber_ prefix followed by the exact database oid and
* a random number.
*/
- pg_log_error("publication \"%s\" already exists", dbinfo->pubname);
- pg_log_error_hint("Consider renaming this publication before continuing.");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "publication \"%s\" already exists", dbinfo->pubname);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Consider renaming this publication before continuing.");
disconnect_database(conn, true);
}
@@ -1772,24 +1920,28 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
resetPQExpBuffer(str);
if (dry_run)
- pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"",
- dbinfo->pubname, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would create publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
else
- pg_log_info("creating publication \"%s\" in database \"%s\"",
- dbinfo->pubname, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "creating publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES",
ipubname_esc);
- pg_log_debug("command is: %s", str->data);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "command is: %s", str->data);
if (!dry_run)
{
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
- pg_log_error("could not create publication \"%s\" in database \"%s\": %s",
- dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not create publication \"%s\" in database \"%s\": %s",
+ dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
disconnect_database(conn, true);
}
PQclear(res);
@@ -1819,25 +1971,29 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
if (dry_run)
- pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"",
- pubname, dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would drop publication \"%s\" in database \"%s\"",
+ pubname, dbname);
else
- pg_log_info("dropping publication \"%s\" in database \"%s\"",
- pubname, dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dropping publication \"%s\" in database \"%s\"",
+ pubname, dbname);
appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
PQfreemem(pubname_esc);
- pg_log_debug("command is: %s", str->data);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "command is: %s", str->data);
if (!dry_run)
{
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
- pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
- pubname, dbname, PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not drop publication \"%s\" in database \"%s\": %s",
+ pubname, dbname, PQresultErrorMessage(res));
*made_publication = false; /* don't try again. */
/*
@@ -1872,15 +2028,17 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
if (drop_all_pubs)
{
- pg_log_info("dropping all existing publications in database \"%s\"",
- dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dropping all existing publications in database \"%s\"",
+ dbinfo->dbname);
/* Fetch all publication names */
res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not obtain publication information: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain publication information: %s",
+ PQresultErrorMessage(res));
PQclear(res);
disconnect_database(conn, true);
}
@@ -1903,11 +2061,13 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
else
{
if (dry_run)
- pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
- dbinfo->pubname, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would preserve existing publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
else
- pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
- dbinfo->pubname, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "preserve existing publication \"%s\" in database \"%s\"",
+ dbinfo->pubname, dbinfo->dbname);
}
}
}
@@ -1941,11 +2101,13 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname));
if (dry_run)
- pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"",
- dbinfo->subname, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would create subscription \"%s\" in database \"%s\"",
+ dbinfo->subname, dbinfo->dbname);
else
- pg_log_info("creating subscription \"%s\" in database \"%s\"",
- dbinfo->subname, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "creating subscription \"%s\" in database \"%s\"",
+ dbinfo->subname, dbinfo->dbname);
appendPQExpBuffer(str,
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
@@ -1959,15 +2121,17 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
PQfreemem(pubconninfo_esc);
PQfreemem(replslotname_esc);
- pg_log_debug("command is: %s", str->data);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "command is: %s", str->data);
if (!dry_run)
{
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
- pg_log_error("could not create subscription \"%s\" in database \"%s\": %s",
- dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not create subscription \"%s\" in database \"%s\": %s",
+ dbinfo->subname, dbinfo->dbname, PQresultErrorMessage(res));
disconnect_database(conn, true);
}
PQclear(res);
@@ -2011,15 +2175,17 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not obtain subscription OID: %s",
- PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain subscription OID: %s",
+ PQresultErrorMessage(res));
disconnect_database(conn, true);
}
if (PQntuples(res) != 1 && !dry_run)
{
- pg_log_error("could not obtain subscription OID: got %d rows, expected %d row",
- PQntuples(res), 1);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain subscription OID: got %d rows, expected %d row",
+ PQntuples(res), 1);
disconnect_database(conn, true);
}
@@ -2043,26 +2209,30 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons
originname = psprintf("pg_%u", suboid);
if (dry_run)
- pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
- originname, lsnstr, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
+ originname, lsnstr, dbinfo->dbname);
else
- pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
- originname, lsnstr, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"",
+ originname, lsnstr, dbinfo->dbname);
resetPQExpBuffer(str);
appendPQExpBuffer(str,
"SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')",
originname, lsnstr);
- pg_log_debug("command is: %s", str->data);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "command is: %s", str->data);
if (!dry_run)
{
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not set replication progress for subscription \"%s\": %s",
- dbinfo->subname, PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not set replication progress for subscription \"%s\": %s",
+ dbinfo->subname, PQresultErrorMessage(res));
disconnect_database(conn, true);
}
PQclear(res);
@@ -2093,23 +2263,27 @@ enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname));
if (dry_run)
- pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"",
- dbinfo->subname, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "dry-run: would enable subscription \"%s\" in database \"%s\"",
+ dbinfo->subname, dbinfo->dbname);
else
- pg_log_info("enabling subscription \"%s\" in database \"%s\"",
- dbinfo->subname, dbinfo->dbname);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "enabling subscription \"%s\" in database \"%s\"",
+ dbinfo->subname, dbinfo->dbname);
appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname);
- pg_log_debug("command is: %s", str->data);
+ pg_createsub_log(PG_LOG_DEBUG, PG_LOG_PRIMARY,
+ "command is: %s", str->data);
if (!dry_run)
{
res = PQexec(conn, str->data);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
{
- pg_log_error("could not enable subscription \"%s\": %s",
- dbinfo->subname, PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not enable subscription \"%s\": %s",
+ dbinfo->subname, PQresultErrorMessage(res));
disconnect_database(conn, true);
}
@@ -2154,7 +2328,9 @@ get_publisher_databases(struct CreateSubscriberOptions *opt,
res = PQexec(conn, "SELECT datname FROM pg_database WHERE datistemplate = false AND datallowconn AND datconnlimit <> -2 ORDER BY 1");
if (PQresultStatus(res) != PGRES_TUPLES_OK)
{
- pg_log_error("could not obtain a list of databases: %s", PQresultErrorMessage(res));
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "could not obtain a list of databases: %s",
+ PQresultErrorMessage(res));
PQclear(res);
disconnect_database(conn, true);
}
@@ -2258,9 +2434,11 @@ main(int argc, char **argv)
#ifndef WIN32
if (geteuid() == 0)
{
- pg_log_error("cannot be executed by \"root\"");
- pg_log_error_hint("You must run %s as the PostgreSQL superuser.",
- progname);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "cannot be executed by \"root\"");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "You must run %s as the PostgreSQL superuser.",
+ progname);
exit(1);
}
#endif
@@ -2351,7 +2529,9 @@ main(int argc, char **argv)
break;
default:
/* getopt_long already emitted a complaint */
- pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Try \"%s --help\" for more information.",
+ progname);
exit(1);
}
}
@@ -2372,9 +2552,12 @@ main(int argc, char **argv)
if (bad_switch)
{
- pg_log_error("options %s and %s cannot be used together",
- bad_switch, "-a/--all");
- pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "options %s and %s cannot be used together",
+ bad_switch, "-a/--all");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Try \"%s --help\" for more information.",
+ progname);
exit(1);
}
}
@@ -2382,17 +2565,21 @@ main(int argc, char **argv)
/* Any non-option arguments? */
if (optind < argc)
{
- pg_log_error("too many command-line arguments (first is \"%s\")",
- argv[optind]);
- pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "too many command-line arguments (first is \"%s\")",
+ argv[optind]);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Try \"%s --help\" for more information.", progname);
exit(1);
}
/* Required arguments */
if (subscriber_dir == NULL)
{
- pg_log_error("no subscriber data directory specified");
- pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "no subscriber data directory specified");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Try \"%s --help\" for more information.", progname);
exit(1);
}
@@ -2419,22 +2606,27 @@ main(int argc, char **argv)
* identical entries for physical and logical replication. If there is
* not, we would fail anyway.
*/
- pg_log_error("no publisher connection string specified");
- pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "no publisher connection string specified");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Try \"%s --help\" for more information.", progname);
exit(1);
}
if (dry_run)
- pg_log_info("Executing in dry-run mode.\n"
- "The target directory will not be modified.");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "Executing in dry-run mode.\n"
+ "The target directory will not be modified.");
- pg_log_info("validating publisher connection string");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "validating publisher connection string");
pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
&dbname_conninfo);
if (pub_base_conninfo == NULL)
exit(1);
- pg_log_info("validating subscriber connection string");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "validating subscriber connection string");
sub_base_conninfo = get_sub_conninfo(&opt);
/*
@@ -2451,7 +2643,8 @@ main(int argc, char **argv)
if (opt.database_names.head == NULL)
{
- pg_log_info("no database was specified");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "no database was specified");
/*
* Try to obtain the dbname from the publisher conninfo. If dbname
@@ -2462,14 +2655,17 @@ main(int argc, char **argv)
simple_string_list_append(&opt.database_names, dbname_conninfo);
num_dbs++;
- pg_log_info("database name \"%s\" was extracted from the publisher connection string",
- dbname_conninfo);
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "database name \"%s\" was extracted from the publisher connection string",
+ dbname_conninfo);
}
else
{
- pg_log_error("no database name specified");
- pg_log_error_hint("Try \"%s --help\" for more information.",
- progname);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "no database name specified");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Try \"%s --help\" for more information.",
+ progname);
exit(1);
}
}
@@ -2477,23 +2673,29 @@ main(int argc, char **argv)
/* Number of object names must match number of databases */
if (num_pubs > 0 && num_pubs != num_dbs)
{
- pg_log_error("wrong number of publication names specified");
- pg_log_error_detail("The number of specified publication names (%d) must match the number of specified database names (%d).",
- num_pubs, num_dbs);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "wrong number of publication names specified");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+ "The number of specified publication names (%d) must match the number of specified database names (%d).",
+ num_pubs, num_dbs);
exit(1);
}
if (num_subs > 0 && num_subs != num_dbs)
{
- pg_log_error("wrong number of subscription names specified");
- pg_log_error_detail("The number of specified subscription names (%d) must match the number of specified database names (%d).",
- num_subs, num_dbs);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "wrong number of subscription names specified");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+ "The number of specified subscription names (%d) must match the number of specified database names (%d).",
+ num_subs, num_dbs);
exit(1);
}
if (num_replslots > 0 && num_replslots != num_dbs)
{
- pg_log_error("wrong number of replication slot names specified");
- pg_log_error_detail("The number of specified replication slot names (%d) must match the number of specified database names (%d).",
- num_replslots, num_dbs);
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "wrong number of replication slot names specified");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_DETAIL,
+ "The number of specified replication slot names (%d) must match the number of specified database names (%d).",
+ num_replslots, num_dbs);
exit(1);
}
@@ -2504,9 +2706,11 @@ main(int argc, char **argv)
dbinfos.objecttypes_to_clean |= OBJECTTYPE_PUBLICATIONS;
else
{
- pg_log_error("invalid object type \"%s\" specified for %s",
- cell->val, "--clean");
- pg_log_error_hint("The valid value is: \"%s\"", "publications");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "invalid object type \"%s\" specified for %s",
+ cell->val, "--clean");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "The valid value is: \"%s\"", "publications");
exit(1);
}
}
@@ -2550,8 +2754,10 @@ main(int argc, char **argv)
*/
if (stat(pidfile, &statbuf) == 0)
{
- pg_log_error("standby server is running");
- pg_log_error_hint("Stop the standby server and try again.");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_PRIMARY,
+ "standby server is running");
+ pg_createsub_log(PG_LOG_ERROR, PG_LOG_HINT,
+ "Stop the standby server and try again.");
exit(1);
}
@@ -2560,7 +2766,8 @@ main(int argc, char **argv)
* by command-line options). The goal is to avoid connections during the
* transformation steps.
*/
- pg_log_info("starting the standby server with command-line options");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "starting the standby server with command-line options");
start_standby_server(&opt, true, false);
/* Check if the standby server is ready for logical replication */
@@ -2576,7 +2783,8 @@ main(int argc, char **argv)
* guarantees it) *before* creating the replication slots in
* setup_publisher().
*/
- pg_log_info("stopping the subscriber");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "stopping the subscriber");
stop_standby_server(subscriber_dir);
/* Create the required objects for each database on publisher */
@@ -2590,7 +2798,8 @@ main(int argc, char **argv)
* until accepting connections. We don't want to start logical replication
* during setup.
*/
- pg_log_info("starting the subscriber");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "starting the subscriber");
start_standby_server(&opt, true, true);
/* Waiting the subscriber to be promoted */
@@ -2611,7 +2820,8 @@ main(int argc, char **argv)
drop_failover_replication_slots(dbinfos.dbinfo);
/* Stop the subscriber */
- pg_log_info("stopping the subscriber");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "stopping the subscriber");
stop_standby_server(subscriber_dir);
/* Change system identifier from subscriber */
@@ -2619,7 +2829,8 @@ main(int argc, char **argv)
success = true;
- pg_log_info("Done!");
+ pg_createsub_log(PG_LOG_INFO, PG_LOG_PRIMARY,
+ "Done!");
return 0;
}
--
2.43.0
view thread (65+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: [Proposal] Adding Log File Capability to pg_createsubscriber
In-Reply-To: <CAEqnbaXb-TKUm7P-=_zrgQ=shRXkkZscPOHEL9OS6Cb2V8YT8Q@mail.gmail.com>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox