Widen xl_len field of XLogRecord header to 32 bits, so that we'll have
authorTom Lane <tgl@sss.pgh.pa.us>
Sun, 29 Aug 2004 16:34:48 +0000 (16:34 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sun, 29 Aug 2004 16:34:48 +0000 (16:34 +0000)
a more tolerable limit on the number of subtransactions or deleted files
in COMMIT and ABORT records.  Buy back the extra space by eliminating the
xl_xact_prev field, which isn't being used for anything and is rather
unlikely ever to be used for anything.
This does not force initdb, but you do need to do pg_resetxlog if you
want to upgrade an existing 8.0 installation without initdb.

src/backend/access/transam/xlog.c
src/bin/pg_resetxlog/pg_resetxlog.c
src/include/access/xlog.h
src/include/access/xlog_internal.h

index e65c109f66517dfc48087610eb4dfa51541c62ca..e8f9ea20de7cab3e8850327f1e501f13c47e4058 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.166 2004/08/29 05:06:40 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.167 2004/08/29 16:34:47 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -409,6 +409,10 @@ static uint32 readOff = 0;
 /* Buffer for currently read page (BLCKSZ bytes) */
 static char *readBuf = NULL;
 
+/* Buffer for current ReadRecord result (expandable) */
+static char *readRecordBuf = NULL;
+static uint32 readRecordBufSize = 0;
+
 /* State information for XLOG reading */
 static XLogRecPtr ReadRecPtr;
 static XLogRecPtr EndRecPtr;
@@ -440,11 +444,9 @@ static bool RestoreArchivedFile(char *path, const char *xlogfname,
                    const char *recovername, off_t expectedSize);
 static void PreallocXlogFiles(XLogRecPtr endptr);
 static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
+static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
-static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
-                    int whichChkpt,
-                    char *buffer);
+static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
 static List *readTimeLineHistory(TimeLineID targetTLI);
 static bool existsTimeLineHistory(TimeLineID probeTLI);
 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
@@ -627,7 +629,7 @@ begin:;
     * may not be true forever.  If you need to remove the len == 0 check,
     * also remove the check for xl_len == 0 in ReadRecord, below.
     */
-   if (len == 0 || len > MAXLOGRECSZ)
+   if (len == 0)
        elog(PANIC, "invalid xlog record length %u", len);
 
    START_CRIT_SECTION();
@@ -745,14 +747,6 @@ begin:;
    /* Insert record header */
 
    record->xl_prev = Insert->PrevRecord;
-   if (no_tran)
-   {
-       record->xl_xact_prev.xlogid = 0;
-       record->xl_xact_prev.xrecoff = 0;
-   }
-   else
-       record->xl_xact_prev = MyLastRecPtr;
-
    record->xl_xid = GetCurrentTransactionId();
    record->xl_len = len;       /* doesn't include backup blocks */
    record->xl_info = info;
@@ -2316,14 +2310,14 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
  * If no valid record is available, returns NULL, or fails if emode is PANIC.
  * (emode must be either PANIC or LOG.)
  *
- * buffer is a workspace at least _INTL_MAXLOGRECSZ bytes long.  It is needed
- * to reassemble a record that crosses block boundaries.  Note that on
- * successful return, the returned record pointer always points at buffer.
+ * The record is copied into readRecordBuf, so that on successful return,
+ * the returned record pointer always points there.
  */
 static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer)
+ReadRecord(XLogRecPtr *RecPtr, int emode)
 {
    XLogRecord *record;
+   char       *buffer;
    XLogRecPtr  tmpRecPtr = EndRecPtr;
    bool        randAccess = false;
    uint32      len,
@@ -2467,6 +2461,13 @@ got_record:;
                        RecPtr->xlogid, RecPtr->xrecoff)));
        goto next_record_is_invalid;
    }
+   if (record->xl_rmid > RM_MAX_ID)
+   {
+       ereport(emode,
+               (errmsg("invalid resource manager ID %u at %X/%X",
+                    record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
+       goto next_record_is_invalid;
+   }
 
    /*
     * Compute total length of record including any appended backup
@@ -2481,24 +2482,34 @@ got_record:;
    }
 
    /*
-    * Make sure it will fit in buffer (currently, it is mechanically
-    * impossible for this test to fail, but it seems like a good idea
-    * anyway).
+    * Allocate or enlarge readRecordBuf as needed.  To avoid useless
+    * small increases, round its size to a multiple of BLCKSZ, and make
+    * sure it's at least 4*BLCKSZ to start with.  (That is enough for
+    * all "normal" records, but very large commit or abort records might
+    * need more space.)
     */
-   if (total_len > _INTL_MAXLOGRECSZ)
-   {
-       ereport(emode,
-               (errmsg("record length %u at %X/%X too long",
-                       total_len, RecPtr->xlogid, RecPtr->xrecoff)));
-       goto next_record_is_invalid;
-   }
-   if (record->xl_rmid > RM_MAX_ID)
+   if (total_len > readRecordBufSize)
    {
-       ereport(emode,
-               (errmsg("invalid resource manager ID %u at %X/%X",
-                    record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
-       goto next_record_is_invalid;
+       uint32      newSize = total_len;
+
+       newSize += BLCKSZ - (newSize % BLCKSZ);
+       newSize = Max(newSize, 4 * BLCKSZ);
+       if (readRecordBuf)
+           free(readRecordBuf);
+       readRecordBuf = (char *) malloc(newSize);
+       if (!readRecordBuf)
+       {
+           readRecordBufSize = 0;
+           /* We treat this as a "bogus data" condition */
+           ereport(emode,
+                   (errmsg("record length %u at %X/%X too long",
+                           total_len, RecPtr->xlogid, RecPtr->xrecoff)));
+           goto next_record_is_invalid;
+       }
+       readRecordBufSize = newSize;
    }
+
+   buffer = readRecordBuf;
    nextRecord = NULL;
    len = BLCKSZ - RecPtr->xrecoff % BLCKSZ;
    if (total_len > len)
@@ -3481,8 +3492,6 @@ BootStrapXLOG(void)
    record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
    record->xl_prev.xlogid = 0;
    record->xl_prev.xrecoff = 0;
-   record->xl_xact_prev.xlogid = 0;
-   record->xl_xact_prev.xrecoff = 0;
    record->xl_xid = InvalidTransactionId;
    record->xl_len = sizeof(checkPoint);
    record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
@@ -3981,12 +3990,8 @@ StartupXLOG(void)
    uint32      endLogId;
    uint32      endLogSeg;
    XLogRecord *record;
-   char       *buffer;
    uint32      freespace;
 
-   /* Use malloc() to ensure record buffer is MAXALIGNED */
-   buffer = (char *) malloc(_INTL_MAXLOGRECSZ);
-
    CritSectionCount++;
 
    /*
@@ -4063,7 +4068,7 @@ StartupXLOG(void)
         * from the checkpoint it identifies, rather than using
         * pg_control.
         */
-       record = ReadCheckpointRecord(checkPointLoc, 0, buffer);
+       record = ReadCheckpointRecord(checkPointLoc, 0);
        if (record != NULL)
        {
            ereport(LOG,
@@ -4085,7 +4090,7 @@ StartupXLOG(void)
         * according to pg_control is broken, try the next-to-last one.
         */
        checkPointLoc = ControlFile->checkPoint;
-       record = ReadCheckpointRecord(checkPointLoc, 1, buffer);
+       record = ReadCheckpointRecord(checkPointLoc, 1);
        if (record != NULL)
        {
            ereport(LOG,
@@ -4095,7 +4100,7 @@ StartupXLOG(void)
        else
        {
            checkPointLoc = ControlFile->prevCheckPoint;
-           record = ReadCheckpointRecord(checkPointLoc, 2, buffer);
+           record = ReadCheckpointRecord(checkPointLoc, 2);
            if (record != NULL)
            {
                ereport(LOG,
@@ -4198,12 +4203,12 @@ StartupXLOG(void)
        if (XLByteLT(checkPoint.redo, RecPtr))
        {
            /* back up to find the record */
-           record = ReadRecord(&(checkPoint.redo), PANIC, buffer);
+           record = ReadRecord(&(checkPoint.redo), PANIC);
        }
        else
        {
            /* just have to read next record after CheckPoint */
-           record = ReadRecord(NULL, LOG, buffer);
+           record = ReadRecord(NULL, LOG);
        }
 
        if (record != NULL)
@@ -4263,7 +4268,7 @@ StartupXLOG(void)
 
                LastRec = ReadRecPtr;
 
-               record = ReadRecord(NULL, LOG, buffer);
+               record = ReadRecord(NULL, LOG);
            } while (record != NULL && recoveryContinue);
 
            /*
@@ -4287,7 +4292,7 @@ StartupXLOG(void)
     * Re-fetch the last valid or last applied record, so we can identify
     * the exact endpoint of what we consider the valid portion of WAL.
     */
-   record = ReadRecord(&LastRec, PANIC, buffer);
+   record = ReadRecord(&LastRec, PANIC);
    EndOfLog = EndRecPtr;
    XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
 
@@ -4404,7 +4409,7 @@ StartupXLOG(void)
                            RecPtr.xlogid, RecPtr.xrecoff)));
            do
            {
-               record = ReadRecord(&RecPtr, PANIC, buffer);
+               record = ReadRecord(&RecPtr, PANIC);
                if (TransactionIdIsValid(record->xl_xid) &&
                    !TransactionIdDidCommit(record->xl_xid))
                    RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record);
@@ -4498,8 +4503,12 @@ StartupXLOG(void)
        free(readBuf);
        readBuf = NULL;
    }
-
-   free(buffer);
+   if (readRecordBuf)
+   {
+       free(readRecordBuf);
+       readRecordBuf = NULL;
+       readRecordBufSize = 0;
+   }
 }
 
 /*
@@ -4509,9 +4518,7 @@ StartupXLOG(void)
  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
  */
 static XLogRecord *
-ReadCheckpointRecord(XLogRecPtr RecPtr,
-                    int whichChkpt,
-                    char *buffer)
+ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
 {
    XLogRecord *record;
 
@@ -4535,7 +4542,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr,
        return NULL;
    }
 
-   record = ReadRecord(&RecPtr, LOG, buffer);
+   record = ReadRecord(&RecPtr, LOG);
 
    if (record == NULL)
    {
@@ -5080,9 +5087,8 @@ xlog_outrec(char *buf, XLogRecord *record)
    int         bkpb;
    int         i;
 
-   sprintf(buf + strlen(buf), "prev %X/%X; xprev %X/%X; xid %u",
+   sprintf(buf + strlen(buf), "prev %X/%X; xid %u",
            record->xl_prev.xlogid, record->xl_prev.xrecoff,
-           record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff,
            record->xl_xid);
 
    for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)
index 936026230ec93e179c8c1117ca4abc1d89dee5a4..fdbd527edcddd09983fe91d747281f2892314698 100644 (file)
@@ -23,7 +23,7 @@
  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/bin/pg_resetxlog/pg_resetxlog.c,v 1.23 2004/08/29 05:06:54 momjian Exp $
+ * $PostgreSQL: pgsql/src/bin/pg_resetxlog/pg_resetxlog.c,v 1.24 2004/08/29 16:34:48 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -645,8 +645,6 @@ WriteEmptyXLOG(void)
    record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
    record->xl_prev.xlogid = 0;
    record->xl_prev.xrecoff = 0;
-   record->xl_xact_prev.xlogid = 0;
-   record->xl_xact_prev.xrecoff = 0;
    record->xl_xid = InvalidTransactionId;
    record->xl_len = sizeof(CheckPoint);
    record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
index 8d3d3cfb62d4a576db8db6c170a70ed1e9a9bea6..eeee24bc41afc194fd52c00684c1da65ab74549e 100644 (file)
@@ -6,7 +6,7 @@
  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.57 2004/08/29 05:06:55 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.58 2004/08/29 16:34:48 tgl Exp $
  */
 #ifndef XLOG_H
 #define XLOG_H
@@ -35,18 +35,18 @@ typedef struct XLogRecord
 {
    crc64       xl_crc;         /* CRC for this record */
    XLogRecPtr  xl_prev;        /* ptr to previous record in log */
-   XLogRecPtr  xl_xact_prev;   /* ptr to previous record of this xact */
    TransactionId xl_xid;       /* xact id */
-   uint16      xl_len;         /* total len of rmgr data */
+   uint32      xl_len;         /* total len of rmgr data */
    uint8       xl_info;        /* flag bits, see below */
    RmgrId      xl_rmid;        /* resource manager for this record */
 
+   /* Depending on MAXALIGN, there are either 2 or 6 wasted bytes here */
+
    /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
 
 } XLogRecord;
 
 #define SizeOfXLogRecord   MAXALIGN(sizeof(XLogRecord))
-#define MAXLOGRECSZ            65535       /* the most that'll fit in xl_len */
 
 #define XLogRecGetData(record) ((char*) (record) + SizeOfXLogRecord)
 
index 8ce7fae15b64e117317124e3b1c19726b1bd642f..70b3f21d98608c2eeb650dde22cabdd03e04b41d 100644 (file)
@@ -11,7 +11,7 @@
  * Portions Copyright (c) 1996-2004, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/access/xlog_internal.h,v 1.4 2004/08/29 05:06:55 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/access/xlog_internal.h,v 1.5 2004/08/29 16:34:48 tgl Exp $
  */
 #ifndef XLOG_INTERNAL_H
 #define XLOG_INTERNAL_H
@@ -58,7 +58,7 @@ typedef struct XLogContRecord
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD05B /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD05C /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {
@@ -203,13 +203,6 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
 
 extern char XLogDir[MAXPGPATH];
 
-/*
- * _INTL_MAXLOGRECSZ: max space needed for a record including header and
- * any backup-block data.
- */
-#define _INTL_MAXLOGRECSZ  (SizeOfXLogRecord + MAXLOGRECSZ + \
-                            XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
-
 
 /*
  * Method table for resource managers.