Replace the "New Linear" GiST split algorithm for boxes and points with a
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 6 Oct 2011 07:03:46 +0000 (10:03 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Thu, 6 Oct 2011 07:03:46 +0000 (10:03 +0300)
new double-sorting algorithm. The new algorithm produces better quality
trees, making searches faster.

Alexander Korotkov

src/backend/access/gist/gistproc.c

index 43c4b1251b1301b0da1eec863fcc7b4160653179..f7eb9412f90fb53056683cd3dd04d24652b940ee 100644 (file)
@@ -27,6 +27,9 @@ static double size_box(Datum dbox);
 static bool rtree_internal_consistent(BOX *key, BOX *query,
                          StrategyNumber strategy);
 
+/* Minimum accepted ratio of split */
+#define LIMIT_RATIO 0.3
+
 
 /**************************************************
  * Box ops
@@ -49,30 +52,6 @@ rt_box_union(PG_FUNCTION_ARGS)
    PG_RETURN_BOX_P(n);
 }
 
-static Datum
-rt_box_inter(PG_FUNCTION_ARGS)
-{
-   BOX        *a = PG_GETARG_BOX_P(0);
-   BOX        *b = PG_GETARG_BOX_P(1);
-   BOX        *n;
-
-   n = (BOX *) palloc(sizeof(BOX));
-
-   n->high.x = Min(a->high.x, b->high.x);
-   n->high.y = Min(a->high.y, b->high.y);
-   n->low.x = Max(a->low.x, b->low.x);
-   n->low.y = Max(a->low.y, b->low.y);
-
-   if (n->high.x < n->low.x || n->high.y < n->low.y)
-   {
-       pfree(n);
-       /* Indicate "no intersection" by returning NULL pointer */
-       n = NULL;
-   }
-
-   PG_RETURN_BOX_P(n);
-}
-
 /*
  * The GiST Consistent method for boxes
  *
@@ -194,86 +173,6 @@ gist_box_penalty(PG_FUNCTION_ARGS)
    PG_RETURN_POINTER(result);
 }
 
-static void
-chooseLR(GIST_SPLITVEC *v,
-        OffsetNumber *list1, int nlist1, BOX *union1,
-        OffsetNumber *list2, int nlist2, BOX *union2)
-{
-   bool        firstToLeft = true;
-
-   if (v->spl_ldatum_exists || v->spl_rdatum_exists)
-   {
-       if (v->spl_ldatum_exists && v->spl_rdatum_exists)
-       {
-           BOX         LRl = *union1,
-                       LRr = *union2;
-           BOX         RLl = *union2,
-                       RLr = *union1;
-           double      sizeLR,
-                       sizeRL;
-
-           adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum));
-           adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum));
-           adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum));
-           adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum));
-
-           sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)));
-           sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)));
-
-           if (sizeLR > sizeRL)
-               firstToLeft = false;
-
-       }
-       else
-       {
-           float       p1,
-                       p2;
-           GISTENTRY   oldUnion,
-                       addon;
-
-           gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum,
-                         NULL, NULL, InvalidOffsetNumber, FALSE);
-
-           gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
-           DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1));
-           gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
-           DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2));
-
-           if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2))
-               firstToLeft = false;
-       }
-   }
-
-   if (firstToLeft)
-   {
-       v->spl_left = list1;
-       v->spl_right = list2;
-       v->spl_nleft = nlist1;
-       v->spl_nright = nlist2;
-       if (v->spl_ldatum_exists)
-           adjustBox(union1, DatumGetBoxP(v->spl_ldatum));
-       v->spl_ldatum = BoxPGetDatum(union1);
-       if (v->spl_rdatum_exists)
-           adjustBox(union2, DatumGetBoxP(v->spl_rdatum));
-       v->spl_rdatum = BoxPGetDatum(union2);
-   }
-   else
-   {
-       v->spl_left = list2;
-       v->spl_right = list1;
-       v->spl_nleft = nlist2;
-       v->spl_nright = nlist1;
-       if (v->spl_ldatum_exists)
-           adjustBox(union2, DatumGetBoxP(v->spl_ldatum));
-       v->spl_ldatum = BoxPGetDatum(union2);
-       if (v->spl_rdatum_exists)
-           adjustBox(union1, DatumGetBoxP(v->spl_rdatum));
-       v->spl_rdatum = BoxPGetDatum(union1);
-   }
-
-   v->spl_ldatum_exists = v->spl_rdatum_exists = false;
-}
-
 /*
  * Trivial split: half of entries will be placed on one page
  * and another half - to another
@@ -338,199 +237,603 @@ fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
 }
 
 /*
- * The GiST PickSplit method
+ * Represents information about an entry that can be placed to either group
+ * without affecting overlap over selected axis ("common entry").
+ */
+typedef struct
+{
+   /* Index of entry in the initial array */
+   int         index;
+   /* Delta between penalties of entry insertion into different groups */
+   double      delta;
+}  CommonEntry;
+
+/*
+ * Context for g_box_consider_split. Contains information about currently
+ * selected split and some general information.
+ */
+typedef struct
+{
+   int         entriesCount;   /* total number of entries being split */
+   BOX         boundingBox;    /* minimum bounding box across all entries */
+
+   /* Information about currently selected split follows */
+
+   bool        first;          /* true if no split was selected yet */
+
+   double      leftUpper;      /* upper bound of left interval */
+   double      rightLower;     /* lower bound of right interval */
+
+   float4      ratio;
+   float4      overlap;
+   int         dim;            /* axis of this split */
+   double      range;          /* width of general MBR projection to the
+                                * selected axis */
+}  ConsiderSplitContext;
+
+/*
+ * Interval represents projection of box to axis.
+ */
+typedef struct
+{
+   double      lower,
+               upper;
+}  SplitInterval;
+
+/*
+ * Interval comparison function by lower bound of the interval;
+ */
+static int
+interval_cmp_lower(const void *i1, const void *i2)
+{
+   double      lower1 = ((SplitInterval *) i1)->lower,
+               lower2 = ((SplitInterval *) i2)->lower;
+
+   if (lower1 < lower2)
+       return -1;
+   else if (lower1 > lower2)
+       return 1;
+   else
+       return 0;
+}
+
+/*
+ * Interval comparison function by upper bound of the interval;
+ */
+static int
+interval_cmp_upper(const void *i1, const void *i2)
+{
+   double      upper1 = ((SplitInterval *) i1)->upper,
+               upper2 = ((SplitInterval *) i2)->upper;
+
+   if (upper1 < upper2)
+       return -1;
+   else if (upper1 > upper2)
+       return 1;
+   else
+       return 0;
+}
+
+/*
+ * Replace negative value with zero.
+ */
+static inline float
+non_negative(float val)
+{
+   if (val >= 0.0f)
+       return val;
+   else
+       return 0.0f;
+}
+
+/*
+ * Consider replacement of currently selected split with the better one.
+ */
+static void inline
+g_box_consider_split(ConsiderSplitContext *context, int dimNum,
+                    double rightLower, int minLeftCount,
+                    double leftUpper, int maxLeftCount)
+{
+   int         leftCount,
+               rightCount;
+   float4      ratio,
+               overlap;
+   double      range;
+
+   /*
+    * Calculate entries distribution ratio assuming most uniform distribution
+    * of common entries.
+    */
+   if (minLeftCount >= (context->entriesCount + 1) / 2)
+   {
+       leftCount = minLeftCount;
+   }
+   else
+   {
+       if (maxLeftCount <= context->entriesCount / 2)
+           leftCount = maxLeftCount;
+       else
+           leftCount = context->entriesCount / 2;
+   }
+   rightCount = context->entriesCount - leftCount;
+
+   /*
+    * Ratio of split - quotient between size of lesser group and total
+    * entries count.
+    */
+   ratio = ((float4) Min(leftCount, rightCount)) /
+       ((float4) context->entriesCount);
+
+   if (ratio > LIMIT_RATIO)
+   {
+       bool        selectthis = false;
+
+       /*
+        * The ratio is acceptable, so compare current split with previously
+        * selected one. Between splits of one dimension we search for minimal
+        * overlap (allowing negative values) and minimal ration (between same
+        * overlaps. We switch dimension if find less overlap (non-negative)
+        * or less range with same overlap.
+        */
+       if (dimNum == 0)
+           range = context->boundingBox.high.x - context->boundingBox.low.x;
+       else
+           range = context->boundingBox.high.y - context->boundingBox.low.y;
+
+       overlap = (leftUpper - rightLower) / range;
+
+       /* If there is no previous selection, select this */
+       if (context->first)
+           selectthis = true;
+       else if (context->dim == dimNum)
+       {
+           /*
+            * Within the same dimension, choose the new split if it has a
+            * smaller overlap, or same overlap but better ratio.
+            */
+           if (overlap < context->overlap ||
+               (overlap == context->overlap && ratio > context->ratio))
+               selectthis = true;
+       }
+       else
+       {
+           /*
+            * Across dimensions, choose the new split if it has a smaller
+            * *non-negative* overlap, or same *non-negative* overlap but
+            * bigger range. This condition differs from the one described in
+            * the article. On the datasets where leaf MBRs don't overlap
+            * themselves, non-overlapping splits (i.e. splits which have zero
+            * *non-negative* overlap) are frequently possible. In this case
+            * splits tends to be along one dimension, because most distant
+            * non-overlapping splits (i.e. having lowest negative overlap)
+            * appears to be in the same dimension as in the previous split.
+            * Therefore MBRs appear to be very prolonged along another
+            * dimension, which leads to bad search performance. Using range
+            * as the second split criteria makes MBRs more quadratic. Using
+            * *non-negative* overlap instead of overlap as the first split
+            * criteria gives to range criteria a chance to matter, because
+            * non-overlapping splits are equivalent in this criteria.
+            */
+           if (non_negative(overlap) < non_negative(context->overlap) ||
+               (range > context->range &&
+                non_negative(overlap) <= non_negative(context->overlap)))
+               selectthis = true;
+       }
+
+       if (selectthis)
+       {
+           /* save information about selected split */
+           context->first = false;
+           context->ratio = ratio;
+           context->range = range;
+           context->overlap = overlap;
+           context->rightLower = rightLower;
+           context->leftUpper = leftUpper;
+           context->dim = dimNum;
+       }
+   }
+}
+
+/*
+ * Return increase of original BOX area by new BOX area insertion.
+ */
+static double
+box_penalty(BOX *original, BOX *new)
+{
+   double      union_width,
+               union_height;
+
+   union_width = Max(original->high.x, new->high.x) -
+       Min(original->low.x, new->low.x);
+   union_height = Max(original->high.y, new->high.y) -
+       Min(original->low.y, new->low.y);
+   return union_width * union_height - (original->high.x - original->low.x) *
+       (original->high.y - original->low.y);
+}
+
+/*
+ * Compare common entries by their deltas.
+ */
+static int
+common_entry_cmp(const void *i1, const void *i2)
+{
+   double      delta1 = ((CommonEntry *) i1)->delta,
+               delta2 = ((CommonEntry *) i2)->delta;
+
+   if (delta1 < delta2)
+       return -1;
+   else if (delta1 > delta2)
+       return 1;
+   else
+       return 0;
+}
+
+/*
+ * --------------------------------------------------------------------------
+ * Double sorting split algorithm. This is used for both boxes and points.
  *
- * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
- * C.H.Ang and T.C.Tan
+ * The algorithm finds split of boxes by considering splits along each axis.
+ * Each entry is first projected as an interval on the X-axis, and different
+ * ways to split the intervals into two groups are considered, trying to
+ * minimize the overlap of the groups. Then the same is repeated for the
+ * Y-axis, and the overall best split is chosen. The quality of a split is
+ * determined by overlap along that axis and some other criteria (see
+ * g_box_consider_split).
  *
- * This is used for both boxes and points.
+ * After that, all the entries are divided into three groups:
+ *
+ * 1) Entries which should be placed to the left group
+ * 2) Entries which should be placed to the right group
+ * 3) "Common entries" which can be placed to any of groups without affecting
+ *   of overlap along selected axis.
+ *
+ * The common entries are distributed by minimizing penalty.
+ *
+ * For details see:
+ * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov
+ * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
+ * --------------------------------------------------------------------------
  */
 Datum
 gist_box_picksplit(PG_FUNCTION_ARGS)
 {
    GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
    GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
-   OffsetNumber i;
-   OffsetNumber *listL,
-              *listR,
-              *listB,
-              *listT;
-   BOX        *unionL,
-              *unionR,
-              *unionB,
-              *unionT;
-   int         posL,
-               posR,
-               posB,
-               posT;
-   BOX         pageunion;
-   BOX        *cur;
-   char        direction = ' ';
-   bool        allisequal = true;
-   OffsetNumber maxoff;
-   int         nbytes;
+   OffsetNumber i,
+               maxoff;
+   ConsiderSplitContext context;
+   BOX        *box,
+              *leftBox,
+              *rightBox;
+   int         dim,
+               commonEntriesCount;
+   SplitInterval *intervalsLower,
+              *intervalsUpper;
+   CommonEntry *commonEntries;
+   int         nentries;
+
+   memset(&context, 0, sizeof(ConsiderSplitContext));
 
-   posL = posR = posB = posT = 0;
    maxoff = entryvec->n - 1;
+   nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
 
-   cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key);
-   memcpy((void *) &pageunion, (void *) cur, sizeof(BOX));
+   /* Allocate arrays for intervals along axes */
+   intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
+   intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
 
-   /* find MBR */
-   for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i))
+   /*
+    * Calculate the overall minimum bounding box over all the entries.
+    */
+   for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
    {
-       cur = DatumGetBoxP(entryvec->vector[i].key);
-       if (allisequal && (
-                          pageunion.high.x != cur->high.x ||
-                          pageunion.high.y != cur->high.y ||
-                          pageunion.low.x != cur->low.x ||
-                          pageunion.low.y != cur->low.y
-                          ))
-           allisequal = false;
-
-       adjustBox(&pageunion, cur);
+       box = DatumGetBoxP(entryvec->vector[i].key);
+       if (i == FirstOffsetNumber)
+           context.boundingBox = *box;
+       else
+           adjustBox(&context.boundingBox, box);
    }
 
-   if (allisequal)
+   /*
+    * Iterate over axes for optimal split searching.
+    */
+   context.first = true;       /* nothing selected yet */
+   for (dim = 0; dim < 2; dim++)
    {
+       double      leftUpper,
+                   rightLower;
+       int         i1,
+                   i2;
+
+       /* Project each entry as an interval on the selected axis. */
+       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+       {
+           box = DatumGetBoxP(entryvec->vector[i].key);
+           if (dim == 0)
+           {
+               intervalsLower[i - FirstOffsetNumber].lower = box->low.x;
+               intervalsLower[i - FirstOffsetNumber].upper = box->high.x;
+           }
+           else
+           {
+               intervalsLower[i - FirstOffsetNumber].lower = box->low.y;
+               intervalsLower[i - FirstOffsetNumber].upper = box->high.y;
+           }
+       }
+
+       /*
+        * Make two arrays of intervals: one sorted by lower bound and another
+        * sorted by upper bound.
+        */
+       memcpy(intervalsUpper, intervalsLower,
+              sizeof(SplitInterval) * nentries);
+       qsort(intervalsLower, nentries, sizeof(SplitInterval),
+             interval_cmp_lower);
+       qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+             interval_cmp_upper);
+
+       /*----
+        * The goal is to form a left and right interval, so that every entry
+        * interval is contained by either left or right interval (or both).
+        *
+        * For example, with the intervals (0,1), (1,3), (2,3), (2,4):
+        *
+        * 0 1 2 3 4
+        * +-+
+        *   +---+
+        *     +-+
+        *     +---+
+        *
+        * The left and right intervals are of the form (0,a) and (b,4).
+        * We first consider splits where b is the lower bound of an entry.
+        * We iterate through all entries, and for each b, calculate the
+        * smallest possible a. Then we consider splits where a is the
+        * uppper bound of an entry, and for each a, calculate the greatest
+        * possible b.
+        *
+        * In the above example, the first loop would consider splits:
+        * b=0: (0,1)-(0,4)
+        * b=1: (0,1)-(1,4)
+        * b=2: (0,3)-(2,4)
+        *
+        * And the second loop:
+        * a=1: (0,1)-(1,4)
+        * a=3: (0,3)-(2,4)
+        * a=4: (0,4)-(2,4)
+        */
+
+       /*
+        * Iterate over lower bound of right group, finding smallest possible
+        * upper bound of left group.
+        */
+       i1 = 0;
+       i2 = 0;
+       rightLower = intervalsLower[i1].lower;
+       leftUpper = intervalsUpper[i2].lower;
+       while (true)
+       {
+           /*
+            * Find next lower bound of right group.
+            */
+           while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+           {
+               leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+               i1++;
+           }
+           if (i1 >= nentries)
+               break;
+           rightLower = intervalsLower[i1].lower;
+
+           /*
+            * Find count of intervals which anyway should be placed to the
+            * left group.
+            */
+           while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+               i2++;
+
+           /*
+            * Consider found split.
+            */
+           g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+       }
+
        /*
-        * All entries are the same
+        * Iterate over upper bound of left group finding greates possible
+        * lower bound of right group.
         */
+       i1 = nentries - 1;
+       i2 = nentries - 1;
+       rightLower = intervalsLower[i1].upper;
+       leftUpper = intervalsUpper[i2].upper;
+       while (true)
+       {
+           /*
+            * Find next upper bound of left group.
+            */
+           while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+           {
+               rightLower = Min(rightLower, intervalsUpper[i2].lower);
+               i2--;
+           }
+           if (i2 < 0)
+               break;
+           leftUpper = intervalsUpper[i2].upper;
+
+           /*
+            * Find count of intervals which anyway should be placed to the
+            * right group.
+            */
+           while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+               i1--;
+
+           /*
+            * Consider found split.
+            */
+           g_box_consider_split(&context, dim,
+                                rightLower, i1 + 1, leftUpper, i2 + 1);
+       }
+   }
+
+   /*
+    * If we failed to find any acceptable splits, use trivial split.
+    */
+   if (context.first)
+   {
        fallbackSplit(entryvec, v);
        PG_RETURN_POINTER(v);
    }
 
-   nbytes = (maxoff + 2) * sizeof(OffsetNumber);
-   listL = (OffsetNumber *) palloc(nbytes);
-   listR = (OffsetNumber *) palloc(nbytes);
-   listB = (OffsetNumber *) palloc(nbytes);
-   listT = (OffsetNumber *) palloc(nbytes);
-   unionL = (BOX *) palloc(sizeof(BOX));
-   unionR = (BOX *) palloc(sizeof(BOX));
-   unionB = (BOX *) palloc(sizeof(BOX));
-   unionT = (BOX *) palloc(sizeof(BOX));
-
-#define ADDLIST( list, unionD, pos, num ) do { \
-   if ( pos ) { \
-       if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x  = cur->high.x; \
-       if ( (unionD)->low.x  > cur->low.x  ) (unionD)->low.x   = cur->low.x; \
-       if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y  = cur->high.y; \
-       if ( (unionD)->low.y  > cur->low.y  ) (unionD)->low.y   = cur->low.y; \
-   } else { \
-           memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) );  \
-   } \
-   (list)[pos] = num; \
-   (pos)++; \
-} while(0)
+   /*
+    * Ok, we have now selected the split across one axis.
+    *
+    * While considering the splits, we already determined that there will be
+    * enough entries in both groups to reach the desired ratio, but we did
+    * not memorize which entries go to which group. So determine that now.
+    */
 
-   for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
-   {
-       cur = DatumGetBoxP(entryvec->vector[i].key);
-       if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
-           ADDLIST(listL, unionL, posL, i);
-       else
-           ADDLIST(listR, unionR, posR, i);
-       if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
-           ADDLIST(listB, unionB, posB, i);
-       else
-           ADDLIST(listT, unionT, posT, i);
-   }
+   /* Allocate vectors for results */
+   v->spl_left = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+   v->spl_right = (OffsetNumber *) palloc(nentries * sizeof(OffsetNumber));
+   v->spl_nleft = 0;
+   v->spl_nright = 0;
+
+   /* Allocate bounding boxes of left and right groups */
+   leftBox = palloc0(sizeof(BOX));
+   rightBox = palloc0(sizeof(BOX));
 
-#define LIMIT_RATIO 0.1
-#define _IS_BADRATIO(x,y)  ( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
-#define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
-   /* bad disposition, try to split by centers of boxes  */
-   if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
+   /*
+    * Allocate an array for "common entries" - entries which can be placed to
+    * either group without affecting overlap along selected axis.
+    */
+   commonEntriesCount = 0;
+   commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
+
+   /* Helper macros to place an entry in the left or right group */
+#define PLACE_LEFT(box, off)                   \
+   do {                                        \
+       if (v->spl_nleft > 0)                   \
+           adjustBox(leftBox, box);            \
+       else                                    \
+           *leftBox = *(box);                  \
+       v->spl_left[v->spl_nleft++] = off;      \
+   } while(0)
+
+#define PLACE_RIGHT(box, off)                  \
+   do {                                        \
+       if (v->spl_nright > 0)                  \
+           adjustBox(rightBox, box);           \
+       else                                    \
+           *rightBox = *(box);                 \
+       v->spl_right[v->spl_nright++] = off;    \
+   } while(0)
+
+   /*
+    * Distribute entries which can be distributed unambiguously, and collect
+    * common entries.
+    */
+   for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
    {
-       double      avgCenterX = 0.0,
-                   avgCenterY = 0.0;
-       double      CenterX,
-                   CenterY;
+       double      lower,
+                   upper;
 
-       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+       /*
+        * Get upper and lower bounds along selected axis.
+        */
+       box = DatumGetBoxP(entryvec->vector[i].key);
+       if (context.dim == 0)
        {
-           cur = DatumGetBoxP(entryvec->vector[i].key);
-           avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0;
-           avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0;
+           lower = box->low.x;
+           upper = box->high.x;
        }
-
-       avgCenterX /= maxoff;
-       avgCenterY /= maxoff;
-
-       posL = posR = posB = posT = 0;
-       for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+       else
        {
-           cur = DatumGetBoxP(entryvec->vector[i].key);
-
-           CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0;
-           CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0;
+           lower = box->low.y;
+           upper = box->high.y;
+       }
 
-           if (CenterX < avgCenterX)
-               ADDLIST(listL, unionL, posL, i);
-           else if (CenterX == avgCenterX)
+       if (upper <= context.leftUpper)
+       {
+           /* Fits to the left group */
+           if (lower >= context.rightLower)
            {
-               if (posL > posR)
-                   ADDLIST(listR, unionR, posR, i);
-               else
-                   ADDLIST(listL, unionL, posL, i);
+               /* Fits also to the right group, so "common entry" */
+               commonEntries[commonEntriesCount++].index = i;
            }
            else
-               ADDLIST(listR, unionR, posR, i);
-
-           if (CenterY < avgCenterY)
-               ADDLIST(listB, unionB, posB, i);
-           else if (CenterY == avgCenterY)
            {
-               if (posB > posT)
-                   ADDLIST(listT, unionT, posT, i);
-               else
-                   ADDLIST(listB, unionB, posB, i);
+               /* Doesn't fit to the right group, so join to the left group */
+               PLACE_LEFT(box, i);
            }
-           else
-               ADDLIST(listT, unionT, posT, i);
        }
-
-       if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
+       else
        {
-           fallbackSplit(entryvec, v);
-           PG_RETURN_POINTER(v);
+           /*
+            * Each entry should fit on either left or right group. Since this
+            * entry didn't fit on the left group, it better fit in the right
+            * group.
+            */
+           Assert(lower >= context.rightLower);
+
+           /* Doesn't fit to the left group, so join to the right group */
+           PLACE_RIGHT(box, i);
        }
    }
 
-   /* which split more optimal? */
-   if (Max(posL, posR) < Max(posB, posT))
-       direction = 'x';
-   else if (Max(posL, posR) > Max(posB, posT))
-       direction = 'y';
-   else
+   /*
+    * Distribute "common entries", if any.
+    */
+   if (commonEntriesCount > 0)
    {
-       Datum       interLR = DirectFunctionCall2(rt_box_inter,
-                                                 BoxPGetDatum(unionL),
-                                                 BoxPGetDatum(unionR));
-       Datum       interBT = DirectFunctionCall2(rt_box_inter,
-                                                 BoxPGetDatum(unionB),
-                                                 BoxPGetDatum(unionT));
-       double      sizeLR,
-                   sizeBT;
-
-       sizeLR = size_box(interLR);
-       sizeBT = size_box(interBT);
-
-       if (sizeLR < sizeBT)
-           direction = 'x';
-       else
-           direction = 'y';
-   }
+       /*
+        * Calculate minimum number of entries that must be placed in both
+        * groups, to reach LIMIT_RATIO.
+        */
+       int         m = ceil(LIMIT_RATIO * (double) nentries);
 
-   if (direction == 'x')
-       chooseLR(v,
-                listL, posL, unionL,
-                listR, posR, unionR);
-   else
-       chooseLR(v,
-                listB, posB, unionB,
-                listT, posT, unionT);
+       /*
+        * Calculate delta between penalties of join "common entries" to
+        * different groups.
+        */
+       for (i = 0; i < commonEntriesCount; i++)
+       {
+           box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
+           commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
+                                        box_penalty(rightBox, box));
+       }
+
+       /*
+        * Sort "common entries" by calculated deltas in order to distribute
+        * the most ambiguous entries first.
+        */
+       qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
+
+       /*
+        * Distribute "common entries" between groups.
+        */
+       for (i = 0; i < commonEntriesCount; i++)
+       {
+           box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
+
+           /*
+            * Check if we have to place this entry in either group to achieve
+            * LIMIT_RATIO.
+            */
+           if (v->spl_nleft + (commonEntriesCount - i) <= m)
+               PLACE_LEFT(box, commonEntries[i].index);
+           else if (v->spl_nright + (commonEntriesCount - i) <= m)
+               PLACE_RIGHT(box, commonEntries[i].index);
+           else
+           {
+               /* Otherwise select the group by minimal penalty */
+               if (box_penalty(leftBox, box) < box_penalty(rightBox, box))
+                   PLACE_LEFT(box, commonEntries[i].index);
+               else
+                   PLACE_RIGHT(box, commonEntries[i].index);
+           }
+       }
+   }
 
+   v->spl_ldatum = PointerGetDatum(leftBox);
+   v->spl_rdatum = PointerGetDatum(rightBox);
    PG_RETURN_POINTER(v);
 }