PG_RETURN_BOOL(range_after_internal(typcache, r1, r2));
}
+/*
+ * Check if two bounds A and B are "adjacent", where A is an upper bound and B
+ * is a lower bound. For the bounds to be adjacent, each subtype value must
+ * satisfy strictly one of the bounds: there are no values which satisfy both
+ * bounds (i.e. less than A and greater than B); and there are no values which
+ * satisfy neither bound (i.e. greater than A and less than B).
+ *
+ * For discrete ranges, we rely on the canonicalization function to see if A..B
+ * normalizes to empty. (If there is no canonicalization function, it's
+ * impossible for such a range to normalize to empty, so we needn't bother to
+ * try.)
+ *
+ * If A == B, the ranges are adjacent only if the bounds have different
+ * inclusive flags (i.e., exactly one of the ranges includes the common
+ * boundary point).
+ *
+ * And if A > B then the ranges are not adjacent in this order.
+ */
+bool
+bounds_adjacent(TypeCacheEntry *typcache, RangeBound boundA, RangeBound boundB)
+{
+ int cmp;
+
+ Assert(!boundA.lower && boundB.lower);
+
+ cmp = range_cmp_bound_values(typcache, &boundA, &boundB);
+ if (cmp < 0)
+ {
+ RangeType *r;
+
+ /*
+ * Bounds do not overlap; see if there are points in between.
+ */
+
+ /* in a continuous subtype, there are assumed to be points between */
+ if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
+ return false;
+
+ /*
+ * The bounds are of a discrete range type; so make a range A..B and
+ * see if it's empty.
+ */
+
+ /* flip the inclusion flags */
+ boundA.inclusive = !boundA.inclusive;
+ boundB.inclusive = !boundB.inclusive;
+ /* change upper/lower labels to avoid Assert failures */
+ boundA.lower = true;
+ boundB.lower = false;
+ r = make_range(typcache, &boundA, &boundB, false);
+ return RangeIsEmpty(r);
+ }
+ else if (cmp == 0)
+ return boundA.inclusive != boundB.inclusive;
+ else
+ return false; /* bounds overlap */
+}
+
/* adjacent to (but not overlapping)? (internal version) */
bool
range_adjacent_internal(TypeCacheEntry *typcache, RangeType *r1, RangeType *r2)
upper2;
bool empty1,
empty2;
- RangeType *r3;
- int cmp;
/* Different types should be prevented by ANYRANGE matching rules */
if (RangeTypeGetOid(r1) != RangeTypeGetOid(r2))
return false;
/*
- * Given two ranges A..B and C..D, where B < C, the ranges are adjacent if
- * and only if the range B..C is empty, where inclusivity of these two
- * bounds is inverted compared to the original bounds. For discrete
- * ranges, we have to rely on the canonicalization function to normalize
- * B..C to empty if it contains no elements of the subtype. (If there is
- * no canonicalization function, it's impossible for such a range to
- * normalize to empty, so we needn't bother to try.)
- *
- * If B == C, the ranges are adjacent only if these bounds have different
- * inclusive flags (i.e., exactly one of the ranges includes the common
- * boundary point).
- *
- * And if B > C then the ranges cannot be adjacent in this order, but we
- * must consider the other order (i.e., check D <= A).
+ * Given two ranges A..B and C..D, the ranges are adjacent if and only if
+ * B is adjacent to C, or D is adjacent to A.
*/
- cmp = range_cmp_bound_values(typcache, &upper1, &lower2);
- if (cmp < 0)
- {
- /* in a continuous subtype, there are assumed to be points between */
- if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
- return (false);
- /* flip the inclusion flags */
- upper1.inclusive = !upper1.inclusive;
- lower2.inclusive = !lower2.inclusive;
- /* change upper/lower labels to avoid Assert failures */
- upper1.lower = true;
- lower2.lower = false;
- r3 = make_range(typcache, &upper1, &lower2, false);
- return RangeIsEmpty(r3);
- }
- if (cmp == 0)
- {
- return (upper1.inclusive != lower2.inclusive);
- }
-
- cmp = range_cmp_bound_values(typcache, &upper2, &lower1);
- if (cmp < 0)
- {
- /* in a continuous subtype, there are assumed to be points between */
- if (!OidIsValid(typcache->rng_canonical_finfo.fn_oid))
- return (false);
- /* flip the inclusion flags */
- upper2.inclusive = !upper2.inclusive;
- lower1.inclusive = !lower1.inclusive;
- /* change upper/lower labels to avoid Assert failures */
- upper2.lower = true;
- lower1.lower = false;
- r3 = make_range(typcache, &upper2, &lower1, false);
- return RangeIsEmpty(r3);
- }
- if (cmp == 0)
- {
- return (upper2.inclusive != lower1.inclusive);
- }
-
- return false;
+ return (bounds_adjacent(typcache, upper1, lower2) ||
+ bounds_adjacent(typcache, upper2, lower1));
}
/* adjacent to (but not overlapping)? */
int which;
int i;
+ /*
+ * For adjacent search we need also previous centroid (if any) to improve
+ * the precision of the consistent check. In this case needPrevious flag is
+ * set and centroid is passed into reconstructedValues. This is not the
+ * intended purpose of reconstructedValues (because we already have the
+ * full value available at the leaf), but it's a convenient place to store
+ * state while traversing the tree.
+ */
+ bool needPrevious = false;
+
if (in->allTheSame)
{
/* Report that all nodes should be visited */
case RANGESTRAT_OVERLAPS:
case RANGESTRAT_OVERRIGHT:
case RANGESTRAT_AFTER:
+ case RANGESTRAT_ADJACENT:
/* These strategies return false if any argument is empty */
if (empty)
which = 0;
/* Are the restrictions on range bounds inclusive? */
bool inclusive = true;
bool strictEmpty = true;
+ int cmp,
+ which1,
+ which2;
strategy = in->scankeys[i].sk_strategy;
inclusive = false;
break;
+ case RANGESTRAT_ADJACENT:
+ if (empty)
+ break; /* Skip to strictEmpty check. */
+
+ /*
+ * which1 is bitmask for possibility to be adjacent with
+ * lower bound of argument. which2 is bitmask for
+ * possibility to be adjacent with upper bound of argument.
+ */
+ which1 = which2 = (1 << 1) | (1 << 2) | (1 << 3) | (1 << 4);
+
+ /*
+ * Previously selected quadrant could exclude possibility
+ * for lower or upper bounds to be adjacent. Deserialize
+ * previous centroid range if present for checking this.
+ */
+ if (in->reconstructedValue != (Datum) 0)
+ {
+ RangeType *prevCentroid;
+ RangeBound prevLower,
+ prevUpper;
+ bool prevEmpty;
+ int cmp1,
+ cmp2;
+
+ prevCentroid = DatumGetRangeType(in->reconstructedValue);
+ range_deserialize(typcache, prevCentroid,
+ &prevLower, &prevUpper, &prevEmpty);
+
+ /*
+ * Check if lower bound of argument is not in a
+ * quadrant we visited in the previous step.
+ */
+ cmp1 = range_cmp_bounds(typcache, &lower, &prevUpper);
+ cmp2 = range_cmp_bounds(typcache, ¢roidUpper,
+ &prevUpper);
+ if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
+ which1 = 0;
+
+ /*
+ * Check if upper bound of argument is not in a
+ * quadrant we visited in the previous step.
+ */
+ cmp1 = range_cmp_bounds(typcache, &upper, &prevLower);
+ cmp2 = range_cmp_bounds(typcache, ¢roidLower,
+ &prevLower);
+ if ((cmp2 < 0 && cmp1 > 0) || (cmp2 > 0 && cmp1 < 0))
+ which2 = 0;
+ }
+
+ if (which1)
+ {
+ /*
+ * For a range's upper bound to be adjacent to the
+ * argument's lower bound, it will be found along the
+ * line adjacent to (and just below) Y=lower.
+ * Therefore, if the argument's lower bound is less
+ * than the centroid's upper bound, the line falls in
+ * quadrants 2 and 3; if greater, the line falls in
+ * quadrants 1 and 4.
+ *
+ * The above is true even when the argument's lower
+ * bound is greater and adjacent to the centroid's
+ * upper bound. If the argument's lower bound is
+ * greater than the centroid's upper bound, then the
+ * lowest value that an adjacent range could have is
+ * that of the centroid's upper bound, which still
+ * falls in quadrants 1 and 4.
+ *
+ * In the edge case, where the argument's lower bound
+ * is equal to the cetroid's upper bound, there may be
+ * adjacent ranges in any quadrant.
+ */
+ cmp = range_cmp_bounds(typcache, &lower,
+ ¢roidUpper);
+ if (cmp < 0)
+ which1 &= (1 << 2) | (1 << 3);
+ else if (cmp > 0)
+ which1 &= (1 << 1) | (1 << 4);
+ }
+
+ if (which2)
+ {
+ /*
+ * For a range's lower bound to be adjacent to the
+ * argument's upper bound, it will be found along the
+ * line adjacent to (and just right of)
+ * X=upper. Therefore, if the argument's upper bound is
+ * less than (and not adjacent to) the centroid's upper
+ * bound, the line falls in quadrants 3 and 4; if
+ * greater or equal to, the line falls in quadrants 1
+ * and 2.
+ *
+ * The edge case is when the argument's upper bound is
+ * less than and adjacent to the centroid's lower
+ * bound. In that case, adjacent ranges may be in any
+ * quadrant.
+ */
+ cmp = range_cmp_bounds(typcache, &lower,
+ ¢roidUpper);
+ if (cmp < 0 &&
+ !bounds_adjacent(typcache, upper, centroidLower))
+ which1 &= (1 << 3) | (1 << 4);
+ else if (cmp > 0)
+ which1 &= (1 << 1) | (1 << 2);
+ }
+
+ which &= which1 | which2;
+
+ needPrevious = true;
+ break;
+
case RANGESTRAT_CONTAINS:
/*
* Non-empty range A contains non-empty range B if lower
/* We must descend into the quadrant(s) identified by 'which' */
out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
+ if (needPrevious)
+ out->reconstructedValues = (Datum *) palloc(sizeof(Datum) * in->nNodes);
out->nNodes = 0;
for (i = 1; i <= in->nNodes; i++)
{
if (which & (1 << i))
+ {
+ /* Save previous prefix if needed */
+ if (needPrevious)
+ out->reconstructedValues[out->nNodes] = in->prefixDatum;
out->nodeNumbers[out->nNodes++] = i - 1;
+ }
}
PG_RETURN_VOID();
res = range_after_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));
break;
+ case RANGESTRAT_ADJACENT:
+ res = range_adjacent_internal(typcache, leafRange,
+ DatumGetRangeType(keyDatum));
+ break;
case RANGESTRAT_CONTAINS:
res = range_contains_internal(typcache, leafRange,
DatumGetRangeType(keyDatum));