public inbox for [email protected]
help / color / mirror / Atom feedFrom: Kumar, Sachin <[email protected]>
To: Tom Lane <[email protected]>
Cc: Jacob Champion <[email protected]>
Cc: Nathan Bossart <[email protected]>
Cc: Jan Wieck <[email protected]>
Cc: Bruce Momjian <[email protected]>
Cc: Zhihong Yu <[email protected]>
Cc: Andrew Dunstan <[email protected]>
Cc: Magnus Hagander <[email protected]>
Cc: Robins Tharakan <[email protected]>
Cc: Peter Eisentraut <[email protected]>
Cc: [email protected] <[email protected]>
Subject: Re: pg_upgrade failing for 200+ million Large Objects
Date: Mon, 4 Dec 2023 16:07:59 +0000
Message-ID: <[email protected]> (raw)
In-Reply-To: <[email protected]>
References: <[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<[email protected]>
<20220825003227.GA1456581@nathanxps13>
<[email protected]>
<20220908231807.GA2242918@nathanxps13>
<CAAWbhmgUb8p7ff_ZX5jCvqM=ipPxbbDJTXMNVzH-Ho_CXVkRHA@mail.gmail.com>
<[email protected]>
<[email protected]>
> "Tom Lane" <[email protected] <mailto:[email protected]>> wrote:
> FWIW, I agree with Jacob's concern about it being a bad idea to let
> users of pg_upgrade pass down arbitrary options to pg_dump/pg_restore.
> I think we'd regret going there, because it'd hugely expand the set
> of cases pg_upgrade has to deal with.
> Also, pg_upgrade is often invoked indirectly via scripts, so I do
> not especially buy the idea that we're going to get useful control
> input from some human somewhere. I think we'd be better off to
> assume that pg_upgrade is on its own to manage the process, so that
> if we need to switch strategies based on object count or whatever,
> we should put in a heuristic to choose the strategy automatically.
> It might not be perfect, but that will give better results for the
> pretty large fraction of users who are not going to mess with
> weird little switches.
I have updated the patch to use heuristic, During pg_upgrade we count
Large objects per database. During pg_restore execution if db large_objects
count is greater than LARGE_OBJECTS_THRESOLD (1k) we will use
--restore-blob-batch-size.
I also modified pg_upgrade --jobs behavior if we have large_objects (> LARGE_OBJECTS_THRESOLD)
+ /* Restore all the dbs where LARGE_OBJECTS_THRESOLD is not breached */
+ restore_dbs(stats, true);
+ /* reap all children */
+ while (reap_child(true) == true)
+ ;
+ /* Restore rest of the dbs one by one with pg_restore --jobs = user_opts.jobs */
+ restore_dbs(stats, false);
/* reap all children */
while (reap_child(true) == true)
;
Regards
Sachin
Attachments:
[application/octet-stream] pg_upgrade_improvements_v7.diff (27.9K, 2-pg_upgrade_improvements_v7.diff)
download | inline diff:
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 85e6515ac2..9328830b2e 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -858,6 +858,11 @@ RunWorker(ArchiveHandle *AH, ParallelSlot *slot)
*/
WaitForCommands(AH, pipefd);
+ /*
+ * Close an eventually open BLOB batch transaction.
+ */
+ CommitBlobTransaction((Archive *)AH);
+
/*
* Disconnect from database and clean up.
*/
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 9ef2f2017e..65519791e9 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -223,6 +223,8 @@ typedef struct Archive
int numWorkers; /* number of parallel processes */
char *sync_snapshot_id; /* sync snapshot id for parallel operation */
+ int blobBatchSize; /* # of blobs to restore per transaction */
+
/* info needed for string escaping */
int encoding; /* libpq code for client_encoding */
bool std_strings; /* standard_conforming_strings */
@@ -293,6 +295,7 @@ extern void WriteData(Archive *AHX, const void *data, size_t dLen);
extern int StartLO(Archive *AHX, Oid oid);
extern int EndLO(Archive *AHX, Oid oid);
+extern void CommitBlobTransaction(Archive *AH);
extern void CloseArchive(Archive *AHX);
extern void SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 256d1e35a4..43be945064 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -45,6 +45,7 @@
#define TEXT_DUMP_HEADER "--\n-- PostgreSQL database dump\n--\n\n"
#define TEXT_DUMPALL_HEADER "--\n-- PostgreSQL database cluster dump\n--\n\n"
+static int blobBatchCount = 0;
static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
const pg_compress_specification compression_spec,
@@ -258,6 +259,23 @@ CloseArchive(Archive *AHX)
pg_fatal("could not close output file: %m");
}
+/* Public */
+void
+CommitBlobTransaction(Archive *AHX)
+{
+ ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+ if (blobBatchCount > 0)
+ {
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "-- End BLOB restore batch\n");
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "COMMIT;\n\n");
+
+ blobBatchCount = 0;
+ }
+}
+
/* Public */
void
SetArchiveOptions(Archive *AH, DumpOptions *dopt, RestoreOptions *ropt)
@@ -719,6 +737,8 @@ RestoreArchive(Archive *AHX)
ahprintf(AH, "COMMIT;\n\n");
}
+ CommitBlobTransaction(AHX);
+
if (AH->public.verbose)
dumpTimestamp(AH, "Completed on", time(NULL));
@@ -3543,6 +3563,57 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, bool isData)
{
RestoreOptions *ropt = AH->public.ropt;
+ /* We restore BLOBs in batches to reduce XID consumption */
+ if (strcmp(te->desc, "BLOB") == 0 && AH->public.blobBatchSize > 1)
+ {
+ if (blobBatchCount > 0)
+ {
+ /* We are inside a BLOB restore transaction */
+ if (blobBatchCount >= AH->public.blobBatchSize)
+ {
+ /*
+ * We did reach the batch size with the previous BLOB.
+ * Commit and start a new batch.
+ */
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "-- BLOB batch size reached\n");
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "COMMIT;\n");
+ ahprintf(AH, "BEGIN;\n\n");
+
+ blobBatchCount = 1;
+ }
+ else
+ {
+ /* This one still fits into the current batch */
+ blobBatchCount++;
+ }
+ }
+ else
+ {
+ /* Not inside a transaction, start a new batch */
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "-- Start BLOB restore batch\n");
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "BEGIN;\n\n");
+
+ blobBatchCount = 1;
+ }
+ }
+ else
+ {
+ /* Not a BLOB. If we have a BLOB batch open, close it. */
+ if (blobBatchCount > 0)
+ {
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "-- End BLOB restore batch\n");
+ ahprintf(AH, "--\n");
+ ahprintf(AH, "COMMIT;\n\n");
+
+ blobBatchCount = 0;
+ }
+ }
+
/* Select owner, schema, tablespace and default AM as necessary */
_becomeOwner(AH, te);
_selectOutputSchema(AH, te->namespace);
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index e863913849..2c6d49732b 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -205,11 +205,20 @@ static inline void dumpComment(Archive *fout, const char *type,
const char *name, const char *namespace,
const char *owner, CatalogId catalogId,
int subid, DumpId dumpId);
+static bool dumpCommentQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+ const char *type, const char *name,
+ const char *namespace, const char *owner,
+ CatalogId catalogId, int subid, DumpId dumpId,
+ const char *initdb_comment);
static int findComments(Oid classoid, Oid objoid, CommentItem **items);
static void collectComments(Archive *fout);
static void dumpSecLabel(Archive *fout, const char *type, const char *name,
const char *namespace, const char *owner,
CatalogId catalogId, int subid, DumpId dumpId);
+static bool dumpSecLabelQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+ const char *type, const char *name,
+ const char *namespace, const char *owner,
+ CatalogId catalogId, int subid, DumpId dumpId);
static int findSecLabels(Oid classoid, Oid objoid, SecLabelItem **items);
static void collectSecLabels(Archive *fout);
static void dumpDumpableObject(Archive *fout, DumpableObject *dobj);
@@ -265,6 +274,12 @@ static DumpId dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
const char *type, const char *name, const char *subname,
const char *nspname, const char *owner,
const DumpableAcl *dacl);
+static bool dumpACLQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+ DumpId objDumpId, DumpId altDumpId,
+ const char *type, const char *name,
+ const char *subname,
+ const char *nspname, const char *owner,
+ const DumpableAcl *dacl);
static void getDependencies(Archive *fout);
static void BuildArchiveDependencies(Archive *fout);
@@ -3641,10 +3656,42 @@ dumpLO(Archive *fout, const LoInfo *loinfo)
{
PQExpBuffer cquery = createPQExpBuffer();
PQExpBuffer dquery = createPQExpBuffer();
+ PQExpBuffer tag = createPQExpBuffer();
+ teSection section = SECTION_PRE_DATA;
appendPQExpBuffer(cquery,
"SELECT pg_catalog.lo_create('%s');\n",
loinfo->dobj.name);
+ /*
+ * In binary upgrade mode we put all the queries to restore
+ * one large object into a single TOC entry and emit it as
+ * SECTION_DATA so that they can be restored in parallel.
+ */
+ if (fout->dopt->binary_upgrade)
+ {
+ section = SECTION_DATA;
+
+ /* Dump comment if any */
+ if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
+ dumpCommentQuery(fout, cquery, tag, "LARGE OBJECT",
+ loinfo->dobj.name, NULL, loinfo->rolname,
+ loinfo->dobj.catId, 0, loinfo->dobj.dumpId, NULL);
+
+ /* Dump security label if any */
+ if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
+ dumpSecLabelQuery(fout, cquery, tag, "LARGE OBJECT",
+ loinfo->dobj.name,
+ NULL, loinfo->rolname,
+ loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
+
+ /* Dump ACL if any */
+ if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
+ dumpACLQuery(fout, cquery, tag,
+ loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
+ loinfo->dobj.name, NULL,
+ NULL, loinfo->rolname, &loinfo->dacl);
+ }
+
appendPQExpBuffer(dquery,
"SELECT pg_catalog.lo_unlink('%s');\n",
@@ -3655,27 +3702,28 @@ dumpLO(Archive *fout, const LoInfo *loinfo)
ARCHIVE_OPTS(.tag = loinfo->dobj.name,
.owner = loinfo->rolname,
.description = "BLOB",
- .section = SECTION_PRE_DATA,
+ .section = section,
.createStmt = cquery->data,
.dropStmt = dquery->data));
- /* Dump comment if any */
- if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
- dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name,
- NULL, loinfo->rolname,
- loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
-
- /* Dump security label if any */
- if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
- dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name,
- NULL, loinfo->rolname,
- loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
-
- /* Dump ACL if any */
- if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
- dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
- loinfo->dobj.name, NULL,
- NULL, loinfo->rolname, &loinfo->dacl);
+ if (!fout->dopt->binary_upgrade)
+ {
+ /* Dump comment if any */
+ if (loinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
+ dumpComment(fout, "LARGE OBJECT", loinfo->dobj.name,
+ NULL, loinfo->rolname,
+ loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
+ /* Dump security label if any */
+ if (loinfo->dobj.dump & DUMP_COMPONENT_SECLABEL)
+ dumpSecLabel(fout, "LARGE OBJECT", loinfo->dobj.name,
+ NULL, loinfo->rolname,
+ loinfo->dobj.catId, 0, loinfo->dobj.dumpId);
+ /* Dump ACL if any */
+ if (loinfo->dobj.dump & DUMP_COMPONENT_ACL)
+ dumpACL(fout, loinfo->dobj.dumpId, InvalidDumpId, "LARGE OBJECT",
+ loinfo->dobj.name, NULL,
+ NULL, loinfo->rolname, &loinfo->dacl);
+ }
destroyPQExpBuffer(cquery);
destroyPQExpBuffer(dquery);
@@ -9899,6 +9947,38 @@ dumpCommentExtended(Archive *fout, const char *type,
const char *owner, CatalogId catalogId,
int subid, DumpId dumpId,
const char *initdb_comment)
+{
+ PQExpBuffer query = createPQExpBuffer();
+ PQExpBuffer tag = createPQExpBuffer();
+
+ if (dumpCommentQuery(fout, query, tag, type, name, namespace, owner,
+ catalogId, subid, dumpId, initdb_comment))
+ {
+ /*
+ * We mark comments as SECTION_NONE because they really belong in the
+ * same section as their parent, whether that is pre-data or
+ * post-data.
+ */
+ ArchiveEntry(fout, nilCatalogId, createDumpId(),
+ ARCHIVE_OPTS(.tag = tag->data,
+ .namespace = namespace,
+ .owner = owner,
+ .description = "COMMENT",
+ .section = SECTION_NONE,
+ .createStmt = query->data,
+ .deps = &dumpId,
+ .nDeps = 1));
+ }
+ destroyPQExpBuffer(query);
+ destroyPQExpBuffer(tag);
+}
+
+static bool
+dumpCommentQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+ const char *type, const char *name,
+ const char *namespace, const char *owner,
+ CatalogId catalogId, int subid, DumpId dumpId,
+ const char *initdb_comment)
{
DumpOptions *dopt = fout->dopt;
CommentItem *comments;
@@ -9906,19 +9986,19 @@ dumpCommentExtended(Archive *fout, const char *type,
/* do nothing, if --no-comments is supplied */
if (dopt->no_comments)
- return;
+ return false;
/* Comments are schema not data ... except LO comments are data */
if (strcmp(type, "LARGE OBJECT") != 0)
{
if (dopt->dataOnly)
- return;
+ return false;
}
else
{
/* We do dump LO comments in binary-upgrade mode */
if (dopt->schemaOnly && !dopt->binary_upgrade)
- return;
+ return false;
}
/* Search for comments associated with catalogId, using table */
@@ -9956,9 +10036,6 @@ dumpCommentExtended(Archive *fout, const char *type,
/* If a comment exists, build COMMENT ON statement */
if (ncomments > 0)
{
- PQExpBuffer query = createPQExpBuffer();
- PQExpBuffer tag = createPQExpBuffer();
-
appendPQExpBuffer(query, "COMMENT ON %s ", type);
if (namespace && *namespace)
appendPQExpBuffer(query, "%s.", fmtId(namespace));
@@ -9968,24 +10045,10 @@ dumpCommentExtended(Archive *fout, const char *type,
appendPQExpBuffer(tag, "%s %s", type, name);
- /*
- * We mark comments as SECTION_NONE because they really belong in the
- * same section as their parent, whether that is pre-data or
- * post-data.
- */
- ArchiveEntry(fout, nilCatalogId, createDumpId(),
- ARCHIVE_OPTS(.tag = tag->data,
- .namespace = namespace,
- .owner = owner,
- .description = "COMMENT",
- .section = SECTION_NONE,
- .createStmt = query->data,
- .deps = &dumpId,
- .nDeps = 1));
-
- destroyPQExpBuffer(query);
- destroyPQExpBuffer(tag);
+ return true;
}
+
+ return false;
}
/*
@@ -14939,23 +15002,65 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
const DumpableAcl *dacl)
{
DumpId aclDumpId = InvalidDumpId;
+ PQExpBuffer query = createPQExpBuffer();
+ PQExpBuffer tag = createPQExpBuffer();
+
+ if (dumpACLQuery(fout, query, tag, objDumpId, altDumpId,
+ type, name, subname, nspname, owner, dacl))
+ {
+ DumpId aclDeps[2];
+ int nDeps = 0;
+
+ if (subname)
+ appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname);
+ else
+ appendPQExpBuffer(tag, "%s %s", type, name);
+
+ aclDeps[nDeps++] = objDumpId;
+ if (altDumpId != InvalidDumpId)
+ aclDeps[nDeps++] = altDumpId;
+
+ aclDumpId = createDumpId();
+
+ ArchiveEntry(fout, nilCatalogId, aclDumpId,
+ ARCHIVE_OPTS(.tag = tag->data,
+ .namespace = nspname,
+ .owner = owner,
+ .description = "ACL",
+ .section = SECTION_NONE,
+ .createStmt = query->data,
+ .deps = aclDeps,
+ .nDeps = nDeps));
+
+ }
+
+ destroyPQExpBuffer(query);
+ destroyPQExpBuffer(tag);
+
+ return aclDumpId;
+}
+
+static bool
+dumpACLQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+ DumpId objDumpId, DumpId altDumpId,
+ const char *type, const char *name, const char *subname,
+ const char *nspname, const char *owner,
+ const DumpableAcl *dacl)
+{
DumpOptions *dopt = fout->dopt;
const char *acls = dacl->acl;
const char *acldefault = dacl->acldefault;
char privtype = dacl->privtype;
const char *initprivs = dacl->initprivs;
const char *baseacls;
- PQExpBuffer sql;
/* Do nothing if ACL dump is not enabled */
if (dopt->aclsSkip)
- return InvalidDumpId;
+ return false;
/* --data-only skips ACLs *except* large object ACLs */
if (dopt->dataOnly && strcmp(type, "LARGE OBJECT") != 0)
- return InvalidDumpId;
-
- sql = createPQExpBuffer();
+ return false;
/*
* In binary upgrade mode, we don't run an extension's script but instead
@@ -14973,13 +15078,13 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
if (dopt->binary_upgrade && privtype == 'e' &&
initprivs && *initprivs != '\0')
{
- appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(true);\n");
+ appendPQExpBufferStr(query, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(true);\n");
if (!buildACLCommands(name, subname, nspname, type,
initprivs, acldefault, owner,
- "", fout->remoteVersion, sql))
+ "", fout->remoteVersion, query))
pg_fatal("could not parse initial ACL list (%s) or default (%s) for object \"%s\" (%s)",
initprivs, acldefault, name, type);
- appendPQExpBufferStr(sql, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(false);\n");
+ appendPQExpBufferStr(query, "SELECT pg_catalog.binary_upgrade_set_record_init_privs(false);\n");
}
/*
@@ -15001,43 +15106,19 @@ dumpACL(Archive *fout, DumpId objDumpId, DumpId altDumpId,
if (!buildACLCommands(name, subname, nspname, type,
acls, baseacls, owner,
- "", fout->remoteVersion, sql))
+ "", fout->remoteVersion, query))
pg_fatal("could not parse ACL list (%s) or default (%s) for object \"%s\" (%s)",
acls, baseacls, name, type);
- if (sql->len > 0)
+ if (query->len > 0 && tag != NULL)
{
- PQExpBuffer tag = createPQExpBuffer();
- DumpId aclDeps[2];
- int nDeps = 0;
-
if (subname)
appendPQExpBuffer(tag, "COLUMN %s.%s", name, subname);
else
appendPQExpBuffer(tag, "%s %s", type, name);
-
- aclDeps[nDeps++] = objDumpId;
- if (altDumpId != InvalidDumpId)
- aclDeps[nDeps++] = altDumpId;
-
- aclDumpId = createDumpId();
-
- ArchiveEntry(fout, nilCatalogId, aclDumpId,
- ARCHIVE_OPTS(.tag = tag->data,
- .namespace = nspname,
- .owner = owner,
- .description = "ACL",
- .section = SECTION_NONE,
- .createStmt = sql->data,
- .deps = aclDeps,
- .nDeps = nDeps));
-
- destroyPQExpBuffer(tag);
}
- destroyPQExpBuffer(sql);
-
- return aclDumpId;
+ return true;
}
/*
@@ -15062,16 +15143,42 @@ static void
dumpSecLabel(Archive *fout, const char *type, const char *name,
const char *namespace, const char *owner,
CatalogId catalogId, int subid, DumpId dumpId)
+{
+ PQExpBuffer query = createPQExpBuffer();
+ PQExpBuffer tag = createPQExpBuffer();
+
+ if (dumpSecLabelQuery(fout, query, tag, type, name,
+ namespace, owner, catalogId, subid, dumpId))
+ {
+ ArchiveEntry(fout, nilCatalogId, createDumpId(),
+ ARCHIVE_OPTS(.tag = tag->data,
+ .namespace = namespace,
+ .owner = owner,
+ .description = "SECURITY LABEL",
+ .section = SECTION_NONE,
+ .createStmt = query->data,
+ .deps = &dumpId,
+ .nDeps = 1));
+ }
+
+ destroyPQExpBuffer(query);
+ destroyPQExpBuffer(tag);
+}
+
+static bool
+dumpSecLabelQuery(Archive *fout, PQExpBuffer query, PQExpBuffer tag,
+ const char *type, const char *name,
+ const char *namespace, const char *owner,
+ CatalogId catalogId, int subid, DumpId dumpId)
{
DumpOptions *dopt = fout->dopt;
SecLabelItem *labels;
int nlabels;
int i;
- PQExpBuffer query;
/* do nothing, if --no-security-labels is supplied */
if (dopt->no_security_labels)
- return;
+ return false;
/*
* Security labels are schema not data ... except large object labels are
@@ -15080,20 +15187,18 @@ dumpSecLabel(Archive *fout, const char *type, const char *name,
if (strcmp(type, "LARGE OBJECT") != 0)
{
if (dopt->dataOnly)
- return;
+ return false;
}
else
{
/* We do dump large object security labels in binary-upgrade mode */
if (dopt->schemaOnly && !dopt->binary_upgrade)
- return;
+ return false;
}
/* Search for security labels associated with catalogId, using table */
nlabels = findSecLabels(catalogId.tableoid, catalogId.oid, &labels);
- query = createPQExpBuffer();
-
for (i = 0; i < nlabels; i++)
{
/*
@@ -15114,22 +15219,11 @@ dumpSecLabel(Archive *fout, const char *type, const char *name,
if (query->len > 0)
{
- PQExpBuffer tag = createPQExpBuffer();
-
appendPQExpBuffer(tag, "%s %s", type, name);
- ArchiveEntry(fout, nilCatalogId, createDumpId(),
- ARCHIVE_OPTS(.tag = tag->data,
- .namespace = namespace,
- .owner = owner,
- .description = "SECURITY LABEL",
- .section = SECTION_NONE,
- .createStmt = query->data,
- .deps = &dumpId,
- .nDeps = 1));
- destroyPQExpBuffer(tag);
+ return true;
}
- destroyPQExpBuffer(query);
+ return false;
}
/*
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 049a100634..2159f72ffb 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -60,6 +60,7 @@ main(int argc, char **argv)
int c;
int exit_code;
int numWorkers = 1;
+ int blobBatchSize = 0;
Archive *AH;
char *inputFileSpec;
static int disable_triggers = 0;
@@ -123,6 +124,7 @@ main(int argc, char **argv)
{"no-publications", no_argument, &no_publications, 1},
{"no-security-labels", no_argument, &no_security_labels, 1},
{"no-subscriptions", no_argument, &no_subscriptions, 1},
+ {"restore-blob-batch-size", required_argument, NULL, 4},
{NULL, 0, NULL, 0}
};
@@ -286,6 +288,10 @@ main(int argc, char **argv)
set_dump_section(optarg, &(opts->dumpSections));
break;
+ case 4: /* # of blobs to restore per transaction */
+ blobBatchSize = atoi(optarg);
+ break;
+
default:
/* getopt_long already emitted a complaint */
pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -405,6 +411,7 @@ main(int argc, char **argv)
SortTocFromFile(AH);
AH->numWorkers = numWorkers;
+ AH->blobBatchSize = blobBatchSize;
if (opts->tocSummary)
PrintTOCSummary(AH);
@@ -478,6 +485,8 @@ usage(const char *progname)
printf(_(" --use-set-session-authorization\n"
" use SET SESSION AUTHORIZATION commands instead of\n"
" ALTER OWNER commands to set ownership\n"));
+ printf(_(" --restore-blob-batch-size=NUM\n"
+ " attempt to restore NUM large objects per transaction\n"));
printf(_("\nConnection options:\n"));
printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index fa52aa2c22..459d834ac3 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -84,7 +84,7 @@ output_check_banner(bool live_check)
void
-check_and_dump_old_cluster(bool live_check)
+check_and_dump_old_cluster(bool live_check, DbDumpStats **stats)
{
/* -- OLD -- */
@@ -202,12 +202,36 @@ check_and_dump_old_cluster(bool live_check)
* the old server is running.
*/
if (!user_opts.check)
+ {
+ *stats = collect_db_stats();
generate_old_dump();
+ }
if (!live_check)
stop_postmaster(false);
}
+DbDumpStats* collect_db_stats(void)
+{
+ uint dbnum;
+ DbDumpStats *stats = (DbDumpStats *)pg_malloc(sizeof(DbDumpStats));
+ stats->large_objects = (uint64 *)pg_malloc(sizeof(old_cluster.dbarr.ndbs * sizeof(uint64)));
+ prep_status("Collecting database stats");
+ for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ {
+ PGresult *res;
+ DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum];
+ PGconn *conn = connectToServer(&old_cluster, active_db->db_name);
+
+ res = executeQueryOrDie(conn, "SELECT count(*) from pg_largeobject_metadata");
+ stats->large_objects[dbnum] = atoll(PQgetvalue(res, 0, 0));
+ PQclear(res);
+ PQfinish(conn);
+ }
+ check_ok();
+
+ return stats;
+}
void
check_new_cluster(void)
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 3960af4036..12605200b5 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -54,7 +54,8 @@
static void set_locale_and_encoding(void);
static void prepare_new_cluster(void);
static void prepare_new_globals(void);
-static void create_new_objects(void);
+static void restore_dbs(DbDumpStats *stats, bool parallel_restore);
+static void create_new_objects(DbDumpStats *stats);
static void copy_xact_xlog_xid(void);
static void set_frozenxids(bool minmxid_only);
static void make_outputdirs(char *pgdata);
@@ -82,6 +83,7 @@ main(int argc, char **argv)
{
char *deletion_script_file_name = NULL;
bool live_check = false;
+ DbDumpStats *stats;
/*
* pg_upgrade doesn't currently use common/logging.c, but initialize it
@@ -127,7 +129,7 @@ main(int argc, char **argv)
check_cluster_compatibility(live_check);
- check_and_dump_old_cluster(live_check);
+ check_and_dump_old_cluster(live_check, &stats);
/* -- NEW -- */
@@ -160,7 +162,7 @@ main(int argc, char **argv)
prepare_new_globals();
- create_new_objects();
+ create_new_objects(stats);
stop_postmaster(false);
@@ -508,9 +510,72 @@ prepare_new_globals(void)
check_ok();
}
+static void
+restore_dbs(DbDumpStats *stats, bool parallel_restore)
+{
+ int dbnum;
+ for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+ {
+ char sql_file_name[MAXPGPATH],
+ log_file_name[MAXPGPATH];
+ DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum];
+ const char *create_opts;
+ int jobs = user_opts.jobs ? user_opts.jobs : 1 ;
+ bool large_objects_thresold_breached = stats && stats->large_objects[dbnum] > LARGE_OBJECTS_THRESOLD;
+
+ /* Skip template1 in this pass */
+ if (strcmp(old_db->db_name, "template1") == 0)
+ continue;
+ /* Skip dbs where LARGE_OBJECTS_THRESOLD is breached and parallel_restore is enabled*/
+ if (large_objects_thresold_breached && parallel_restore)
+ continue;
+
+ pg_log(PG_STATUS, "%s", old_db->db_name);
+ snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid);
+ snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+ /*
+ * postgres database will already exist in the target installation, so
+ * tell pg_restore to drop and recreate it; otherwise we would fail to
+ * propagate its database-level properties.
+ */
+ if (strcmp(old_db->db_name, "postgres") == 0)
+ create_opts = "--clean --create";
+ else
+ create_opts = "--create";
+
+ if (parallel_restore)
+ parallel_exec_prog(log_file_name,
+ NULL,
+ "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+ "--dbname template1 \"%s/%s\"",
+ new_cluster.bindir,
+ cluster_conn_opts(&new_cluster),
+ create_opts,
+ log_opts.dumpdir,
+ sql_file_name);
+ else
+ exec_prog(log_file_name,
+ NULL,
+ true,
+ true,
+ "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
+ "--restore-blob-batch-size %d --jobs %d "
+ "--dbname template1 \"%s/%s\"",
+ new_cluster.bindir,
+ cluster_conn_opts(&new_cluster),
+ create_opts,
+ large_objects_thresold_breached ?
+ LARGE_OBJECTS_THRESOLD : 0,
+ large_objects_thresold_breached ?
+ jobs : 1,
+ log_opts.dumpdir,
+ sql_file_name);
+ }
+}
static void
-create_new_objects(void)
+create_new_objects(DbDumpStats *stats)
{
int dbnum;
@@ -557,43 +622,13 @@ create_new_objects(void)
break; /* done once we've processed template1 */
}
-
- for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
- {
- char sql_file_name[MAXPGPATH],
- log_file_name[MAXPGPATH];
- DbInfo *old_db = &old_cluster.dbarr.dbs[dbnum];
- const char *create_opts;
-
- /* Skip template1 in this pass */
- if (strcmp(old_db->db_name, "template1") == 0)
- continue;
-
- pg_log(PG_STATUS, "%s", old_db->db_name);
- snprintf(sql_file_name, sizeof(sql_file_name), DB_DUMP_FILE_MASK, old_db->db_oid);
- snprintf(log_file_name, sizeof(log_file_name), DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
-
- /*
- * postgres database will already exist in the target installation, so
- * tell pg_restore to drop and recreate it; otherwise we would fail to
- * propagate its database-level properties.
- */
- if (strcmp(old_db->db_name, "postgres") == 0)
- create_opts = "--clean --create";
- else
- create_opts = "--create";
-
- parallel_exec_prog(log_file_name,
- NULL,
- "\"%s/pg_restore\" %s %s --exit-on-error --verbose "
- "--dbname template1 \"%s/%s\"",
- new_cluster.bindir,
- cluster_conn_opts(&new_cluster),
- create_opts,
- log_opts.dumpdir,
- sql_file_name);
- }
-
+ /* Restore all the dbs where LARGE_OBJECTS_THRESOLD is not breached */
+ restore_dbs(stats, true);
+ /* reap all children */
+ while (reap_child(true) == true)
+ ;
+ /* Restore rest of the dbs one by one with pg_restore --jobs = user_opts.jobs */
+ restore_dbs(stats, false);
/* reap all children */
while (reap_child(true) == true)
;
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index a710f325de..f41063dbc7 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -24,6 +24,8 @@
#define MESSAGE_WIDTH 62
+#define LARGE_OBJECTS_THRESOLD 200 // maybe 10k ?
+
#define GET_MAJOR_VERSION(v) ((v) / 100)
/* contains both global db information and CREATE DATABASE commands */
@@ -347,6 +349,15 @@ typedef struct
ClusterInfo *running_cluster;
} OSInfo;
+/*
+ * Dump stats, will be used by pg_upgrade to efficently run pg_restore
+ */
+
+typedef struct
+{
+ uint64 *large_objects;
+}DbDumpStats;
+
/*
* Global variables
@@ -361,7 +372,7 @@ extern OSInfo os_info;
/* check.c */
void output_check_banner(bool live_check);
-void check_and_dump_old_cluster(bool live_check);
+void check_and_dump_old_cluster(bool live_check, DbDumpStats **stats);
void check_new_cluster(void);
void report_clusters_compatible(void);
void issue_warnings_and_set_wal_level(void);
@@ -369,6 +380,7 @@ void output_completion_banner(char *deletion_script_file_name);
void check_cluster_versions(void);
void check_cluster_compatibility(bool live_check);
void create_script_for_old_cluster_deletion(char **deletion_script_file_name);
+DbDumpStats* collect_db_stats(void);
/* controldata.c */
view thread (5+ messages) latest in thread
reply
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Reply to all the recipients using the --to and --cc options:
reply via email
To: [email protected]
Cc: [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected], [email protected]
Subject: Re: pg_upgrade failing for 200+ million Large Objects
In-Reply-To: <[email protected]>
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
This inbox is served by agora; see mirroring instructions
for how to clone and mirror all data and code used for this inbox