*
*
* IDENTIFICATION
- * $Id: nbtsort.c,v 1.37 1999/02/21 03:48:27 scrappy Exp $
+ * $Id: nbtsort.c,v 1.38 1999/05/09 00:53:19 tgl Exp $
*
* NOTES
*
#define MAXTAPES (7)
#define TAPEBLCKSZ (BLCKSZ << 2)
-#define TAPETEMP "pg_btsortXXXXXXX"
extern int NDirectFileRead;
extern int NDirectFileWrite;
-extern char *mktemp(char *template);
/*
* this is what we use to shovel BTItems in and out of memory. it's
typedef struct
{
int bttb_magic; /* magic number */
- int bttb_fd; /* file descriptor */
+ File bttb_fd; /* file descriptor */
int bttb_top; /* top of free space within bttb_data */
short bttb_ntup; /* number of tuples in this block */
short bttb_eor; /* End-Of-Run marker */
static void
_bt_taperewind(BTTapeBlock *tape)
{
- FileSeek(tape->bttb_fd, 0, SEEK_SET);
+ FileSeek(tape->bttb_fd, 0L, SEEK_SET);
}
/*
* as well as opening a physical tape file.
*/
static BTTapeBlock *
-_bt_tapecreate(char *fname)
+_bt_tapecreate(void)
{
BTTapeBlock *tape = (BTTapeBlock *) palloc(sizeof(BTTapeBlock));
tape->bttb_magic = BTTAPEMAGIC;
-#ifndef __CYGWIN32__
- tape->bttb_fd = FileNameOpenFile(fname, O_RDWR | O_CREAT | O_TRUNC, 0600);
-#else
- tape->bttb_fd = FileNameOpenFile(fname, O_RDWR | O_CREAT | O_TRUNC | O_BINARY, 0600);
-#endif
+ tape->bttb_fd = OpenTemporaryFile();
Assert(tape->bttb_fd >= 0);
/* initialize the buffer */
static int
_bt_taperead(BTTapeBlock *tape)
{
- int fd;
+ File fd;
int nread;
if (tape->bttb_eor)
{
BTSpool *btspool = (BTSpool *) palloc(sizeof(BTSpool));
int i;
- char *fname = (char *) palloc(sizeof(TAPETEMP) + 1);
- if (btspool == (BTSpool *) NULL || fname == (char *) NULL)
+ if (btspool == (BTSpool *) NULL)
elog(ERROR, "_bt_spoolinit: out of memory");
MemSet((char *) btspool, 0, sizeof(BTSpool));
btspool->bts_ntapes = ntapes;
for (i = 0; i < ntapes; ++i)
{
- btspool->bts_itape[i] = _bt_tapecreate(mktemp(strcpy(fname, TAPETEMP)));
- btspool->bts_otape[i] = _bt_tapecreate(mktemp(strcpy(fname, TAPETEMP)));
+ btspool->bts_itape[i] = _bt_tapecreate();
+ btspool->bts_otape[i] = _bt_tapecreate();
}
- pfree((void *) fname);
_bt_isortcmpinit(index, btspool);
* Copyright (c) 1994, Regents of the University of California
*
*
- * $Id: nodeHash.c,v 1.33 1999/05/06 00:30:46 tgl Exp $
+ * $Id: nodeHash.c,v 1.34 1999/05/09 00:53:20 tgl Exp $
*
*-------------------------------------------------------------------------
*/
extern int NBuffers;
-#define HJ_TEMP_NAMELEN 16 /* max length for mk_hj_temp file names */
-
-static void mk_hj_temp(char *tempname);
static int hashFunc(Datum key, int len, bool byVal);
static RelativeAddr hashTableAlloc(int size, HashJoinTable hashtable);
static void * absHashTableAlloc(int size, HashJoinTable hashtable);
RelativeAddr *batchPos;
int *batchSizes;
int i;
- RelativeAddr *innerbatchNames;
/* ----------------
* get state info from node
if (nbatch > 0)
{ /* if needs hash partition */
- innerbatchNames = (RelativeAddr *) ABSADDR(hashtable->innerbatchNames);
-
/* --------------
* allocate space for the file descriptors of batch files
* then open the batch files in the current processes.
batches = (File *) palloc(nbatch * sizeof(File));
for (i = 0; i < nbatch; i++)
{
-#ifndef __CYGWIN32__
- batches[i] = FileNameOpenFile(ABSADDR(innerbatchNames[i]),
- O_CREAT | O_RDWR, 0600);
-#else
- batches[i] = FileNameOpenFile(ABSADDR(innerbatchNames[i]),
- O_CREAT | O_RDWR | O_BINARY, 0600);
-#endif
+ batches[i] = OpenTemporaryFile();
}
hashstate->hashBatches = batches;
batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos);
* ----------------------------------------------------------------
*/
#define NTUP_PER_BUCKET 10
-#define FUDGE_FAC 1.5
+#define FUDGE_FAC 2.0
HashJoinTable
ExecHashTableCreate(Hash *node)
int totalbuckets;
int bucketsize;
int i;
- RelativeAddr *outerbatchNames;
RelativeAddr *outerbatchPos;
- RelativeAddr *innerbatchNames;
RelativeAddr *innerbatchPos;
int *innerbatchSizes;
- RelativeAddr tempname;
/* ----------------
* Get information about the size of the relation to be hashed
* allocate and initialize the outer batches
* ---------------
*/
- outerbatchNames = (RelativeAddr *)
- absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
outerbatchPos = (RelativeAddr *)
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
for (i = 0; i < nbatch; i++)
{
- tempname = hashTableAlloc(HJ_TEMP_NAMELEN, hashtable);
- mk_hj_temp(ABSADDR(tempname));
- outerbatchNames[i] = tempname;
outerbatchPos[i] = -1;
}
- hashtable->outerbatchNames = RELADDR(outerbatchNames);
hashtable->outerbatchPos = RELADDR(outerbatchPos);
/* ---------------
* allocate and initialize the inner batches
* ---------------
*/
- innerbatchNames = (RelativeAddr *)
- absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
innerbatchPos = (RelativeAddr *)
absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable);
innerbatchSizes = (int *)
absHashTableAlloc(nbatch * sizeof(int), hashtable);
for (i = 0; i < nbatch; i++)
{
- tempname = hashTableAlloc(HJ_TEMP_NAMELEN, hashtable);
- mk_hj_temp(ABSADDR(tempname));
- innerbatchNames[i] = tempname;
innerbatchPos[i] = -1;
innerbatchSizes[i] = 0;
}
- hashtable->innerbatchNames = RELADDR(innerbatchNames);
hashtable->innerbatchPos = RELADDR(innerbatchPos);
hashtable->innerbatchSizes = RELADDR(innerbatchSizes);
}
else
{
- hashtable->outerbatchNames = (RelativeAddr) NULL;
hashtable->outerbatchPos = (RelativeAddr) NULL;
- hashtable->innerbatchNames = (RelativeAddr) NULL;
hashtable->innerbatchPos = (RelativeAddr) NULL;
hashtable->innerbatchSizes = (RelativeAddr) NULL;
}
hashtable->pcount = hashtable->nprocess;
}
-static void
-mk_hj_temp(char *tempname)
-{
- static int hjtmpcnt = 0;
-
- snprintf(tempname, HJ_TEMP_NAMELEN, "HJ%d.%d", (int) MyProcPid, hjtmpcnt);
- hjtmpcnt = (hjtmpcnt + 1) % 1000;
-}
-
void
ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent)
{
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: psort.c,v 1.50 1999/02/13 23:20:15 momjian Exp $
+ * $Id: psort.c,v 1.51 1999/05/09 00:53:22 tgl Exp $
*
* NOTES
* Sorts the first relation into the second relation.
#include "utils/rel.h"
static bool createfirstrun(Sort *node);
-static bool createrun(Sort *node, FILE *file);
-static void destroytape(FILE *file);
-static void dumptuples(FILE *file, Sort *node);
-static FILE *gettape(void);
+static bool createrun(Sort *node, BufFile *file);
+static void destroytape(BufFile *file);
+static void dumptuples(BufFile *file, Sort *node);
+static BufFile *gettape(void);
static void initialrun(Sort *node);
static void inittapes(Sort *node);
static void merge(Sort *node, struct tape * dest);
-static FILE *mergeruns(Sort *node);
+static BufFile *mergeruns(Sort *node);
static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);
-
-#define TEMPDIR "./"
-
/*
* tlenzero used to delimit runs; both vars below must have
* the same size as HeapTuple->t_len
static unsigned int tlenzero = 0;
static unsigned int tlendummy;
+/* these are used by _psort_cmp, and are set just before calling qsort() */
static TupleDesc PsortTupDesc;
-static ScanKey PsortKeys; /* used by _psort_cmp */
+static ScanKey PsortKeys;
static int PsortNkeys;
/*
*
* struct leftist *Tuples; current tuples in memory
*
- * FILE *psort_grab_file; this holds tuples grabbed
+ * BufFile *psort_grab_file; this holds tuples grabbed
* from merged sort runs
* long psort_current; current file position
* long psort_saved; file position saved for
( \
(TUP)->t_len += HEAPTUPLESIZE, \
((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len, \
- fwrite((char *)TUP, (TUP)->t_len, 1, FP), \
- fwrite((char *)&((TUP)->t_len), sizeof (tlendummy), 1, FP), \
+ BufFileWrite(FP, (char *)TUP, (TUP)->t_len), \
+ BufFileWrite(FP, (char *)&((TUP)->t_len), sizeof(tlendummy)), \
(TUP)->t_len -= HEAPTUPLESIZE \
)
-#define ENDRUN(FP) fwrite((char *)&tlenzero, sizeof (tlenzero), 1, FP)
-#define GETLEN(LEN, FP) fread((char *)&(LEN), sizeof (tlenzero), 1, FP)
+#define ENDRUN(FP) BufFileWrite(FP, (char *)&tlenzero, sizeof(tlenzero))
+#define GETLEN(LEN, FP) BufFileRead(FP, (char *)&(LEN), sizeof(tlenzero))
#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))
+#define FREE(x) pfree((char *) x)
#define GETTUP(NODE, TUP, LEN, FP) \
( \
IncrProcessed(), \
- ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof (tlenzero), \
- fread((char *)(TUP) + sizeof (tlenzero), (LEN) - sizeof (tlenzero), 1, FP), \
+ ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof(tlenzero), \
+ BufFileRead(FP, (char *)(TUP) + sizeof(tlenzero), (LEN) - sizeof(tlenzero)), \
(TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE), \
- fread((char *)&tlendummy, sizeof (tlendummy), 1, FP) \
+ BufFileRead(FP, (char *)&tlendummy, sizeof(tlendummy)) \
)
-#define SETTUPLEN(TUP, LEN) (TUP)->t_len = LEN - HEAPTUPLESIZE
+#define SETTUPLEN(TUP, LEN) ((TUP)->t_len = (LEN) - HEAPTUPLESIZE)
+
+#define rewind(FP) BufFileSeek(FP, 0L, SEEK_SET)
/*
* USEMEM - record use of memory FREEMEM - record
* - (replacement selection(R2-R3)--Knuth, Vol.3, p.257)
* - (polyphase merge Alg.D(D2-D4)--Knuth, Vol.3, p.271)
*
- * Explaination:
+ * Explanation:
* Tuples are distributed to the tapes as in Algorithm D.
* A "tuple" with t_size == 0 is used to mark the end of a run.
*
Assert(PS(node)->memtuples == NULL);
Assert(PS(node)->tupcount == 0);
if (LACKMEM(node))
- elog(FATAL, "psort: LACKMEM in createfirstrun");
+ elog(ERROR, "psort: LACKMEM in createfirstrun");
memtuples = palloc(t_free * sizeof(HeapTuple));
* Tuples contains the tuples for the following run upon exit
*/
static bool
-createrun(Sort *node, FILE *file)
+createrun(Sort *node, BufFile *file)
{
HeapTuple lasttuple;
HeapTuple tup;
* Returns:
* file of tuples in order
*/
-static FILE *
+static BufFile *
mergeruns(Sort *node)
{
struct tape *tp;
{
tp = tp->tp_prev;
rewind(tp->tp_file);
- /* resettape(tp->tp_file); -not sufficient */
merge(node, tp);
rewind(tp->tp_file);
}
struct tape *lasttp; /* (TAPE[P]) */
struct tape *tp;
struct leftist *tuples;
- FILE *destfile;
+ BufFile *destfile;
int times; /* runs left to merge */
int outdummy; /* complete dummy runs */
short fromtape;
* dumptuples - stores all the tuples in tree into file
*/
static void
-dumptuples(FILE *file, Sort *node)
+dumptuples(BufFile *file, Sort *node)
{
struct leftist *tp;
struct leftist *newp;
* psort_current is pointing to the zero tuplen at the end of
* file
*/
- fseek(PS(node)->psort_grab_file,
- PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
+ BufFileSeek(PS(node)->psort_grab_file,
+ PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
GETLEN(tuplen, PS(node)->psort_grab_file);
if (PS(node)->psort_current < tuplen)
- elog(FATAL, "psort_grabtuple: too big last tuple len in backward scan");
+ elog(ERROR, "psort_grabtuple: too big last tuple len in backward scan");
PS(node)->all_fetched = false;
}
else
{
/* move to position of end tlen of prev tuple */
PS(node)->psort_current -= sizeof(tlendummy);
- fseek(PS(node)->psort_grab_file, PS(node)->psort_current, SEEK_SET);
+ BufFileSeek(PS(node)->psort_grab_file,
+ PS(node)->psort_current, SEEK_SET);
GETLEN(tuplen, PS(node)->psort_grab_file); /* get tlen of prev
* tuple */
if (tuplen == 0)
- elog(FATAL, "psort_grabtuple: tuplen is 0 in backward scan");
+ elog(ERROR, "psort_grabtuple: tuplen is 0 in backward scan");
if (PS(node)->psort_current <= tuplen + sizeof(tlendummy))
{ /* prev tuple should be first one */
if (PS(node)->psort_current != tuplen)
- elog(FATAL, "psort_grabtuple: first tuple expected in backward scan");
+ elog(ERROR, "psort_grabtuple: first tuple expected in backward scan");
PS(node)->psort_current = 0;
- fseek(PS(node)->psort_grab_file, PS(node)->psort_current, SEEK_SET);
+ BufFileSeek(PS(node)->psort_grab_file,
+ PS(node)->psort_current, SEEK_SET);
return NULL;
}
*/
PS(node)->psort_current -= tuplen;
/* move to position of end tlen of prev tuple */
- fseek(PS(node)->psort_grab_file,
- PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
+ BufFileSeek(PS(node)->psort_grab_file,
+ PS(node)->psort_current - sizeof(tlendummy), SEEK_SET);
GETLEN(tuplen, PS(node)->psort_grab_file);
if (PS(node)->psort_current < tuplen + sizeof(tlendummy))
- elog(FATAL, "psort_grabtuple: too big tuple len in backward scan");
+ elog(ERROR, "psort_grabtuple: too big tuple len in backward scan");
}
/*
* move to prev (or last) tuple start position + sizeof(t_len)
*/
- fseek(PS(node)->psort_grab_file,
- PS(node)->psort_current - tuplen, SEEK_SET);
+ BufFileSeek(PS(node)->psort_grab_file,
+ PS(node)->psort_current - tuplen, SEEK_SET);
tup = ALLOCTUP(tuplen);
SETTUPLEN(tup, tuplen);
GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
Assert(PS(node) != (Psortstate *) NULL);
if (PS(node)->using_tape_files == true)
- fseek(PS(node)->psort_grab_file, PS(node)->psort_saved, SEEK_SET);
+ BufFileSeek(PS(node)->psort_grab_file,
+ PS(node)->psort_saved, SEEK_SET);
PS(node)->psort_current = PS(node)->psort_saved;
}
}
-/*
- * gettape - handles access temporary files in polyphase merging
- *
- * Optimizations:
- * If guarenteed that only one sort running/process,
- * can simplify the file generation--and need not store the
- * name for later unlink.
- */
-
-struct tapelst
-{
- char *tl_name;
- int tl_fd;
- struct tapelst *tl_next;
-};
-
-static struct tapelst *Tapes = NULL;
-
/*
* gettape - returns an open stream for writing/reading
*
* Returns:
* Open stream for writing/reading.
* NULL if unable to open temporary file.
+ *
+ * There used to be a lot of cruft here to try to ensure that we destroyed
+ * all the tape files; but it didn't really work. Now we rely on fd.c to
+ * clean up temp files if an error occurs.
*/
-static FILE *
+static BufFile *
gettape()
{
- struct tapelst *tp;
- FILE *file;
- static int tapeinit = 0;
- char *mktemp();
- static unsigned int uniqueFileId = 0;
- extern int errno;
- char uniqueName[MAXPGPATH];
-
- tp = (struct tapelst *) palloc((unsigned) sizeof(struct tapelst));
-
- snprintf(uniqueName, MAXPGPATH - 1, "%spg_psort.%d.%u",
- TEMPDIR, (int) MyProcPid, uniqueFileId++);
-
- tapeinit = 1;
-
- tp->tl_name = palloc((unsigned) sizeof(uniqueName));
-
- /*
- * now, copy template with final null into palloc'd space
- */
-
- StrNCpy(tp->tl_name, uniqueName, MAXPGPATH);
-
-#ifndef __CYGWIN32__
- file = AllocateFile(tp->tl_name, "w+");
-#else
- file = AllocateFile(tp->tl_name, "w+b");
-#endif
- if (file == NULL)
- elog(ERROR, "Open: %s in %s line %d, %s", tp->tl_name,
- __FILE__, __LINE__, strerror(errno));
-
- tp->tl_fd = fileno(file);
- tp->tl_next = Tapes;
- Tapes = tp;
- return file;
-}
-
-/*
- * resettape - resets the tape to size 0
- */
-#ifdef NOT_USED
-static void
-resettape(FILE *file)
-{
- struct tapelst *tp;
- int fd;
+ File tfile;
- Assert(PointerIsValid(file));
-
- fd = fileno(file);
- for (tp = Tapes; tp != NULL && tp->tl_fd != fd; tp = tp->tl_next)
- ;
- if (tp == NULL)
- elog(ERROR, "resettape: tape not found");
-
- file = freopen(tp->tl_name, "w+", file);
- if (file == NULL)
- elog(FATAL, "could not freopen temporary file");
+ tfile = OpenTemporaryFile();
+ Assert(tfile >= 0);
+ return BufFileCreate(tfile);
}
-#endif
-
/*
- * distroytape - unlinks the tape
- *
- * Efficiency note:
- * More efficient to destroy more recently allocated tapes first.
- *
- * Possible bugs:
- * Exits instead of returning status, if given invalid tape.
+ * destroytape - unlinks the tape
*/
static void
-destroytape(FILE *file)
+destroytape(BufFile *file)
{
- struct tapelst *tp,
- *tq;
- int fd;
-
- if ((tp = Tapes) == NULL)
- elog(FATAL, "destroytape: tape not found");
-
- if ((fd = fileno(file)) == tp->tl_fd)
- {
- Tapes = tp->tl_next;
- FreeFile(file);
- unlink(tp->tl_name);
- FREE(tp->tl_name);
- FREE(tp);
- }
- else
- for (;;)
- {
- if (tp->tl_next == NULL)
- elog(FATAL, "destroytape: tape not found");
- if (tp->tl_next->tl_fd == fd)
- {
- FreeFile(file);
- tq = tp->tl_next;
- tp->tl_next = tq->tl_next;
- unlink(tq->tl_name);
- FREE((tq->tl_name));
- FREE(tq);
- break;
- }
- tp = tp->tl_next;
- }
+ BufFileClose(file);
}
static int