From e8bb2d0bd71fd0e022f81d9cc132840dc47b5f63 Mon Sep 17 00:00:00 2001 From: Gyan Sreejith Date: Sun, 25 Jan 2026 20:32:21 -0500 Subject: [PATCH v3] Add a new argument -l to pg_createsubscriber. Enabling the option to write messages to log files in the specified directory. A new directory is created if required, and two new logfiles (with timestamps in their names) are added to the directory: 1. pg_createsubscriber_server_timestamp.log - captures messages related to starting and stopping the standby server. 2. pg_createsubscriber_internal_timestamp.log - captures internal diagnostic output from pg_createsubscriber. For example, if we specify -l abc as an argument, and if the timestamp on running it is 2026-01-19-20-43-17.204240, a directory abc is created if it doesn't exist already, and the directory will contain the two log files pg_createsubscriber_server_2026-01-19-20-43-17.204240.log and pg_createsubscriber_internal_2026-01-19-20-43-17.204240.log --- doc/src/sgml/ref/pg_createsubscriber.sgml | 22 ++ src/bin/pg_basebackup/pg_createsubscriber.c | 373 ++++++++++++------ .../t/040_pg_createsubscriber.pl | 36 +- 3 files changed, 319 insertions(+), 112 deletions(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index cf45ff3573d..8cb70403931 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -136,6 +136,28 @@ PostgreSQL documentation + + + + + + Specify the name of the log directory. A new directory is created with this name if it does not exist. The following two log files will be created with a umask of 077 so that access is disallowed to other users by default. The timestamp that is added to the name will indicate the time at which pg_createsubscriber was run. + + + + pg_createsubscriber_server_timestamp.log which captures logs related to stopping and starting the standby server, + + + + + pg_createsubscriber_internal_timestamp.log which captures internal diagnostic output (validations, checks, etc.) + + + + + + + diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index c8ace1c7732..060aaca3491 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -49,10 +49,35 @@ #define INCLUDED_CONF_FILE "pg_createsubscriber.conf" #define INCLUDED_CONF_FILE_DISABLED INCLUDED_CONF_FILE ".disabled" +#define SERVER_LOG_FILE_NAME "pg_createsubscriber_server" +#define INTERNAL_LOG_FILE_NAME "pg_createsubscriber_internal" + +#define INFO(...) do{\ + if (internal_log_file_fp != NULL) \ + internal_log_file_write(__VA_ARGS__); \ + else \ + pg_log_info(__VA_ARGS__);\ +} while(0) + +#define INFO_HINT(...) do{\ + if (internal_log_file_fp != NULL) \ + internal_log_file_write(__VA_ARGS__); \ + else \ + pg_log_info_hint(__VA_ARGS__);\ +} while(0) + +#define DEBUG(...) do{\ + if (internal_log_file_fp != NULL) \ + internal_log_file_write(__VA_ARGS__); \ + else \ + pg_log_debug(__VA_ARGS__);\ +} while(0) + /* Command-line options */ struct CreateSubscriberOptions { char *config_file; /* configuration file */ + char *log_dir; /* log directory name */ char *pub_conninfo_str; /* publisher connection string */ char *socket_dir; /* directory for Unix-domain socket, if any */ char *sub_port; /* subscriber port number */ @@ -146,6 +171,9 @@ static void drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname); static void get_publisher_databases(struct CreateSubscriberOptions *opt, bool dbnamespecified); +static void + internal_log_file_write(const char *format,...) __attribute__((format(printf, 1, 2))); + #define WAIT_INTERVAL 1 /* 1 second */ @@ -167,6 +195,9 @@ static pg_prng_state prng_state; static char *pg_ctl_path = NULL; static char *pg_resetwal_path = NULL; +static FILE *internal_log_file_fp = NULL; /* File ptr to log all messages to */ +static char *log_timestamp = NULL; /* Timestamp to be used in all log file names */ + /* standby / subscriber data directory */ static char *subscriber_dir = NULL; @@ -174,6 +205,78 @@ static bool recovery_ended = false; static bool standby_running = false; static bool recovery_params_set = false; +/* + * Print the current time, with microseconds, into a caller-supplied + * buffer. + * Same as pq_TraceFormatTimestamp() in src/interfaces/libfq/fe-trace.c + */ +static void +populate_timestamp(char *timestr, size_t ts_len) +{ + struct timeval tval; + time_t now; + struct tm tmbuf; + + gettimeofday(&tval, NULL); + + /* + * MSVC's implementation of timeval uses a long for tv_sec, however, + * localtime() expects a time_t pointer. Here we'll assign tv_sec to a + * local time_t variable so that we pass localtime() the correct pointer + * type. + */ + now = tval.tv_sec; + strftime(timestr, ts_len, + "%Y-%m-%d-%H-%M-%S", + localtime_r(&now, &tmbuf)); + /* append microseconds */ + snprintf(timestr + strlen(timestr), ts_len - strlen(timestr), + ".%06u", (unsigned int) (tval.tv_usec)); +} + +static void +internal_log_file_write(const char *format,...) +{ + if (internal_log_file_fp != NULL) + { + va_list args; + + va_start(args, format); + vfprintf(internal_log_file_fp, format, args); + fprintf(internal_log_file_fp, "\n"); + va_end(args); + } +} + +/* + * Open a new logfile with proper permissions. + * From src/backend/postmaster/syslogger.c + */ +static FILE * +logfile_open(const char *filename, const char *mode) +{ + FILE *fh; + mode_t oumask; + + oumask = umask((mode_t) ((~(S_IRUSR | S_IWUSR)) & (S_IRWXU | S_IRWXG | S_IRWXO))); + fh = fopen(filename, mode); + umask(oumask); + + if (fh) + { + setvbuf(fh, NULL, PG_IOLBF, 0); + +#ifdef WIN32 + /* use CRLF line endings on Windows */ + _setmode(_fileno(fh), _O_TEXT); +#endif + } + else + pg_fatal("could not open log file \"%s\": %m", + filename); + + return fh; +} /* * Clean up objects created by pg_createsubscriber. @@ -212,6 +315,9 @@ cleanup_objects_atexit(void) if (success) return; + if (internal_log_file_fp != NULL) + fclose(internal_log_file_fp); + /* * If the server is promoted, there is no way to use the current setup * again. Warn the user that a new replication setup should be done before @@ -283,6 +389,7 @@ usage(void) " databases and databases that don't allow connections\n")); printf(_(" -d, --database=DBNAME database in which to create a subscription\n")); printf(_(" -D, --pgdata=DATADIR location for the subscriber data directory\n")); + printf(_(" -l, --logdir=LOGDIR location for the new log directory\n")); printf(_(" -n, --dry-run dry run, just show what would be done\n")); printf(_(" -p, --subscriber-port=PORT subscriber port number (default %s)\n"), DEFAULT_SUB_PORT); printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n")); @@ -426,7 +533,7 @@ get_exec_path(const char *argv0, const char *progname) progname, full_path, "pg_createsubscriber"); } - pg_log_debug("%s path is: %s", progname, exec_path); + DEBUG("%s path is: %s", progname, exec_path); return exec_path; } @@ -443,8 +550,8 @@ check_data_directory(const char *datadir) uint32 major_version; char *version_str; - pg_log_info("checking if directory \"%s\" is a cluster data directory", - datadir); + INFO("checking if directory \"%s\" is a cluster data directory", + datadir); if (stat(datadir, &statbuf) != 0) { @@ -547,14 +654,14 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt, dbinfo[i].subname = NULL; /* Other fields will be filled later */ - pg_log_debug("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i, - dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)", - dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)", - dbinfo[i].pubconninfo); - pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i, - dbinfo[i].subname ? dbinfo[i].subname : "(auto)", - dbinfo[i].subconninfo, - dbinfos.two_phase ? "true" : "false"); + DEBUG("publisher(%d): publication: %s ; replication slot: %s ; connection string: %s", i, + dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)", + dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)", + dbinfo[i].pubconninfo); + DEBUG("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i, + dbinfo[i].subname ? dbinfo[i].subname : "(auto)", + dbinfo[i].subconninfo, + dbinfos.two_phase ? "true" : "false"); if (num_pubs > 0) pubcell = pubcell->next; @@ -635,7 +742,7 @@ get_primary_sysid(const char *conninfo) PGresult *res; uint64 sysid; - pg_log_info("getting system identifier from publisher"); + INFO("getting system identifier from publisher"); conn = connect_database(conninfo, true); @@ -655,7 +762,7 @@ get_primary_sysid(const char *conninfo) sysid = strtou64(PQgetvalue(res, 0, 0), NULL, 10); - pg_log_info("system identifier is %" PRIu64 " on publisher", sysid); + INFO("system identifier is %" PRIu64 " on publisher", sysid); PQclear(res); disconnect_database(conn, false); @@ -675,7 +782,7 @@ get_standby_sysid(const char *datadir) bool crc_ok; uint64 sysid; - pg_log_info("getting system identifier from subscriber"); + INFO("getting system identifier from subscriber"); cf = get_controlfile(datadir, &crc_ok); if (!crc_ok) @@ -683,7 +790,7 @@ get_standby_sysid(const char *datadir) sysid = cf->system_identifier; - pg_log_info("system identifier is %" PRIu64 " on subscriber", sysid); + INFO("system identifier is %" PRIu64 " on subscriber", sysid); pg_free(cf); @@ -702,9 +809,10 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt) bool crc_ok; struct timeval tv; + char *out_file; char *cmd_str; - pg_log_info("modifying system identifier of subscriber"); + INFO("modifying system identifier of subscriber"); cf = get_controlfile(subscriber_dir, &crc_ok); if (!crc_ok) @@ -713,7 +821,7 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt) /* * Select a new system identifier. * - * XXX this code was extracted from BootStrapXLOG(). + * XXX this code was extracted from BootStrapXINFO(). */ gettimeofday(&tv, NULL); cf->system_identifier = ((uint64) tv.tv_sec) << 32; @@ -721,31 +829,37 @@ modify_subscriber_sysid(const struct CreateSubscriberOptions *opt) cf->system_identifier |= getpid() & 0xFFF; if (dry_run) - pg_log_info("dry-run: would set system identifier to %" PRIu64 " on subscriber", - cf->system_identifier); + INFO("dry-run: would set system identifier to %" PRIu64 " on subscriber", + cf->system_identifier); else { update_controlfile(subscriber_dir, cf, true); - pg_log_info("system identifier is %" PRIu64 " on subscriber", - cf->system_identifier); + INFO("system identifier is %" PRIu64 " on subscriber", + cf->system_identifier); } if (dry_run) - pg_log_info("dry-run: would run pg_resetwal on the subscriber"); + INFO("dry-run: would run pg_resetwal on the subscriber"); + else + INFO("running pg_resetwal on the subscriber"); + + + if (opt->log_dir != NULL) + out_file = psprintf("%s/%s_%s.log", opt->log_dir, SERVER_LOG_FILE_NAME, log_timestamp); else - pg_log_info("running pg_resetwal on the subscriber"); + out_file = DEVNULL; - cmd_str = psprintf("\"%s\" -D \"%s\" > \"%s\"", pg_resetwal_path, - subscriber_dir, DEVNULL); + cmd_str = psprintf("\"%s\" -D \"%s\" >> \"%s\"", pg_resetwal_path, + subscriber_dir, out_file); - pg_log_debug("pg_resetwal command is: %s", cmd_str); + DEBUG("pg_resetwal command is: %s", cmd_str); if (!dry_run) { int rc = system(cmd_str); if (rc == 0) - pg_log_info("successfully reset WAL on the subscriber"); + INFO("successfully reset WAL on the subscriber"); else pg_fatal("could not reset WAL on subscriber: %s", wait_result_to_str(rc)); } @@ -873,8 +987,8 @@ setup_publisher(struct LogicalRepInfo *dbinfo) if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname)) { /* Reuse existing publication on publisher. */ - pg_log_info("use existing publication \"%s\" in database \"%s\"", - dbinfo[i].pubname, dbinfo[i].dbname); + INFO("use existing publication \"%s\" in database \"%s\"", + dbinfo[i].pubname, dbinfo[i].dbname); /* Don't remove pre-existing publication if an error occurs. */ dbinfo[i].made_publication = false; } @@ -971,7 +1085,7 @@ check_publisher(const struct LogicalRepInfo *dbinfo) int max_prepared_transactions; char *max_slot_wal_keep_size; - pg_log_info("checking settings on publisher"); + INFO("checking settings on publisher"); conn = connect_database(dbinfo[0].pubconninfo, true); @@ -1022,15 +1136,15 @@ check_publisher(const struct LogicalRepInfo *dbinfo) PQclear(res); - pg_log_debug("publisher: wal_level: %s", wal_level); - pg_log_debug("publisher: max_replication_slots: %d", max_repslots); - pg_log_debug("publisher: current replication slots: %d", cur_repslots); - pg_log_debug("publisher: max_wal_senders: %d", max_walsenders); - pg_log_debug("publisher: current wal senders: %d", cur_walsenders); - pg_log_debug("publisher: max_prepared_transactions: %d", - max_prepared_transactions); - pg_log_debug("publisher: max_slot_wal_keep_size: %s", - max_slot_wal_keep_size); + DEBUG("publisher: wal_level: %s", wal_level); + DEBUG("publisher: max_replication_slots: %d", max_repslots); + DEBUG("publisher: current replication slots: %d", cur_repslots); + DEBUG("publisher: max_wal_senders: %d", max_walsenders); + DEBUG("publisher: current wal senders: %d", cur_walsenders); + DEBUG("publisher: max_prepared_transactions: %d", + max_prepared_transactions); + DEBUG("publisher: max_slot_wal_keep_size: %s", + max_slot_wal_keep_size); disconnect_database(conn, false); @@ -1106,7 +1220,7 @@ check_subscriber(const struct LogicalRepInfo *dbinfo) int max_reporigins; int max_wprocs; - pg_log_info("checking settings on subscriber"); + INFO("checking settings on subscriber\n"); conn = connect_database(dbinfo[0].subconninfo, true); @@ -1148,12 +1262,12 @@ check_subscriber(const struct LogicalRepInfo *dbinfo) if (strcmp(PQgetvalue(res, 3, 0), "") != 0) primary_slot_name = pg_strdup(PQgetvalue(res, 3, 0)); - pg_log_debug("subscriber: max_logical_replication_workers: %d", - max_lrworkers); - pg_log_debug("subscriber: max_active_replication_origins: %d", max_reporigins); - pg_log_debug("subscriber: max_worker_processes: %d", max_wprocs); + DEBUG("subscriber: max_logical_replication_workers: %d", + max_lrworkers); + DEBUG("subscriber: max_active_replication_origins: %d", max_reporigins); + DEBUG("subscriber: max_worker_processes: %d", max_wprocs); if (primary_slot_name) - pg_log_debug("subscriber: primary_slot_name: %s", primary_slot_name); + DEBUG("subscriber: primary_slot_name: %s", primary_slot_name); PQclear(res); @@ -1215,12 +1329,12 @@ drop_existing_subscription(PGconn *conn, const char *subname, const char *dbname appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname); if (dry_run) - pg_log_info("dry-run: would drop subscription \"%s\" in database \"%s\"", - subname, dbname); + INFO("dry-run: would drop subscription \"%s\" in database \"%s\"", + subname, dbname); else { - pg_log_info("dropping subscription \"%s\" in database \"%s\"", - subname, dbname); + INFO("dropping subscription \"%s\" in database \"%s\"", + subname, dbname); res = PQexec(conn, query->data); @@ -1373,7 +1487,7 @@ setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir, const c lsn); } - pg_log_debug("recovery parameters:\n%s", recoveryconfcontents->data); + DEBUG("recovery parameters:\n%s", recoveryconfcontents->data); if (!dry_run) { @@ -1494,11 +1608,11 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo) Assert(conn != NULL); if (dry_run) - pg_log_info("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher", - slot_name, dbinfo->dbname); + INFO("dry-run: would create the replication slot \"%s\" in database \"%s\" on publisher", + slot_name, dbinfo->dbname); else - pg_log_info("creating the replication slot \"%s\" in database \"%s\" on publisher", - slot_name, dbinfo->dbname); + INFO("creating the replication slot \"%s\" in database \"%s\" on publisher", + slot_name, dbinfo->dbname); slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name)); @@ -1509,7 +1623,7 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo) PQfreemem(slot_name_esc); - pg_log_debug("command is: %s", str->data); + DEBUG("command is: %s", str->data); if (!dry_run) { @@ -1547,11 +1661,11 @@ drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, Assert(conn != NULL); if (dry_run) - pg_log_info("dry-run: would drop the replication slot \"%s\" in database \"%s\"", - slot_name, dbinfo->dbname); + INFO("dry-run: would drop the replication slot \"%s\" in database \"%s\"", + slot_name, dbinfo->dbname); else - pg_log_info("dropping the replication slot \"%s\" in database \"%s\"", - slot_name, dbinfo->dbname); + INFO("dropping the replication slot \"%s\" in database \"%s\"", + slot_name, dbinfo->dbname); slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name)); @@ -1559,7 +1673,7 @@ drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo, PQfreemem(slot_name_esc); - pg_log_debug("command is: %s", str->data); + DEBUG("command is: %s", str->data); if (!dry_run) { @@ -1650,12 +1764,17 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_ if (restrict_logical_worker) appendPQExpBufferStr(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\""); - pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data); + if (opt->log_dir != NULL) + { + appendPQExpBuffer(pg_ctl_cmd, " -l %s/%s_%s.log", opt->log_dir, SERVER_LOG_FILE_NAME, log_timestamp); + } + + DEBUG("pg_ctl command is: %s", pg_ctl_cmd->data); rc = system(pg_ctl_cmd->data); pg_ctl_status(pg_ctl_cmd->data, rc); standby_running = true; destroyPQExpBuffer(pg_ctl_cmd); - pg_log_info("server was started"); + INFO("server was started"); } static void @@ -1666,11 +1785,11 @@ stop_standby_server(const char *datadir) pg_ctl_cmd = psprintf("\"%s\" stop -D \"%s\" -s", pg_ctl_path, datadir); - pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd); + DEBUG("pg_ctl command is: %s", pg_ctl_cmd); rc = system(pg_ctl_cmd); pg_ctl_status(pg_ctl_cmd, rc); standby_running = false; - pg_log_info("server was stopped"); + INFO("server was stopped"); } /* @@ -1689,7 +1808,7 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions bool ready = false; int timer = 0; - pg_log_info("waiting for the target server to reach the consistent state"); + INFO("waiting for the target server to reach the consistent state"); conn = connect_database(conninfo, true); @@ -1721,8 +1840,8 @@ wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions if (!ready) pg_fatal("server did not end recovery"); - pg_log_info("target server reached the consistent state"); - pg_log_info_hint("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing."); + INFO("target server reached the consistent state"); + INFO_HINT("If pg_createsubscriber fails after this point, you must recreate the physical replica before continuing."); } /* @@ -1772,16 +1891,16 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo) resetPQExpBuffer(str); if (dry_run) - pg_log_info("dry-run: would create publication \"%s\" in database \"%s\"", - dbinfo->pubname, dbinfo->dbname); + INFO("dry-run: would create publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); else - pg_log_info("creating publication \"%s\" in database \"%s\"", - dbinfo->pubname, dbinfo->dbname); + INFO("creating publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); appendPQExpBuffer(str, "CREATE PUBLICATION %s FOR ALL TABLES", ipubname_esc); - pg_log_debug("command is: %s", str->data); + DEBUG("command is: %s", str->data); if (!dry_run) { @@ -1819,17 +1938,17 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname, pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname)); if (dry_run) - pg_log_info("dry-run: would drop publication \"%s\" in database \"%s\"", - pubname, dbname); + INFO("dry-run: would drop publication \"%s\" in database \"%s\"", + pubname, dbname); else - pg_log_info("dropping publication \"%s\" in database \"%s\"", - pubname, dbname); + INFO("dropping publication \"%s\" in database \"%s\"", + pubname, dbname); appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc); PQfreemem(pubname_esc); - pg_log_debug("command is: %s", str->data); + DEBUG("command is: %s", str->data); if (!dry_run) { @@ -1872,8 +1991,8 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo) if (drop_all_pubs) { - pg_log_info("dropping all existing publications in database \"%s\"", - dbinfo->dbname); + INFO("dropping all existing publications in database \"%s\"", + dbinfo->dbname); /* Fetch all publication names */ res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;"); @@ -1903,11 +2022,11 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo) else { if (dry_run) - pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"", - dbinfo->pubname, dbinfo->dbname); + INFO("dry-run: would preserve existing publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); else - pg_log_info("preserve existing publication \"%s\" in database \"%s\"", - dbinfo->pubname, dbinfo->dbname); + INFO("preserve existing publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); } } } @@ -1941,11 +2060,11 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo) replslotname_esc = PQescapeLiteral(conn, dbinfo->replslotname, strlen(dbinfo->replslotname)); if (dry_run) - pg_log_info("dry-run: would create subscription \"%s\" in database \"%s\"", - dbinfo->subname, dbinfo->dbname); + INFO("dry-run: would create subscription \"%s\" in database \"%s\"", + dbinfo->subname, dbinfo->dbname); else - pg_log_info("creating subscription \"%s\" in database \"%s\"", - dbinfo->subname, dbinfo->dbname); + INFO("creating subscription \"%s\" in database \"%s\"", + dbinfo->subname, dbinfo->dbname); appendPQExpBuffer(str, "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " @@ -1959,7 +2078,7 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo) PQfreemem(pubconninfo_esc); PQfreemem(replslotname_esc); - pg_log_debug("command is: %s", str->data); + DEBUG("command is: %s", str->data); if (!dry_run) { @@ -2043,18 +2162,18 @@ set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo, cons originname = psprintf("pg_%u", suboid); if (dry_run) - pg_log_info("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"", - originname, lsnstr, dbinfo->dbname); + INFO("dry-run: would set the replication progress (node name \"%s\", LSN %s) in database \"%s\"", + originname, lsnstr, dbinfo->dbname); else - pg_log_info("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"", - originname, lsnstr, dbinfo->dbname); + INFO("setting the replication progress (node name \"%s\", LSN %s) in database \"%s\"", + originname, lsnstr, dbinfo->dbname); resetPQExpBuffer(str); appendPQExpBuffer(str, "SELECT pg_catalog.pg_replication_origin_advance('%s', '%s')", originname, lsnstr); - pg_log_debug("command is: %s", str->data); + DEBUG("command is: %s", str->data); if (!dry_run) { @@ -2093,15 +2212,15 @@ enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo) subname = PQescapeIdentifier(conn, dbinfo->subname, strlen(dbinfo->subname)); if (dry_run) - pg_log_info("dry-run: would enable subscription \"%s\" in database \"%s\"", - dbinfo->subname, dbinfo->dbname); + INFO("dry-run: would enable subscription \"%s\" in database \"%s\"", + dbinfo->subname, dbinfo->dbname); else - pg_log_info("enabling subscription \"%s\" in database \"%s\"", - dbinfo->subname, dbinfo->dbname); + INFO("enabling subscription \"%s\" in database \"%s\"", + dbinfo->subname, dbinfo->dbname); appendPQExpBuffer(str, "ALTER SUBSCRIPTION %s ENABLE", subname); - pg_log_debug("command is: %s", str->data); + DEBUG("command is: %s", str->data); if (!dry_run) { @@ -2181,6 +2300,7 @@ main(int argc, char **argv) {"all", no_argument, NULL, 'a'}, {"database", required_argument, NULL, 'd'}, {"pgdata", required_argument, NULL, 'D'}, + {"logdir", required_argument, NULL, 'l'}, {"dry-run", no_argument, NULL, 'n'}, {"subscriber-port", required_argument, NULL, 'p'}, {"publisher-server", required_argument, NULL, 'P'}, @@ -2215,6 +2335,7 @@ main(int argc, char **argv) char *consistent_lsn; char pidfile[MAXPGPATH]; + char *internal_log_file; pg_logging_init(argv[0]); pg_logging_set_level(PG_LOG_WARNING); @@ -2239,6 +2360,7 @@ main(int argc, char **argv) /* Default settings */ subscriber_dir = NULL; opt.config_file = NULL; + opt.log_dir = NULL; opt.pub_conninfo_str = NULL; opt.socket_dir = NULL; opt.sub_port = DEFAULT_SUB_PORT; @@ -2267,9 +2389,11 @@ main(int argc, char **argv) get_restricted_token(); - while ((c = getopt_long(argc, argv, "ad:D:np:P:s:t:TU:v", + while ((c = getopt_long(argc, argv, "ad:D:l:np:P:s:t:TU:v", long_options, &option_index)) != -1) { + char timestamp[128]; + switch (c) { case 'a': @@ -2287,6 +2411,30 @@ main(int argc, char **argv) case 'D': subscriber_dir = pg_strdup(optarg); canonicalize_path(subscriber_dir); + break; + case 'l': + populate_timestamp(timestamp, sizeof(timestamp)); + log_timestamp = pg_strdup(timestamp); + + opt.log_dir = pg_strdup(optarg); + canonicalize_path(opt.log_dir); + + if (stat(opt.log_dir, &statbuf) != 0) + { + if (errno == ENOENT) + { + mkdir(opt.log_dir, S_IRWXU); + INFO("log directory created"); + } + else if (errno == EACCES) + pg_fatal("permission denied trying to access directory \"%s\": %m", opt.log_dir); + else + pg_fatal("could not access directory \"%s\": %m", opt.log_dir); + } + internal_log_file = psprintf("%s/%s_%s.log", opt.log_dir, INTERNAL_LOG_FILE_NAME, log_timestamp); + if ((internal_log_file_fp = logfile_open(internal_log_file, "a")) == NULL) + pg_fatal("could not open log file \"%s\": %m", internal_log_file); + break; case 'n': dry_run = true; @@ -2425,16 +2573,16 @@ main(int argc, char **argv) } if (dry_run) - pg_log_info("Executing in dry-run mode.\n" - "The target directory will not be modified."); + INFO("Executing in dry-run mode.\n" + "The target directory will not be modified."); - pg_log_info("validating publisher connection string"); + INFO("validating publisher connection string"); pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str, &dbname_conninfo); if (pub_base_conninfo == NULL) exit(1); - pg_log_info("validating subscriber connection string"); + INFO("validating subscriber connection string"); sub_base_conninfo = get_sub_conninfo(&opt); /* @@ -2451,7 +2599,7 @@ main(int argc, char **argv) if (opt.database_names.head == NULL) { - pg_log_info("no database was specified"); + INFO("no database was specified"); /* * Try to obtain the dbname from the publisher conninfo. If dbname @@ -2462,8 +2610,8 @@ main(int argc, char **argv) simple_string_list_append(&opt.database_names, dbname_conninfo); num_dbs++; - pg_log_info("database name \"%s\" was extracted from the publisher connection string", - dbname_conninfo); + INFO("database name \"%s\" was extracted from the publisher connection string", + dbname_conninfo); } else { @@ -2560,7 +2708,7 @@ main(int argc, char **argv) * by command-line options). The goal is to avoid connections during the * transformation steps. */ - pg_log_info("starting the standby server with command-line options"); + INFO("starting the standby server with command-line options"); start_standby_server(&opt, true, false); /* Check if the standby server is ready for logical replication */ @@ -2576,7 +2724,7 @@ main(int argc, char **argv) * guarantees it) *before* creating the replication slots in * setup_publisher(). */ - pg_log_info("stopping the subscriber"); + INFO("stopping the subscriber"); stop_standby_server(subscriber_dir); /* Create the required objects for each database on publisher */ @@ -2590,7 +2738,7 @@ main(int argc, char **argv) * until accepting connections. We don't want to start logical replication * during setup. */ - pg_log_info("starting the subscriber"); + INFO("starting the subscriber"); start_standby_server(&opt, true, true); /* Waiting the subscriber to be promoted */ @@ -2611,7 +2759,7 @@ main(int argc, char **argv) drop_failover_replication_slots(dbinfos.dbinfo); /* Stop the subscriber */ - pg_log_info("stopping the subscriber"); + INFO("stopping the subscriber"); stop_standby_server(subscriber_dir); /* Change system identifier from subscriber */ @@ -2619,7 +2767,10 @@ main(int argc, char **argv) success = true; - pg_log_info("Done!"); + INFO("Done!"); + + if (internal_log_file_fp != NULL) + fclose(internal_log_file_fp); return 0; } diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 0c27fca7bb7..cdb12623b3b 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -13,7 +13,8 @@ program_help_ok('pg_createsubscriber'); program_version_ok('pg_createsubscriber'); program_options_handling_ok('pg_createsubscriber'); -my $datadir = PostgreSQL::Test::Utils::tempdir; +my $datadir = PostgreSQL::Test::Utils::tempdir + "/datadir"; +my $logdir = PostgreSQL::Test::Utils::tempdir + "/logdir"; # Generate a database with a name made of a range of ASCII characters. # Extracted from 002_pg_upgrade.pl. @@ -629,10 +630,43 @@ command_ok( # not being tracked, something that is set within $node->start(). system_log('pg_ctl', 'stop', '--pgdata', $node_k->data_dir); +$node_p->backup('backup_3'); + +# Set up node R as a logical replica node +my $node_r = PostgreSQL::Test::Cluster->new('node_r'); +$node_r->init_from_backup($node_p, 'backup_3', has_streaming => 1); +$node_r->append_conf( + 'postgresql.conf', qq[ +primary_conninfo = '$pconnstr dbname=postgres' +hot_standby_feedback = on +]); +$node_r->set_standby_mode(); + +# Test that --logdir works for pg_createsubscriber +command_ok( + [ + 'pg_createsubscriber', + '--verbose', + '--pgdata' => $node_r->data_dir, + '--publisher-server' => $pconnstr, + '--database' => 'postgres', + '--logdir' => $logdir, + ], + 'check for log file creation for pg_createSubscriber'); + +# Check that the log files were created +my @server_log_files = glob "$logdir/pg_createsubscriber_server_*.log"; +is( scalar(@server_log_files), 1, " + pg_createsubscriber_server.log file was created"); +my @internal_log_files = glob "$logdir/pg_createsubscriber_internal_*.log"; +is( scalar(@internal_log_files), 1, " + pg_createsubscriber_internal.log file was created"); + # clean up $node_p->teardown_node; $node_s->teardown_node; $node_t->teardown_node; $node_f->teardown_node; +$node_r->teardown_node; done_testing(); -- 2.43.0