SP-GiST support of the range adjacent operator -|-
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 8 Mar 2013 13:03:19 +0000 (15:03 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 8 Mar 2013 13:03:19 +0000 (15:03 +0200)
Alexander Korotkov, reviewed by Jeff Davis.

src/backend/utils/adt/rangetypes.c
src/backend/utils/adt/rangetypes_spgist.c
src/include/catalog/pg_amop.h
src/include/utils/rangetypes.h
src/test/regress/expected/opr_sanity.out

index c440847916b9ccb70a5755ccc82d0c44e957364a..84a4aca16c097a69af89898b476bda0e19b2d698 100644 (file)
@@ -709,6 +709,64 @@ range_after(PG_FUNCTION_ARGS)
    PG_RETURN_BOOL(range_after_internal(typcache, r1, r2));
 }
 
+/*
+ * Check if two bounds A and B are "adjacent", where A is an upper bound and B
+ * is a lower bound. For the bounds to be adjacent, each subtype value must
+ * satisfy strictly one of the bounds: there are no values which satisfy both
+ * bounds (i.e. less than A and greater than B); and there are no values which
+ * satisfy neither bound (i.e. greater than A and less than B).
+ *
+ * For discrete ranges, we rely on the canonicalization function to see if A..B
+ * normalizes to empty. (If there is no canonicalization function, it's
+ * impossible for such a range to normalize to empty, so we needn't bother to
+ * try.)
+ *
+ * If A == B, the ranges are adjacent only if the bounds have different
+ * inclusive flags (i.e., exactly one of the ranges includes the common
+ * boundary point).
+ *
+ * And if A > B then the ranges are not adjacent in this order.
+ */
+bool
+bounds_adjacent(TypeCacheEntry *typcache, RangeBound boundA, RangeBound boundB)
+{
+   int         cmp;
+
+   Assert(!boundA.lower && boundB.lower);
+
+   cmp = range_cmp_bound_values(typcache, &boundA, &boundB);
+   if (cmp < 0)
+   {
+       RangeType *r;
+
+       /*
+        * Bounds do not overlap; see if there are points in between.
+        */
+
+       /* in a continuous subtype, there are assumed to be points between */
+       if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
+           return false;
+
+       /*
+        * The bounds are of a discrete range type; so make a range A..B and
+        * see if it's empty.
+        */
+
+       /* flip the inclusion flags */
+       boundA.inclusive = !boundA.inclusive;
+       boundB.inclusive = !boundB.inclusive;
+       /* change upper/lower labels to avoid Assert failures */
+       boundA.lower = true;
+       boundB.lower = false;
+       r = make_range(typcache, &boundA, &boundB, false);
+       return RangeIsEmpty(r);
+   }
+   else if (cmp == 0)
+       return boundA.inclusive != boundB.inclusive;
+   else
+       return false;       /* bounds overlap */
+}
+
 /* adjacent to (but not overlapping)? (internal version) */
 bool
 range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
@@ -719,8 +777,6 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
                upper2;
    bool        empty1,
                empty2;
-   RangeType  *r3;
-   int         cmp;
 
    /* Different types should be prevented by ANYRANGE matching rules */
    if (RangeTypeGetOid(r1) != RangeTypeGetOid(r2))
@@ -734,62 +790,11 @@ range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
        return false;
 
    /*
-    * Given two ranges A..B and C..D, where B < C, the ranges are adjacent if
-    * and only if the range B..C is empty, where inclusivity of these two
-    * bounds is inverted compared to the original bounds.  For discrete
-    * ranges, we have to rely on the canonicalization function to normalize
-    * B..C to empty if it contains no elements of the subtype.  (If there is
-    * no canonicalization function, it's impossible for such a range to
-    * normalize to empty, so we needn't bother to try.)
-    *
-    * If B == C, the ranges are adjacent only if these bounds have different
-    * inclusive flags (i.e., exactly one of the ranges includes the common
-    * boundary point).
-    *
-    * And if B > C then the ranges cannot be adjacent in this order, but we
-    * must consider the other order (i.e., check D <= A).
+    * Given two ranges A..B and C..D, the ranges are adjacent if and only if
+    * B is adjacent to C, or D is adjacent to A.
     */
-   cmp = range_cmp_bound_values(typcache, &upper1, &lower2);
-   if (cmp < 0)
-   {
-       /* in a continuous subtype, there are assumed to be points between */
-       if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
-           return (false);
-       /* flip the inclusion flags */
-       upper1.inclusive = !upper1.inclusive;
-       lower2.inclusive = !lower2.inclusive;
-       /* change upper/lower labels to avoid Assert failures */
-       upper1.lower = true;
-       lower2.lower = false;
-       r3 = make_range(typcache, &upper1, &lower2, false);
-       return RangeIsEmpty(r3);
-   }
-   if (cmp == 0)
-   {
-       return (upper1.inclusive != lower2.inclusive);
-   }
-
-   cmp = range_cmp_bound_values(typcache, &upper2, &lower1);
-   if (cmp < 0)
-   {
-       /* in a continuous subtype, there are assumed to be points between */
-       if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
-           return (false);
-       /* flip the inclusion flags */
-       upper2.inclusive = !upper2.inclusive;
-       lower1.inclusive = !lower1.inclusive;
-       /* change upper/lower labels to avoid Assert failures */
-       upper2.lower = true;
-       lower1.lower = false;
-       r3 = make_range(typcache, &upper2, &lower1, false);
-       return RangeIsEmpty(r3);
-   }
-   if (cmp == 0)
-   {
-       return (upper2.inclusive != lower1.inclusive);
-   }
-
-   return false;
+   return (bounds_adjacent(typcache, upper1, lower2) ||
+           bounds_adjacent(typcache, upper2, lower1));
 }
 
 /* adjacent to (but not overlapping)? */
index d395ef47180d1f5032b001392337cce42cbb2ad9..9a7f20d9f37ec00b77c199004f7926a92c1d0319 100644 (file)
@@ -305,6 +305,16 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
    int         which;
    int         i;
 
+   /*
+    * For adjacent search we need also previous centroid (if any) to improve
+    * the precision of the consistent check. In this case needPrevious flag is
+    * set and centroid is passed into reconstructedValues. This is not the
+    * intended purpose of reconstructedValues (because we already have the
+    * full value available at the leaf), but it's a convenient place to store
+    * state while traversing the tree.
+    */
+   bool        needPrevious = false;
+
    if (in->allTheSame)
    {
        /* Report that all nodes should be visited */
@@ -351,6 +361,7 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
                case RANGESTRAT_OVERLAPS:
                case RANGESTRAT_OVERRIGHT:
                case RANGESTRAT_AFTER:
+               case RANGESTRAT_ADJACENT:
                    /* These strategies return false if any argument is empty */
                    if (empty)
                        which = 0;
@@ -435,6 +446,9 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
            /* Are the restrictions on range bounds inclusive? */
            bool        inclusive = true;
            bool        strictEmpty = true;
+           int         cmp,
+                       which1,
+                       which2;
 
            strategy = in->scankeys[i].sk_strategy;
 
@@ -522,6 +536,118 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
                    inclusive = false;
                    break;
 
+               case RANGESTRAT_ADJACENT:
+                   if (empty)
+                       break;              /* Skip to strictEmpty check. */
+
+                   /*
+                    * which1 is bitmask for possibility to be adjacent with
+                    * lower bound of argument. which2 is bitmask for
+                    * possibility to be adjacent with upper bound of argument.
+                    */
+                   which1 = which2 = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
+
+                   /*
+                    * Previously selected quadrant could exclude possibility
+                    * for lower or upper bounds to be adjacent. Deserialize
+                    * previous centroid range if present for checking this.
+                    */
+                   if (in->reconstructedValue != (Datum) 0)
+                   {
+                       RangeType  *prevCentroid;
+                       RangeBound  prevLower,
+                                   prevUpper;
+                       bool        prevEmpty;
+                       int         cmp1,
+                                   cmp2;
+
+                       prevCentroid = DatumGetRangeType(in->reconstructedValue);
+                       range_deserialize(typcache, prevCentroid,
+                                         &prevLower, &prevUpper, &prevEmpty);
+
+                       /*
+                        * Check if lower bound of argument is not in a
+                        * quadrant we visited in the previous step.
+                        */
+                       cmp1 = range_cmp_bounds(typcache, &lower, &prevUpper);
+                       cmp2 = range_cmp_bounds(typcache, &centroidUpper,
+                                               &prevUpper);
+                       if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
+                           which1 = 0;
+
+                       /*
+                        * Check if upper bound of argument is not in a
+                        * quadrant we visited in the previous step.
+                        */
+                       cmp1 = range_cmp_bounds(typcache, &upper, &prevLower);
+                       cmp2 = range_cmp_bounds(typcache, &centroidLower,
+                                               &prevLower);
+                       if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
+                           which2 = 0;
+                   }
+
+                   if (which1)
+                   {
+                       /*
+                        * For a range's upper bound to be adjacent to the
+                        * argument's lower bound, it will be found along the
+                        * line adjacent to (and just below) Y=lower.
+                        * Therefore, if the argument's lower bound is less
+                        * than the centroid's upper bound, the line falls in
+                        * quadrants 2 and 3; if greater, the line falls in
+                        * quadrants 1 and 4.
+                        *
+                        * The above is true even when the argument's lower
+                        * bound is greater and adjacent to the centroid's
+                        * upper bound. If the argument's lower bound is
+                        * greater than the centroid's upper bound, then the
+                        * lowest value that an adjacent range could have is
+                        * that of the centroid's upper bound, which still
+                        * falls in quadrants 1 and 4.
+                        *
+                        * In the edge case, where the argument's lower bound
+                        * is equal to the cetroid's upper bound, there may be
+                        * adjacent ranges in any quadrant.
+                        */
+                       cmp = range_cmp_bounds(typcache, &lower,
+                                              &centroidUpper);
+                       if (cmp < 0)
+                           which1 &= (1 << 2) | (1 << 3);
+                       else if (cmp > 0)
+                           which1 &= (1 << 1) | (1 << 4);
+                   }
+
+                   if (which2)
+                   {
+                       /*
+                        * For a range's lower bound to be adjacent to the
+                        * argument's upper bound, it will be found along the
+                        * line adjacent to (and just right of)
+                        * X=upper. Therefore, if the argument's upper bound is
+                        * less than (and not adjacent to) the centroid's upper
+                        * bound, the line falls in quadrants 3 and 4; if
+                        * greater or equal to, the line falls in quadrants 1
+                        * and 2.
+                        *
+                        * The edge case is when the argument's upper bound is
+                        * less than and adjacent to the centroid's lower
+                        * bound. In that case, adjacent ranges may be in any
+                        * quadrant.
+                        */
+                       cmp = range_cmp_bounds(typcache, &lower,
+                                              &centroidUpper);
+                       if (cmp < 0 &&
+                           !bounds_adjacent(typcache, upper, centroidLower))
+                           which1 &= (1 << 3) | (1 << 4);
+                       else if (cmp > 0)
+                           which1 &= (1 << 1) | (1 << 2);
+                   }
+
+                   which &= which1 | which2;
+
+                   needPrevious = true;
+                   break;
+
                case RANGESTRAT_CONTAINS:
                    /*
                     * Non-empty range A contains non-empty range B if lower
@@ -652,11 +778,18 @@ spg_range_quad_inner_consistent(PG_FUNCTION_ARGS)
 
    /* We must descend into the quadrant(s) identified by 'which' */
    out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
+   if (needPrevious)
+       out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
    out->nNodes = 0;
    for (i = 1; i <= in->nNodes; i++)
    {
        if (which & (1 << i))
+       {
+           /* Save previous prefix if needed */
+           if (needPrevious)
+               out->reconstructedValues[out->nNodes] = in->prefixDatum;
            out->nodeNumbers[out->nNodes++] = i - 1;
+       }
    }
 
    PG_RETURN_VOID();
@@ -713,6 +846,10 @@ spg_range_quad_leaf_consistent(PG_FUNCTION_ARGS)
                res = range_after_internal(typcache, leafRange,
                                           DatumGetRangeType(keyDatum));
                break;
+           case RANGESTRAT_ADJACENT:
+               res = range_adjacent_internal(typcache, leafRange,
+                                          DatumGetRangeType(keyDatum));
+               break;
            case RANGESTRAT_CONTAINS:
                res = range_contains_internal(typcache, leafRange,
                                              DatumGetRangeType(keyDatum));
index 5d451e36c11b9a61f77e1d80cf224889deacafd2..d2003487472a0c1ba67d88cb3e9b70f2857e6a4b 100644 (file)
@@ -775,6 +775,7 @@ DATA(insert (   3474   3831 3831 2 s    3895 4000 0 ));
 DATA(insert (  3474   3831 3831 3 s    3888 4000 0 ));
 DATA(insert (  3474   3831 3831 4 s    3896 4000 0 ));
 DATA(insert (  3474   3831 3831 5 s    3894 4000 0 ));
+DATA(insert (  3474   3831 3831 6 s    3897 4000 0 ));
 DATA(insert (  3474   3831 3831 7 s    3890 4000 0 ));
 DATA(insert (  3474   3831 3831 8 s    3892 4000 0 ));
 DATA(insert (  3474   3831 2283 16 s   3889 4000 0 ));
index 67b1a7ce61101cc9fb4749fb977b52ba8b6471d3..f6b941d3acc7bcaaf890e69c93c8cdfe29896d0f 100644 (file)
@@ -201,6 +201,8 @@ extern int range_cmp_bounds(TypeCacheEntry *typcache, RangeBound *b1,
                 RangeBound *b2);
 extern int range_cmp_bound_values(TypeCacheEntry *typcache, RangeBound *b1,
                       RangeBound *b2);
+extern bool bounds_adjacent(TypeCacheEntry *typcache, RangeBound bound1,
+               RangeBound bound2);
 extern RangeType *make_empty_range(TypeCacheEntry *typcache);
 
 /* GiST support (in rangetypes_gist.c) */
index 6faaf4272f8fe2f64b85dd98fa20d260c79d4cbf..256b7196d0d67ae0406af84c353483b6765e7538 100644 (file)
@@ -1076,6 +1076,7 @@ ORDER BY 1, 2, 3;
        4000 |            4 | ~>=~
        4000 |            5 | >>
        4000 |            5 | ~>~
+       4000 |            6 | -|-
        4000 |            6 | ~=
        4000 |            7 | @>
        4000 |            8 | <@
@@ -1087,7 +1088,7 @@ ORDER BY 1, 2, 3;
        4000 |           15 | >
        4000 |           16 | @>
        4000 |           18 | =
-(61 rows)
+(62 rows)
 
 -- Check that all opclass search operators have selectivity estimators.
 -- This is not absolutely required, but it seems a reasonable thing