diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 85e6515ac2..9328830b2e 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -858,6 +858,11 @@ RunWorker(ArchiveHandle *AH, ParallelSlot *slot) */ WaitForCommands(AH, pipefd); + /* + * Close an eventually open BLOB batch transaction. + */ + CommitBlobTransaction((Archive *)AH); + /* * Disconnect from database and clean up. */ diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 9ef2f2017e..65519791e9 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -223,6 +223,8 @@ typedef struct Archive int numWorkers; /* number of parallel processes */ char *sync_snapshot_id; /* sync snapshot id for parallel operation */ + int blobBatchSize; /* # of blobs to restore per transaction */ + /* info needed for string escaping */ int encoding; /* libpq code for client_encoding */ bool std_strings; /* standard_conforming_strings */ @@ -293,6 +295,7 @@ extern void WriteData(Archive *AHX, const void *data, size_t dLen); extern int StartLO(Archive *AHX, Oid oid); extern int EndLO(Archive *AHX, Oid oid); +extern void CommitBlobTransaction(Archive *AH); extern void CloseArchive(Archive *AHX); extern void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 256d1e35a4..43be945064 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -45,6 +45,7 @@ #define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n" #define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n" +static int blobBatchCount = 0; static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt, const pg_compress_specification compression_spec, @@ -258,6 +259,23 @@ CloseArchive(Archive *AHX) pg_fatal("could not close output file: %m"); } +/* Public */ +void +CommitBlobTransaction(Archive *AHX) +{ + ArchiveHandle *AH = (ArchiveHandle *) AHX; + + if (blobBatchCount > 0) + { + ahprintf(AH, "--\n"); + ahprintf(AH, "-- End BLOB restore batch\n"); + ahprintf(AH, "--\n"); + ahprintf(AH, "COMMIT;\n\n"); + + blobBatchCount = 0; + } +} + /* Public */ void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt) @@ -719,6 +737,8 @@ RestoreArchive(Archive *AHX) ahprintf(AH, "COMMIT;\n\n"); } + CommitBlobTransaction(AHX); + if (AH->public.verbose) dumpTimestamp(AH, "Completed on", time(NULL)); @@ -3543,6 +3563,57 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData) { RestoreOptions *ropt = AH->public.ropt; + /* We restore BLOBs in batches to reduce XID consumption */ + if (strcmp(te->desc, "BLOB") == 0 && AH->public.blobBatchSize > 1) + { + if (blobBatchCount > 0) + { + /* We are inside a BLOB restore transaction */ + if (blobBatchCount >= AH->public.blobBatchSize) + { + /* + * We did reach the batch size with the previous BLOB. + * Commit and start a new batch. + */ + ahprintf(AH, "--\n"); + ahprintf(AH, "-- BLOB batch size reached\n"); + ahprintf(AH, "--\n"); + ahprintf(AH, "COMMIT;\n"); + ahprintf(AH, "BEGIN;\n\n"); + + blobBatchCount = 1; + } + else + { + /* This one still fits into the current batch */ + blobBatchCount++; + } + } + else + { + /* Not inside a transaction, start a new batch */ + ahprintf(AH, "--\n"); + ahprintf(AH, "-- Start BLOB restore batch\n"); + ahprintf(AH, "--\n"); + ahprintf(AH, "BEGIN;\n\n"); + + blobBatchCount = 1; + } + } + else + { + /* Not a BLOB. If we have a BLOB batch open, close it. */ + if (blobBatchCount > 0) + { + ahprintf(AH, "--\n"); + ahprintf(AH, "-- End BLOB restore batch\n"); + ahprintf(AH, "--\n"); + ahprintf(AH, "COMMIT;\n\n"); + + blobBatchCount = 0; + } + } + /* Select owner, schema, tablespace and default AM as necessary */ _becomeOwner(AH, te); _selectOutputSchema(AH, te->namespace); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index e863913849..2c6d49732b 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -205,11 +205,20 @@ static inline void dumpComment(Archive *fout, const char *type, const char *name, const char *namespace, const char *owner, CatalogId catalogId, int subid, DumpId dumpId); +static bool dumpCommentQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag, + const char *type, const char *name, + const char *namespace, const char *owner, + CatalogId catalogId, int subid, DumpId dumpId, + const char *initdb_comment); static int findComments(Oid classoid, Oid objoid, CommentItem **items); static void collectComments(Archive *fout); static void dumpSecLabel(Archive *fout, const char *type, const char *name, const char *namespace, const char *owner, CatalogId catalogId, int subid, DumpId dumpId); +static bool dumpSecLabelQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag, + const char *type, const char *name, + const char *namespace, const char *owner, + CatalogId catalogId, int subid, DumpId dumpId); static int findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items); static void collectSecLabels(Archive *fout); static void dumpDumpableObject(Archive *fout, DumpableObject *dobj); @@ -265,6 +274,12 @@ static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId, const char *type, const char *name, const char *subname, const char *nspname, const char *owner, const DumpableAcl *dacl); +static bool dumpACLQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag, + DumpId objDumpId, DumpId altDumpId, + const char *type, const char *name, + const char *subname, + const char *nspname, const char *owner, + const DumpableAcl *dacl); static void getDependencies(Archive *fout); static void BuildArchiveDependencies(Archive *fout); @@ -3641,10 +3656,42 @@ dumpLO(Archive *fout, const LoInfo *loinfo) { PQExpBuffer cquery = createPQExpBuffer(); PQExpBuffer dquery = createPQExpBuffer(); + PQExpBuffer tag = createPQExpBuffer(); + teSection section = SECTION_PRE_DATA; appendPQExpBuffer(cquery, "SELECT pg_catalog.lo_create('%s');\n", loinfo->dobj.name); + /* + * In binary upgrade mode we put all the queries to restore + * one large object into a single TOC entry and emit it as + * SECTION_DATA so that they can be restored in parallel. + */ + if (fout->dopt->binary_upgrade) + { + section = SECTION_DATA; + + /* Dump comment if any */ + if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT) + dumpCommentQuery(fout, cquery, tag, "LARGE OBJECT", + loinfo->dobj.name, NULL, loinfo->rolname, + loinfo->dobj.catId, 0, loinfo->dobj.dumpId, NULL); + + /* Dump security label if any */ + if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL) + dumpSecLabelQuery(fout, cquery, tag, "LARGE OBJECT", + loinfo->dobj.name, + NULL, loinfo->rolname, + loinfo->dobj.catId, 0, loinfo->dobj.dumpId); + + /* Dump ACL if any */ + if (loinfo->dobj.dump & DUMP_COMPONENT_ACL) + dumpACLQuery(fout, cquery, tag, + loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT", + loinfo->dobj.name, NULL, + NULL, loinfo->rolname, &loinfo->dacl); + } + appendPQExpBuffer(dquery, "SELECT pg_catalog.lo_unlink('%s');\n", @@ -3655,27 +3702,28 @@ dumpLO(Archive *fout, const LoInfo *loinfo) ARCHIVE_OPTS(.tag = loinfo->dobj.name, .owner = loinfo->rolname, .description = "BLOB", - .section = SECTION_PRE_DATA, + .section = section, .createStmt = cquery->data, .dropStmt = dquery->data)); - /* Dump comment if any */ - if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT) - dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name, - NULL, loinfo->rolname, - loinfo->dobj.catId, 0, loinfo->dobj.dumpId); - - /* Dump security label if any */ - if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL) - dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name, - NULL, loinfo->rolname, - loinfo->dobj.catId, 0, loinfo->dobj.dumpId); - - /* Dump ACL if any */ - if (loinfo->dobj.dump & DUMP_COMPONENT_ACL) - dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT", - loinfo->dobj.name, NULL, - NULL, loinfo->rolname, &loinfo->dacl); + if (!fout->dopt->binary_upgrade) + { + /* Dump comment if any */ + if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT) + dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name, + NULL, loinfo->rolname, + loinfo->dobj.catId, 0, loinfo->dobj.dumpId); + /* Dump security label if any */ + if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL) + dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name, + NULL, loinfo->rolname, + loinfo->dobj.catId, 0, loinfo->dobj.dumpId); + /* Dump ACL if any */ + if (loinfo->dobj.dump & DUMP_COMPONENT_ACL) + dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT", + loinfo->dobj.name, NULL, + NULL, loinfo->rolname, &loinfo->dacl); + } destroyPQExpBuffer(cquery); destroyPQExpBuffer(dquery); @@ -9899,6 +9947,38 @@ dumpCommentExtended(Archive *fout, const char *type, const char *owner, CatalogId catalogId, int subid, DumpId dumpId, const char *initdb_comment) +{ + PQExpBuffer query = createPQExpBuffer(); + PQExpBuffer tag = createPQExpBuffer(); + + if (dumpCommentQuery(fout, query, tag, type, name, namespace, owner, + catalogId, subid, dumpId, initdb_comment)) + { + /* + * We mark comments as SECTION_NONE because they really belong in the + * same section as their parent, whether that is pre-data or + * post-data. + */ + ArchiveEntry(fout, nilCatalogId, createDumpId(), + ARCHIVE_OPTS(.tag = tag->data, + .namespace = namespace, + .owner = owner, + .description = "COMMENT", + .section = SECTION_NONE, + .createStmt = query->data, + .deps = &dumpId, + .nDeps = 1)); + } + destroyPQExpBuffer(query); + destroyPQExpBuffer(tag); +} + +static bool +dumpCommentQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag, + const char *type, const char *name, + const char *namespace, const char *owner, + CatalogId catalogId, int subid, DumpId dumpId, + const char *initdb_comment) { DumpOptions *dopt = fout->dopt; CommentItem *comments; @@ -9906,19 +9986,19 @@ dumpCommentExtended(Archive *fout, const char *type, /* do nothing, if --no-comments is supplied */ if (dopt->no_comments) - return; + return false; /* Comments are schema not data ... except LO comments are data */ if (strcmp(type, "LARGE OBJECT") != 0) { if (dopt->dataOnly) - return; + return false; } else { /* We do dump LO comments in binary-upgrade mode */ if (dopt->schemaOnly && !dopt->binary_upgrade) - return; + return false; } /* Search for comments associated with catalogId, using table */ @@ -9956,9 +10036,6 @@ dumpCommentExtended(Archive *fout, const char *type, /* If a comment exists, build COMMENT ON statement */ if (ncomments > 0) { - PQExpBuffer query = createPQExpBuffer(); - PQExpBuffer tag = createPQExpBuffer(); - appendPQExpBuffer(query, "COMMENT ON %s ", type); if (namespace && *namespace) appendPQExpBuffer(query, "%s.", fmtId(namespace)); @@ -9968,24 +10045,10 @@ dumpCommentExtended(Archive *fout, const char *type, appendPQExpBuffer(tag, "%s %s", type, name); - /* - * We mark comments as SECTION_NONE because they really belong in the - * same section as their parent, whether that is pre-data or - * post-data. - */ - ArchiveEntry(fout, nilCatalogId, createDumpId(), - ARCHIVE_OPTS(.tag = tag->data, - .namespace = namespace, - .owner = owner, - .description = "COMMENT", - .section = SECTION_NONE, - .createStmt = query->data, - .deps = &dumpId, - .nDeps = 1)); - - destroyPQExpBuffer(query); - destroyPQExpBuffer(tag); + return true; } + + return false; } /* @@ -14939,23 +15002,65 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId, const DumpableAcl *dacl) { DumpId aclDumpId = InvalidDumpId; + PQExpBuffer query = createPQExpBuffer(); + PQExpBuffer tag = createPQExpBuffer(); + + if (dumpACLQuery(fout, query, tag, objDumpId, altDumpId, + type, name, subname, nspname, owner, dacl)) + { + DumpId aclDeps[2]; + int nDeps = 0; + + if (subname) + appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname); + else + appendPQExpBuffer(tag, "%s %s", type, name); + + aclDeps[nDeps++] = objDumpId; + if (altDumpId != InvalidDumpId) + aclDeps[nDeps++] = altDumpId; + + aclDumpId = createDumpId(); + + ArchiveEntry(fout, nilCatalogId, aclDumpId, + ARCHIVE_OPTS(.tag = tag->data, + .namespace = nspname, + .owner = owner, + .description = "ACL", + .section = SECTION_NONE, + .createStmt = query->data, + .deps = aclDeps, + .nDeps = nDeps)); + + } + + destroyPQExpBuffer(query); + destroyPQExpBuffer(tag); + + return aclDumpId; +} + +static bool +dumpACLQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag, + DumpId objDumpId, DumpId altDumpId, + const char *type, const char *name, const char *subname, + const char *nspname, const char *owner, + const DumpableAcl *dacl) +{ DumpOptions *dopt = fout->dopt; const char *acls = dacl->acl; const char *acldefault = dacl->acldefault; char privtype = dacl->privtype; const char *initprivs = dacl->initprivs; const char *baseacls; - PQExpBuffer sql; /* Do nothing if ACL dump is not enabled */ if (dopt->aclsSkip) - return InvalidDumpId; + return false; /* --data-only skips ACLs *except* large object ACLs */ if (dopt->dataOnly && strcmp(type, "LARGE OBJECT") != 0) - return InvalidDumpId; - - sql = createPQExpBuffer(); + return false; /* * In binary upgrade mode, we don't run an extension's script but instead @@ -14973,13 +15078,13 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId, if (dopt->binary_upgrade && privtype == 'e' && initprivs && *initprivs != '\0') { - appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(true);\n"); + appendPQExpBufferStr(query, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(true);\n"); if (!buildACLCommands(name, subname, nspname, type, initprivs, acldefault, owner, - "", fout->remoteVersion, sql)) + "", fout->remoteVersion, query)) pg_fatal("could not parse initial ACL list (%s) or default (%s) for object \"%s\" (%s)", initprivs, acldefault, name, type); - appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(false);\n"); + appendPQExpBufferStr(query, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(false);\n"); } /* @@ -15001,43 +15106,19 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId, if (!buildACLCommands(name, subname, nspname, type, acls, baseacls, owner, - "", fout->remoteVersion, sql)) + "", fout->remoteVersion, query)) pg_fatal("could not parse ACL list (%s) or default (%s) for object \"%s\" (%s)", acls, baseacls, name, type); - if (sql->len > 0) + if (query->len > 0 && tag != NULL) { - PQExpBuffer tag = createPQExpBuffer(); - DumpId aclDeps[2]; - int nDeps = 0; - if (subname) appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname); else appendPQExpBuffer(tag, "%s %s", type, name); - - aclDeps[nDeps++] = objDumpId; - if (altDumpId != InvalidDumpId) - aclDeps[nDeps++] = altDumpId; - - aclDumpId = createDumpId(); - - ArchiveEntry(fout, nilCatalogId, aclDumpId, - ARCHIVE_OPTS(.tag = tag->data, - .namespace = nspname, - .owner = owner, - .description = "ACL", - .section = SECTION_NONE, - .createStmt = sql->data, - .deps = aclDeps, - .nDeps = nDeps)); - - destroyPQExpBuffer(tag); } - destroyPQExpBuffer(sql); - - return aclDumpId; + return true; } /* @@ -15062,16 +15143,42 @@ static void dumpSecLabel(Archive *fout, const char *type, const char *name, const char *namespace, const char *owner, CatalogId catalogId, int subid, DumpId dumpId) +{ + PQExpBuffer query = createPQExpBuffer(); + PQExpBuffer tag = createPQExpBuffer(); + + if (dumpSecLabelQuery(fout, query, tag, type, name, + namespace, owner, catalogId, subid, dumpId)) + { + ArchiveEntry(fout, nilCatalogId, createDumpId(), + ARCHIVE_OPTS(.tag = tag->data, + .namespace = namespace, + .owner = owner, + .description = "SECURITY LABEL", + .section = SECTION_NONE, + .createStmt = query->data, + .deps = &dumpId, + .nDeps = 1)); + } + + destroyPQExpBuffer(query); + destroyPQExpBuffer(tag); +} + +static bool +dumpSecLabelQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag, + const char *type, const char *name, + const char *namespace, const char *owner, + CatalogId catalogId, int subid, DumpId dumpId) { DumpOptions *dopt = fout->dopt; SecLabelItem *labels; int nlabels; int i; - PQExpBuffer query; /* do nothing, if --no-security-labels is supplied */ if (dopt->no_security_labels) - return; + return false; /* * Security labels are schema not data ... except large object labels are @@ -15080,20 +15187,18 @@ dumpSecLabel(Archive *fout, const char *type, const char *name, if (strcmp(type, "LARGE OBJECT") != 0) { if (dopt->dataOnly) - return; + return false; } else { /* We do dump large object security labels in binary-upgrade mode */ if (dopt->schemaOnly && !dopt->binary_upgrade) - return; + return false; } /* Search for security labels associated with catalogId, using table */ nlabels = findSecLabels(catalogId.tableoid, catalogId.oid, &labels); - query = createPQExpBuffer(); - for (i = 0; i < nlabels; i++) { /* @@ -15114,22 +15219,11 @@ dumpSecLabel(Archive *fout, const char *type, const char *name, if (query->len > 0) { - PQExpBuffer tag = createPQExpBuffer(); - appendPQExpBuffer(tag, "%s %s", type, name); - ArchiveEntry(fout, nilCatalogId, createDumpId(), - ARCHIVE_OPTS(.tag = tag->data, - .namespace = namespace, - .owner = owner, - .description = "SECURITY LABEL", - .section = SECTION_NONE, - .createStmt = query->data, - .deps = &dumpId, - .nDeps = 1)); - destroyPQExpBuffer(tag); + return true; } - destroyPQExpBuffer(query); + return false; } /* diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c index 049a100634..2159f72ffb 100644 --- a/src/bin/pg_dump/pg_restore.c +++ b/src/bin/pg_dump/pg_restore.c @@ -60,6 +60,7 @@ main(int argc, char **argv) int c; int exit_code; int numWorkers = 1; + int blobBatchSize = 0; Archive *AH; char *inputFileSpec; static int disable_triggers = 0; @@ -123,6 +124,7 @@ main(int argc, char **argv) {"no-publications", no_argument, &no_publications, 1}, {"no-security-labels", no_argument, &no_security_labels, 1}, {"no-subscriptions", no_argument, &no_subscriptions, 1}, + {"restore-blob-batch-size", required_argument, NULL, 4}, {NULL, 0, NULL, 0} }; @@ -286,6 +288,10 @@ main(int argc, char **argv) set_dump_section(optarg, &(opts->dumpSections)); break; + case 4: /* # of blobs to restore per transaction */ + blobBatchSize = atoi(optarg); + break; + default: /* getopt_long already emitted a complaint */ pg_log_error_hint("Try \"%s --help\" for more information.", progname); @@ -405,6 +411,7 @@ main(int argc, char **argv) SortTocFromFile(AH); AH->numWorkers = numWorkers; + AH->blobBatchSize = blobBatchSize; if (opts->tocSummary) PrintTOCSummary(AH); @@ -478,6 +485,8 @@ usage(const char *progname) printf(_(" --use-set-session-authorization\n" " use SET SESSION AUTHORIZATION commands instead of\n" " ALTER OWNER commands to set ownership\n")); + printf(_(" --restore-blob-batch-size=NUM\n" + " attempt to restore NUM large objects per transaction\n")); printf(_("\nConnection options:\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index fa52aa2c22..459d834ac3 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -84,7 +84,7 @@ output_check_banner(bool live_check) void -check_and_dump_old_cluster(bool live_check) +check_and_dump_old_cluster(bool live_check, DbDumpStats **stats) { /* -- OLD -- */ @@ -202,12 +202,36 @@ check_and_dump_old_cluster(bool live_check) * the old server is running. */ if (!user_opts.check) + { + *stats = collect_db_stats(); generate_old_dump(); + } if (!live_check) stop_postmaster(false); } +DbDumpStats* collect_db_stats(void) +{ + uint dbnum; + DbDumpStats *stats = (DbDumpStats *)pg_malloc(sizeof(DbDumpStats)); + stats->large_objects = (uint64 *)pg_malloc(sizeof(old_cluster.dbarr.ndbs * sizeof(uint64))); + prep_status("Collecting database stats"); + for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) + { + PGresult *res; + DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum]; + PGconn *conn = connectToServer(&old_cluster, active_db->db_name); + + res = executeQueryOrDie(conn, "SELECT count(*) from pg_largeobject_metadata"); + stats->large_objects[dbnum] = atoll(PQgetvalue(res, 0, 0)); + PQclear(res); + PQfinish(conn); + } + check_ok(); + + return stats; +} void check_new_cluster(void) diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 3960af4036..12605200b5 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -54,7 +54,8 @@ static void set_locale_and_encoding(void); static void prepare_new_cluster(void); static void prepare_new_globals(void); -static void create_new_objects(void); +static void restore_dbs(DbDumpStats *stats, bool parallel_restore); +static void create_new_objects(DbDumpStats *stats); static void copy_xact_xlog_xid(void); static void set_frozenxids(bool minmxid_only); static void make_outputdirs(char *pgdata); @@ -82,6 +83,7 @@ main(int argc, char **argv) { char *deletion_script_file_name = NULL; bool live_check = false; + DbDumpStats *stats; /* * pg_upgrade doesn't currently use common/logging.c, but initialize it @@ -127,7 +129,7 @@ main(int argc, char **argv) check_cluster_compatibility(live_check); - check_and_dump_old_cluster(live_check); + check_and_dump_old_cluster(live_check, &stats); /* -- NEW -- */ @@ -160,7 +162,7 @@ main(int argc, char **argv) prepare_new_globals(); - create_new_objects(); + create_new_objects(stats); stop_postmaster(false); @@ -508,9 +510,72 @@ prepare_new_globals(void) check_ok(); } +static void +restore_dbs(DbDumpStats *stats, bool parallel_restore) +{ + int dbnum; + for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) + { + char sql_file_name[MAXPGPATH], + log_file_name[MAXPGPATH]; + DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; + const char *create_opts; + int jobs = user_opts.jobs ? user_opts.jobs : 1 ; + bool large_objects_thresold_breached = stats && stats->large_objects[dbnum] > LARGE_OBJECTS_THRESOLD; + + /* Skip template1 in this pass */ + if (strcmp(old_db->db_name, "template1") == 0) + continue; + /* Skip dbs where LARGE_OBJECTS_THRESOLD is breached and parallel_restore is enabled*/ + if (large_objects_thresold_breached && parallel_restore) + continue; + + pg_log(PG_STATUS, "%s", old_db->db_name); + snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); + snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid); + + /* + * postgres database will already exist in the target installation, so + * tell pg_restore to drop and recreate it; otherwise we would fail to + * propagate its database-level properties. + */ + if (strcmp(old_db->db_name, "postgres") == 0) + create_opts = "--clean --create"; + else + create_opts = "--create"; + + if (parallel_restore) + parallel_exec_prog(log_file_name, + NULL, + "\"%s/pg_restore\" %s %s --exit-on-error --verbose " + "--dbname template1 \"%s/%s\"", + new_cluster.bindir, + cluster_conn_opts(&new_cluster), + create_opts, + log_opts.dumpdir, + sql_file_name); + else + exec_prog(log_file_name, + NULL, + true, + true, + "\"%s/pg_restore\" %s %s --exit-on-error --verbose " + "--restore-blob-batch-size %d --jobs %d " + "--dbname template1 \"%s/%s\"", + new_cluster.bindir, + cluster_conn_opts(&new_cluster), + create_opts, + large_objects_thresold_breached ? + LARGE_OBJECTS_THRESOLD : 0, + large_objects_thresold_breached ? + jobs : 1, + log_opts.dumpdir, + sql_file_name); + } +} static void -create_new_objects(void) +create_new_objects(DbDumpStats *stats) { int dbnum; @@ -557,43 +622,13 @@ create_new_objects(void) break; /* done once we've processed template1 */ } - - for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++) - { - char sql_file_name[MAXPGPATH], - log_file_name[MAXPGPATH]; - DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum]; - const char *create_opts; - - /* Skip template1 in this pass */ - if (strcmp(old_db->db_name, "template1") == 0) - continue; - - pg_log(PG_STATUS, "%s", old_db->db_name); - snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid); - snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid); - - /* - * postgres database will already exist in the target installation, so - * tell pg_restore to drop and recreate it; otherwise we would fail to - * propagate its database-level properties. - */ - if (strcmp(old_db->db_name, "postgres") == 0) - create_opts = "--clean --create"; - else - create_opts = "--create"; - - parallel_exec_prog(log_file_name, - NULL, - "\"%s/pg_restore\" %s %s --exit-on-error --verbose " - "--dbname template1 \"%s/%s\"", - new_cluster.bindir, - cluster_conn_opts(&new_cluster), - create_opts, - log_opts.dumpdir, - sql_file_name); - } - + /* Restore all the dbs where LARGE_OBJECTS_THRESOLD is not breached */ + restore_dbs(stats, true); + /* reap all children */ + while (reap_child(true) == true) + ; + /* Restore rest of the dbs one by one with pg_restore --jobs = user_opts.jobs */ + restore_dbs(stats, false); /* reap all children */ while (reap_child(true) == true) ; diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index a710f325de..f41063dbc7 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -24,6 +24,8 @@ #define MESSAGE_WIDTH 62 +#define LARGE_OBJECTS_THRESOLD 200 // maybe 10k ? + #define GET_MAJOR_VERSION(v) ((v) / 100) /* contains both global db information and CREATE DATABASE commands */ @@ -347,6 +349,15 @@ typedef struct ClusterInfo *running_cluster; } OSInfo; +/* + * Dump stats, will be used by pg_upgrade to efficently run pg_restore + */ + +typedef struct +{ + uint64 *large_objects; +}DbDumpStats; + /* * Global variables @@ -361,7 +372,7 @@ extern OSInfo os_info; /* check.c */ void output_check_banner(bool live_check); -void check_and_dump_old_cluster(bool live_check); +void check_and_dump_old_cluster(bool live_check, DbDumpStats **stats); void check_new_cluster(void); void report_clusters_compatible(void); void issue_warnings_and_set_wal_level(void); @@ -369,6 +380,7 @@ void output_completion_banner(char *deletion_script_file_name); void check_cluster_versions(void); void check_cluster_compatibility(bool live_check); void create_script_for_old_cluster_deletion(char **deletion_script_file_name); +DbDumpStats* collect_db_stats(void); /* controldata.c */