Second phase of psort reconstruction project: add bookkeeping logic to
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 16 Oct 1999 19:49:28 +0000 (19:49 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 16 Oct 1999 19:49:28 +0000 (19:49 +0000)
recycle storage within sort temp file on a block-by-block basis.  This
reduces peak disk usage to essentially just the volume of data being
sorted, whereas it had been about 4x the data volume before.

src/backend/storage/file/buffile.c
src/backend/utils/sort/Makefile
src/backend/utils/sort/logtape.c [new file with mode: 0644]
src/backend/utils/sort/psort.c
src/include/storage/buffile.h
src/include/utils/logtape.h [new file with mode: 0644]
src/include/utils/psort.h

index cd7da900b4f08afcdcf6b71c08325dceaad6a203..452e3a187dfdd612c01b4accb8e42e719aa93f99 100644 (file)
@@ -6,7 +6,7 @@
  * Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/storage/file/buffile.c,v 1.1 1999/10/13 15:02:29 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/storage/file/buffile.c,v 1.2 1999/10/16 19:49:26 tgl Exp $
  *
  * NOTES:
  *
  *
  * BufFile also supports temporary files that exceed the OS file size limit
  * (by opening multiple fd.c temporary files).  This is an essential feature
- * for sorts and hashjoins on large amounts of data.  It is possible to have
- * more than one BufFile reading/writing the same temp file, although the
- * caller is responsible for avoiding ill effects from buffer overlap when
- * this is done.
+ * for sorts and hashjoins on large amounts of data.
  *-------------------------------------------------------------------------
  */
 
 #define MAX_PHYSICAL_FILESIZE  (RELSEG_SIZE * BLCKSZ)
 
 /*
- * To handle multiple BufFiles on a single logical temp file, we use this
- * data structure representing a logical file (which can be made up of
- * multiple physical files to get around the OS file size limit).
+ * This data structure represents a buffered file that consists of one or
+ * more physical files (each accessed through a virtual file descriptor
+ * managed by fd.c).
  */
-typedef struct LogicalFile
+struct BufFile
 {
-   int         refCount;       /* number of BufFiles using me */
-   bool        isTemp;         /* can only add files if this is TRUE */
    int         numFiles;       /* number of physical files in set */
    /* all files except the last have length exactly MAX_PHYSICAL_FILESIZE */
-
    File       *files;          /* palloc'd array with numFiles entries */
    long       *offsets;        /* palloc'd array with numFiles entries */
    /* offsets[i] is the current seek position of files[i].  We use this
     * to avoid making redundant FileSeek calls.
     */
-} LogicalFile;
 
-/*
- * A single file buffer looks like this.
- */
-struct BufFile
-{
-   LogicalFile *logFile;       /* the underlying LogicalFile */
+   bool        isTemp;         /* can only add files if this is TRUE */
    bool        dirty;          /* does buffer need to be written? */
    /*
-    * "current pos" is position of start of buffer within LogicalFile.
+    * "current pos" is position of start of buffer within the logical file.
     * Position as seen by user of BufFile is (curFile, curOffset + pos).
     */
    int         curFile;        /* file index (0..n) part of current pos */
@@ -84,30 +72,33 @@ struct BufFile
    char        buffer[BLCKSZ];
 };
 
-static LogicalFile *makeLogicalFile(File firstfile);
-static void extendLogicalFile(LogicalFile *file);
-static void deleteLogicalFile(LogicalFile *file);
+static BufFile *makeBufFile(File firstfile);
+static void extendBufFile(BufFile *file);
 static void BufFileLoadBuffer(BufFile *file);
 static void BufFileDumpBuffer(BufFile *file);
 static int BufFileFlush(BufFile *file);
 
 
 /*
- * Create a LogicalFile with one component file and refcount 1.
+ * Create a BufFile given the first underlying physical file.
  * NOTE: caller must set isTemp true if appropriate.
  */
-static LogicalFile *
-makeLogicalFile(File firstfile)
+static BufFile *
+makeBufFile(File firstfile)
 {
-   LogicalFile *file = (LogicalFile *) palloc(sizeof(LogicalFile));
+   BufFile    *file = (BufFile *) palloc(sizeof(BufFile));
 
-   file->refCount = 1;
-   file->isTemp = false;
    file->numFiles = 1;
    file->files = (File *) palloc(sizeof(File));
    file->files[0] = firstfile;
    file->offsets = (long *) palloc(sizeof(long));
    file->offsets[0] = 0L;
+   file->isTemp = false;
+   file->dirty = false;
+   file->curFile = 0;
+   file->curOffset = 0L;
+   file->pos = 0;
+   file->nbytes = 0;
 
    return file;
 }
@@ -116,7 +107,7 @@ makeLogicalFile(File firstfile)
  * Add another component temp file.
  */
 static void
-extendLogicalFile(LogicalFile *file)
+extendBufFile(BufFile *file)
 {
    File        pfile;
 
@@ -133,21 +124,6 @@ extendLogicalFile(LogicalFile *file)
    file->numFiles++;
 }
 
-/*
- * Close and delete a LogicalFile when its refCount has gone to zero.
- */
-static void
-deleteLogicalFile(LogicalFile *file)
-{
-   int i;
-
-   for (i = 0; i < file->numFiles; i++)
-       FileClose(file->files[i]);
-   pfree(file->files);
-   pfree(file->offsets);
-   pfree(file);
-}
-
 /*
  * Create a BufFile for a new temporary file (which will expand to become
  * multiple temporary files if more than MAX_PHYSICAL_FILESIZE bytes are
@@ -156,24 +132,16 @@ deleteLogicalFile(LogicalFile *file)
 BufFile *
 BufFileCreateTemp(void)
 {
-   BufFile    *bfile = (BufFile *) palloc(sizeof(BufFile));
+   BufFile    *file;
    File        pfile;
-   LogicalFile *lfile;
 
    pfile = OpenTemporaryFile();
    Assert(pfile >= 0);
 
-   lfile = makeLogicalFile(pfile);
-   lfile->isTemp = true;
-
-   bfile->logFile = lfile;
-   bfile->dirty = false;
-   bfile->curFile = 0;
-   bfile->curOffset = 0L;
-   bfile->pos = 0;
-   bfile->nbytes = 0;
+   file = makeBufFile(pfile);
+   file->isTemp = true;
 
-   return bfile;
+   return file;
 }
 
 /*
@@ -186,42 +154,7 @@ BufFileCreateTemp(void)
 BufFile *
 BufFileCreate(File file)
 {
-   BufFile    *bfile = (BufFile *) palloc(sizeof(BufFile));
-   LogicalFile *lfile;
-
-   lfile = makeLogicalFile(file);
-
-   bfile->logFile = lfile;
-   bfile->dirty = false;
-   bfile->curFile = 0;
-   bfile->curOffset = 0L;
-   bfile->pos = 0;
-   bfile->nbytes = 0;
-
-   return bfile;
-}
-
-/*
- * Create an additional BufFile accessing the same underlying file as an
- * existing BufFile.  This is useful for having multiple read/write access
- * positions in a single temporary file.  Note the caller is responsible
- * for avoiding trouble due to overlapping buffer positions!  (Caller may
- * assume that buffer size is BLCKSZ...)
- */
-BufFile *
-BufFileReaccess(BufFile *file)
-{
-   BufFile    *bfile = (BufFile *) palloc(sizeof(BufFile));
-
-   bfile->logFile = file->logFile;
-   bfile->logFile->refCount++;
-   bfile->dirty = false;
-   bfile->curFile = 0;
-   bfile->curOffset = 0L;
-   bfile->pos = 0;
-   bfile->nbytes = 0;
-
-   return bfile;
+   return makeBufFile(file);
 }
 
 /*
@@ -232,16 +165,21 @@ BufFileReaccess(BufFile *file)
 void
 BufFileClose(BufFile *file)
 {
+   int     i;
+
    /* flush any unwritten data */
    BufFileFlush(file);
-   /* close the underlying (with delete if it's a temp file) */
-   if (--(file->logFile->refCount) <= 0)
-       deleteLogicalFile(file->logFile);
+   /* close the underlying file(s) (with delete if it's a temp file) */
+   for (i = 0; i < file->numFiles; i++)
+       FileClose(file->files[i]);
    /* release the buffer space */
+   pfree(file->files);
+   pfree(file->offsets);
    pfree(file);
 }
 
-/* BufFileLoadBuffer
+/*
+ * BufFileLoadBuffer
  *
  * Load some data into buffer, if possible, starting from curOffset.
  * At call, must have dirty = false, pos and nbytes = 0.
@@ -250,7 +188,6 @@ BufFileClose(BufFile *file)
 static void
 BufFileLoadBuffer(BufFile *file)
 {
-   LogicalFile *lfile = file->logFile;
    File    thisfile;
 
    /*
@@ -261,30 +198,33 @@ BufFileLoadBuffer(BufFile *file)
     * MAX_PHYSICAL_FILESIZE.
     */
    if (file->curOffset >= MAX_PHYSICAL_FILESIZE &&
-       file->curFile+1 < lfile->numFiles)
+       file->curFile+1 < file->numFiles)
    {
        file->curFile++;
        file->curOffset = 0L;
    }
-   thisfile = lfile->files[file->curFile];
    /*
-    * May need to reposition physical file, if more than one BufFile
-    * is using it.
+    * May need to reposition physical file.
     */
-   if (file->curOffset != lfile->offsets[file->curFile])
+   thisfile = file->files[file->curFile];
+   if (file->curOffset != file->offsets[file->curFile])
    {
        if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset)
            return;             /* seek failed, read nothing */
-       lfile->offsets[file->curFile] = file->curOffset;
+       file->offsets[file->curFile] = file->curOffset;
    }
+   /*
+    * Read whatever we can get, up to a full bufferload.
+    */
    file->nbytes = FileRead(thisfile, file->buffer, sizeof(file->buffer));
    if (file->nbytes < 0)
        file->nbytes = 0;
-   lfile->offsets[file->curFile] += file->nbytes;
+   file->offsets[file->curFile] += file->nbytes;
    /* we choose not to advance curOffset here */
 }
 
-/* BufFileDumpBuffer
+/*
+ * BufFileDumpBuffer
  *
  * Dump buffer contents starting at curOffset.
  * At call, should have dirty = true, nbytes > 0.
@@ -293,7 +233,6 @@ BufFileLoadBuffer(BufFile *file)
 static void
 BufFileDumpBuffer(BufFile *file)
 {
-   LogicalFile *lfile = file->logFile;
    int         wpos = 0;
    int         bytestowrite;
    File        thisfile;
@@ -307,10 +246,10 @@ BufFileDumpBuffer(BufFile *file)
        /*
         * Advance to next component file if necessary and possible.
         */
-       if (file->curOffset >= MAX_PHYSICAL_FILESIZE && lfile->isTemp)
+       if (file->curOffset >= MAX_PHYSICAL_FILESIZE && file->isTemp)
        {
-           while (file->curFile+1 >= lfile->numFiles)
-               extendLogicalFile(lfile);
+           while (file->curFile+1 >= file->numFiles)
+               extendBufFile(file);
            file->curFile++;
            file->curOffset = 0L;
        }
@@ -319,28 +258,27 @@ BufFileDumpBuffer(BufFile *file)
         * to write as much as asked...
         */
        bytestowrite = file->nbytes - wpos;
-       if (lfile->isTemp)
+       if (file->isTemp)
        {
            long    availbytes = MAX_PHYSICAL_FILESIZE - file->curOffset;
 
            if ((long) bytestowrite > availbytes)
                bytestowrite = (int) availbytes;
        }
-       thisfile = lfile->files[file->curFile];
        /*
-        * May need to reposition physical file, if more than one BufFile
-        * is using it.
+        * May need to reposition physical file.
         */
-       if (file->curOffset != lfile->offsets[file->curFile])
+       thisfile = file->files[file->curFile];
+       if (file->curOffset != file->offsets[file->curFile])
        {
            if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset)
                return;         /* seek failed, give up */
-           lfile->offsets[file->curFile] = file->curOffset;
+           file->offsets[file->curFile] = file->curOffset;
        }
        bytestowrite = FileWrite(thisfile, file->buffer, bytestowrite);
        if (bytestowrite <= 0)
            return;             /* failed to write */
-       lfile->offsets[file->curFile] += bytestowrite;
+       file->offsets[file->curFile] += bytestowrite;
        file->curOffset += bytestowrite;
        wpos += bytestowrite;
    }
@@ -363,7 +301,8 @@ BufFileDumpBuffer(BufFile *file)
    file->nbytes = 0;
 }
 
-/* BufFileRead
+/*
+ * BufFileRead
  *
  * Like fread() except we assume 1-byte element size.
  */
@@ -409,7 +348,8 @@ BufFileRead(BufFile *file, void *ptr, size_t size)
    return nread;
 }
 
-/* BufFileWrite
+/*
+ * BufFileWrite
  *
  * Like fwrite() except we assume 1-byte element size.
  */
@@ -458,7 +398,8 @@ BufFileWrite(BufFile *file, void *ptr, size_t size)
    return nwritten;
 }
 
-/* BufFileFlush
+/*
+ * BufFileFlush
  *
  * Like fflush()
  */
@@ -475,9 +416,15 @@ BufFileFlush(BufFile *file)
    return 0;
 }
 
-/* BufFileSeek
+/*
+ * BufFileSeek
  *
- * Like fseek().  Result is 0 if OK, EOF if not.
+ * Like fseek(), except that target position needs two values in order to
+ * work when logical filesize exceeds maximum value representable by long.
+ * We do not support relative seeks across more than LONG_MAX, however.
+ *
+ * Result is 0 if OK, EOF if not.  Logical position is not moved if an
+ * impossible seek is attempted.
  */
 int
 BufFileSeek(BufFile *file, int fileno, long offset, int whence)
@@ -487,7 +434,7 @@ BufFileSeek(BufFile *file, int fileno, long offset, int whence)
    switch (whence)
    {
        case SEEK_SET:
-           if (fileno < 0 || fileno >= file->logFile->numFiles ||
+           if (fileno < 0 || fileno >= file->numFiles ||
                offset < 0)
                return EOF;
            newFile = fileno;
@@ -516,11 +463,11 @@ BufFileSeek(BufFile *file, int fileno, long offset, int whence)
            return EOF;
        newOffset += MAX_PHYSICAL_FILESIZE;
    }
-   if (file->logFile->isTemp)
+   if (file->isTemp)
    {
        while (newOffset > MAX_PHYSICAL_FILESIZE)
        {
-           if (++newFile >= file->logFile->numFiles)
+           if (++newFile >= file->numFiles)
                return EOF;
            newOffset -= MAX_PHYSICAL_FILESIZE;
        }
@@ -548,9 +495,44 @@ BufFileSeek(BufFile *file, int fileno, long offset, int whence)
    return 0;
 }
 
-extern void
+void
 BufFileTell(BufFile *file, int *fileno, long *offset)
 {
    *fileno = file->curFile;
    *offset = file->curOffset + file->pos;
 }
+
+/*
+ * BufFileSeekBlock --- block-oriented seek
+ *
+ * Performs absolute seek to the start of the n'th BLCKSZ-sized block of
+ * the file.  Note that users of this interface will fail if their files
+ * exceed BLCKSZ * LONG_MAX bytes, but that is quite a lot; we don't work
+ * with tables bigger than that, either...
+ *
+ * Result is 0 if OK, EOF if not.  Logical position is not moved if an
+ * impossible seek is attempted.
+ */
+int
+BufFileSeekBlock(BufFile *file, long blknum)
+{
+   return BufFileSeek(file,
+                      (int) (blknum / RELSEG_SIZE),
+                      (blknum % RELSEG_SIZE) * BLCKSZ,
+                      SEEK_SET);
+}
+
+/*
+ * BufFileTellBlock --- block-oriented tell
+ *
+ * Any fractional part of a block in the current seek position is ignored.
+ */
+long
+BufFileTellBlock(BufFile *file)
+{
+   long    blknum;
+
+   blknum = (file->curOffset + file->pos) / BLCKSZ;
+   blknum += file->curFile * RELSEG_SIZE;
+   return blknum;
+}
index f2fb18dc6ce3905ea807e20b83cb4c03b3f7b9fd..d411a89c735a3b1e667a67c20a6b6ebedb8fb6ee 100644 (file)
@@ -4,7 +4,7 @@
 #    Makefile for utils/sort
 #
 # IDENTIFICATION
-#    $Header: /cvsroot/pgsql/src/backend/utils/sort/Makefile,v 1.5 1998/04/06 00:27:37 momjian Exp $
+#    $Header: /cvsroot/pgsql/src/backend/utils/sort/Makefile,v 1.6 1999/10/16 19:49:27 tgl Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -13,7 +13,7 @@ include ../../../Makefile.global
 
 CFLAGS += -I../..
 
-OBJS = lselect.o psort.o
+OBJS = logtape.o lselect.o psort.o
 
 all: SUBSYS.o
 
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
new file mode 100644 (file)
index 0000000..8d5d34c
--- /dev/null
@@ -0,0 +1,903 @@
+/*-------------------------------------------------------------------------
+ *
+ * logtape.c
+ *   Management of "logical tapes" within temporary files.
+ *
+ * This module exists to support sorting via multiple merge passes (see
+ * psort.c).  Merging is an ideal algorithm for tape devices, but if we
+ * implement it on disk by creating a separate file for each "tape",
+ * there is an annoying problem: the peak space usage is at least twice
+ * the volume of actual data to be sorted.  (This must be so because each
+ * datum will appear in both the input and output tapes of the final
+ * merge pass.  For seven-tape polyphase merge, which is otherwise a
+ * pretty good algorithm, peak usage is more like 4x actual data volume.)
+ *
+ * We can work around this problem by recognizing that any one tape
+ * dataset (with the possible exception of the final output) is written
+ * and read exactly once in a perfectly sequential manner.  Therefore,
+ * a datum once read will not be required again, and we can recycle its
+ * space for use by the new tape dataset(s) being generated.  In this way,
+ * the total space usage is essentially just the actual data volume, plus
+ * insignificant bookkeeping and start/stop overhead.
+ *
+ * Few OSes allow arbitrary parts of a file to be released back to the OS,
+ * so we have to implement this space-recycling ourselves within a single
+ * logical file.  logtape.c exists to perform this bookkeeping and provide
+ * the illusion of N independent tape devices to psort.c.  Note that
+ * logtape.c itself depends on buffile.c to provide a "logical file" of
+ * larger size than the underlying OS may support.
+ *
+ * For simplicity, we allocate and release space in the underlying file
+ * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
+ * of which blocks in the underlying file belong to which logical tape,
+ * plus any blocks that are free (recycled and not yet reused).  Normally
+ * there are not very many free blocks, so we just keep those in a list.
+ * The blocks in each logical tape are remembered using a method borrowed
+ * from the Unix HFS filesystem: we store data block numbers in an
+ * "indirect block".  If an indirect block fills up, we write it out to
+ * the underlying file and remember its location in a second-level indirect
+ * block.  In the same way second-level blocks are remembered in third-
+ * level blocks, and so on if necessary (of course we're talking huge
+ * amounts of data here).  The topmost indirect block of a given logical
+ * tape is never actually written out to the physical file, but all lower-
+ * level indirect blocks will be.
+ *
+ * The initial write pass is guaranteed to fill the underlying file
+ * perfectly sequentially, no matter how data is divided into logical tapes.
+ * Once we begin merge passes, the access pattern becomes considerably
+ * less predictable --- but the seeking involved should be comparable to
+ * what would happen if we kept each logical tape in a separate file,
+ * so there's no serious performance penalty paid to obtain the space
+ * savings of recycling.  We try to localize the write accesses by always
+ * writing to the lowest-numbered free block when we have a choice; it's
+ * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
+ * policy for free blocks would be better?)
+ *
+ * Since all the bookkeeping and buffer memory is allocated with palloc(),
+ * and the underlying file(s) are made with OpenTemporaryFile, all resources
+ * for a logical tape set are certain to be cleaned up even if processing
+ * is aborted by elog(ERROR).  To avoid confusion, the caller should take
+ * care that all calls for a single LogicalTapeSet are made in the same
+ * palloc context.
+ * 
+ * Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *   $Header: /cvsroot/pgsql/src/backend/utils/sort/logtape.c,v 1.1 1999/10/16 19:49:27 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/buffile.h"
+#include "utils/logtape.h"
+
+/*
+ * Block indexes are "long"s, so we can fit this many per indirect block.
+ * NB: we assume this is an exact fit!
+ */
+#define BLOCKS_PER_INDIR_BLOCK  (BLCKSZ / sizeof(long))
+
+/*
+ * We use a struct like this for each active indirection level of each
+ * logical tape.  If the indirect block is not the highest level of its
+ * tape, the "nextup" link points to the next higher level.  Only the
+ * "ptrs" array is written out if we have to dump the indirect block to
+ * disk.  If "ptrs" is not completely full, we store -1L in the first
+ * unused slot at completion of the write phase for the logical tape.
+ */
+typedef struct IndirectBlock
+{
+   int         nextSlot;       /* next pointer slot to write or read */
+   struct IndirectBlock *nextup; /* parent indirect level, or NULL if top */
+   long        ptrs[BLOCKS_PER_INDIR_BLOCK]; /* indexes of contained blocks */
+} IndirectBlock;
+
+/*
+ * This data structure represents a single "logical tape" within the set
+ * of logical tapes stored in the same file.  We must keep track of the
+ * current partially-read-or-written data block as well as the active
+ * indirect block level(s).
+ */
+typedef struct LogicalTape
+{
+   IndirectBlock *indirect;    /* bottom of my indirect-block hierarchy */
+   bool        writing;        /* T while in write phase */
+   bool        frozen;         /* T if blocks should not be freed when read */
+   bool        dirty;          /* does buffer need to be written? */
+   /*
+    * The total data volume in the logical tape is numFullBlocks * BLCKSZ
+    * + lastBlockBytes.  BUT: we do not update lastBlockBytes during writing,
+    * only at completion of a write phase.
+    */
+   long        numFullBlocks;  /* number of complete blocks in log tape */
+   int         lastBlockBytes; /* valid bytes in last (incomplete) block */
+   /*
+    * Buffer for current data block.  Note we don't bother to store the
+    * actual file block number of the data block (during the write phase
+    * it hasn't been assigned yet, and during read we don't care anymore).
+    * But we do need the relative block number so we can detect end-of-tape
+    * while reading.
+    */
+   long        curBlockNumber; /* this block's logical blk# within tape */
+   int         pos;            /* next read/write position in buffer */
+   int         nbytes;         /* total # of valid bytes in buffer */
+   char        buffer[BLCKSZ];
+} LogicalTape;
+
+/*
+ * This data structure represents a set of related "logical tapes" sharing
+ * space in a single underlying file.  (But that "file" may be multiple files
+ * if needed to escape OS limits on file size; buffile.c handles that for us.)
+ * The number of tapes is fixed at creation.
+ */
+struct LogicalTapeSet
+{
+   BufFile    *pfile;          /* underlying file for whole tape set */
+   long        nFileBlocks;    /* # of blocks used in underlying file */
+   /*
+    * We store the numbers of recycled-and-available blocks in freeBlocks[].
+    * When there are no such blocks, we extend the underlying file.  Note
+    * that the block numbers in freeBlocks are always in *decreasing* order,
+    * so that removing the last entry gives us the lowest free block.
+    */
+   long       *freeBlocks;     /* resizable array */
+   int         nFreeBlocks;    /* # of currently free blocks */
+   int         freeBlocksLen;  /* current allocated length of freeBlocks[] */
+   /*
+    * tapes[] is declared size 1 since C wants a fixed size, but actually
+    * it is of length nTapes.
+    */
+   int         nTapes;         /* # of logical tapes in set */
+   LogicalTape *tapes[1];      /* must be last in struct! */
+};
+
+static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
+static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
+static long ltsGetFreeBlock(LogicalTapeSet *lts);
+static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
+static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
+                             long blocknum);
+static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
+                                  IndirectBlock *indirect,
+                                  bool freezing);
+static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
+                                        IndirectBlock *indirect);
+static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
+                                 IndirectBlock *indirect,
+                                 bool frozen);
+static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
+                                 IndirectBlock *indirect);
+static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
+
+
+/*
+ * Write a block-sized buffer to the specified block of the underlying file.
+ *
+ * NB: should not attempt to write beyond current end of file (ie, create
+ * "holes" in file), since BufFile doesn't allow that.  The first write pass
+ * must write blocks sequentially.
+ *
+ * No need for an error return convention; we elog() on any error.
+ */
+static void
+ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
+{
+   if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
+       BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
+       elog(ERROR, "ltsWriteBlock: failed to write block %ld of temporary file\n\t\tPerhaps out of disk space?",
+            blocknum);
+}
+
+/*
+ * Read a block-sized buffer from the specified block of the underlying file.
+ *
+ * No need for an error return convention; we elog() on any error.  This
+ * module should never attempt to read a block it doesn't know is there.
+ */
+static void
+ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
+{
+   if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
+       BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
+       elog(ERROR, "ltsReadBlock: failed to read block %ld of temporary file",
+            blocknum);
+}
+
+/*
+ * Select a currently unused block for writing to.
+ *
+ * NB: should only be called when writer is ready to write immediately,
+ * to ensure that first write pass is sequential.
+ */
+static long
+ltsGetFreeBlock(LogicalTapeSet *lts)
+{
+   /* If there are multiple free blocks, we select the one appearing last
+    * in freeBlocks[].  If there are none, assign the next block at the end
+    * of the file.
+    */
+   if (lts->nFreeBlocks > 0)
+       return lts->freeBlocks[--lts->nFreeBlocks];
+   else
+       return lts->nFileBlocks++;
+}
+
+/*
+ * Return a block# to the freelist.
+ */
+static void
+ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
+{
+   int     ndx;
+   long   *ptr;
+
+   /*
+    * Enlarge freeBlocks array if full.
+    */
+   if (lts->nFreeBlocks >= lts->freeBlocksLen)
+   {
+       lts->freeBlocksLen *= 2;
+       lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
+                                           lts->freeBlocksLen * sizeof(long));
+   }
+   /*
+    * Insert blocknum into array, preserving decreasing order (so that
+    * ltsGetFreeBlock returns the lowest available block number).
+    * This could get fairly slow if there were many free blocks, but
+    * we don't expect there to be very many at one time.
+    */
+   ndx = lts->nFreeBlocks++;
+   ptr = lts->freeBlocks + ndx;
+   while (ndx > 0 && ptr[-1] < blocknum)
+   {
+       ptr[0] = ptr[-1];
+       ndx--, ptr--;
+   }
+   ptr[0] = blocknum;
+}
+
+/*
+ * These routines manipulate indirect-block hierarchies.  All are recursive
+ * so that they don't have any specific limit on the depth of hierarchy.
+ */
+
+/*
+ * Record a data block number in a logical tape's lowest indirect block,
+ * or record an indirect block's number in the next higher indirect level.
+ */
+static void
+ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
+                 long blocknum)
+{
+   if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
+   {
+       /*
+        * This indirect block is full, so dump it out and recursively
+        * save its address in the next indirection level.  Create a
+        * new indirection level if there wasn't one before.
+        */
+       long    indirblock = ltsGetFreeBlock(lts);
+
+       ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
+       if (indirect->nextup == NULL)
+       {
+           indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
+           indirect->nextup->nextSlot = 0;
+           indirect->nextup->nextup = NULL;
+       }
+       ltsRecordBlockNum(lts, indirect->nextup, indirblock);
+       /*
+        * Reset to fill another indirect block at this level.
+        */
+       indirect->nextSlot = 0;
+   }
+   indirect->ptrs[indirect->nextSlot++] = blocknum;
+}
+
+/*
+ * Reset a logical tape's indirect-block hierarchy after a write pass
+ * to prepare for reading.  We dump out partly-filled blocks except
+ * at the top of the hierarchy, and we rewind each level to the start.
+ * This call returns the first data block number, or -1L if the tape
+ * is empty.
+ *
+ * Unless 'freezing' is true, release indirect blocks to the free pool after
+ * reading them.
+ */
+static long
+ltsRewindIndirectBlock(LogicalTapeSet *lts,
+                      IndirectBlock *indirect,
+                      bool freezing)
+{
+   /* Insert sentinel if block is not full */
+   if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
+       indirect->ptrs[indirect->nextSlot] = -1L;
+   /*
+    * If block is not topmost, write it out, and recurse to obtain
+    * address of first block in this hierarchy level.  Read that one in.
+    */
+   if (indirect->nextup != NULL)
+   {
+       long    indirblock = ltsGetFreeBlock(lts);
+
+       ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
+       ltsRecordBlockNum(lts, indirect->nextup, indirblock);
+       indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
+       Assert(indirblock != -1L);
+       ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
+       if (! freezing)
+           ltsReleaseBlock(lts, indirblock);
+   }
+   /*
+    * Reset my next-block pointer, and then fetch a block number if any.
+    */
+   indirect->nextSlot = 0;
+   if (indirect->ptrs[0] == -1L)
+       return -1L;
+   return indirect->ptrs[indirect->nextSlot++];
+}
+
+/*
+ * Rewind a previously-frozen indirect-block hierarchy for another read pass.
+ * This call returns the first data block number, or -1L if the tape
+ * is empty.
+ */
+static long
+ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
+                            IndirectBlock *indirect)
+{
+   /*
+    * If block is not topmost, recurse to obtain
+    * address of first block in this hierarchy level.  Read that one in.
+    */
+   if (indirect->nextup != NULL)
+   {
+       long    indirblock;
+
+       indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
+       Assert(indirblock != -1L);
+       ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
+   }
+   /*
+    * Reset my next-block pointer, and then fetch a block number if any.
+    */
+   indirect->nextSlot = 0;
+   if (indirect->ptrs[0] == -1L)
+       return -1L;
+   return indirect->ptrs[indirect->nextSlot++];
+}
+
+/*
+ * Obtain next data block number in the forward direction, or -1L if no more.
+ *
+ * Unless 'frozen' is true, release indirect blocks to the free pool after
+ * reading them.
+ */
+static long
+ltsRecallNextBlockNum(LogicalTapeSet *lts,
+                     IndirectBlock *indirect,
+                     bool frozen)
+{
+   if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
+       indirect->ptrs[indirect->nextSlot] == -1L)
+   {
+       long    indirblock;
+
+       if (indirect->nextup == NULL)
+           return -1L;         /* nothing left at this level */
+       indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
+       if (indirblock == -1L)
+           return -1L;         /* nothing left at this level */
+       ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
+       if (! frozen)
+           ltsReleaseBlock(lts, indirblock);
+       indirect->nextSlot = 0;
+   }
+   if (indirect->ptrs[indirect->nextSlot] == -1L)
+       return -1L;
+   return indirect->ptrs[indirect->nextSlot++];
+}
+
+/*
+ * Obtain next data block number in the reverse direction, or -1L if no more.
+ *
+ * Note this fetches the block# before the one last returned, no matter which
+ * direction of call returned that one.  If we fail, no change in state.
+ *
+ * This routine can only be used in 'frozen' state, so there's no need to
+ * pass a parameter telling whether to release blocks ... we never do.
+ */
+static long
+ltsRecallPrevBlockNum(LogicalTapeSet *lts,
+                     IndirectBlock *indirect)
+{
+   if (indirect->nextSlot <= 1)
+   {
+       long    indirblock;
+
+       if (indirect->nextup == NULL)
+           return -1L;         /* nothing left at this level */
+       indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
+       if (indirblock == -1L)
+           return -1L;         /* nothing left at this level */
+       ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
+       /* The previous block would only have been written out if full,
+        * so we need not search it for a -1 sentinel.
+        */
+       indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK+1;
+   }
+   indirect->nextSlot--;
+   return indirect->ptrs[indirect->nextSlot-1];
+}
+
+
+/*
+ * Create a set of logical tapes in a temporary underlying file.
+ *
+ * Each tape is initialized in write state.
+ */
+LogicalTapeSet *
+LogicalTapeSetCreate(int ntapes)
+{
+   LogicalTapeSet *lts;
+   LogicalTape    *lt;
+   int             i;
+
+   /*
+    * Create top-level struct.  First LogicalTape pointer is already
+    * counted in sizeof(LogicalTapeSet).
+    */
+   Assert(ntapes > 0);
+   lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet) +
+                                   (ntapes-1) * sizeof(LogicalTape *));
+   lts->pfile = BufFileCreateTemp();
+   lts->nFileBlocks = 0L;
+   lts->freeBlocksLen = 32;    /* reasonable initial guess */
+   lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
+   lts->nFreeBlocks = 0;
+   lts->nTapes = ntapes;
+   /*
+    * Create per-tape structs, including first-level indirect blocks.
+    */
+   for (i = 0; i < ntapes; i++)
+   {
+       lt = (LogicalTape *) palloc(sizeof(LogicalTape));
+       lts->tapes[i] = lt;
+       lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
+       lt->indirect->nextSlot = 0;
+       lt->indirect->nextup = NULL;
+       lt->writing = true;
+       lt->frozen = false;
+       lt->dirty = false;
+       lt->numFullBlocks = 0L;
+       lt->lastBlockBytes = 0;
+       lt->curBlockNumber = 0L;
+       lt->pos = 0;
+       lt->nbytes = 0;
+   }
+   return lts;
+}
+
+/*
+ * Close a logical tape set and release all resources.
+ */
+void LogicalTapeSetClose(LogicalTapeSet *lts)
+{
+   LogicalTape    *lt;
+   IndirectBlock  *ib,
+                  *nextib;
+   int             i;
+
+   BufFileClose(lts->pfile);
+   for (i = 0; i < lts->nTapes; i++)
+   {
+       lt = lts->tapes[i];
+       for (ib = lt->indirect; ib != NULL; ib = nextib)
+       {
+           nextib = ib->nextup;
+           pfree(ib);
+       }
+       pfree(lt);
+   }
+   pfree(lts->freeBlocks);
+   pfree(lts);
+}
+
+/*
+ * Dump the dirty buffer of a logical tape.
+ */
+static void
+ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+{
+   long    datablock = ltsGetFreeBlock(lts);
+
+   Assert(lt->dirty);
+   ltsWriteBlock(lts, datablock, (void *) lt->buffer);
+   ltsRecordBlockNum(lts, lt->indirect, datablock);
+   lt->dirty = false;
+   /* Caller must do other state update as needed */
+}
+
+/*
+ * Write to a logical tape.
+ *
+ * There are no error returns; we elog() on failure.
+ */
+void
+LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
+                void *ptr, size_t size)
+{
+   LogicalTape    *lt;
+   size_t          nthistime;
+
+   Assert(tapenum >= 0 && tapenum < lts->nTapes);
+   lt = lts->tapes[tapenum];
+   Assert(lt->writing);
+
+   while (size > 0)
+   {
+       if (lt->pos >= BLCKSZ)
+       {
+           /* Buffer full, dump it out */
+           if (lt->dirty)
+           {
+               ltsDumpBuffer(lts, lt);
+           }
+           else
+           {
+               /* Hmm, went directly from reading to writing? */
+               elog(ERROR, "LogicalTapeWrite: impossible state");
+           }
+           lt->numFullBlocks++;
+           lt->curBlockNumber++;
+           lt->pos = 0;
+           lt->nbytes = 0;
+       }
+
+       nthistime = BLCKSZ - lt->pos;
+       if (nthistime > size)
+           nthistime = size;
+       Assert(nthistime > 0);
+
+       memcpy(lt->buffer + lt->pos, ptr, nthistime);
+
+       lt->dirty = true;
+       lt->pos += nthistime;
+       if (lt->nbytes < lt->pos)
+           lt->nbytes = lt->pos;
+       ptr = (void *) ((char *) ptr + nthistime);
+       size -= nthistime;
+   }
+}
+
+/*
+ * Rewind logical tape and switch from writing to reading or vice versa.
+ *
+ * Unless the tape has been "frozen" in read state, forWrite must be the
+ * opposite of the previous tape state.
+ */
+void
+LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
+{
+   LogicalTape    *lt;
+   long            datablocknum;
+
+   Assert(tapenum >= 0 && tapenum < lts->nTapes);
+   lt = lts->tapes[tapenum];
+
+   if (! forWrite)
+   {
+       if (lt->writing)
+       {
+           /*
+            * Completion of a write phase.  Flush last partial data
+            * block, flush any partial indirect blocks, rewind for
+            * normal (destructive) read.
+            */
+           if (lt->dirty)
+               ltsDumpBuffer(lts, lt);
+           lt->lastBlockBytes = lt->nbytes;
+           lt->writing = false;
+           datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
+       }
+       else
+       {
+           /*
+            * This is only OK if tape is frozen; we rewind for (another)
+            * read pass.
+            */
+           Assert(lt->frozen);
+           datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
+       }
+       /* Read the first block, or reset if tape is empty */
+       lt->curBlockNumber = 0L;
+       lt->pos = 0;
+       lt->nbytes = 0;
+       if (datablocknum != -1L)
+       {
+           ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+           if (! lt->frozen)
+               ltsReleaseBlock(lts, datablocknum);
+           lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
+               BLCKSZ : lt->lastBlockBytes;
+       }
+   }
+   else
+   {
+       /*
+        * Completion of a read phase.  Rewind and prepare for write.
+        *
+        * NOTE: we assume the caller has read the tape to the end;
+        * otherwise untouched data and indirect blocks will not have
+        * been freed.  We could add more code to free any unread blocks,
+        * but in current usage of this module it'd be useless code.
+        */
+       IndirectBlock  *ib,
+                      *nextib;
+
+       Assert(! lt->writing && ! lt->frozen);
+       /* Must truncate the indirect-block hierarchy down to one level. */
+       for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
+       {
+           nextib = ib->nextup;
+           pfree(ib);
+       }
+       lt->indirect->nextSlot = 0;
+       lt->indirect->nextup = NULL;
+       lt->writing = true;
+       lt->dirty = false;
+       lt->numFullBlocks = 0L;
+       lt->lastBlockBytes = 0;
+       lt->curBlockNumber = 0L;
+       lt->pos = 0;
+       lt->nbytes = 0;
+   }
+}
+
+/*
+ * Read from a logical tape.
+ *
+ * Early EOF is indicated by return value less than #bytes requested.
+ */
+size_t
+LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
+               void *ptr, size_t size)
+{
+   LogicalTape    *lt;
+   size_t          nread = 0;
+   size_t          nthistime;
+
+   Assert(tapenum >= 0 && tapenum < lts->nTapes);
+   lt = lts->tapes[tapenum];
+   Assert(! lt->writing);
+
+   while (size > 0)
+   {
+       if (lt->pos >= lt->nbytes)
+       {
+           /* Try to load more data into buffer. */
+           long    datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
+                                                        lt->frozen);
+
+           if (datablocknum == -1L)
+               break;          /* EOF */
+           lt->curBlockNumber++;
+           lt->pos = 0;
+           ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+           if (! lt->frozen)
+               ltsReleaseBlock(lts, datablocknum);
+           lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
+               BLCKSZ : lt->lastBlockBytes;
+           if (lt->nbytes <= 0)
+               break;          /* EOF (possible here?) */
+       }
+
+       nthistime = lt->nbytes - lt->pos;
+       if (nthistime > size)
+           nthistime = size;
+       Assert(nthistime > 0);
+
+       memcpy(ptr, lt->buffer + lt->pos, nthistime);
+
+       lt->pos += nthistime;
+       ptr = (void *) ((char *) ptr + nthistime);
+       size -= nthistime;
+       nread += nthistime;
+   }
+
+   return nread;
+}
+
+/*
+ * "Freeze" the contents of a tape so that it can be read multiple times
+ * and/or read backwards.  Once a tape is frozen, its contents will not
+ * be released until the LogicalTapeSet is destroyed.  This is expected
+ * to be used only for the final output pass of a merge.
+ *
+ * This *must* be called just at the end of a write pass, before the
+ * tape is rewound (after rewind is too late!).  It performs a rewind
+ * and switch to read mode "for free".  An immediately following rewind-
+ * for-read call is OK but not necessary.
+ */
+void
+LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
+{
+   LogicalTape    *lt;
+   long            datablocknum;
+
+   Assert(tapenum >= 0 && tapenum < lts->nTapes);
+   lt = lts->tapes[tapenum];
+   Assert(lt->writing);
+
+   /*
+    * Completion of a write phase.  Flush last partial data
+    * block, flush any partial indirect blocks, rewind for
+    * nondestructive read.
+    */
+   if (lt->dirty)
+       ltsDumpBuffer(lts, lt);
+   lt->lastBlockBytes = lt->nbytes;
+   lt->writing = false;
+   lt->frozen = true;
+   datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
+   /* Read the first block, or reset if tape is empty */
+   lt->curBlockNumber = 0L;
+   lt->pos = 0;
+   lt->nbytes = 0;
+   if (datablocknum != -1L)
+   {
+       ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+       lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
+           BLCKSZ : lt->lastBlockBytes;
+   }
+}
+
+/*
+ * Backspace the tape a given number of bytes.  (We also support a more
+ * general seek interface, see below.)
+ *
+ * *Only* a frozen-for-read tape can be backed up; we don't support
+ * random access during write, and an unfrozen read tape may have
+ * already discarded the desired data!
+ *
+ * Return value is TRUE if seek successful, FALSE if there isn't that much
+ * data before the current point (in which case there's no state change).
+ */
+bool
+LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
+{
+   LogicalTape    *lt;
+   long            nblocks;
+   int             newpos;
+
+   Assert(tapenum >= 0 && tapenum < lts->nTapes);
+   lt = lts->tapes[tapenum];
+   Assert(lt->frozen);
+
+   /*
+    * Easy case for seek within current block.
+    */
+   if (size <= (size_t) lt->pos)
+   {
+       lt->pos -= (int) size;
+       return true;
+   }
+   /*
+    * Not-so-easy case.  Figure out whether it's possible at all.
+    */
+   size -= (size_t) lt->pos;   /* part within this block */
+   nblocks = size / BLCKSZ;
+   size = size % BLCKSZ;
+   if (size)
+   {
+       nblocks++;
+       newpos = (int) (BLCKSZ - size);
+   }
+   else
+       newpos = 0;
+   if (nblocks > lt->curBlockNumber)
+       return false;           /* a seek too far... */
+   /*
+    * OK, we need to back up nblocks blocks.  This implementation
+    * would be pretty inefficient for long seeks, but we really
+    * aren't expecting that (a seek over one tuple is typical).
+    */
+   while (nblocks-- > 0)
+   {
+       long    datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
+
+       if (datablocknum == -1L)
+           elog(ERROR, "LogicalTapeBackspace: unexpected end of tape");
+       lt->curBlockNumber--;
+       if (nblocks == 0)
+       {
+           ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+           lt->nbytes = BLCKSZ;
+       }
+   }
+   lt->pos = newpos;
+   return true;
+}
+
+/*
+ * Seek to an arbitrary position in a logical tape.
+ *
+ * *Only* a frozen-for-read tape can be seeked.
+ *
+ * Return value is TRUE if seek successful, FALSE if there isn't that much
+ * data in the tape (in which case there's no state change).
+ */
+bool
+LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
+               long blocknum, int offset)
+{
+   LogicalTape    *lt;
+
+   Assert(tapenum >= 0 && tapenum < lts->nTapes);
+   lt = lts->tapes[tapenum];
+   Assert(lt->frozen);
+   Assert(offset >= 0 && offset <= BLCKSZ);
+
+   /*
+    * Easy case for seek within current block.
+    */
+   if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
+   {
+       lt->pos = offset;
+       return true;
+   }
+   /*
+    * Not-so-easy case.  Figure out whether it's possible at all.
+    */
+   if (blocknum < 0 || blocknum > lt->numFullBlocks ||
+       (blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
+       return false;
+   /*
+    * OK, advance or back up to the target block.  This implementation
+    * would be pretty inefficient for long seeks, but we really
+    * aren't expecting that (a seek over one tuple is typical).
+    */
+   while (lt->curBlockNumber > blocknum)
+   {
+       long    datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
+
+       if (datablocknum == -1L)
+           elog(ERROR, "LogicalTapeSeek: unexpected end of tape");
+       if (--lt->curBlockNumber == blocknum)
+           ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+   }
+   while (lt->curBlockNumber < blocknum)
+   {
+       long    datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
+                                                    lt->frozen);
+
+       if (datablocknum == -1L)
+           elog(ERROR, "LogicalTapeSeek: unexpected end of tape");
+       if (++lt->curBlockNumber == blocknum)
+           ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+   }
+   lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
+       BLCKSZ : lt->lastBlockBytes;
+   lt->pos = offset;
+   return true;
+}
+
+/*
+ * Obtain current position in a form suitable for a later LogicalTapeSeek.
+ *
+ * NOTE: it'd be OK to do this during write phase with intention of using
+ * the position for a seek after freezing.  Not clear if anyone needs that.
+ */
+void
+LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
+               long *blocknum, int *offset)
+{
+   LogicalTape    *lt;
+
+   Assert(tapenum >= 0 && tapenum < lts->nTapes);
+   lt = lts->tapes[tapenum];
+   *blocknum = lt->curBlockNumber;
+   *offset = lt->pos;
+}
index 14db10c119837607cdce1bfa043faf793697bbb9..67cdfc292e873c992f58563b0dad1d7c7bc2e37d 100644 (file)
-/*
+/*-------------------------------------------------------------------------
+ *
  * psort.c
  *   Polyphase merge sort.
  *
- * Copyright (c) 1994, Regents of the University of California
- *
- *   $Id: psort.c,v 1.57 1999/10/13 15:02:31 tgl Exp $
+ * See Knuth, volume 3, for more than you want to know about this algorithm.
  *
  * NOTES
- *     Sorts the first relation into the second relation.
  *
- *     The old psort.c's routines formed a temporary relation from the merged
- * sort files. This version keeps the files around instead of generating the
- * relation from them, and provides interface functions to the file so that
- * you can grab tuples, mark a position in the file, restore a position in the
- * file. You must now explicitly call an interface function to end the sort,
- * psort_end, when you are done.
- *     Now most of the global variables are stuck in the Sort nodes, and
- * accessed from there (they are passed to all the psort routines) so that
- * each sort running has its own separate state. This is facilitated by having
- * the Sort nodes passed in to all the interface functions.
- *     The one global variable that all the sorts still share is SortMemory.
- *     You should now be allowed to run two or more psorts concurrently,
- * so long as the memory they eat up is not greater than SORTMEM, the initial
- * value of SortMemory.                                            -Rex 2.15.1995
+ * This needs to be generalized to handle index tuples as well as heap tuples,
+ * so that the near-duplicate code in nbtsort.c can be eliminated.  Also,
+ * I think it's got memory leak problems.
  *
- *   Use the tape-splitting method (Knuth, Vol. III, pp281-86) in the future.
+ * Copyright (c) 1994, Regents of the University of California
  *
- *     Arguments? Variables?
- *             MAXMERGE, MAXTAPES
+ * IDENTIFICATION
+ *   $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.58 1999/10/16 19:49:27 tgl Exp $
  *
+ *-------------------------------------------------------------------------
  */
+
 #include <math.h>
-#include <sys/types.h>
-#include <unistd.h>
 
 #include "postgres.h"
 
 #include "access/heapam.h"
+#include "access/relscan.h"
 #include "executor/execdebug.h"
 #include "executor/executor.h"
 #include "miscadmin.h"
+#include "utils/logtape.h"
+#include "utils/lselect.h"
 #include "utils/psort.h"
 
+#define MAXTAPES       7       /* See Knuth Fig. 70, p273 */
+
+struct tape
+{
+   int         tp_dummy;       /* (D) */
+   int         tp_fib;         /* (A) */
+   int         tp_tapenum;     /* (TAPE) */
+   struct tape *tp_prev;
+};
+
+/*
+ * Private state of a Psort operation.  The "psortstate" field in a Sort node
+ * points to one of these.  This replaces a lot of global variables that used
+ * to be here...
+ */
+typedef struct Psortstate
+{
+   LeftistContextData treeContext;
+
+   int         TapeRange;      /* number of tapes less 1 (T) */
+   int         Level;          /* Knuth's l */
+   int         TotalDummy;     /* sum of tp_dummy across all tapes */
+   struct tape Tape[MAXTAPES];
+
+   LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
+
+   int         BytesRead;      /* I/O statistics (useless) */
+   int         BytesWritten;
+   int         tupcount;
+
+   struct leftist *Tuples;     /* current tuple tree */
+
+   int         psort_grab_tape; /* tape number of finished output data */
+   long        psort_current;  /* array index (only used if not tape) */
+   /* psort_saved(_offset) holds marked position for mark and restore */
+   long        psort_saved;    /* could be tape block#, or array index */
+   int         psort_saved_offset; /* lower bits of psort_saved, if tape */
+   bool        using_tape_files;
+   bool        all_fetched;    /* this is for cursors */
+
+   HeapTuple  *memtuples;
+} Psortstate;
+
+/*
+ * PS - Macro to access and cast psortstate from a Sort node
+ */
+#define PS(N) ((Psortstate *)(N)->psortstate)
+
 static bool createfirstrun(Sort *node);
-static bool createrun(Sort *node, BufFile *file);
-static void destroytape(BufFile *file);
-static void dumptuples(BufFile *file, Sort *node);
-static BufFile *gettape(void);
+static bool createrun(Sort *node, int desttapenum);
+static void dumptuples(Sort *node, int desttapenum);
 static void initialrun(Sort *node);
 static void inittapes(Sort *node);
 static void merge(Sort *node, struct tape * dest);
-static BufFile *mergeruns(Sort *node);
+static int mergeruns(Sort *node);
 static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);
 
-
-/*
- * tlenzero used to delimit runs; both vars below must have
- * the same size as HeapTuple->t_len
- */
-static unsigned int tlenzero = 0;
-static unsigned int tlendummy;
-
 /* these are used by _psort_cmp, and are set just before calling qsort() */
 static TupleDesc PsortTupDesc;
 static ScanKey PsortKeys;
 static int PsortNkeys;
 
 /*
- * old psort global variables
- *
- * (These are the global variables from the old psort. They are still used,
- * but are now accessed from Sort nodes using the PS macro. Note that while
- * these variables will be accessed by PS(node)->whatever, they will still
- * be called by their original names within the comments!      -Rex 2.10.1995)
+ * tlenzero is used to write a zero to delimit runs, tlendummy is used
+ * to read in length words that we don't care about.
  *
- * LeftistContextData  treeContext;
- *
- * static      int     TapeRange;              number of tapes - 1 (T)
- * static      int     Level;                  (l)
- * static      int     TotalDummy;             summation of tp_dummy
- * static struct tape  *Tape;
- *
- * static      int     BytesRead;              to keep track of # of IO
- * static      int     BytesWritten;
- *
- * struct leftist      *Tuples;                current tuples in memory
- *
- * BufFile             *psort_grab_file;       this holds tuples grabbed
- *                                                from merged sort runs
- * long                    psort_current;          current file position
- * long                    psort_saved;            file position saved for
- *                                                mark and restore
+ * both vars must have the same size as HeapTuple->t_len
  */
+static unsigned int tlenzero = 0;
+static unsigned int tlendummy;
 
-/*
- * PS - Macro to access and cast psortstate from a Sort node
- */
-#define PS(N) ((Psortstate *)N->psortstate)
 
 /*
- *     psort_begin     - polyphase merge sort entry point. Sorts the subplan
- *                       into a temporary file psort_grab_file. After
- *                       this is called, calling the interface function
- *                       psort_grabtuple iteratively will get you the sorted
- *                       tuples. psort_end then finishes the sort off, after
- *                       all the tuples have been grabbed.
+ *     psort_begin
  *
- *                       Allocates and initializes sort node's psort state.
+ * polyphase merge sort entry point. Sorts the subplan
+ * into memory or a temporary file. After
+ * this is called, calling the interface function
+ * psort_grabtuple iteratively will get you the sorted
+ * tuples. psort_end releases storage when done.
+ *
+ * Allocates and initializes sort node's psort state.
  */
 bool
 psort_begin(Sort *node, int nkeys, ScanKey key)
 {
-
-   node->psortstate = (struct Psortstate *) palloc(sizeof(struct Psortstate));
-
    AssertArg(nkeys >= 1);
    AssertArg(key[0].sk_attno != 0);
    AssertArg(key[0].sk_procedure != 0);
 
-   PS(node)->BytesRead = 0;
-   PS(node)->BytesWritten = 0;
+   node->psortstate = (void *) palloc(sizeof(struct Psortstate));
+
    PS(node)->treeContext.tupDesc = ExecGetTupType(outerPlan((Plan *) node));
    PS(node)->treeContext.nKeys = nkeys;
    PS(node)->treeContext.scanKeys = key;
    PS(node)->treeContext.sortMem = SortMem * 1024;
 
-   PS(node)->Tuples = NULL;
+   PS(node)->tapeset = NULL;
+
+   PS(node)->BytesRead = 0;
+   PS(node)->BytesWritten = 0;
    PS(node)->tupcount = 0;
 
+   PS(node)->Tuples = NULL;
+
    PS(node)->using_tape_files = false;
    PS(node)->all_fetched = false;
-   PS(node)->psort_grab_file = NULL;
+   PS(node)->psort_grab_tape = -1;
+
    PS(node)->memtuples = NULL;
 
    initialrun(node);
@@ -138,12 +148,12 @@ psort_begin(Sort *node, int nkeys, ScanKey key)
    if (PS(node)->tupcount == 0)
        return false;
 
-   if (PS(node)->using_tape_files && PS(node)->psort_grab_file == NULL)
-       PS(node)->psort_grab_file = mergeruns(node);
+   if (PS(node)->using_tape_files && PS(node)->psort_grab_tape == -1)
+       PS(node)->psort_grab_tape = mergeruns(node);
 
-   PS(node)->psort_current = 0;
-   PS(node)->psort_saved_fileno = 0;
+   PS(node)->psort_current = 0L;
    PS(node)->psort_saved = 0L;
+   PS(node)->psort_saved_offset = 0;
 
    return true;
 }
@@ -151,8 +161,8 @@ psort_begin(Sort *node, int nkeys, ScanKey key)
 /*
  *     inittapes       - initializes the tapes
  *                     - (polyphase merge Alg.D(D1)--Knuth, Vol.3, p.270)
- *     Returns:
- *             number of allocated tapes
+ *
+ * This is called only if we have found we don't have room to sort in memory.
  */
 static void
 inittapes(Sort *node)
@@ -163,16 +173,14 @@ inittapes(Sort *node)
    Assert(node != (Sort *) NULL);
    Assert(PS(node) != (Psortstate *) NULL);
 
-   /*
-    * ASSERT(ntapes >= 3 && ntapes <= MAXTAPES, "inittapes: Invalid
-    * number of tapes to initialize.\n");
-    */
+   PS(node)->tapeset = LogicalTapeSetCreate(MAXTAPES);
 
    tp = PS(node)->Tape;
-   for (i = 0; i < MAXTAPES && (tp->tp_file = gettape()) != NULL; i++)
+   for (i = 0; i < MAXTAPES; i++)
    {
        tp->tp_dummy = 1;
        tp->tp_fib = 1;
+       tp->tp_tapenum = i;
        tp->tp_prev = tp - 1;
        tp++;
    }
@@ -181,10 +189,6 @@ inittapes(Sort *node)
    tp->tp_fib = 0;
    PS(node)->Tape[0].tp_prev = tp;
 
-   if (PS(node)->TapeRange <= 1)
-       elog(ERROR, "inittapes: Could only allocate %d < 3 tapes\n",
-            PS(node)->TapeRange + 1);
-
    PS(node)->Level = 1;
    PS(node)->TotalDummy = PS(node)->TapeRange;
 
@@ -194,9 +198,9 @@ inittapes(Sort *node)
 /*
  *     PUTTUP          - writes the next tuple
  *     ENDRUN          - mark end of run
- *     GETLEN          - reads the length of the next tuple
+ *     TRYGETLEN       - reads the length of the next tuple, if any
+ *     GETLEN          - reads the length of the next tuple, must be one
  *     ALLOCTUP        - returns space for the new tuple
- *     SETTUPLEN       - stores the length into the tuple
  *     GETTUP          - reads the tuple
  *
  *     Note:
@@ -204,31 +208,47 @@ inittapes(Sort *node)
  */
 
 
-#define PUTTUP(NODE, TUP, FP) \
+#define PUTTUP(NODE, TUP, TAPE) \
 ( \
    (TUP)->t_len += HEAPTUPLESIZE, \
-   ((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len, \
-   BufFileWrite(FP, (char *)TUP, (TUP)->t_len), \
-   BufFileWrite(FP, (char *)&((TUP)->t_len), sizeof(tlendummy)), \
+   PS(NODE)->BytesWritten += (TUP)->t_len, \
+   LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void*)(TUP), (TUP)->t_len), \
+   LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void*)&((TUP)->t_len), sizeof(tlendummy)), \
    (TUP)->t_len -= HEAPTUPLESIZE \
 )
 
-#define ENDRUN(FP)     BufFileWrite(FP, (char *)&tlenzero, sizeof(tlenzero))
-#define GETLEN(LEN, FP) BufFileRead(FP, (char *)&(LEN), sizeof(tlenzero))
-#define ALLOCTUP(LEN)  ((HeapTuple)palloc((unsigned)LEN))
-#define FREE(x)            pfree((char *) x)
-#define GETTUP(NODE, TUP, LEN, FP) \
-( \
-   IncrProcessed(), \
-   ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof(tlenzero), \
-   BufFileRead(FP, (char *)(TUP) + sizeof(tlenzero), (LEN) - sizeof(tlenzero)), \
-   (TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE), \
-   BufFileRead(FP, (char *)&tlendummy, sizeof(tlendummy)) \
-)
+#define ENDRUN(NODE, TAPE) \
+   LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void *)&tlenzero, sizeof(tlenzero))
+
+#define TRYGETLEN(NODE, LEN, TAPE) \
+   (LogicalTapeRead(PS(NODE)->tapeset, (TAPE), \
+                    (void *) &(LEN), sizeof(tlenzero)) == sizeof(tlenzero) \
+    && (LEN) != 0)
 
-#define SETTUPLEN(TUP, LEN)        ((TUP)->t_len = (LEN) - HEAPTUPLESIZE)
+#define GETLEN(NODE, LEN, TAPE) \
+   do { \
+       if (! TRYGETLEN(NODE, LEN, TAPE)) \
+           elog(ERROR, "psort: unexpected end of data"); \
+   } while(0)
 
-#define rewind(FP)     BufFileSeek(FP, 0, 0L, SEEK_SET)
+static void GETTUP(Sort *node, HeapTuple tup, unsigned int len, int tape)
+{
+   IncrProcessed();
+   PS(node)->BytesRead += len;
+   if (LogicalTapeRead(PS(node)->tapeset, tape,
+                       ((char *) tup) + sizeof(tlenzero),
+                       len - sizeof(tlenzero)) != len - sizeof(tlenzero))
+       elog(ERROR, "psort: unexpected end of data");
+   tup->t_len = len - HEAPTUPLESIZE;
+   tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
+   if (LogicalTapeRead(PS(node)->tapeset, tape,
+                       (void *) &tlendummy,
+                       sizeof(tlendummy)) != sizeof(tlendummy))
+       elog(ERROR, "psort: unexpected end of data");
+}
+
+#define ALLOCTUP(LEN)  ((HeapTuple) palloc(LEN))
+#define FREE(x)            pfree((char *) (x))
 
  /*
   * USEMEM         - record use of memory FREEMEM         - record
@@ -268,10 +288,10 @@ inittapes(Sort *node)
 static void
 initialrun(Sort *node)
 {
-   /* struct tuple   *tup; */
    struct tape *tp;
    int         baseruns;       /* D:(a) */
    int         extrapasses;    /* EOF */
+   int         tapenum;
 
    Assert(node != (Sort *) NULL);
    Assert(PS(node) != (Psortstate *) NULL);
@@ -284,8 +304,8 @@ initialrun(Sort *node)
        extrapasses = 0;
    }
    else
-/* all tuples fetched */
    {
+       /* all tuples fetched */
        if (!PS(node)->using_tape_files)        /* empty or sorted in
                                                 * memory */
            return;
@@ -297,8 +317,9 @@ initialrun(Sort *node)
         */
        if (PS(node)->Tuples == NULL)
        {
-           PS(node)->psort_grab_file = PS(node)->Tape->tp_file;
-           rewind(PS(node)->psort_grab_file);
+           PS(node)->psort_grab_tape = PS(node)->Tape[0].tp_tapenum;
+           /* freeze and rewind the finished output tape */
+           LogicalTapeFreeze(PS(node)->tapeset, PS(node)->psort_grab_tape);
            return;
        }
        extrapasses = 2;
@@ -334,19 +355,20 @@ initialrun(Sort *node)
        {
            if (--extrapasses)
            {
-               dumptuples(tp->tp_file, node);
-               ENDRUN(tp->tp_file);
+               dumptuples(node, tp->tp_tapenum);
+               ENDRUN(node, tp->tp_tapenum);
                continue;
            }
            else
                break;
        }
-       if ((bool) createrun(node, tp->tp_file) == false)
+       if (createrun(node, tp->tp_tapenum) == false)
            extrapasses = 1 + (PS(node)->Tuples != NULL);
        /* D2 */
    }
-   for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
-       rewind(tp->tp_file);    /* D. */
+   /* End of step D2: rewind all output tapes to prepare for merging */
+   for (tapenum = 0; tapenum < PS(node)->TapeRange; tapenum++)
+       LogicalTapeRewind(PS(node)->tapeset, tapenum, false);
 }
 
 /*
@@ -374,7 +396,7 @@ createfirstrun(Sort *node)
    Assert(PS(node)->memtuples == NULL);
    Assert(PS(node)->tupcount == 0);
    if (LACKMEM(node))
-       elog(ERROR, "psort: LACKMEM in createfirstrun");
+       elog(ERROR, "psort: LACKMEM before createfirstrun");
 
    memtuples = palloc(t_free * sizeof(HeapTuple));
 
@@ -439,7 +461,7 @@ createfirstrun(Sort *node)
        for (t = t_last - 1; t >= 0; t--)
            puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext);
        pfree(memtuples);
-       foundeor = !createrun(node, PS(node)->Tape->tp_file);
+       foundeor = ! createrun(node, PS(node)->Tape->tp_tapenum);
    }
    else
    {
@@ -451,8 +473,10 @@ createfirstrun(Sort *node)
 }
 
 /*
- *     createrun       - places the next run on file, grabbing the tuples by
- *                     executing the subplan passed in
+ *     createrun
+ *
+ * Create the next run and write it to desttapenum, grabbing the tuples by
+ * executing the subplan passed in
  *
  *     Uses:
  *             Tuples, which should contain any tuples for this run
@@ -462,7 +486,7 @@ createfirstrun(Sort *node)
  *             Tuples contains the tuples for the following run upon exit
  */
 static bool
-createrun(Sort *node, BufFile *file)
+createrun(Sort *node, int desttapenum)
 {
    HeapTuple   lasttuple;
    HeapTuple   tup;
@@ -492,7 +516,7 @@ createrun(Sort *node, BufFile *file)
            }
            lasttuple = gettuple(&PS(node)->Tuples, &junk,
                                 &PS(node)->treeContext);
-           PUTTUP(node, lasttuple, file);
+           PUTTUP(node, lasttuple, desttapenum);
            TRACEOUT(createrun, lasttuple);
        }
 
@@ -545,8 +569,8 @@ createrun(Sort *node, BufFile *file)
        FREE(lasttuple);
        TRACEMEM(createrun);
    }
-   dumptuples(file, node);
-   ENDRUN(file);               /* delimit the end of the run */
+   dumptuples(node, desttapenum);
+   ENDRUN(node, desttapenum);      /* delimit the end of the run */
 
    t_last++;
    /* put tuples for the next run into leftist tree */
@@ -573,28 +597,31 @@ createrun(Sort *node, BufFile *file)
  *                       (polyphase merge Alg.D(D6)--Knuth, Vol.3, p271)
  *
  *     Returns:
- *             file of tuples in order
+ *             tape number of finished tape containing all tuples in order
  */
-static BufFile *
+static int
 mergeruns(Sort *node)
 {
    struct tape *tp;
 
    Assert(node != (Sort *) NULL);
    Assert(PS(node) != (Psortstate *) NULL);
-   Assert(PS(node)->using_tape_files == true);
+   Assert(PS(node)->using_tape_files);
 
    tp = PS(node)->Tape + PS(node)->TapeRange;
    merge(node, tp);
-   rewind(tp->tp_file);
    while (--PS(node)->Level != 0)
    {
+       /* rewind output tape to use as new input */
+       LogicalTapeRewind(PS(node)->tapeset, tp->tp_tapenum, false);
        tp = tp->tp_prev;
-       rewind(tp->tp_file);
+       /* rewind new output tape and prepare it for write pass */
+       LogicalTapeRewind(PS(node)->tapeset, tp->tp_tapenum, true);
        merge(node, tp);
-       rewind(tp->tp_file);
    }
-   return tp->tp_file;
+   /* freeze and rewind the final output tape */
+   LogicalTapeFreeze(PS(node)->tapeset, tp->tp_tapenum);
+   return tp->tp_tapenum;
 }
 
 /*
@@ -608,7 +635,7 @@ merge(Sort *node, struct tape * dest)
    struct tape *lasttp;        /* (TAPE[P]) */
    struct tape *tp;
    struct leftist *tuples;
-   BufFile    *destfile;
+   int         desttapenum;
    int         times;          /* runs left to merge */
    int         outdummy;       /* complete dummy runs */
    short       fromtape;
@@ -616,7 +643,7 @@ merge(Sort *node, struct tape * dest)
 
    Assert(node != (Sort *) NULL);
    Assert(PS(node) != (Psortstate *) NULL);
-   Assert(PS(node)->using_tape_files == true);
+   Assert(PS(node)->using_tape_files);
 
    lasttp = dest->tp_prev;
    times = lasttp->tp_fib;
@@ -641,19 +668,18 @@ merge(Sort *node, struct tape * dest)
        /* do not add the outdummy runs yet */
        times -= outdummy;
    }
-   destfile = dest->tp_file;
+   desttapenum = dest->tp_tapenum;
    while (times-- != 0)
    {                           /* merge one run */
        tuples = NULL;
        if (PS(node)->TotalDummy == 0)
            for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev)
            {
-               GETLEN(tuplen, tp->tp_file);
+               GETLEN(node, tuplen, tp->tp_tapenum);
                tup = ALLOCTUP(tuplen);
                USEMEM(node, tuplen);
                TRACEMEM(merge);
-               SETTUPLEN(tup, tuplen);
-               GETTUP(node, tup, tuplen, tp->tp_file);
+               GETTUP(node, tup, tuplen, tp->tp_tapenum);
                puttuple(&tuples, tup, tp - PS(node)->Tape,
                         &PS(node)->treeContext);
            }
@@ -668,12 +694,11 @@ merge(Sort *node, struct tape * dest)
                }
                else
                {
-                   GETLEN(tuplen, tp->tp_file);
+                   GETLEN(node, tuplen, tp->tp_tapenum);
                    tup = ALLOCTUP(tuplen);
                    USEMEM(node, tuplen);
                    TRACEMEM(merge);
-                   SETTUPLEN(tup, tuplen);
-                   GETTUP(node, tup, tuplen, tp->tp_file);
+                   GETTUP(node, tup, tuplen, tp->tp_tapenum);
                    puttuple(&tuples, tup, tp - PS(node)->Tape,
                             &PS(node)->treeContext);
                }
@@ -683,38 +708,34 @@ merge(Sort *node, struct tape * dest)
        {
            /* possible optimization by using count in tuples */
            tup = gettuple(&tuples, &fromtape, &PS(node)->treeContext);
-           PUTTUP(node, tup, destfile);
+           PUTTUP(node, tup, desttapenum);
            FREEMEM(node, tup->t_len);
            FREE(tup);
            TRACEMEM(merge);
-           GETLEN(tuplen, PS(node)->Tape[fromtape].tp_file);
-           if (tuplen == 0)
-               ;
-           else
+           if (TRYGETLEN(node, tuplen, PS(node)->Tape[fromtape].tp_tapenum))
            {
                tup = ALLOCTUP(tuplen);
                USEMEM(node, tuplen);
                TRACEMEM(merge);
-               SETTUPLEN(tup, tuplen);
-               GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_file);
+               GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_tapenum);
                puttuple(&tuples, tup, fromtape, &PS(node)->treeContext);
            }
        }
-       ENDRUN(destfile);
+       ENDRUN(node, desttapenum);
    }
    PS(node)->TotalDummy += outdummy;
 }
 
 /*
- * dumptuples  - stores all the tuples in tree into file
+ * dumptuples  - stores all the tuples remaining in tree to dest tape
  */
 static void
-dumptuples(BufFile *file, Sort *node)
+dumptuples(Sort *node, int desttapenum)
 {
+   LeftistContext context = &PS(node)->treeContext;
+   struct leftist **treep = &PS(node)->Tuples;
    struct leftist *tp;
    struct leftist *newp;
-   struct leftist **treep = &PS(node)->Tuples;
-   LeftistContext context = &PS(node)->treeContext;
    HeapTuple   tup;
 
    Assert(PS(node)->using_tape_files);
@@ -728,7 +749,7 @@ dumptuples(BufFile *file, Sort *node)
        else
            newp = lmerge(tp->lt_left, tp->lt_right, context);
        pfree(tp);
-       PUTTUP(node, tup, file);
+       PUTTUP(node, tup, desttapenum);
        FREEMEM(node, tup->t_len);
        FREE(tup);
 
@@ -760,11 +781,10 @@ psort_grabtuple(Sort *node, bool *should_free)
        {
            if (PS(node)->all_fetched)
                return NULL;
-           if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0)
+           if (TRYGETLEN(node, tuplen, PS(node)->psort_grab_tape))
            {
                tup = ALLOCTUP(tuplen);
-               SETTUPLEN(tup, tuplen);
-               GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
+               GETTUP(node, tup, tuplen, PS(node)->psort_grab_tape);
                return tup;
            }
            else
@@ -786,10 +806,11 @@ psort_grabtuple(Sort *node, bool *should_free)
             * length word.  If seek fails we must have a completely empty
             * file.
             */
-           if (BufFileSeek(PS(node)->psort_grab_file, 0,
-                           - (long) (2 * sizeof(tlendummy)), SEEK_CUR))
+           if (! LogicalTapeBackspace(PS(node)->tapeset,
+                                      PS(node)->psort_grab_tape,
+                                      2 * sizeof(tlendummy)))
                return NULL;
-           GETLEN(tuplen, PS(node)->psort_grab_file);
+           GETLEN(node, tuplen, PS(node)->psort_grab_tape);
            PS(node)->all_fetched = false;
        }
        else
@@ -798,28 +819,29 @@ psort_grabtuple(Sort *node, bool *should_free)
             * Back up and fetch prev tuple's ending length word.
             * If seek fails, assume we are at start of file.
             */
-           if (BufFileSeek(PS(node)->psort_grab_file, 0,
-                           - (long) sizeof(tlendummy), SEEK_CUR))
+           if (! LogicalTapeBackspace(PS(node)->tapeset,
+                                      PS(node)->psort_grab_tape,
+                                      sizeof(tlendummy)))
                return NULL;
-           GETLEN(tuplen, PS(node)->psort_grab_file);
-           if (tuplen == 0)
-               elog(ERROR, "psort_grabtuple: tuplen is 0 in backward scan");
+           GETLEN(node, tuplen, PS(node)->psort_grab_tape);
            /*
             * Back up to get ending length word of tuple before it.
             */
-           if (BufFileSeek(PS(node)->psort_grab_file, 0,
-                           - (long) (tuplen + 2*sizeof(tlendummy)), SEEK_CUR))
+           if (! LogicalTapeBackspace(PS(node)->tapeset,
+                                      PS(node)->psort_grab_tape,
+                                      tuplen + 2*sizeof(tlendummy)))
            {
                /* If fail, presumably the prev tuple is the first in the file.
                 * Back up so that it becomes next to read in forward direction
                 * (not obviously right, but that is what in-memory case does)
                 */
-               if (BufFileSeek(PS(node)->psort_grab_file, 0,
-                               - (long) (tuplen + sizeof(tlendummy)), SEEK_CUR))
+               if (! LogicalTapeBackspace(PS(node)->tapeset,
+                                          PS(node)->psort_grab_tape,
+                                          tuplen + sizeof(tlendummy)))
                    elog(ERROR, "psort_grabtuple: too big last tuple len in backward scan");
                return NULL;
            }
-           GETLEN(tuplen, PS(node)->psort_grab_file);
+           GETLEN(node, tuplen, PS(node)->psort_grab_tape);
        }
 
        /*
@@ -827,12 +849,12 @@ psort_grabtuple(Sort *node, bool *should_free)
         * Note: GETTUP expects we are positioned after the initial length
         * word of the tuple, so back up to that point.
         */
-       if (BufFileSeek(PS(node)->psort_grab_file, 0,
-                       - (long) tuplen, SEEK_CUR))
+       if (! LogicalTapeBackspace(PS(node)->tapeset,
+                                  PS(node)->psort_grab_tape,
+                                  tuplen))
            elog(ERROR, "psort_grabtuple: too big tuple len in backward scan");
        tup = ALLOCTUP(tuplen);
-       SETTUPLEN(tup, tuplen);
-       GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
+       GETTUP(node, tup, tuplen, PS(node)->psort_grab_tape);
        return tup;
    }
    else
@@ -880,9 +902,10 @@ psort_markpos(Sort *node)
    Assert(PS(node) != (Psortstate *) NULL);
 
    if (PS(node)->using_tape_files == true)
-       BufFileTell(PS(node)->psort_grab_file,
-                   & PS(node)->psort_saved_fileno,
-                   & PS(node)->psort_saved);
+       LogicalTapeTell(PS(node)->tapeset,
+                       PS(node)->psort_grab_tape,
+                       & PS(node)->psort_saved,
+                       & PS(node)->psort_saved_offset);
    else
        PS(node)->psort_saved = PS(node)->psort_current;
 }
@@ -898,46 +921,41 @@ psort_restorepos(Sort *node)
    Assert(PS(node) != (Psortstate *) NULL);
 
    if (PS(node)->using_tape_files == true)
-       BufFileSeek(PS(node)->psort_grab_file,
-                   PS(node)->psort_saved_fileno,
-                   PS(node)->psort_saved,
-                   SEEK_SET);
+   {
+       if (! LogicalTapeSeek(PS(node)->tapeset,
+                             PS(node)->psort_grab_tape,
+                             PS(node)->psort_saved,
+                             PS(node)->psort_saved_offset))
+           elog(ERROR, "psort_restorepos failed");
+   }
    else
        PS(node)->psort_current = PS(node)->psort_saved;
 }
 
 /*
- *     psort_end       - unlinks the tape files, and cleans up. Should not be
- *                       called unless psort_grabtuple has returned a NULL.
+ * psort_end
+ *
+ * Release resources and clean up.
  */
 void
 psort_end(Sort *node)
 {
-   struct tape *tp;
-
-   if (!node->cleaned)
+   /* node->cleaned is probably redundant? */
+   if (!node->cleaned && PS(node) != (Psortstate *) NULL)
    {
+       if (PS(node)->tapeset)
+           LogicalTapeSetClose(PS(node)->tapeset);
+       if (PS(node)->memtuples)
+           pfree(PS(node)->memtuples);
 
-       /*
-        * I'm changing this because if we are sorting a relation with no
-        * tuples, psortstate is NULL.
-        */
-       if (PS(node) != (Psortstate *) NULL)
-       {
-           if (PS(node)->using_tape_files == true)
-               for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
-                   destroytape(tp->tp_file);
-           else if (PS(node)->memtuples)
-               pfree(PS(node)->memtuples);
-
-           NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ);
-           NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ);
+       /* XXX what about freeing leftist tree and tuples in memory? */
 
-           pfree((void *) node->psortstate);
-           node->psortstate = NULL;
+       NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ);
+       NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ);
 
-           node->cleaned = TRUE;
-       }
+       pfree((void *) node->psortstate);
+       node->psortstate = NULL;
+       node->cleaned = TRUE;
    }
 }
 
@@ -951,46 +969,22 @@ psort_rescan(Sort *node)
    if (((Plan *) node)->lefttree->chgParam != NULL)
    {
        psort_end(node);
-       node->cleaned = false;
+       node->cleaned = false;  /* huh? */
    }
    else if (PS(node) != (Psortstate *) NULL)
    {
        PS(node)->all_fetched = false;
        PS(node)->psort_current = 0;
-       PS(node)->psort_saved_fileno = 0;
        PS(node)->psort_saved = 0L;
+       PS(node)->psort_saved_offset = 0;
        if (PS(node)->using_tape_files == true)
-           rewind(PS(node)->psort_grab_file);
+           LogicalTapeRewind(PS(node)->tapeset,
+                             PS(node)->psort_grab_tape,
+                             false);
    }
 
 }
 
-/*
- *     gettape         - returns an open stream for writing/reading
- *
- *     Returns:
- *             Open stream for writing/reading.
- *             NULL if unable to open temporary file.
- *
- * There used to be a lot of cruft here to try to ensure that we destroyed
- * all the tape files; but it didn't really work.  Now we rely on fd.c to
- * clean up temp files if an error occurs.
- */
-static BufFile *
-gettape()
-{
-   return BufFileCreateTemp();
-}
-
-/*
- *     destroytape     - unlinks the tape
- */
-static void
-destroytape(BufFile *file)
-{
-   BufFileClose(file);
-}
-
 static int
 _psort_cmp(HeapTuple *ltup, HeapTuple *rtup)
 {
index 2416d645cfe56e8b7f8eb040a4e49d0395b4cdcc..bc472b9d14ebbeec6c9ed58eeea2b13cb773f75c 100644 (file)
@@ -17,7 +17,7 @@
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: buffile.h,v 1.1 1999/10/13 15:02:32 tgl Exp $
+ * $Id: buffile.h,v 1.2 1999/10/16 19:49:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -37,11 +37,12 @@ typedef struct BufFile BufFile;
 
 extern BufFile *BufFileCreateTemp(void);
 extern BufFile *BufFileCreate(File file);
-extern BufFile *BufFileReaccess(BufFile *file);
 extern void BufFileClose(BufFile *file);
 extern size_t BufFileRead(BufFile *file, void *ptr, size_t size);
 extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size);
 extern int BufFileSeek(BufFile *file, int fileno, long offset, int whence);
 extern void BufFileTell(BufFile *file, int *fileno, long *offset);
+extern int BufFileSeekBlock(BufFile *file, long blknum);
+extern long BufFileTellBlock(BufFile *file);
 
 #endif  /* BUFFILE_H */
diff --git a/src/include/utils/logtape.h b/src/include/utils/logtape.h
new file mode 100644 (file)
index 0000000..16fc2c1
--- /dev/null
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ *
+ * logtape.h
+ *   Management of "logical tapes" within temporary files.
+ *
+ * See logtape.c for explanations.
+ *
+ * Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id: logtape.h,v 1.1 1999/10/16 19:49:28 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef LOGTAPE_H
+#define LOGTAPE_H
+
+/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
+
+typedef struct LogicalTapeSet LogicalTapeSet;
+
+/*
+ * prototypes for functions in logtape.c
+ */
+
+extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes);
+extern void LogicalTapeSetClose(LogicalTapeSet *lts);
+extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
+                             void *ptr, size_t size);
+extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
+                            void *ptr, size_t size);
+extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite);
+extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
+extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
+                                size_t size);
+extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
+                           long blocknum, int offset);
+extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
+                           long *blocknum, int *offset);
+
+#endif  /* LOGTAPE_H */
index 9a100bad0d8b6ae5a5c38620ebe60bb0fe39e8cb..5f7a638442deb3c6c19141b34bdd1fff782d05ae 100644 (file)
 /*-------------------------------------------------------------------------
  *
  * psort.h
- *
- *
+ *   Polyphase merge sort.
  *
  * Copyright (c) 1994, Regents of the University of California
  *
- * $Id: psort.h,v 1.22 1999/10/13 15:02:28 tgl Exp $
+ * $Id: psort.h,v 1.23 1999/10/16 19:49:28 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef PSORT_H
 #define PSORT_H
 
-#include "access/relscan.h"
+#include "access/htup.h"
+#include "access/skey.h"
 #include "nodes/plannodes.h"
-#include "storage/buffile.h"
-#include "utils/lselect.h"
-
-#define MAXTAPES       7       /* See Knuth Fig. 70, p273 */
-
-struct tape
-{
-   int         tp_dummy;       /* (D) */
-   int         tp_fib;         /* (A) */
-   BufFile    *tp_file;        /* (TAPE) */
-   struct tape *tp_prev;
-};
-
-struct cmplist
-{
-   int         cp_attn;        /* attribute number */
-   int         cp_num;         /* comparison function code */
-   int         cp_rev;         /* invert comparison flag */
-   struct cmplist *cp_next;    /* next in chain */
-};
-
-/* This structure preserves the state of psort between calls from different
- * nodes to its interface functions. Basically, it includes all of the global
- * variables in psort. In case you were wondering, pointers to these structures
- * are included in Sort node structures.                       -Rex 2.6.1995
- */
-typedef struct Psortstate
-{
-   LeftistContextData treeContext;
-
-   int         TapeRange;
-   int         Level;
-   int         TotalDummy;
-   struct tape Tape[MAXTAPES];
-
-   int         BytesRead;
-   int         BytesWritten;
-   int         tupcount;
-
-   struct leftist *Tuples;
-
-   BufFile    *psort_grab_file;
-   long        psort_current;  /* array index (only used if not tape) */
-   int         psort_saved_fileno; /* upper bits of psort_saved, if tape */
-   long        psort_saved;    /* could be file offset, or array index */
-   bool        using_tape_files;
-   bool        all_fetched;    /* this is for cursors */
-
-   HeapTuple  *memtuples;
-} Psortstate;
-
-#ifdef EBUG
-#include "storage/buf.h"
-#include "storage/bufmgr.h"
-
-#define PDEBUG(PROC, S1)\
-elog(DEBUG, "%s:%d>> PROC: %s.", __FILE__, __LINE__, S1)
-
-#define PDEBUG2(PROC, S1, D1)\
-elog(DEBUG, "%s:%d>> PROC: %s %d.", __FILE__, __LINE__, S1, D1)
-
-#define PDEBUG4(PROC, S1, D1, S2, D2)\
-elog(DEBUG, "%s:%d>> PROC: %s %d, %s %d.", __FILE__, __LINE__, S1, D1, S2, D2)
-
-#define VDEBUG(VAR, FMT)\
-elog(DEBUG, "%s:%d>> VAR =FMT", __FILE__, __LINE__, VAR)
-
-#define ASSERT(EXPR, STR)\
-if (!(EXPR)) elog(FATAL, "%s:%d>> %s", __FILE__, __LINE__, STR)
-
-#define TRACE(VAL, CODE)\
-if (1) CODE; else
-
-#else
-#define PDEBUG(MSG)
-#define VDEBUG(VAR, FMT)
-#define ASSERT(EXPR, MSG)
-#define TRACE(VAL, CODE)
-#endif
 
-/* psort.c */
 extern bool psort_begin(Sort *node, int nkeys, ScanKey key);
 extern HeapTuple psort_grabtuple(Sort *node, bool *should_free);
 extern void psort_markpos(Sort *node);