Fix up pg_dump's treatment of large object ownership and ACLs. We now emit
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 18 Feb 2010 01:29:10 +0000 (01:29 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 18 Feb 2010 01:29:10 +0000 (01:29 +0000)
a separate archive entry for each BLOB, and use pg_dump's standard methods
for dealing with its ownership, ACL if any, and comment if any.  This means
that switches like --no-owner and --no-privileges do what they're supposed
to.  Preliminary testing says that performance is still reasonable even
with many blobs, though we'll have to see how that shakes out in the field.

KaiGai Kohei, revised by me

src/bin/pg_dump/dumputils.c
src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_backup_archiver.h
src/bin/pg_dump/pg_backup_db.c
src/bin/pg_dump/pg_backup_null.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/pg_dump/pg_dump_sort.c

index e448e388d408aff8fc968cd03e8c95ce529e6255..3452944a426badb2d8e754e7465db3275f68a59a 100644 (file)
@@ -8,7 +8,7 @@
  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.53 2010/01/02 16:57:59 momjian Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/dumputils.c,v 1.54 2010/02/18 01:29:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -486,7 +486,8 @@ parsePGArray(const char *atext, char ***itemarray, int *nitems)
  *     name: the object name, in the form to use in the commands (already quoted)
  *     subname: the sub-object name, if any (already quoted); NULL if none
  *     type: the object type (as seen in GRANT command: must be one of
- *             TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE)
+ *             TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE,
+ *             FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT)
  *     acls: the ACL string fetched from the database
  *     owner: username of object owner (will be passed through fmtId); can be
  *             NULL or empty string to indicate "no owner known"
index 4357d1197175707fda2d8e40e8237b954ddddfeb..f227ed1590ead380ea90d77aea06d959ee19d5b4 100644 (file)
@@ -15,7 +15,7 @@
  *
  *
  * IDENTIFICATION
- *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.178 2010/01/19 18:39:19 tgl Exp $
+ *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.179 2010/02/18 01:29:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -98,6 +98,7 @@ static void _selectTablespace(ArchiveHandle *AH, const char *tablespace);
 static void processEncodingEntry(ArchiveHandle *AH, TocEntry *te);
 static void processStdStringsEntry(ArchiveHandle *AH, TocEntry *te);
 static teReqs _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls);
+static bool _tocEntryIsACL(TocEntry *te);
 static void _disableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
 static void _enableTriggersIfNecessary(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
 static TocEntry *getTocEntryByDumpId(ArchiveHandle *AH, DumpId id);
@@ -329,9 +330,9 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
                        AH->currentTE = te;
 
                        reqs = _tocEntryRequired(te, ropt, false /* needn't drop ACLs */ );
-                       if (((reqs & REQ_SCHEMA) != 0) && te->dropStmt)
+                       /* We want anything that's selected and has a dropStmt */
+                       if (((reqs & (REQ_SCHEMA|REQ_DATA)) != 0) && te->dropStmt)
                        {
-                               /* We want the schema */
                                ahlog(AH, 1, "dropping %s %s\n", te->desc, te->tag);
                                /* Select owner and schema as necessary */
                                _becomeOwner(AH, te);
@@ -381,7 +382,8 @@ RestoreArchive(Archive *AHX, RestoreOptions *ropt)
                /* Work out what, if anything, we want from this entry */
                reqs = _tocEntryRequired(te, ropt, true);
 
-               if ((reqs & REQ_SCHEMA) != 0)   /* We want the schema */
+               /* Both schema and data objects might now have ownership/ACLs */
+               if ((reqs & (REQ_SCHEMA|REQ_DATA)) != 0)
                {
                        ahlog(AH, 1, "setting owner and privileges for %s %s\n",
                                  te->desc, te->tag);
@@ -905,6 +907,7 @@ EndRestoreBlobs(ArchiveHandle *AH)
 void
 StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
 {
+       bool            old_blob_style = (AH->version < K_VERS_1_12);
        Oid                     loOid;
 
        AH->blobCount++;
@@ -914,24 +917,32 @@ StartRestoreBlob(ArchiveHandle *AH, Oid oid, bool drop)
 
        ahlog(AH, 2, "restoring large object with OID %u\n", oid);
 
-       if (drop)
+       /* With an old archive we must do drop and create logic here */
+       if (old_blob_style && drop)
                DropBlobIfExists(AH, oid);
 
        if (AH->connection)
        {
-               loOid = lo_create(AH->connection, oid);
-               if (loOid == 0 || loOid != oid)
-                       die_horribly(AH, modulename, "could not create large object %u\n",
-                                                oid);
-
+               if (old_blob_style)
+               {
+                       loOid = lo_create(AH->connection, oid);
+                       if (loOid == 0 || loOid != oid)
+                               die_horribly(AH, modulename, "could not create large object %u\n",
+                                                        oid);
+               }
                AH->loFd = lo_open(AH->connection, oid, INV_WRITE);
                if (AH->loFd == -1)
-                       die_horribly(AH, modulename, "could not open large object\n");
+                       die_horribly(AH, modulename, "could not open large object %u\n",
+                                                oid);
        }
        else
        {
-               ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
-                                oid, INV_WRITE);
+               if (old_blob_style)
+                       ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
+                                        oid, INV_WRITE);
+               else
+                       ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
+                                        oid, INV_WRITE);
        }
 
        AH->writingBlob = 1;
@@ -1829,6 +1840,9 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
        AH->vmin = K_VERS_MINOR;
        AH->vrev = K_VERS_REV;
 
+       /* Make a convenient integer <maj><min><rev>00 */
+       AH->version = ((AH->vmaj * 256 + AH->vmin) * 256 + AH->vrev) * 256 + 0;
+
        /* initialize for backwards compatible string processing */
        AH->public.encoding = 0;        /* PG_SQL_ASCII */
        AH->public.std_strings = false;
@@ -2068,12 +2082,13 @@ ReadToc(ArchiveHandle *AH)
                else
                {
                        /*
-                        * rules for pre-8.4 archives wherein pg_dump hasn't classified
-                        * the entries into sections
+                        * Rules for pre-8.4 archives wherein pg_dump hasn't classified
+                        * the entries into sections.  This list need not cover entry
+                        * types added later than 8.4.
                         */
                        if (strcmp(te->desc, "COMMENT") == 0 ||
                                strcmp(te->desc, "ACL") == 0 ||
-                               strcmp(te->desc, "DEFAULT ACL") == 0)
+                               strcmp(te->desc, "ACL LANGUAGE") == 0)
                                te->section = SECTION_NONE;
                        else if (strcmp(te->desc, "TABLE DATA") == 0 ||
                                         strcmp(te->desc, "BLOBS") == 0 ||
@@ -2228,10 +2243,10 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
                return 0;
 
        /* If it's an ACL, maybe ignore it */
-       if ((!include_acls || ropt->aclsSkip) &&
-               (strcmp(te->desc, "ACL") == 0 || strcmp(te->desc, "DEFAULT ACL") == 0))
+       if ((!include_acls || ropt->aclsSkip) && _tocEntryIsACL(te))
                return 0;
 
+       /* Ignore DATABASE entry unless we should create it */
        if (!ropt->create && strcmp(te->desc, "DATABASE") == 0)
                return 0;
 
@@ -2286,9 +2301,18 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
        if (!te->hadDumper)
        {
                /*
-                * Special Case: If 'SEQUENCE SET' then it is considered a data entry
+                * Special Case: If 'SEQUENCE SET' or anything to do with BLOBs,
+                * then it is considered a data entry.  We don't need to check for
+                * the BLOBS entry or old-style BLOB COMMENTS, because they will
+                * have hadDumper = true ... but we do need to check new-style
+                * BLOB comments.
                 */
-               if (strcmp(te->desc, "SEQUENCE SET") == 0)
+               if (strcmp(te->desc, "SEQUENCE SET") == 0 ||
+                       strcmp(te->desc, "BLOB") == 0 ||
+                       (strcmp(te->desc, "ACL") == 0 &&
+                        strncmp(te->tag, "LARGE OBJECT ", 13) == 0) ||
+                       (strcmp(te->desc, "COMMENT") == 0 &&
+                        strncmp(te->tag, "LARGE OBJECT ", 13) == 0))
                        res = res & REQ_DATA;
                else
                        res = res & ~REQ_DATA;
@@ -2320,6 +2344,20 @@ _tocEntryRequired(TocEntry *te, RestoreOptions *ropt, bool include_acls)
        return res;
 }
 
+/*
+ * Identify TOC entries that are ACLs.
+ */
+static bool
+_tocEntryIsACL(TocEntry *te)
+{
+       /* "ACL LANGUAGE" was a crock emitted only in PG 7.4 */
+       if (strcmp(te->desc, "ACL") == 0 ||
+               strcmp(te->desc, "ACL LANGUAGE") == 0 ||
+               strcmp(te->desc, "DEFAULT ACL") == 0)
+               return true;
+       return false;
+}
+
 /*
  * Issue SET commands for parameters that we want to have set the same way
  * at all times during execution of a restore script.
@@ -2685,6 +2723,13 @@ _getObjectDescription(PQExpBuffer buf, TocEntry *te, ArchiveHandle *AH)
                return;
        }
 
+       /* BLOBs just have a name, but it's numeric so must not use fmtId */
+       if (strcmp(type, "BLOB") == 0)
+       {
+               appendPQExpBuffer(buf, "LARGE OBJECT %s", te->tag);
+               return;
+       }
+
        /*
         * These object types require additional decoration.  Fortunately, the
         * information needed is exactly what's in the DROP command.
@@ -2723,14 +2768,12 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
        /* ACLs are dumped only during acl pass */
        if (acl_pass)
        {
-               if (!(strcmp(te->desc, "ACL") == 0 ||
-                         strcmp(te->desc, "DEFAULT ACL") == 0))
+               if (!_tocEntryIsACL(te))
                        return;
        }
        else
        {
-               if (strcmp(te->desc, "ACL") == 0 ||
-                       strcmp(te->desc, "DEFAULT ACL") == 0)
+               if (_tocEntryIsACL(te))
                        return;
        }
 
@@ -2824,6 +2867,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
                strlen(te->owner) > 0 && strlen(te->dropStmt) > 0)
        {
                if (strcmp(te->desc, "AGGREGATE") == 0 ||
+                       strcmp(te->desc, "BLOB") == 0 ||
                        strcmp(te->desc, "CONVERSION") == 0 ||
                        strcmp(te->desc, "DATABASE") == 0 ||
                        strcmp(te->desc, "DOMAIN") == 0 ||
@@ -2873,7 +2917,7 @@ _printTocEntry(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt, bool isDat
         * If it's an ACL entry, it might contain SET SESSION AUTHORIZATION
         * commands, so we can no longer assume we know the current auth setting.
         */
-       if (strncmp(te->desc, "ACL", 3) == 0)
+       if (acl_pass)
        {
                if (AH->currUser)
                        free(AH->currUser);
index b3eb8b7b106317eaea1ade3b3e2959c82b7de6d4..dc2ccb5e95a70bd9e2e3308a82cc7ec5dc3090cf 100644 (file)
@@ -17,7 +17,7 @@
  *
  *
  * IDENTIFICATION
- *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.83 2009/12/14 00:39:11 itagaki Exp $
+ *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.84 2010/02/18 01:29:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -61,16 +61,16 @@ typedef struct _z_stream
 typedef z_stream *z_streamp;
 #endif
 
+/* Current archive version number (the format we can output) */
 #define K_VERS_MAJOR 1
-#define K_VERS_MINOR 11
+#define K_VERS_MINOR 12
 #define K_VERS_REV 0
 
 /* Data block types */
 #define BLK_DATA 1
-#define BLK_BLOB 2
 #define BLK_BLOBS 3
 
-/* Some important version numbers (checked in code) */
+/* Historical version numbers (checked in code) */
 #define K_VERS_1_0 (( (1 * 256 + 0) * 256 + 0) * 256 + 0)
 #define K_VERS_1_2 (( (1 * 256 + 2) * 256 + 0) * 256 + 0)              /* Allow No ZLIB */
 #define K_VERS_1_3 (( (1 * 256 + 3) * 256 + 0) * 256 + 0)              /* BLOBs */
@@ -87,8 +87,11 @@ typedef z_stream *z_streamp;
 #define K_VERS_1_10 (( (1 * 256 + 10) * 256 + 0) * 256 + 0)            /* add tablespace */
 #define K_VERS_1_11 (( (1 * 256 + 11) * 256 + 0) * 256 + 0)            /* add toc section
                                                                                                                                 * indicator */
+#define K_VERS_1_12 (( (1 * 256 + 12) * 256 + 0) * 256 + 0)            /* add separate BLOB
+                                                                                                                                * entries */
 
-#define K_VERS_MAX (( (1 * 256 + 11) * 256 + 255) * 256 + 0)
+/* Newest format we can read */
+#define K_VERS_MAX (( (1 * 256 + 12) * 256 + 255) * 256 + 0)
 
 
 /* Flags to indicate disposition of offsets stored in files */
index 53711de11912a489a6375678bdf3a8e8f53900bc..1a9ddc9580009afdd0ba2594d5177b7a3a14fb1a 100644 (file)
@@ -5,7 +5,7 @@
  *     Implements the basic DB functions used by the archiver.
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.87 2010/02/17 04:19:40 tgl Exp $
+ *       $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_db.c,v 1.88 2010/02/18 01:29:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -703,16 +703,26 @@ CommitTransaction(ArchiveHandle *AH)
 void
 DropBlobIfExists(ArchiveHandle *AH, Oid oid)
 {
-       /* Call lo_unlink only if exists to avoid not-found error. */
-       if (PQserverVersion(AH->connection) >= 90000)
+       /*
+        * If we are not restoring to a direct database connection, we have to
+        * guess about how to detect whether the blob exists.  Assume new-style.
+        */
+       if (AH->connection == NULL ||
+               PQserverVersion(AH->connection) >= 90000)
        {
-               ahprintf(AH, "SELECT pg_catalog.lo_unlink(oid) "
-                                        "FROM pg_catalog.pg_largeobject_metadata "
-                                        "WHERE oid = %u;\n", oid);
+               ahprintf(AH,
+                                "SELECT pg_catalog.lo_unlink(oid) "
+                                "FROM pg_catalog.pg_largeobject_metadata "
+                                "WHERE oid = '%u';\n",
+                                oid);
        }
        else
        {
-               ahprintf(AH, "SELECT CASE WHEN EXISTS(SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u') THEN pg_catalog.lo_unlink('%u') END;\n",
+               /* Restoring to pre-9.0 server, so do it the old way */
+               ahprintf(AH,
+                                "SELECT CASE WHEN EXISTS("
+                                "SELECT 1 FROM pg_catalog.pg_largeobject WHERE loid = '%u'"
+                                ") THEN pg_catalog.lo_unlink('%u') END;\n",
                                 oid, oid);
        }
 }
index 127ff5721976120c55403940cd125c7cbeb54b90..35b9a18a4ed3e6e6e0c7d259b570a93a4d76377c 100644 (file)
@@ -17,7 +17,7 @@
  *
  *
  * IDENTIFICATION
- *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.23 2009/12/14 00:39:11 itagaki Exp $
+ *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_null.c,v 1.24 2010/02/18 01:29:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -147,14 +147,21 @@ _StartBlobs(ArchiveHandle *AH, TocEntry *te)
 static void
 _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid)
 {
+       bool            old_blob_style = (AH->version < K_VERS_1_12);
+
        if (oid == 0)
                die_horribly(AH, NULL, "invalid OID for large object\n");
 
-       if (AH->ropt->dropSchema)
+       /* With an old archive we must do drop and create logic here */
+       if (old_blob_style && AH->ropt->dropSchema)
                DropBlobIfExists(AH, oid);
 
-       ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
-                        oid, INV_WRITE);
+       if (old_blob_style)
+               ahprintf(AH, "SELECT pg_catalog.lo_open(pg_catalog.lo_create('%u'), %d);\n",
+                                oid, INV_WRITE);
+       else
+               ahprintf(AH, "SELECT pg_catalog.lo_open('%u', %d);\n",
+                                oid, INV_WRITE);
 
        AH->WriteDataPtr = _WriteBlobData;
 }
index ace4ffe34bb298c846a18d7be755daf86a2e88fd..84d12b118f45bae8783a9200adee3444b34975fd 100644 (file)
@@ -12,7 +12,7 @@
  *     by PostgreSQL
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.571 2010/02/17 04:19:40 tgl Exp $
+ *       $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.c,v 1.572 2010/02/18 01:29:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -190,9 +190,9 @@ static void selectSourceSchema(const char *schemaName);
 static char *getFormattedTypeName(Oid oid, OidOptions opts);
 static char *myFormatType(const char *typname, int32 typmod);
 static const char *fmtQualifiedId(const char *schema, const char *id);
-static bool hasBlobs(Archive *AH);
+static void getBlobs(Archive *AH);
+static void dumpBlob(Archive *AH, BlobInfo *binfo);
 static int     dumpBlobs(Archive *AH, void *arg);
-static int     dumpBlobComments(Archive *AH, void *arg);
 static void dumpDatabase(Archive *AH);
 static void dumpEncoding(Archive *AH);
 static void dumpStdStrings(Archive *AH);
@@ -701,25 +701,8 @@ main(int argc, char **argv)
                        getTableDataFKConstraints();
        }
 
-       if (outputBlobs && hasBlobs(g_fout))
-       {
-               /* Add placeholders to allow correct sorting of blobs */
-               DumpableObject *blobobj;
-               DumpableObject *blobcobj;
-
-               blobobj = (DumpableObject *) malloc(sizeof(DumpableObject));
-               blobobj->objType = DO_BLOBS;
-               blobobj->catId = nilCatalogId;
-               AssignDumpId(blobobj);
-               blobobj->name = strdup("BLOBS");
-
-               blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject));
-               blobcobj->objType = DO_BLOB_COMMENTS;
-               blobcobj->catId = nilCatalogId;
-               AssignDumpId(blobcobj);
-               blobcobj->name = strdup("BLOB COMMENTS");
-               addObjectDependency(blobcobj, blobobj->dumpId);
-       }
+       if (outputBlobs)
+               getBlobs(g_fout);
 
        /*
         * Collect dependency data to assist in ordering the objects.
@@ -1808,7 +1791,7 @@ dumpDatabase(Archive *AH)
 
                appendPQExpBuffer(loFrozenQry, "SELECT relfrozenxid\n"
                                                        "FROM pg_catalog.pg_class\n"
-                                                       "WHERE oid = %d;\n",
+                                                       "WHERE oid = %u;\n",
                                                        LargeObjectRelationId);
 
                lo_res = PQexec(g_conn, loFrozenQry->data);
@@ -1825,7 +1808,7 @@ dumpDatabase(Archive *AH)
                appendPQExpBuffer(loOutQry, "\n-- For binary upgrade, set pg_largeobject relfrozenxid.\n");
                appendPQExpBuffer(loOutQry, "UPDATE pg_catalog.pg_class\n"
                                                  "SET relfrozenxid = '%u'\n"
-                                                 "WHERE oid = %d;\n",
+                                                 "WHERE oid = %u;\n",
                                                  atoi(PQgetvalue(lo_res, 0, i_relfrozenxid)),
                                                  LargeObjectRelationId);
                ArchiveEntry(AH, nilCatalogId, createDumpId(),
@@ -1938,40 +1921,135 @@ dumpStdStrings(Archive *AH)
 
 
 /*
- * hasBlobs:
- *     Test whether database contains any large objects
+ * getBlobs:
+ *     Collect schema-level data about large objects
  */
-static bool
-hasBlobs(Archive *AH)
+static void
+getBlobs(Archive *AH)
 {
-       bool            result;
-       const char *blobQry;
-       PGresult   *res;
+       PQExpBuffer             blobQry = createPQExpBuffer();
+       BlobInfo           *binfo;
+       DumpableObject *bdata;
+       PGresult           *res;
+       int                     ntups;
+       int                     i;
+
+       /* Verbose message */
+       if (g_verbose)
+               write_msg(NULL, "reading large objects\n");
 
        /* Make sure we are in proper schema */
        selectSourceSchema("pg_catalog");
 
-       /* Check for BLOB OIDs */
+       /* Fetch BLOB OIDs, and owner/ACL data if >= 9.0 */
        if (AH->remoteVersion >= 90000)
-               blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1";
+               appendPQExpBuffer(blobQry,
+                                                 "SELECT oid, (%s lomowner) AS rolname, lomacl"
+                                                 " FROM pg_largeobject_metadata",
+                                                 username_subquery);
        else if (AH->remoteVersion >= 70100)
-               blobQry = "SELECT loid FROM pg_largeobject LIMIT 1";
+               appendPQExpBuffer(blobQry,
+                                                 "SELECT DISTINCT loid, NULL::oid, NULL::oid"
+                                                 " FROM pg_largeobject");
        else
-               blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1";
+               appendPQExpBuffer(blobQry,
+                                                 "SELECT oid, NULL::oid, NULL::oid"
+                                                 " FROM pg_class WHERE relkind = 'l'");
 
-       res = PQexec(g_conn, blobQry);
-       check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK);
+       res = PQexec(g_conn, blobQry->data);
+       check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK);
+
+       ntups = PQntuples(res);
+       if (ntups > 0)
+       {
+               /*
+                * Each large object has its own BLOB archive entry.
+                */
+               binfo = (BlobInfo *) malloc(ntups * sizeof(BlobInfo));
 
-       result = PQntuples(res) > 0;
+               for (i = 0; i < ntups; i++)
+               {
+                       binfo[i].dobj.objType = DO_BLOB;
+                       binfo[i].dobj.catId.tableoid = LargeObjectRelationId;
+                       binfo[i].dobj.catId.oid = atooid(PQgetvalue(res, i, 0));
+                       AssignDumpId(&binfo[i].dobj);
+
+                       binfo[i].dobj.name = strdup(PQgetvalue(res, i, 0));
+                       if (!PQgetisnull(res, i, 1))
+                               binfo[i].rolname = strdup(PQgetvalue(res, i, 1));
+                       else
+                               binfo[i].rolname = "";
+                       if (!PQgetisnull(res, i, 2))
+                               binfo[i].blobacl = strdup(PQgetvalue(res, i, 2));
+                       else
+                               binfo[i].blobacl = NULL;
+               }
+
+               /*
+                * If we have any large objects, a "BLOBS" archive entry is needed.
+                * This is just a placeholder for sorting; it carries no data now.
+                */
+               bdata = (DumpableObject *) malloc(sizeof(DumpableObject));
+               bdata->objType = DO_BLOB_DATA;
+               bdata->catId = nilCatalogId;
+               AssignDumpId(bdata);
+               bdata->name = strdup("BLOBS");
+       }
 
        PQclear(res);
+       destroyPQExpBuffer(blobQry);
+}
 
-       return result;
+/*
+ * dumpBlob
+ *
+ * dump the definition (metadata) of the given large object
+ */
+static void
+dumpBlob(Archive *AH, BlobInfo *binfo)
+{
+       PQExpBuffer             cquery = createPQExpBuffer();
+       PQExpBuffer             dquery = createPQExpBuffer();
+
+       appendPQExpBuffer(cquery,
+                                         "SELECT pg_catalog.lo_create('%s');\n",
+                                         binfo->dobj.name);
+
+       appendPQExpBuffer(dquery,
+                                         "SELECT pg_catalog.lo_unlink('%s');\n",
+                                         binfo->dobj.name);
+
+       ArchiveEntry(AH, binfo->dobj.catId, binfo->dobj.dumpId,
+                                binfo->dobj.name,
+                                NULL, NULL,
+                                binfo->rolname, false,
+                                "BLOB", SECTION_PRE_DATA,
+                                cquery->data, dquery->data, NULL,
+                                binfo->dobj.dependencies, binfo->dobj.nDeps,
+                                NULL, NULL);
+
+       /* set up tag for comment and/or ACL */
+       resetPQExpBuffer(cquery);
+       appendPQExpBuffer(cquery, "LARGE OBJECT %s", binfo->dobj.name);
+
+       /* Dump comment if any */
+       dumpComment(AH, cquery->data,
+                               NULL, binfo->rolname,
+                               binfo->dobj.catId, 0, binfo->dobj.dumpId);
+
+       /* Dump ACL if any */
+       if (binfo->blobacl)
+               dumpACL(AH, binfo->dobj.catId, binfo->dobj.dumpId, "LARGE OBJECT",
+                               binfo->dobj.name, NULL, cquery->data,
+                               NULL, binfo->rolname, binfo->blobacl);
+
+       destroyPQExpBuffer(cquery);
+       destroyPQExpBuffer(dquery);
 }
 
 /*
  * dumpBlobs:
- *     dump all blobs
+ *     dump the data contents of all large objects
  */
 static int
 dumpBlobs(Archive *AH, void *arg)
@@ -1980,6 +2058,7 @@ dumpBlobs(Archive *AH, void *arg)
        const char *blobFetchQry;
        PGresult   *res;
        char            buf[LOBBUFSIZE];
+       int                     ntups;
        int                     i;
        int                     cnt;
 
@@ -1989,7 +2068,10 @@ dumpBlobs(Archive *AH, void *arg)
        /* Make sure we are in proper schema */
        selectSourceSchema("pg_catalog");
 
-       /* Cursor to get all BLOB OIDs */
+       /*
+        * Currently, we re-fetch all BLOB OIDs using a cursor.  Consider
+        * scanning the already-in-memory dumpable objects instead...
+        */
        if (AH->remoteVersion >= 90000)
                blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata";
        else if (AH->remoteVersion >= 70100)
@@ -2012,7 +2094,8 @@ dumpBlobs(Archive *AH, void *arg)
                check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
 
                /* Process the tuples, if any */
-               for (i = 0; i < PQntuples(res); i++)
+               ntups = PQntuples(res);
+               for (i = 0; i < ntups; i++)
                {
                        Oid                     blobOid;
                        int                     loFd;
@@ -2022,8 +2105,8 @@ dumpBlobs(Archive *AH, void *arg)
                        loFd = lo_open(g_conn, blobOid, INV_READ);
                        if (loFd == -1)
                        {
-                               write_msg(NULL, "dumpBlobs(): could not open large object: %s",
-                                                 PQerrorMessage(g_conn));
+                               write_msg(NULL, "dumpBlobs(): could not open large object %u: %s",
+                                                 blobOid, PQerrorMessage(g_conn));
                                exit_nicely();
                        }
 
@@ -2035,8 +2118,8 @@ dumpBlobs(Archive *AH, void *arg)
                                cnt = lo_read(g_conn, loFd, buf, LOBBUFSIZE);
                                if (cnt < 0)
                                {
-                                       write_msg(NULL, "dumpBlobs(): error reading large object: %s",
-                                                         PQerrorMessage(g_conn));
+                                       write_msg(NULL, "dumpBlobs(): error reading large object %u: %s",
+                                                         blobOid, PQerrorMessage(g_conn));
                                        exit_nicely();
                                }
 
@@ -2047,141 +2130,13 @@ dumpBlobs(Archive *AH, void *arg)
 
                        EndBlob(AH, blobOid);
                }
-       } while (PQntuples(res) > 0);
+       } while (ntups > 0);
 
        PQclear(res);
 
        return 1;
 }
 
-/*
- * dumpBlobComments
- *     dump all blob properties.
- *  It has "BLOB COMMENTS" tag due to the historical reason, but note
- *  that it is the routine to dump all the properties of blobs.
- *
- * Since we don't provide any way to be selective about dumping blobs,
- * there's no need to be selective about their comments either.  We put
- * all the comments into one big TOC entry.
- */
-static int
-dumpBlobComments(Archive *AH, void *arg)
-{
-       const char *blobQry;
-       const char *blobFetchQry;
-       PQExpBuffer cmdQry = createPQExpBuffer();
-       PGresult   *res;
-       int                     i;
-
-       if (g_verbose)
-               write_msg(NULL, "saving large object properties\n");
-
-       /* Make sure we are in proper schema */
-       selectSourceSchema("pg_catalog");
-
-       /* Cursor to get all BLOB comments */
-       if (AH->remoteVersion >= 90000)
-               blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
-                       "obj_description(oid, 'pg_largeobject'), "
-                       "pg_get_userbyid(lomowner), lomacl "
-                       "FROM pg_largeobject_metadata";
-       else if (AH->remoteVersion >= 70300)
-               blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
-                       "obj_description(loid, 'pg_largeobject'), NULL, NULL "
-                       "FROM (SELECT DISTINCT loid FROM "
-                       "pg_description d JOIN pg_largeobject l ON (objoid = loid) "
-                       "WHERE classoid = 'pg_largeobject'::regclass) ss";
-       else if (AH->remoteVersion >= 70200)
-               blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
-                       "obj_description(loid, 'pg_largeobject'), NULL, NULL "
-                       "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
-       else if (AH->remoteVersion >= 70100)
-               blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
-                       "obj_description(loid), NULL, NULL "
-                       "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
-       else
-               blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
-                       "       ( "
-                       "               SELECT description "
-                       "               FROM pg_description pd "
-                       "               WHERE pd.objoid=pc.oid "
-                       "       ), NULL, NULL "
-                       "FROM pg_class pc WHERE relkind = 'l'";
-
-       res = PQexec(g_conn, blobQry);
-       check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
-
-       /* Command to fetch from cursor */
-       blobFetchQry = "FETCH 100 IN blobcmt";
-
-       do
-       {
-               PQclear(res);
-
-               /* Do a fetch */
-               res = PQexec(g_conn, blobFetchQry);
-               check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
-
-               /* Process the tuples, if any */
-               for (i = 0; i < PQntuples(res); i++)
-               {
-                       Oid                     blobOid = atooid(PQgetvalue(res, i, 0));
-                       char       *lo_comment = PQgetvalue(res, i, 1);
-                       char       *lo_owner = PQgetvalue(res, i, 2);
-                       char       *lo_acl = PQgetvalue(res, i, 3);
-                       char            lo_name[32];
-
-                       resetPQExpBuffer(cmdQry);
-
-                       /* comment on the blob */
-                       if (!PQgetisnull(res, i, 1))
-                       {
-                               appendPQExpBuffer(cmdQry,
-                                                                 "COMMENT ON LARGE OBJECT %u IS ", blobOid);
-                               appendStringLiteralAH(cmdQry, lo_comment, AH);
-                               appendPQExpBuffer(cmdQry, ";\n");
-                       }
-
-                       /* dump blob ownership, if necessary */
-                       if (!PQgetisnull(res, i, 2))
-                       {
-                               appendPQExpBuffer(cmdQry,
-                                                                 "ALTER LARGE OBJECT %u OWNER TO %s;\n",
-                                                                 blobOid, lo_owner);
-                       }
-
-                       /* dump blob privileges, if necessary */
-                       if (!PQgetisnull(res, i, 3) &&
-                               !dataOnly && !aclsSkip)
-                       {
-                               snprintf(lo_name, sizeof(lo_name), "%u", blobOid);
-                               if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT",
-                                                                         lo_acl, lo_owner, "",
-                                                                         AH->remoteVersion, cmdQry))
-                               {
-                                       write_msg(NULL, "could not parse ACL (%s) for "
-                                                         "large object %u", lo_acl, blobOid);
-                                       exit_nicely();
-                               }
-                       }
-
-                       if (cmdQry->len > 0)
-                       {
-                               appendPQExpBuffer(cmdQry, "\n");
-                               archputs(cmdQry->data, AH);
-                       }
-               }
-       } while (PQntuples(res) > 0);
-
-       PQclear(res);
-
-       archputs("\n", AH);
-
-       destroyPQExpBuffer(cmdQry);
-
-       return 1;
-}
-
 static void
 binary_upgrade_set_type_oids_by_type_oid(PQExpBuffer upgrade_buffer,
                                                                                           Oid pg_type_oid)
@@ -6140,9 +6095,17 @@ dumpComment(Archive *fout, const char *target,
        CommentItem *comments;
        int                     ncomments;
 
-       /* Comments are SCHEMA not data */
-       if (dataOnly)
-               return;
+       /* Comments are schema not data ... except blob comments are data */
+       if (strncmp(target, "LARGE OBJECT ", 13) != 0)
+       {
+               if (dataOnly)
+                       return;
+       }
+       else
+       {
+               if (schemaOnly)
+                       return;
+       }
 
        /* Search for comments associated with catalogId, using table */
        ncomments = findComments(fout, catalogId.tableoid, catalogId.oid,
@@ -6530,7 +6493,10 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
                case DO_DEFAULT_ACL:
                        dumpDefaultACL(fout, (DefaultACLInfo *) dobj);
                        break;
-               case DO_BLOBS:
+               case DO_BLOB:
+                       dumpBlob(fout, (BlobInfo *) dobj);
+                       break;
+               case DO_BLOB_DATA:
                        ArchiveEntry(fout, dobj->catId, dobj->dumpId,
                                                 dobj->name, NULL, NULL, "",
                                                 false, "BLOBS", SECTION_DATA,
@@ -6538,14 +6504,6 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
                                                 dobj->dependencies, dobj->nDeps,
                                                 dumpBlobs, NULL);
                        break;
-               case DO_BLOB_COMMENTS:
-                       ArchiveEntry(fout, dobj->catId, dobj->dumpId,
-                                                dobj->name, NULL, NULL, "",
-                                                false, "BLOB COMMENTS", SECTION_DATA,
-                                                "", "", NULL,
-                                                dobj->dependencies, dobj->nDeps,
-                                                dumpBlobComments, NULL);
-                       break;
        }
 }
 
@@ -10382,14 +10340,16 @@ dumpDefaultACL(Archive *fout, DefaultACLInfo *daclinfo)
  *
  * 'objCatId' is the catalog ID of the underlying object.
  * 'objDumpId' is the dump ID of the underlying object.
- * 'type' must be TABLE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, or TABLESPACE.
+ * 'type' must be one of
+ *             TABLE, SEQUENCE, FUNCTION, LANGUAGE, SCHEMA, DATABASE, TABLESPACE,
+ *             FOREIGN DATA WRAPPER, SERVER, or LARGE OBJECT.
  * 'name' is the formatted name of the object. Must be quoted etc. already.
  * 'subname' is the formatted name of the sub-object, if any.  Must be quoted.
  * 'tag' is the tag for the archive entry (typ. unquoted name of object).
  * 'nspname' is the namespace the object is in (NULL if none).
  * 'owner' is the owner, NULL if there is no owner (for languages).
  * 'acls' is the string read out of the fooacl system catalog field;
- * it will be parsed here.
+ *             it will be parsed here.
  *----------
  */
 static void
@@ -10401,7 +10361,11 @@ dumpACL(Archive *fout, CatalogId objCatId, DumpId objDumpId,
        PQExpBuffer sql;
 
        /* Do nothing if ACL dump is not enabled */
-       if (dataOnly || aclsSkip)
+       if (aclsSkip)
+               return;
+
+       /* --data-only skips ACLs *except* BLOB ACLs */
+       if (dataOnly && strcmp(type, "LARGE OBJECT") != 0)
                return;
 
        sql = createPQExpBuffer();
index fa5972a55ff61c3cd16a9881a452881e52406c09..e71d03604b7f69626377a22820eb6c0a84e50fee 100644 (file)
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.h,v 1.162 2010/01/28 23:21:12 petere Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump.h,v 1.163 2010/02/18 01:29:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -115,8 +115,8 @@ typedef enum
        DO_FDW,
        DO_FOREIGN_SERVER,
        DO_DEFAULT_ACL,
-       DO_BLOBS,
-       DO_BLOB_COMMENTS
+       DO_BLOB,
+       DO_BLOB_DATA
 } DumpableObjectType;
 
 typedef struct _dumpableObject
@@ -443,6 +443,13 @@ typedef struct _defaultACLInfo
        char       *defaclacl;
 } DefaultACLInfo;
 
+typedef struct _blobInfo
+{
+       DumpableObject  dobj;
+       char       *rolname;
+       char       *blobacl;
+} BlobInfo;
+
 /* global decls */
 extern bool force_quotes;              /* double-quotes for identifiers flag */
 extern bool g_verbose;                 /* verbose flag */
index 1551af3dbef6d741d54501947d073ee48416e8f8..f3761217d1c9ff92ceb9a84dc9dd4889b059badc 100644 (file)
@@ -9,7 +9,7 @@
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.28 2010/02/15 19:59:47 petere Exp $
+ *       $PostgreSQL: pgsql/src/bin/pg_dump/pg_dump_sort.c,v 1.29 2010/02/18 01:29:10 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@ static const int oldObjectTypePriority[] =
        16,                                                     /* DO_FK_CONSTRAINT */
        2,                                                      /* DO_PROCLANG */
        2,                                                      /* DO_CAST */
-       9,                                                      /* DO_TABLE_DATA */
+       10,                                                     /* DO_TABLE_DATA */
        7,                                                      /* DO_DUMMY_TYPE */
        3,                                                      /* DO_TSPARSER */
        4,                                                      /* DO_TSDICT */
@@ -55,8 +55,8 @@ static const int oldObjectTypePriority[] =
        3,                                                      /* DO_FDW */
        4,                                                      /* DO_FOREIGN_SERVER */
        17,                                                     /* DO_DEFAULT_ACL */
-       10,                                                     /* DO_BLOBS */
-       11                                                      /* DO_BLOB_COMMENTS */
+       9,                                                      /* DO_BLOB */
+       11                                                      /* DO_BLOB_DATA */
 };
 
 /*
@@ -83,7 +83,7 @@ static const int newObjectTypePriority[] =
        26,                                                     /* DO_FK_CONSTRAINT */
        2,                                                      /* DO_PROCLANG */
        8,                                                      /* DO_CAST */
-       19,                                                     /* DO_TABLE_DATA */
+       20,                                                     /* DO_TABLE_DATA */
        17,                                                     /* DO_DUMMY_TYPE */
        10,                                                     /* DO_TSPARSER */
        12,                                                     /* DO_TSDICT */
@@ -92,8 +92,8 @@ static const int newObjectTypePriority[] =
        14,                                                     /* DO_FDW */
        15,                                                     /* DO_FOREIGN_SERVER */
        27,                                                     /* DO_DEFAULT_ACL */
-       20,                                                     /* DO_BLOBS */
-       21                                                      /* DO_BLOB_COMMENTS */
+       19,                                                     /* DO_BLOB */
+       21                                                      /* DO_BLOB_DATA */
 };
 
 
@@ -1157,14 +1157,14 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
                                         "DEFAULT ACL %s  (ID %d OID %u)",
                                         obj->name, obj->dumpId, obj->catId.oid);
                        return;
-               case DO_BLOBS:
+               case DO_BLOB:
                        snprintf(buf, bufsize,
-                                        "BLOBS  (ID %d)",
-                                        obj->dumpId);
+                                        "BLOB  (ID %d OID %u)",
+                                        obj->dumpId, obj->catId.oid);
                        return;
-               case DO_BLOB_COMMENTS:
+               case DO_BLOB_DATA:
                        snprintf(buf, bufsize,
-                                        "BLOB COMMENTS  (ID %d)",
+                                        "BLOB DATA  (ID %d)",
                                         obj->dumpId);
                        return;
        }