Improve performance of our private version of qsort. Per recent testing,
authorTom Lane <tgl@sss.pgh.pa.us>
Tue, 21 Mar 2006 19:49:15 +0000 (19:49 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Tue, 21 Mar 2006 19:49:15 +0000 (19:49 +0000)
the logic it contained to switch to insertion sort for near-sorted input was
in fact a big loss, because it could fairly easily be fooled into applying
insertion sort to large subfiles that weren't all that well ordered.  Remove
that, and instead add a simple check for already-perfectly-sorted input, as
per suggestion from Dann Corbit.  This adds at worst O(N*lgN) overhead, and
usually far less, while sometimes allowing a subfile sort to finish in O(N)
time.  Preliminary testing says this is an improvement over the basic
Bentley & McIlroy code for many nonrandom inputs, and it costs almost
nothing when the input is random.

src/port/qsort.c

index 71c1f155ddd388f72ca134c1763fbf118b1f842d..a7aa8cea2a9fa38c221e79e57f6a421f4b06ebaa 100644 (file)
@@ -2,8 +2,10 @@
  * Modifications from vanilla NetBSD source:
  *   Add do ... while() macro fix
  *   Remove __inline, _DIAGASSERTs, __P
+ *   Remove ill-considered "swap_cnt" switch to insertion sort,
+ *   in favor of a simple check for presorted input.
  *
- * $PostgreSQL: pgsql/src/port/qsort.c,v 1.8 2005/10/15 02:49:51 momjian Exp $
+ * $PostgreSQL: pgsql/src/port/qsort.c,v 1.9 2006/03/21 19:49:15 tgl Exp $
  */
 
 /* $NetBSD: qsort.c,v 1.13 2003/08/07 16:43:42 agc Exp $   */
@@ -47,7 +49,11 @@ static void swapfunc(char *, char *, size_t, int);
 #define min(a, b)  ((a) < (b) ? (a) : (b))
 
 /*
- * Qsort routine from Bentley & McIlroy's "Engineering a Sort Function".
+ * Qsort routine based on J. L. Bentley and M. D. McIlroy,
+ * "Engineering a sort function",
+ * Software--Practice and Experience 23 (1993) 1249-1265.
+ * We have modified their original by adding a check for already-sorted input,
+ * which seems to be a win per discussions on pgsql-hackers around 2006-03-21.
  */
 #define swapcode(TYPE, parmi, parmj, n) \
 do {       \
@@ -116,10 +122,9 @@ int            (*cmp) (const void *, const void *);
    int         d,
                r,
                swaptype,
-               swap_cnt;
+               presorted;
 
 loop:SWAPINIT(a, es);
-   swap_cnt = 0;
    if (n < 7)
    {
        for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
@@ -128,6 +133,17 @@ loop:SWAPINIT(a, es);
                swap(pl, pl - es);
        return;
    }
+   presorted = 1;
+   for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
+   {
+       if (cmp(pm - es, pm) > 0)
+       {
+           presorted = 0;
+           break;
+       }
+   }
+   if (presorted)
+       return;
    pm = (char *) a + (n / 2) * es;
    if (n > 7)
    {
@@ -144,7 +160,6 @@ loop:SWAPINIT(a, es);
    }
    swap(a, pm);
    pa = pb = (char *) a + es;
-
    pc = pd = (char *) a + (n - 1) * es;
    for (;;)
    {
@@ -152,7 +167,6 @@ loop:SWAPINIT(a, es);
        {
            if (r == 0)
            {
-               swap_cnt = 1;
                swap(pa, pb);
                pa += es;
            }
@@ -162,7 +176,6 @@ loop:SWAPINIT(a, es);
        {
            if (r == 0)
            {
-               swap_cnt = 1;
                swap(pc, pd);
                pd -= es;
            }
@@ -171,19 +184,9 @@ loop:SWAPINIT(a, es);
        if (pb > pc)
            break;
        swap(pb, pc);
-       swap_cnt = 1;
        pb += es;
        pc -= es;
    }
-   if (swap_cnt == 0)
-   {                           /* Switch to insertion sort */
-       for (pm = (char *) a + es; pm < (char *) a + n * es; pm += es)
-           for (pl = pm; pl > (char *) a && cmp(pl - es, pl) > 0;
-                pl -= es)
-               swap(pl, pl - es);
-       return;
-   }
-
    pn = (char *) a + n * es;
    r = min(pa - (char *) a, pb - pa);
    vecswap(a, pb - r, r);