Added long-standing transaction when restoring BLOBS (uses commit every BLOB_BATCH_SIZE)
authorPhilip Warner <pjw@rhyme.com.au>
Tue, 31 Oct 2000 14:20:30 +0000 (14:20 +0000)
committerPhilip Warner <pjw@rhyme.com.au>
Tue, 31 Oct 2000 14:20:30 +0000 (14:20 +0000)
Prevent dumping of languages from template1.

src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_backup_archiver.h
src/bin/pg_dump/pg_backup_custom.c
src/bin/pg_dump/pg_backup_db.c
src/bin/pg_dump/pg_backup_db.h
src/bin/pg_dump/pg_backup_files.c
src/bin/pg_dump/pg_backup_tar.c
src/bin/pg_dump/pg_dump.c

index b6487d342596cd041088c91b0637cf0d8172158b..d8a969b41e0279623081d0869737c869be389845 100644 (file)
  * Modifications - 31-Jul-2000 - pjw@rhyme.com.au (1.46, 1.47)
  *     Fixed version number initialization in _allocAH (pg_backup_archiver.c)
  *
+ *
+ * Modifications - 30-Oct-2000 - pjw@rhyme.com.au
+ *     Added {Start,End}RestoreBlobs to allow extended TX during BLOB restore.
+ *
  *-------------------------------------------------------------------------
  */
 
@@ -590,6 +594,34 @@ int EndBlob(Archive* AHX, int oid)
  * BLOB Restoration
  **********/
 
+/*
+ * Called by a format handler before any blobs are restored 
+ */
+void StartRestoreBlobs(ArchiveHandle* AH)
+{
+   AH->blobCount = 0;
+}
+
+/*
+ * Called by a format handler after all blobs are restored 
+ */
+void EndRestoreBlobs(ArchiveHandle* AH)
+{
+   if (AH->txActive)
+   {
+       ahlog(AH, 2, "Committing BLOB transactions\n");
+       CommitTransaction(AH);
+   }
+
+   if (AH->blobTxActive)
+   {
+       CommitTransactionXref(AH);
+   }
+
+   ahlog(AH, 1, "Restored %d BLOBs\n", AH->blobCount);
+}
+
+
 /*
  * Called by a format handler to initiate restoration of a blob
  */
@@ -597,6 +629,8 @@ void StartRestoreBlob(ArchiveHandle* AH, int oid)
 {
    int         loOid;
 
+   AH->blobCount++;
+
    if (!AH->createdBlobXref)
    {
        if (!AH->connection)
@@ -606,7 +640,18 @@ void StartRestoreBlob(ArchiveHandle* AH, int oid)
        AH->createdBlobXref = 1;
    }
 
-   StartTransaction(AH);
+   /*
+    * Start long-running TXs if necessary
+    */
+   if (!AH->txActive)
+   {
+       ahlog(AH, 2, "Starting BLOB transactions\n");
+       StartTransaction(AH);
+   }
+   if (!AH->blobTxActive)
+   {
+       StartTransactionXref(AH);
+   }
 
    loOid = lo_creat(AH->connection, INV_READ | INV_WRITE);
    if (loOid == 0)
@@ -628,7 +673,15 @@ void EndRestoreBlob(ArchiveHandle* AH, int oid)
     lo_close(AH->connection, AH->loFd);
     AH->writingBlob = 0;
 
-   CommitTransaction(AH);
+   /*
+    * Commit every BLOB_BATCH_SIZE blobs...
+    */
+   if ( ((AH->blobCount / BLOB_BATCH_SIZE) * BLOB_BATCH_SIZE) == AH->blobCount) 
+   {
+       ahlog(AH, 2, "Committing BLOB transactions\n");
+       CommitTransaction(AH);
+       CommitTransactionXref(AH);
+   }
 }
 
 /***********
index 41fbb5c9c06a0918aeb480969105629cd0974a94..2c7291e6c6917f678c6711c2ecac34c532546412 100644 (file)
@@ -62,7 +62,7 @@ typedef z_stream *z_streamp;
 
 #define K_VERS_MAJOR 1
 #define K_VERS_MINOR 4 
-#define K_VERS_REV 21 
+#define K_VERS_REV 22 
 
 /* Data block types */
 #define BLK_DATA 1
@@ -76,6 +76,9 @@ typedef z_stream *z_streamp;
 #define K_VERS_1_4 (( (1 * 256 + 4) * 256 + 0) * 256 + 0) /* Date & name in header */
 #define K_VERS_MAX (( (1 * 256 + 4) * 256 + 255) * 256 + 0)
 
+/* No of BLOBs to restore in 1 TX */
+#define BLOB_BATCH_SIZE    100
+
 struct _archiveHandle;
 struct _tocEntry;
 struct _restoreList;
@@ -186,6 +189,8 @@ typedef struct _archiveHandle {
    char                *pgport;
    PGconn              *connection;
    PGconn              *blobConnection;    /* Connection for BLOB xref */
+   int                 txActive;           /* Flag set if TX active on connection */
+   int                 blobTxActive;       /* Flag set if TX active on blobConnection */
    int                 connectToDB;        /* Flag to indicate if direct DB connection is required */
    int                 pgCopyIn;           /* Currently in libpq 'COPY IN' mode. */
    PQExpBuffer         pgCopyBuf;          /* Left-over data from incomplete lines in COPY IN */
@@ -193,6 +198,7 @@ typedef struct _archiveHandle {
    int                 loFd;               /* BLOB fd */
    int                 writingBlob;        /* Flag */
    int                 createdBlobXref;    /* Flag */
+   int                 blobCount;          /* # of blobs restored */
 
    int                 lastID;             /* Last internal ID for a TOC entry */
    char*               fSpec;              /* Archive File Spec */
@@ -256,8 +262,10 @@ extern int                 ReadInt(ArchiveHandle* AH);
 extern char*           ReadStr(ArchiveHandle* AH);
 extern int                 WriteStr(ArchiveHandle* AH, char* s);
 
+extern void                StartRestoreBlobs(ArchiveHandle* AH);
 extern void            StartRestoreBlob(ArchiveHandle* AH, int oid);
 extern void            EndRestoreBlob(ArchiveHandle* AH, int oid);
+extern void                EndRestoreBlobs(ArchiveHandle* AH);
 
 extern void            InitArchiveFmt_Custom(ArchiveHandle* AH);
 extern void            InitArchiveFmt_Files(ArchiveHandle* AH);
index f5b208e233ebc933b11e00a73390c51a1ce858e7..e44f02259c00e572ddbc5da2dd66d83779075e04 100644 (file)
@@ -585,6 +585,8 @@ static void _LoadBlobs(ArchiveHandle* AH)
 {
     int        oid;
 
+   StartRestoreBlobs(AH);
+
     oid = ReadInt(AH);
     while(oid != 0)
     {
@@ -593,6 +595,9 @@ static void _LoadBlobs(ArchiveHandle* AH)
        EndRestoreBlob(AH, oid);
        oid = ReadInt(AH);
     }
+
+   EndRestoreBlobs(AH);
+
 }
 
 /*
@@ -608,8 +613,8 @@ static void _skipBlobs(ArchiveHandle* AH)
     oid = ReadInt(AH);
     while(oid != 0)
     {
-   _skipData(AH);
-   oid = ReadInt(AH);
+       _skipData(AH);
+       oid = ReadInt(AH);
     }
 }
 
index d606508a36a4c8d80520be8cbbcab0e7effe3fb9..4b8873c3a2360e420ff520128148d2481b84c490 100644 (file)
@@ -675,6 +675,17 @@ void StartTransaction(ArchiveHandle* AH)
    appendPQExpBuffer(qry, "Begin;");
 
    ExecuteSqlCommand(AH, qry, "can not start database transaction");
+   AH->txActive = true;
+}
+
+void StartTransactionXref(ArchiveHandle* AH)
+{
+   PQExpBuffer     qry = createPQExpBuffer();
+
+   appendPQExpBuffer(qry, "Begin;");
+
+   _executeSqlCommand(AH, AH->blobConnection, qry, "can not start BLOB xref transaction");
+   AH->blobTxActive = true;
 }
 
 void CommitTransaction(ArchiveHandle* AH)
@@ -684,6 +695,15 @@ void CommitTransaction(ArchiveHandle* AH)
     appendPQExpBuffer(qry, "Commit;");
 
     ExecuteSqlCommand(AH, qry, "can not commit database transaction");
+   AH->txActive = false;
 }
 
+void CommitTransactionXref(ArchiveHandle* AH)
+{
+   PQExpBuffer     qry = createPQExpBuffer();
 
+   appendPQExpBuffer(qry, "Commit;");
+
+   _executeSqlCommand(AH, AH->blobConnection, qry, "can not commit BLOB xref transaction");
+   AH->blobTxActive = false;
+}
index 5d03967f583df5c1524116682c91922d8318fb64..3dfc6664fc934f349342ddf25b0ef28cde541f9e 100644 (file)
@@ -12,5 +12,7 @@ extern int ExecuteSqlCommandBuf(ArchiveHandle* AH, void *qry, int bufLen);
 extern void CreateBlobXrefTable(ArchiveHandle* AH);
 extern void InsertBlobXref(ArchiveHandle* AH, int old, int new);
 extern void StartTransaction(ArchiveHandle* AH);
+extern void StartTransactionXref(ArchiveHandle* AH);
 extern void CommitTransaction(ArchiveHandle* AH);
+extern void CommitTransactionXref(ArchiveHandle* AH);
 
index 1583a497b9c87e4c27f1d040c60e681327f75a91..1624bf14355a791fa02a53c57e13caff09df5c60 100644 (file)
@@ -318,6 +318,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt)
    lclContext*     ctx = (lclContext*)AH->formatData;
    char            fname[K_STD_BUF_SIZE];
 
+   StartRestoreBlobs(AH);
+
    ctx->blobToc = fopen("blobs.toc", PG_BINARY_R);
 
    _getBlobTocEntry(AH, &oid, fname);
@@ -331,6 +333,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt)
     }
 
    fclose(ctx->blobToc);
+
+   EndRestoreBlobs(AH);
 }
 
 
index a137513e78c2e6f15bcb1352df851465bd4a8347..cb4a9e906d22e23952e12e70b44cd3d646216ea8 100644 (file)
@@ -627,6 +627,8 @@ static void _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt)
    int             cnt;
    char            buf[4096];
 
+   StartRestoreBlobs(AH);
+
    th = tarOpen(AH, NULL, 'r'); /* Open next file */
    while (th != NULL)
    {
@@ -652,21 +654,8 @@ static void    _LoadBlobs(ArchiveHandle* AH, RestoreOptions *ropt)
        th = tarOpen(AH, NULL, 'r');
    }
 
-   /*
-    * ctx->blobToc = tarOpen(AH, "blobs.toc", 'r');
-    *
-    * _getBlobTocEntry(AH, &oid, fname);
-    *
-     * while(oid != 0)
-     * {
-    *      StartRestoreBlob(AH, oid);
-    *      _PrintFileData(AH, fname, ropt);
-    *      EndRestoreBlob(AH, oid);
-    *      _getBlobTocEntry(AH, &oid, fname);
-     * }
-    *
-    * tarClose(AH, ctx->blobToc);
-    */ 
+   EndRestoreBlobs(AH);
+
 }
 
 
index 0ae0ee00144182773c2843f96627b660b03c27e2..4b765f528806ad1c216b60ed65ba70888766a79d 100644 (file)
@@ -22,7 +22,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.176 2000/10/24 13:24:30 pjw Exp $
+ *   $Header: /cvsroot/pgsql/src/bin/pg_dump/pg_dump.c,v 1.177 2000/10/31 14:20:30 pjw Exp $
  *
  * Modifications - 6/10/96 - dave@bensoft.com - version 1.13.dhb
  *
@@ -2872,6 +2872,7 @@ dumpProcLangs(Archive *fout, FuncInfo *finfo, int numFuncs,
    int         i_lanpltrusted;
    int         i_lanplcallfoid;
    int         i_lancompiler;
+   Oid         lanoid;
    char       *lanname;
    char       *lancompiler;
    const char *lanplcallfoid;
@@ -2898,7 +2899,13 @@ dumpProcLangs(Archive *fout, FuncInfo *finfo, int numFuncs,
 
    for (i = 0; i < ntups; i++)
    {
+       lanoid = atoi(PQgetvalue(res, i, i_oid));
+       if (lanoid <= g_last_builtin_oid)
+           continue;
+
        lanplcallfoid = PQgetvalue(res, i, i_lanplcallfoid);
+
+
        for (fidx = 0; fidx < numFuncs; fidx++)
        {
            if (!strcmp(finfo[fidx].oid, lanplcallfoid))