diff options
author | Bruce Momjian | 1997-08-06 03:42:21 +0000 |
---|---|---|
committer | Bruce Momjian | 1997-08-06 03:42:21 +0000 |
commit | f5f366e18829c982273540d88a1ac81c9c85d401 (patch) | |
tree | 347fc45caa2420d381e963d6b95bd6109085a163 /src/backend/utils | |
parent | 3bea7b138bbdf6129b36fbdab158526544262980 (diff) |
Allow internal sorts to be stored in memory rather than in files.
Diffstat (limited to 'src/backend/utils')
-rw-r--r-- | src/backend/utils/sort/lselect.c | 135 | ||||
-rw-r--r-- | src/backend/utils/sort/psort.c | 581 |
2 files changed, 444 insertions, 272 deletions
diff --git a/src/backend/utils/sort/lselect.c b/src/backend/utils/sort/lselect.c index 8c6c27ef46e..26a3ca38576 100644 --- a/src/backend/utils/sort/lselect.c +++ b/src/backend/utils/sort/lselect.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/lselect.c,v 1.3 1997/05/20 11:35:48 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/lselect.c,v 1.4 1997/08/06 03:41:47 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -26,37 +26,14 @@ #include "utils/psort.h" #include "utils/lselect.h" -extern Relation SortRdesc; /* later static */ - -/* - * PUTTUP - writes the next tuple - * ENDRUN - mark end of run - * GETLEN - reads the length of the next tuple - * ALLOCTUP - returns space for the new tuple - * SETTUPLEN - stores the length into the tuple - * GETTUP - reads the tuple - * - * Note: - * LEN field must be a short; FP is a stream - */ - #define PUTTUP(TUP, FP) fwrite((char *)TUP, (TUP)->t_len, 1, FP) -#define ENDRUN(FP) fwrite((char *)&shortzero, sizeof (shortzero), 1, FP) -#define GETLEN(LEN, FP) fread(&(LEN), sizeof (shortzero), 1, FP) -#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN)) -#define GETTUP(TUP, LEN, FP)\ - fread((char *)(TUP) + sizeof (shortzero), 1, (LEN) - sizeof (shortzero), FP) -#define SETTUPLEN(TUP, LEN) (TUP)->t_len = LEN /* * USEMEM - record use of memory * FREEMEM - record freeing of memory - * FULLMEM - 1 iff a tuple will fit */ - -#define USEMEM(AMT) SortMemory -= (AMT) -#define FREEMEM(AMT) SortMemory += (AMT) -#define LACKMEM() (SortMemory <= BLCKSZ) /* not accurate */ +#define USEMEM(context,AMT) context->sortMem -= (AMT) +#define FREEMEM(context,AMT) context->sortMem += (AMT) /* * lmerge - merges two leftist trees into one @@ -67,12 +44,12 @@ extern Relation SortRdesc; /* later static */ * speed up code significantly. */ struct leftist * -lmerge(struct leftist *pt, struct leftist *qt) +lmerge(struct leftist *pt, struct leftist *qt, LeftistContext context) { register struct leftist *root, *majorLeftist, *minorLeftist; int dist; - if (tuplecmp(pt->lt_tuple, qt->lt_tuple)) { + if (tuplecmp(pt->lt_tuple, qt->lt_tuple, context)) { root = pt; majorLeftist = qt; } else { @@ -83,7 +60,7 @@ lmerge(struct leftist *pt, struct leftist *qt) root->lt_left = majorLeftist; else { if ((minorLeftist = root->lt_right) != NULL) - majorLeftist = lmerge(majorLeftist, minorLeftist); + majorLeftist = lmerge(majorLeftist, minorLeftist, context); if ((dist = root->lt_left->lt_dist) < majorLeftist->lt_dist) { root->lt_dist = 1 + dist; root->lt_right = root->lt_left; @@ -97,11 +74,11 @@ lmerge(struct leftist *pt, struct leftist *qt) } static struct leftist * -linsert(struct leftist *root, struct leftist *new1) +linsert(struct leftist *root, struct leftist *new1, LeftistContext context) { register struct leftist *left, *right; - if (! tuplecmp(root->lt_tuple, new1->lt_tuple)) { + if (! tuplecmp(root->lt_tuple, new1->lt_tuple, context)) { new1->lt_left = root; return(new1); } @@ -116,7 +93,7 @@ linsert(struct leftist *root, struct leftist *new1) } return(root); } - right = linsert(right, new1); + right = linsert(right, new1, context); if (right->lt_dist < left->lt_dist) { root->lt_dist = 1 + left->lt_dist; root->lt_left = right; @@ -142,7 +119,8 @@ linsert(struct leftist *root, struct leftist *new1) */ HeapTuple gettuple(struct leftist **treep, - short *devnum) /* device from which tuple came */ + short *devnum, /* device from which tuple came */ + LeftistContext context) { register struct leftist *tp; HeapTuple tup; @@ -153,9 +131,9 @@ gettuple(struct leftist **treep, if (tp->lt_dist == 1) /* lt_left == NULL */ *treep = tp->lt_left; else - *treep = lmerge(tp->lt_left, tp->lt_right); + *treep = lmerge(tp->lt_left, tp->lt_right, context); - FREEMEM(sizeof (struct leftist)); + FREEMEM(context,sizeof (struct leftist)); FREE(tp); return(tup); } @@ -169,14 +147,17 @@ gettuple(struct leftist **treep, * Note: * Currently never returns NULL BUG */ -int -puttuple(struct leftist **treep, HeapTuple newtuple, int devnum) +void +puttuple(struct leftist **treep, + HeapTuple newtuple, + short devnum, + LeftistContext context) { register struct leftist *new1; register struct leftist *tp; new1 = (struct leftist *) palloc((unsigned) sizeof (struct leftist)); - USEMEM(sizeof (struct leftist)); + USEMEM(context,sizeof (struct leftist)); new1->lt_dist = 1; new1->lt_devnum = devnum; new1->lt_tuple = newtuple; @@ -185,39 +166,12 @@ puttuple(struct leftist **treep, HeapTuple newtuple, int devnum) if ((tp = *treep) == NULL) *treep = new1; else - *treep = linsert(tp, new1); - return(1); + *treep = linsert(tp, new1, context); + return; } /* - * dumptuples - stores all the tuples in tree into file - */ -void -dumptuples(FILE *file) -{ - register struct leftist *tp; - register struct leftist *newp; - HeapTuple tup; - - tp = Tuples; - while (tp != NULL) { - tup = tp->lt_tuple; - if (tp->lt_dist == 1) /* lt_right == NULL */ - newp = tp->lt_left; - else - newp = lmerge(tp->lt_left, tp->lt_right); - FREEMEM(sizeof (struct leftist)); - FREE(tp); - PUTTUP(tup, file); - FREEMEM(tup->t_len); - FREE(tup); - tp = newp; - } - Tuples = NULL; -} - -/* * tuplecmp - Compares two tuples with respect CmpList * * Returns: @@ -225,7 +179,7 @@ dumptuples(FILE *file) * Assumtions: */ int -tuplecmp(HeapTuple ltup, HeapTuple rtup) +tuplecmp(HeapTuple ltup, HeapTuple rtup, LeftistContext context) { register char *lattr, *rattr; int nkey = 0; @@ -238,24 +192,27 @@ tuplecmp(HeapTuple ltup, HeapTuple rtup) return(0); if (rtup == (HeapTuple)NULL) return(1); - while (nkey < Nkeys && !result) { + while (nkey < context->nKeys && !result) { lattr = heap_getattr(ltup, InvalidBuffer, - Key[nkey].sk_attno, - RelationGetTupleDescriptor(SortRdesc), - &isnull); + context->scanKeys[nkey].sk_attno, + context->tupDesc, &isnull); if (isnull) return(0); rattr = heap_getattr(rtup, InvalidBuffer, - Key[nkey].sk_attno, - RelationGetTupleDescriptor(SortRdesc), + context->scanKeys[nkey].sk_attno, + context->tupDesc, &isnull); if (isnull) return(1); - if (Key[nkey].sk_flags & SK_COMMUTE) { - if (!(result = (long) (*Key[nkey].sk_func) (rattr, lattr))) - result = -(long) (*Key[nkey].sk_func) (lattr, rattr); - } else if (!(result = (long) (*Key[nkey].sk_func) (lattr, rattr))) - result = -(long) (*Key[nkey].sk_func) (rattr, lattr); + if (context->scanKeys[nkey].sk_flags & SK_COMMUTE) { + if (!(result = + (long) (*context->scanKeys[nkey].sk_func) (rattr, lattr))) + result = + -(long) (*context->scanKeys[nkey].sk_func) (lattr, rattr); + } else if (!(result = + (long) (*context->scanKeys[nkey].sk_func) (lattr, rattr))) + result = + -(long) (*context->scanKeys[nkey].sk_func) (rattr, lattr); nkey++; } return (result == 1); @@ -263,7 +220,7 @@ tuplecmp(HeapTuple ltup, HeapTuple rtup) #ifdef EBUG void -checktree(struct leftist *tree) +checktree(struct leftist *tree, LeftistContext context) { int lnodes; int rnodes; @@ -272,8 +229,8 @@ checktree(struct leftist *tree) puts("Null tree."); return; } - lnodes = checktreer(tree->lt_left, 1); - rnodes = checktreer(tree->lt_right, 1); + lnodes = checktreer(tree->lt_left, 1, context); + rnodes = checktreer(tree->lt_right, 1, context); if (lnodes < 0) { lnodes = -lnodes; puts("0:\tBad left side."); @@ -297,24 +254,24 @@ checktree(struct leftist *tree) } else if (tree->lt_dist != 1+ tree->lt_right->lt_dist) puts("0:\tDistance incorrect."); if (lnodes > 0) - if (tuplecmp(tree->lt_left->lt_tuple, tree->lt_tuple)) + if (tuplecmp(tree->lt_left->lt_tuple, tree->lt_tuple, context)) printf("%d:\tLeft child < parent.\n"); if (rnodes > 0) - if (tuplecmp(tree->lt_right->lt_tuple, tree->lt_tuple)) + if (tuplecmp(tree->lt_right->lt_tuple, tree->lt_tuple, context)) printf("%d:\tRight child < parent.\n"); printf("Tree has %d nodes\n", 1 + lnodes + rnodes); } int -checktreer(struct leftist *tree, int level) +checktreer(struct leftist *tree, int level, LeftistContext context) { int lnodes, rnodes; int error = 0; if (tree == NULL) return(0); - lnodes = checktreer(tree->lt_left, level + 1); - rnodes = checktreer(tree->lt_right, level + 1); + lnodes = checktreer(tree->lt_left, level + 1, context); + rnodes = checktreer(tree->lt_right, level + 1, context); if (lnodes < 0) { error = 1; lnodes = -lnodes; @@ -349,12 +306,12 @@ checktreer(struct leftist *tree, int level) printf("%d:\tDistance incorrect.\n", level); } if (lnodes > 0) - if (tuplecmp(tree->lt_left->lt_tuple, tree->lt_tuple)) { + if (tuplecmp(tree->lt_left->lt_tuple, tree->lt_tuple, context)) { error = 1; printf("%d:\tLeft child < parent.\n"); } if (rnodes > 0) - if (tuplecmp(tree->lt_right->lt_tuple, tree->lt_tuple)) { + if (tuplecmp(tree->lt_right->lt_tuple, tree->lt_tuple, context)) { error = 1; printf("%d:\tRight child < parent.\n"); } diff --git a/src/backend/utils/sort/psort.c b/src/backend/utils/sort/psort.c index 7005666ce3f..5821905669f 100644 --- a/src/backend/utils/sort/psort.c +++ b/src/backend/utils/sort/psort.c @@ -7,11 +7,25 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.5 1997/07/24 20:18:07 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.6 1997/08/06 03:41:55 momjian Exp $ * * NOTES - * Sorts the first relation into the second relation. The sort may - * not be called twice simultaneously. + * Sorts the first relation into the second relation. + * + * The old psort.c's routines formed a temporary relation from the merged + * sort files. This version keeps the files around instead of generating the + * relation from them, and provides interface functions to the file so that + * you can grab tuples, mark a position in the file, restore a position in the + * file. You must now explicitly call an interface function to end the sort, + * psort_end, when you are done. + * Now most of the global variables are stuck in the Sort nodes, and + * accessed from there (they are passed to all the psort routines) so that + * each sort running has its own separate state. This is facilitated by having + * the Sort nodes passed in to all the interface functions. + * The one global variable that all the sorts still share is SortMemory. + * You should now be allowed to run two or more psorts concurrently, + * so long as the memory they eat up is not greater than SORTMEM, the initial + * value of SortMemory. -Rex 2.15.1995 * * Use the tape-splitting method (Knuth, Vol. III, pp281-86) in the future. * @@ -21,7 +35,6 @@ */ #include <stdio.h> #include <math.h> -#include <unistd.h> #include "postgres.h" @@ -35,120 +48,150 @@ #include "storage/buf.h" #include "storage/bufmgr.h" /* for BLCKSZ */ #include "utils/portal.h" /* for {Start,End}PortalAllocMode */ +#include "utils/elog.h" #include "utils/rel.h" -#include "utils/psort.h" +#include "nodes/execnodes.h" +#include "nodes/plannodes.h" +#include "executor/executor.h" + #include "utils/lselect.h" +#include "utils/psort.h" #include "storage/fd.h" -#ifndef HAVE_MEMMOVE -# include <regex/utils.h> -#else -# include <string.h> -#endif - #define TEMPDIR "./" -int Nkeys; -ScanKey Key; -int SortMemory; - -static int TapeRange; /* number of tapes - 1 (T) */ -static int Level; /* (l) */ -static int TotalDummy; /* summation of tp_dummy */ -static struct tape Tape[MAXTAPES]; -static long shortzero = 0; /* used to delimit runs */ -static struct tuple *LastTuple = NULL; /* last output */ +extern int SortMem; /* defined as postgres option */ +static long shortzero = 0; /* used to delimit runs */ -static int BytesRead; /* to keep track of # of IO */ -static int BytesWritten; +/* + * old psort global variables + * + * (These are the global variables from the old psort. They are still used, + * but are now accessed from Sort nodes using the PS macro. Note that while + * these variables will be accessed by PS(node)->whatever, they will still + * be called by their original names within the comments! -Rex 2.10.1995) + * + * LeftistContextData treeContext; + * + * static int TapeRange; // number of tapes - 1 (T) // + * static int Level; // (l) // + * static int TotalDummy; // summation of tp_dummy // + * static struct tape *Tape; + * + * static int BytesRead; // to keep track of # of IO // + * static int BytesWritten; + * + * struct leftist *Tuples; // current tuples in memory // + * + * FILE *psort_grab_file; // this holds tuples grabbed + * from merged sort runs // + * long psort_current; // current file position // + * long psort_saved; // file position saved for + * mark and restore // + */ -Relation SortRdesc; /* current tuples in memory */ -struct leftist *Tuples; /* current tuples in memory */ +/* + * PS - Macro to access and cast psortstate from a Sort node + */ +#define PS(N) ((Psortstate *)N->psortstate) /* - * psort - polyphase merge sort entry point + * psort_begin - polyphase merge sort entry point. Sorts the subplan + * into a temporary file psort_grab_file. After + * this is called, calling the interface function + * psort_grabtuple iteratively will get you the sorted + * tuples. psort_end then finishes the sort off, after + * all the tuples have been grabbed. + * + * Allocates and initializes sort node's psort state. */ -void -psort(Relation oldrel, Relation newrel, int nkeys, ScanKey key) +bool +psort_begin(Sort *node, int nkeys, ScanKey key) { + bool empty; /* to answer: is child node empty? */ + + node->psortstate = (struct Psortstate *)palloc(sizeof(struct Psortstate)); + if (node->psortstate == NULL) + return false; + AssertArg(nkeys >= 1); AssertArg(key[0].sk_attno != 0); AssertArg(key[0].sk_procedure != 0); - Nkeys = nkeys; - Key = key; - SortMemory = 0; - SortRdesc = oldrel; - BytesRead = 0; - BytesWritten = 0; - /* - * may not be the best place. - * - * Pass 0 for the "limit" as the argument is currently ignored. - * Previously, only one arg was passed. -mer 12 Nov. 1991 - */ - StartPortalAllocMode(StaticAllocMode, (Size)0); - initpsort(); - initialrun(oldrel); - /* call finalrun(newrel, mergerun()) instead */ - endpsort(newrel, mergeruns()); - EndPortalAllocMode(); - NDirectFileRead += (int)ceil((double)BytesRead / BLCKSZ); - NDirectFileWrite += (int)ceil((double)BytesWritten / BLCKSZ); -} + PS(node)->BytesRead = 0; + PS(node)->BytesWritten = 0; + PS(node)->treeContext.tupDesc = + ExecGetTupType(outerPlan((Plan *)node)); + PS(node)->treeContext.nKeys = nkeys; + PS(node)->treeContext.scanKeys = key; + PS(node)->treeContext.sortMem = SortMem; -/* - * TAPENO - number of tape in Tape - */ + PS(node)->Tuples = NULL; + PS(node)->tupcount = 0; + + PS(node)->using_tape_files = false; + PS(node)->memtuples = NULL; + + initialrun(node, &empty); + + if (empty) + return false; + + if (PS(node)->using_tape_files) + PS(node)->psort_grab_file = mergeruns(node); -#define TAPENO(NODE) (NODE - Tape) -#define TUPLENO(TUP) ((TUP == NULL) ? -1 : (int) TUP->t_iid) + PS(node)->psort_current = 0; + PS(node)->psort_saved = 0; + + return true; +} /* - * initpsort - initializes the tapes + * inittapes - initializes the tapes * - (polyphase merge Alg.D(D1)--Knuth, Vol.3, p.270) * Returns: * number of allocated tapes */ void -initpsort() +inittapes(Sort *node) { register int i; register struct tape *tp; + Assert(node != (Sort *) NULL); + Assert(PS(node) != (Psortstate *) NULL); + /* ASSERT(ntapes >= 3 && ntapes <= MAXTAPES, - "initpsort: Invalid number of tapes to initialize.\n"); + "inittapes: Invalid number of tapes to initialize.\n"); */ - tp = Tape; + tp = PS(node)->Tape; for (i = 0; i < MAXTAPES && (tp->tp_file = gettape()) != NULL; i++) { tp->tp_dummy = 1; tp->tp_fib = 1; tp->tp_prev = tp - 1; tp++; } - TapeRange = --tp - Tape; + PS(node)->TapeRange = --tp - PS(node)->Tape; tp->tp_dummy = 0; tp->tp_fib = 0; - Tape[0].tp_prev = tp; - - if (TapeRange <= 1) - elog(WARN, "initpsort: Could only allocate %d < 3 tapes\n", - TapeRange + 1); + PS(node)->Tape[0].tp_prev = tp; - Level = 1; - TotalDummy = TapeRange; + if (PS(node)->TapeRange <= 1) + elog(WARN, "inittapes: Could only allocate %d < 3 tapes\n", + PS(node)->TapeRange + 1); - SortMemory = SORTMEM; - LastTuple = NULL; - Tuples = NULL; + PS(node)->Level = 1; + PS(node)->TotalDummy = PS(node)->TapeRange; + + PS(node)->using_tape_files = true; } /* - * resetpsort - resets (frees) malloc'd memory for an aborted Xaction + * resetpsort - resets (pfrees) palloc'd memory for an aborted Xaction * * Not implemented yet. */ @@ -170,16 +213,18 @@ resetpsort() * LEN field must be a short; FP is a stream */ -#define PUTTUP(TUP, FP)\ - BytesWritten += (TUP)->t_len; \ - fwrite((char *)TUP, (TUP)->t_len, 1, FP) + +#define PUTTUP(NODE, TUP, FP) if (1) {\ + ((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len; \ + fwrite((char *)TUP, (TUP)->t_len, 1, FP);} else #define ENDRUN(FP) fwrite((char *)&shortzero, sizeof (shortzero), 1, FP) #define GETLEN(LEN, FP) fread((char *)&(LEN), sizeof (shortzero), 1, FP) #define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN)) -#define GETTUP(TUP, LEN, FP)\ +#define GETTUP(NODE, TUP, LEN, FP) if (1) {\ IncrProcessed(); \ - BytesRead += (LEN) - sizeof (shortzero); \ - fread((char *)(TUP) + sizeof (shortzero), (LEN) - sizeof (shortzero), 1, FP) + ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof (shortzero); \ + fread((char *)(TUP) + sizeof (shortzero), (LEN) - sizeof (shortzero), 1, FP);} \ + else #define SETTUPLEN(TUP, LEN) (TUP)->t_len = LEN /* @@ -188,9 +233,9 @@ resetpsort() * FULLMEM - 1 iff a tuple will fit */ -#define USEMEM(AMT) SortMemory -= (AMT) -#define FREEMEM(AMT) SortMemory += (AMT) -#define LACKMEM() (SortMemory <= BLCKSZ) /* not accurate */ +#define USEMEM(NODE,AMT) PS(node)->treeContext.sortMem -= (AMT) +#define FREEMEM(NODE,AMT) PS(node)->treeContext.sortMem += (AMT) +#define LACKMEM(NODE) (PS(node)->treeContext.sortMem <= MAXBLCKSZ) /* not accurate */ #define TRACEMEM(FUNC) #define TRACEOUT(FUNC, TUP) @@ -219,61 +264,66 @@ resetpsort() * Also, perhaps allocate tapes when needed. Split into 2 funcs. */ void -initialrun(Relation rdesc) +initialrun(Sort *node, bool *empty) { /* register struct tuple *tup; */ register struct tape *tp; - HeapScanDesc sdesc; int baseruns; /* D:(a) */ - int morepasses; /* EOF */ - - sdesc = heap_beginscan(rdesc, 0, NowTimeQual, 0, - (ScanKey)NULL); - tp = Tape; + int extrapasses; /* EOF */ + + Assert(node != (Sort *) NULL); + Assert(PS(node) != (Psortstate *) NULL); + + tp = PS(node)->Tape; - if ((bool)createrun(sdesc, tp->tp_file) != false) - morepasses = 0; + if ((bool)createrun(node, tp->tp_file, empty) != false) { + if (! PS(node)->using_tape_files) + inittapes(node); + extrapasses = 0; + } else - morepasses = 1 + (Tuples != NULL); /* (T != N) ? 2 : 1 */ + return; /* if rows fit in memory, we never access tape stuff */ for ( ; ; ) { tp->tp_dummy--; - TotalDummy--; + PS(node)->TotalDummy--; if (tp->tp_dummy < (tp + 1)->tp_dummy) tp++; else if (tp->tp_dummy != 0) - tp = Tape; + tp = PS(node)->Tape; else { - Level++; - baseruns = Tape[0].tp_fib; - for (tp = Tape; tp - Tape < TapeRange; tp++) { - TotalDummy += + PS(node)->Level++; + baseruns = PS(node)->Tape[0].tp_fib; + for (tp = PS(node)->Tape; + tp - PS(node)->Tape < PS(node)->TapeRange; tp++) { + PS(node)->TotalDummy += (tp->tp_dummy = baseruns + (tp + 1)->tp_fib - tp->tp_fib); tp->tp_fib = baseruns + (tp + 1)->tp_fib; } - tp = Tape; /* D4 */ + tp = PS(node)->Tape; /* D4 */ } /* D3 */ - if (morepasses) - if (--morepasses) { - dumptuples(tp->tp_file); + if (extrapasses) + if (--extrapasses) { + dumptuples(node); ENDRUN(tp->tp_file); continue; } else break; - if ((bool)createrun(sdesc, tp->tp_file) == false) - morepasses = 1 + (Tuples != NULL); + + if ((bool)createrun(node, tp->tp_file, empty) == false) + extrapasses = 1 + (PS(node)->Tuples != NULL); /* D2 */ } - for (tp = Tape + TapeRange; tp >= Tape; tp--) - rewind(tp->tp_file); /* D. */ - heap_endscan(sdesc); + for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--) + rewind(tp->tp_file); /* D. */ } /* - * createrun - places the next run on file + * createrun - places the next run on file, grabbing the tuples by + * executing the subplan passed in * * Uses: * Tuples, which should contain any tuples for this run @@ -283,7 +333,7 @@ initialrun(Relation rdesc) * Tuples contains the tuples for the following run upon exit */ bool -createrun(HeapScanDesc sdesc, FILE *file) +createrun(Sort *node, FILE *file, bool *empty) { register HeapTuple lasttuple; register HeapTuple btup, tup; @@ -291,47 +341,74 @@ createrun(HeapScanDesc sdesc, FILE *file) Buffer b; bool foundeor; short junk; - + + int cr_tuples = 0; /* Count tuples grabbed from plannode */ + TupleTableSlot *cr_slot; + + Assert(node != (Sort *) NULL); + Assert(PS(node) != (Psortstate *) NULL); + lasttuple = NULL; nextrun = NULL; foundeor = false; for ( ; ; ) { - while (LACKMEM() && Tuples != NULL) { + while (LACKMEM(node) && PS(node)->Tuples != NULL) { if (lasttuple != NULL) { - FREEMEM(lasttuple->t_len); + FREEMEM(node,lasttuple->t_len); FREE(lasttuple); TRACEMEM(createrun); } - lasttuple = tup = gettuple(&Tuples, &junk); - PUTTUP(tup, file); + lasttuple = tup = gettuple(&PS(node)->Tuples, &junk, + &PS(node)->treeContext); + if (! PS(node)->using_tape_files) + inittapes(node); + PUTTUP(node, tup, PS(node)->Tape->tp_file); TRACEOUT(createrun, tup); } - if (LACKMEM()) + if (LACKMEM(node)) break; - btup = heap_getnext(sdesc, 0, &b); - if (!HeapTupleIsValid(btup)) { + + /* About to call ExecProcNode, it can mess up the state if it + * eventually calls another Sort node. So must stow it away here for + * the meantime. -Rex 2.2.1995 + */ + + cr_slot = ExecProcNode(outerPlan((Plan *)node), (Plan *)node); + + if (TupIsNull(cr_slot)) { foundeor = true; break; } + else { + tup = tuplecopy(cr_slot->val); + ExecClearTuple(cr_slot); + PS(node)->tupcount++; + cr_tuples++; + } + IncrProcessed(); - tup = tuplecopy(btup, sdesc->rs_rd, b); - USEMEM(tup->t_len); + USEMEM(node,tup->t_len); TRACEMEM(createrun); - if (lasttuple != NULL && tuplecmp(tup, lasttuple)) - puttuple(&nextrun, tup, 0); + if (lasttuple != NULL && tuplecmp(tup, lasttuple, + &PS(node)->treeContext)) + puttuple(&nextrun, tup, 0, &PS(node)->treeContext); else - puttuple(&Tuples, tup, 0); - ReleaseBuffer(b); + puttuple(&PS(node)->Tuples, tup, 0, &PS(node)->treeContext); } if (lasttuple != NULL) { - FREEMEM(lasttuple->t_len); + FREEMEM(node,lasttuple->t_len); FREE(lasttuple); TRACEMEM(createrun); } - dumptuples(file); - ENDRUN(file); + dumptuples(node); + if (PS(node)->using_tape_files) + ENDRUN(file); /* delimit the end of the run */ - Tuples = nextrun; + PS(node)->Tuples = nextrun; + + /* if we did not see any tuples, mark empty */ + *empty = (cr_tuples > 0) ? false : true; + return((bool)! foundeor); /* XXX - works iff bool is {0,1} */ } @@ -339,10 +416,10 @@ createrun(HeapScanDesc sdesc, FILE *file) * tuplecopy - see also tuple.c:palloctup() * * This should eventually go there under that name? And this will - * then use malloc directly (see version -r1.2). + * then use palloc directly (see version -r1.2). */ HeapTuple -tuplecopy(HeapTuple tup, Relation rdesc, Buffer b) +tuplecopy(HeapTuple tup) { HeapTuple rettup; @@ -362,18 +439,22 @@ tuplecopy(HeapTuple tup, Relation rdesc, Buffer b) * file of tuples in order */ FILE * -mergeruns() +mergeruns(Sort *node) { - register struct tape *tp; - - tp = Tape + TapeRange; - merge(tp); + register struct tape *tp; + + Assert(node != (Sort *) NULL); + Assert(PS(node) != (Psortstate *) NULL); + Assert(PS(node)->using_tape_files == true); + + tp = PS(node)->Tape + PS(node)->TapeRange; + merge(node, tp); rewind(tp->tp_file); - while (--Level != 0) { + while (--PS(node)->Level != 0) { tp = tp->tp_prev; rewind(tp->tp_file); - /* resettape(tp->tp_file); -not sufficient */ - merge(tp); + /* resettape(tp->tp_file); -not sufficient */ + merge(node, tp); rewind(tp->tp_file); } return(tp->tp_file); @@ -384,7 +465,7 @@ mergeruns() * (polyphase merge Alg.D(D5)--Knuth, Vol.3, p271) */ void -merge(struct tape *dest) +merge(Sort *node, struct tape *dest) { register HeapTuple tup; register struct tape *lasttp; /* (TAPE[P]) */ @@ -396,6 +477,10 @@ merge(struct tape *dest) short fromtape; long tuplen; + Assert(node != (Sort *) NULL); + Assert(PS(node) != (Psortstate *) NULL); + Assert(PS(node)->using_tape_files == true); + lasttp = dest->tp_prev; times = lasttp->tp_fib; for (tp = lasttp ; tp != dest; tp = tp->tp_prev) @@ -403,95 +488,217 @@ merge(struct tape *dest) tp->tp_fib += times; /* Tape[].tp_fib (A[]) is set to proper exit values */ - if (TotalDummy < TapeRange) /* no complete dummy runs */ + if (PS(node)->TotalDummy < PS(node)->TapeRange)/* no complete dummy runs */ outdummy = 0; else { - outdummy = TotalDummy; /* a large positive number */ + outdummy = PS(node)->TotalDummy; /* a large positive number */ for (tp = lasttp; tp != dest; tp = tp->tp_prev) if (outdummy > tp->tp_dummy) outdummy = tp->tp_dummy; for (tp = lasttp; tp != dest; tp = tp->tp_prev) tp->tp_dummy -= outdummy; tp->tp_dummy += outdummy; - TotalDummy -= outdummy * TapeRange; + PS(node)->TotalDummy -= outdummy * PS(node)->TapeRange; /* do not add the outdummy runs yet */ times -= outdummy; } destfile = dest->tp_file; - while (times-- != 0) { /* merge one run */ + while (times-- != 0) { /* merge one run */ tuples = NULL; - if (TotalDummy == 0) + if (PS(node)->TotalDummy == 0) for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev) { GETLEN(tuplen, tp->tp_file); tup = ALLOCTUP(tuplen); - USEMEM(tuplen); + USEMEM(node,tuplen); TRACEMEM(merge); SETTUPLEN(tup, tuplen); - GETTUP(tup, tuplen, tp->tp_file); - puttuple(&tuples, tup, TAPENO(tp)); + GETTUP(node, tup, tuplen, tp->tp_file); + puttuple(&tuples, tup, tp - PS(node)->Tape, + &PS(node)->treeContext); } else { for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev) { if (tp->tp_dummy != 0) { tp->tp_dummy--; - TotalDummy--; + PS(node)->TotalDummy--; } else { GETLEN(tuplen, tp->tp_file); tup = ALLOCTUP(tuplen); - USEMEM(tuplen); + USEMEM(node,tuplen); TRACEMEM(merge); SETTUPLEN(tup, tuplen); - GETTUP(tup, tuplen, tp->tp_file); - puttuple(&tuples, tup, TAPENO(tp)); + GETTUP(node, tup, tuplen, tp->tp_file); + puttuple(&tuples, tup, tp - PS(node)->Tape, + &PS(node)->treeContext); } } } while (tuples != NULL) { /* possible optimization by using count in tuples */ - tup = gettuple(&tuples, &fromtape); - PUTTUP(tup, destfile); - FREEMEM(tup->t_len); + tup = gettuple(&tuples, &fromtape, &PS(node)->treeContext); + PUTTUP(node, tup, destfile); + FREEMEM(node,tup->t_len); FREE(tup); TRACEMEM(merge); - GETLEN(tuplen, Tape[fromtape].tp_file); + GETLEN(tuplen, PS(node)->Tape[fromtape].tp_file); if (tuplen == 0) ; else { tup = ALLOCTUP(tuplen); - USEMEM(tuplen); + USEMEM(node,tuplen); TRACEMEM(merge); SETTUPLEN(tup, tuplen); - GETTUP(tup, tuplen, Tape[fromtape].tp_file); - puttuple(&tuples, tup, fromtape); + GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_file); + puttuple(&tuples, tup, fromtape, &PS(node)->treeContext); } - } + } ENDRUN(destfile); } - TotalDummy += outdummy; + PS(node)->TotalDummy += outdummy; } /* - * endpsort - creates the new relation and unlinks the tape files + * dumptuples - stores all the tuples in tree into file */ void -endpsort(Relation rdesc, FILE *file) +dumptuples(Sort *node) { - register struct tape *tp; - register HeapTuple tup; - long tuplen; + register struct leftist *tp; + register struct leftist *newp; + struct leftist **treep = &PS(node)->Tuples; + LeftistContext context = &PS(node)->treeContext; + HeapTuple tup; + int memtupindex = 0; + + if (! PS(node)->using_tape_files) { + Assert(PS(node)->memtuples == NULL); + PS(node)->memtuples = palloc(PS(node)->tupcount * sizeof(HeapTuple)); + } - if (! feof(file)) - while (GETLEN(tuplen, file) && tuplen != 0) { - tup = ALLOCTUP(tuplen); - SortMemory += tuplen; - SETTUPLEN(tup, tuplen); - GETTUP(tup, tuplen, file); - heap_insert(rdesc, tup); + tp = *treep; + while (tp != NULL) { + tup = tp->lt_tuple; + if (tp->lt_dist == 1) /* lt_right == NULL */ + newp = tp->lt_left; + else + newp = lmerge(tp->lt_left, tp->lt_right, context); + FREEMEM(node,sizeof (struct leftist)); + FREE(tp); + if (PS(node)->using_tape_files) { + PUTTUP(node, tup, PS(node)->Tape->tp_file); + FREEMEM(node,tup->t_len); FREE(tup); - SortMemory -= tuplen; } - for (tp = Tape + TapeRange; tp >= Tape; tp--) - destroytape(tp->tp_file); + else + PS(node)->memtuples[memtupindex++] = tup; + + tp = newp; + } + *treep = NULL; +} + +/* + * psort_grabtuple - gets a tuple from the sorted file and returns it. + * If there are no tuples left, returns NULL. + * Should not call psort_end unless this has returned + * a NULL indicating the last tuple has been processed. + */ +HeapTuple +psort_grabtuple(Sort *node) +{ + register HeapTuple tup; + long tuplen; + + Assert(node != (Sort *) NULL); + Assert(PS(node) != (Psortstate *) NULL); + + if (PS(node)->using_tape_files == true) { + if (!feof(PS(node)->psort_grab_file)) { + if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0) { + tup = (HeapTuple)palloc((unsigned)tuplen); + SETTUPLEN(tup, tuplen); + GETTUP(node, tup, tuplen, PS(node)->psort_grab_file); + + /* Update current merged sort file position */ + PS(node)->psort_current += tuplen; + + return tup; + } + else + return NULL; + } + else + return NULL; + } + else { + if (PS(node)->psort_current < PS(node)->tupcount) + return PS(node)->memtuples[PS(node)->psort_current++]; + else + return NULL; + } +} + +/* + * psort_markpos - saves current position in the merged sort file + */ +void +psort_markpos(Sort *node) +{ + Assert(node != (Sort *) NULL); + Assert(PS(node) != (Psortstate *) NULL); + + PS(node)->psort_saved = PS(node)->psort_current; +} + +/* + * psort_restorepos- restores current position in merged sort file to + * last saved position + */ +void +psort_restorepos(Sort *node) +{ + Assert(node != (Sort *) NULL); + Assert(PS(node) != (Psortstate *) NULL); + + if (PS(node)->using_tape_files == true) + fseek(PS(node)->psort_grab_file, PS(node)->psort_saved, SEEK_SET); + PS(node)->psort_current = PS(node)->psort_saved; +} + +/* + * psort_end - unlinks the tape files, and cleans up. Should not be + * called unless psort_grabtuple has returned a NULL. + */ +void +psort_end(Sort *node) +{ + register struct tape *tp; + + if (!node->cleaned) { + Assert(node != (Sort *) NULL); +/* Assert(PS(node) != (Psortstate *) NULL); */ + + /* + * I'm changing this because if we are sorting a relation + * with no tuples, psortstate is NULL. + */ + if (PS(node) != (Psortstate *) NULL) { + if (PS(node)->using_tape_files == true) + for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--) + destroytape(tp->tp_file); + else if (PS(node)->memtuples) + pfree(PS(node)->memtuples); + + NDirectFileRead += + (int)ceil((double)PS(node)->BytesRead / BLCKSZ); + NDirectFileWrite += + (int)ceil((double)PS(node)->BytesWritten / BLCKSZ); + + pfree((void *)node->psortstate); + + node->cleaned = TRUE; + } + } } /* @@ -522,26 +729,34 @@ static char Tempfile[MAXPGPATH] = TEMPDIR; FILE * gettape() { - register struct tapelst *tp; - FILE *file; - static int tapeinit = 0; + register struct tapelst *tp; + FILE *file; + static int tapeinit = 0; + char *mktemp(); + static unsigned int uniqueFileId = 0; + extern int errno; + char uniqueName[MAXPGPATH]; tp = (struct tapelst *)palloc((unsigned)sizeof (struct tapelst)); - if (!tapeinit) { - Tempfile[sizeof (TEMPDIR) - 1] = '/'; - memmove(Tempfile + sizeof(TEMPDIR), TAPEEXT, sizeof (TAPEEXT)); - tapeinit = 1; - } - tp->tl_name = palloc((unsigned)sizeof(Tempfile)); + + sprintf(uniqueName, "%spg_psort.%d.%d", TEMPDIR, getpid(), uniqueFileId); + uniqueFileId++; + + tapeinit = 1; + + tp->tl_name = palloc((unsigned)sizeof(uniqueName)); + /* - * now, copy template with final null into malloc'd space + * now, copy template with final null into palloc'd space */ - memmove(tp->tl_name, Tempfile, sizeof (TEMPDIR) + sizeof (TAPEEXT)); - mktemp(tp->tl_name); + + memmove(tp->tl_name, uniqueName, strlen(uniqueName)); + AllocateFile(); file = fopen(tp->tl_name, "w+"); if (file == NULL) { + elog(NOTICE, "psort: gettape: fopen returned error code %i", errno); /* XXX this should not happen */ FreeFile(); FREE(tp->tl_name); |