Add support for nearest-neighbor (KNN) searches to SP-GiST
authorAlexander Korotkov <akorotkov@postgresql.org>
Tue, 18 Sep 2018 22:54:10 +0000 (01:54 +0300)
committerAlexander Korotkov <akorotkov@postgresql.org>
Tue, 18 Sep 2018 22:54:10 +0000 (01:54 +0300)
Currently, KNN searches were supported only by GiST.  SP-GiST also capable to
support them.  This commit implements that support.  SP-GiST scan stack is
replaced with queue, which serves as stack if no ordering is specified.  KNN
support is provided for three SP-GIST opclasses: quad_point_ops, kd_point_ops
and poly_ops (catversion is bumped).  Some common parts between GiST and SP-GiST
KNNs are extracted into separate functions.

Discussion: https://postgr.es/m/570825e8-47d0-4732-2bf6-88d67d2d51c8%40postgrespro.ru
Author: Nikita Glukhov, Alexander Korotkov based on GSoC work by Vlad Sterzhanov
Review: Andrey Borodin, Alexander Korotkov

29 files changed:
doc/src/sgml/indices.sgml
doc/src/sgml/spgist.sgml
doc/src/sgml/xindex.sgml
src/backend/access/gist/gistget.c
src/backend/access/gist/gistutil.c
src/backend/access/index/indexam.c
src/backend/access/spgist/Makefile
src/backend/access/spgist/README
src/backend/access/spgist/spgkdtreeproc.c
src/backend/access/spgist/spgproc.c [new file with mode: 0644]
src/backend/access/spgist/spgquadtreeproc.c
src/backend/access/spgist/spgscan.c
src/backend/access/spgist/spgutils.c
src/backend/access/spgist/spgvalidate.c
src/backend/utils/adt/geo_spgist.c
src/backend/utils/cache/lsyscache.c
src/include/access/genam.h
src/include/access/spgist.h
src/include/access/spgist_private.h
src/include/catalog/catversion.h
src/include/catalog/pg_amop.dat
src/include/utils/lsyscache.h
src/test/regress/expected/amutils.out
src/test/regress/expected/create_index.out
src/test/regress/expected/opr_sanity.out
src/test/regress/expected/polygon.out
src/test/regress/sql/amutils.sql
src/test/regress/sql/create_index.sql
src/test/regress/sql/polygon.sql

index a57c5e2e1f495c1b40a4bb0c09c0e0ee9f0aed0b..df7d16ff68af2e9c0561d548d654d2ded54186ee 100644 (file)
@@ -281,6 +281,13 @@ SELECT * FROM places ORDER BY location <-> point '(101,456)' LIMIT 10;
    For more information see <xref linkend="spgist"/>.
   </para>
 
+  <para>
+   Like GiST, SP-GiST supports <quote>nearest-neighbor</quote> searches.
+   For SP-GiST operator classes that support distance ordering, the 
+   corresponding operator is specified in the <quote>Ordering Operators</quote>
+   column in <xref linkend="spgist-builtin-opclasses-table"/>.
+  </para>
+
   <para>
    <indexterm>
     <primary>index</primary>
index d69f034f1c5f3c2b57125975a6248c44164909ec..126d1f6c156b6c9670003c221f048ddaf96b0749 100644 (file)
 
   <table id="spgist-builtin-opclasses-table">
    <title>Built-in <acronym>SP-GiST</acronym> Operator Classes</title>
-   <tgroup cols="3">
+   <tgroup cols="4">
     <thead>
      <row>
       <entry>Name</entry>
       <entry>Indexed Data Type</entry>
       <entry>Indexable Operators</entry>
+      <entry>Ordering Operators</entry>
      </row>
     </thead>
     <tbody>
@@ -84,6 +85,9 @@
        <literal>&gt;^</literal>
        <literal>~=</literal>
       </entry>
+      <entry>
+       <literal>&lt;-&gt;</literal>
+      </entry>
      </row>
      <row>
       <entry><literal>quad_point_ops</literal></entry>
        <literal>&gt;^</literal>
        <literal>~=</literal>
       </entry>
+      <entry>
+       <literal>&lt;-&gt;</literal>
+      </entry>
      </row>
      <row>
       <entry><literal>range_ops</literal></entry>
        <literal>&gt;&gt;</literal>
        <literal>@&gt;</literal>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>box_ops</literal></entry>
        <literal>|&gt;&gt;</literal>
        <literal>|&amp;&gt;</literal>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>poly_ops</literal></entry>
        <literal>|&gt;&gt;</literal>
        <literal>|&amp;&gt;</literal>
       </entry>
+      <entry>
+        <literal>&lt;-&gt;</literal>
+      </entry>      
      </row>
      <row>
       <entry><literal>text_ops</literal></entry>
        <literal>~&gt;~</literal>
        <literal>^@</literal>
       </entry>
+      <entry>
+      </entry>
      </row>
      <row>
       <entry><literal>inet_ops</literal></entry>
        <literal>&lt;=</literal>
        <literal>=</literal>
       </entry>
+      <entry>
+      </entry>
      </row>
     </tbody>
    </tgroup>
   supports the same operators but uses a different index data structure which
   may offer better performance in some applications.
  </para>
+ <para>
+  The <literal>quad_point_ops</literal>, <literal>kd_point_ops</literal> and
+  <literal>poly_ops</literal> operator classes support the <literal>&lt;-&gt;</literal>
+  ordering operator, which enables the k-nearest neighbor (<literal>k-NN</literal>)
+  search over indexed point or polygon datasets.
+ </para>
 
 </sect1>
 
@@ -630,7 +654,10 @@ CREATE FUNCTION my_inner_consistent(internal, internal) RETURNS void ...
 typedef struct spgInnerConsistentIn
 {
     ScanKey     scankeys;       /* array of operators and comparison values */
-    int         nkeys;          /* length of array */
+    ScanKey     orderbys;       /* array of ordering operators and comparison
+                                 * values */
+    int         nkeys;          /* length of scankeys array */
+    int         norderbys;      /* length of orderbys array */
 
     Datum       reconstructedValue;     /* value reconstructed at parent */
     void       *traversalValue; /* opclass-specific traverse value */
@@ -653,6 +680,7 @@ typedef struct spgInnerConsistentOut
     int        *levelAdds;      /* increment level by this much for each */
     Datum      *reconstructedValues;    /* associated reconstructed values */
     void      **traversalValues;        /* opclass-specific traverse values */
+    double    **distances;              /* associated distances */
 } spgInnerConsistentOut;
 </programlisting>
 
@@ -667,6 +695,8 @@ typedef struct spgInnerConsistentOut
        In particular it is not necessary to check <structfield>sk_flags</structfield> to
        see if the comparison value is NULL, because the SP-GiST core code
        will filter out such conditions.
+       The array <structfield>orderbys</structfield>, of length <structfield>norderbys</structfield>,
+       describes ordering operators (if any) in the same manner.
        <structfield>reconstructedValue</structfield> is the value reconstructed for the
        parent tuple; it is <literal>(Datum) 0</literal> at the root level or if the
        <function>inner_consistent</function> function did not provide a value at the
@@ -709,6 +739,10 @@ typedef struct spgInnerConsistentOut
        of <structname>spgConfigOut</structname>.<structfield>leafType</structfield> type
        reconstructed for each child node to be visited; otherwise, leave
        <structfield>reconstructedValues</structfield> as NULL.
+       If ordered search is performed, set <structfield>distances</structfield>
+       to an array of distance values according to <structfield>orderbys</structfield>
+       array (nodes with lowest distances will be processed first).  Leave it
+       NULL otherwise.
        If it is desired to pass down additional out-of-band information
        (<quote>traverse values</quote>) to lower levels of the tree search,
        set <structfield>traversalValues</structfield> to an array of the appropriate
@@ -717,6 +751,7 @@ typedef struct spgInnerConsistentOut
        Note that the <function>inner_consistent</function> function is
        responsible for palloc'ing the
        <structfield>nodeNumbers</structfield>, <structfield>levelAdds</structfield>,
+       <structfield>distances</structfield>,
        <structfield>reconstructedValues</structfield>, and
        <structfield>traversalValues</structfield> arrays in the current memory context.
        However, any output traverse values pointed to by
@@ -747,7 +782,10 @@ CREATE FUNCTION my_leaf_consistent(internal, internal) RETURNS bool ...
 typedef struct spgLeafConsistentIn
 {
     ScanKey     scankeys;       /* array of operators and comparison values */
-    int         nkeys;          /* length of array */
+    ScanKey     orderbys;       /* array of ordering operators and comparison
+                                 * values */
+    int         nkeys;          /* length of scankeys array */
+    int         norderbys;      /* length of orderbys array */
 
     Datum       reconstructedValue;     /* value reconstructed at parent */
     void       *traversalValue; /* opclass-specific traverse value */
@@ -759,8 +797,10 @@ typedef struct spgLeafConsistentIn
 
 typedef struct spgLeafConsistentOut
 {
-    Datum       leafValue;      /* reconstructed original data, if any */
-    bool        recheck;        /* set true if operator must be rechecked */
+    Datum       leafValue;        /* reconstructed original data, if any */
+    bool        recheck;          /* set true if operator must be rechecked */
+    bool        recheckDistances; /* set true if distances must be rechecked */
+    double     *distances;        /* associated distances */
 } spgLeafConsistentOut;
 </programlisting>
 
@@ -775,6 +815,8 @@ typedef struct spgLeafConsistentOut
        In particular it is not necessary to check <structfield>sk_flags</structfield> to
        see if the comparison value is NULL, because the SP-GiST core code
        will filter out such conditions.
+       The array <structfield>orderbys</structfield>, of length <structfield>norderbys</structfield>,
+       describes the ordering operators in the same manner.
        <structfield>reconstructedValue</structfield> is the value reconstructed for the
        parent tuple; it is <literal>(Datum) 0</literal> at the root level or if the
        <function>inner_consistent</function> function did not provide a value at the
@@ -803,6 +845,12 @@ typedef struct spgLeafConsistentOut
        <structfield>recheck</structfield> may be set to <literal>true</literal> if the match
        is uncertain and so the operator(s) must be re-applied to the actual
        heap tuple to verify the match.
+       If ordered search is performed, set <structfield>distances</structfield>
+       to an array of distance values according to <structfield>orderbys</structfield>
+       array.  Leave it NULL otherwise.  If at least one of returned distances
+       is not exact, set <structfield>recheckDistances</structfield> to true.
+       In this case, the executor will calculate the exact distances after
+       fetching the tuple from the heap, and will reorder the tuples if needed.
       </para>
      </listitem>
     </varlistentry>
index f7713e8abaf588bfc14ebc85d4433e3f76d1fc96..9446f8b836ca4dd7cb434e03d61bb4e879a50e55 100644 (file)
@@ -1242,7 +1242,7 @@ SELECT sum(x) OVER (ORDER BY x RANGE BETWEEN 5 PRECEDING AND 10 FOLLOWING)
   <title>Ordering Operators</title>
 
   <para>
-   Some index access methods (currently, only GiST) support the concept of
+   Some index access methods (currently, only GiST and SP-GiST) support the concept of
    <firstterm>ordering operators</firstterm>.  What we have been discussing so far
    are <firstterm>search operators</firstterm>.  A search operator is one for which
    the index can be searched to find all rows satisfying
index ad07b9e63c821a923978443b20fa1a798d2b951a..e4a3786be01cfc5d9ecea9441a16435edbc6b339 100644 (file)
@@ -14,9 +14,9 @@
  */
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/gist_private.h"
 #include "access/relscan.h"
-#include "catalog/pg_type.h"
 #include "miscadmin.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -543,7 +543,6 @@ getNextNearest(IndexScanDesc scan)
 {
    GISTScanOpaque so = (GISTScanOpaque) scan->opaque;
    bool        res = false;
-   int         i;
 
    if (scan->xs_hitup)
    {
@@ -564,45 +563,10 @@ getNextNearest(IndexScanDesc scan)
            /* found a heap item at currently minimal distance */
            scan->xs_ctup.t_self = item->data.heap.heapPtr;
            scan->xs_recheck = item->data.heap.recheck;
-           scan->xs_recheckorderby = item->data.heap.recheckDistances;
-           for (i = 0; i < scan->numberOfOrderBys; i++)
-           {
-               if (so->orderByTypes[i] == FLOAT8OID)
-               {
-#ifndef USE_FLOAT8_BYVAL
-                   /* must free any old value to avoid memory leakage */
-                   if (!scan->xs_orderbynulls[i])
-                       pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
-#endif
-                   scan->xs_orderbyvals[i] = Float8GetDatum(item->distances[i]);
-                   scan->xs_orderbynulls[i] = false;
-               }
-               else if (so->orderByTypes[i] == FLOAT4OID)
-               {
-                   /* convert distance function's result to ORDER BY type */
-#ifndef USE_FLOAT4_BYVAL
-                   /* must free any old value to avoid memory leakage */
-                   if (!scan->xs_orderbynulls[i])
-                       pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
-#endif
-                   scan->xs_orderbyvals[i] = Float4GetDatum((float4) item->distances[i]);
-                   scan->xs_orderbynulls[i] = false;
-               }
-               else
-               {
-                   /*
-                    * If the ordering operator's return value is anything
-                    * else, we don't know how to convert the float8 bound
-                    * calculated by the distance function to that.  The
-                    * executor won't actually need the order by values we
-                    * return here, if there are no lossy results, so only
-                    * insist on converting if the *recheck flag is set.
-                    */
-                   if (scan->xs_recheckorderby)
-                       elog(ERROR, "GiST operator family's FOR ORDER BY operator must return float8 or float4 if the distance function is lossy");
-                   scan->xs_orderbynulls[i] = true;
-               }
-           }
+
+           index_store_float8_orderby_distances(scan, so->orderByTypes,
+                                                item->distances,
+                                                item->data.heap.recheckDistances);
 
            /* in an index-only scan, also return the reconstructed tuple. */
            if (scan->xs_want_itup)
index dddfe0ae2c59ac01bff567c2f9e53d876e830830..70627e5df6611f4d3e3f041906985c2b6655a2c6 100644 (file)
@@ -23,6 +23,7 @@
 #include "storage/lmgr.h"
 #include "utils/float.h"
 #include "utils/syscache.h"
+#include "utils/lsyscache.h"
 
 
 /*
@@ -871,12 +872,6 @@ gistproperty(Oid index_oid, int attno,
             IndexAMProperty prop, const char *propname,
             bool *res, bool *isnull)
 {
-   HeapTuple   tuple;
-   Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
-   Form_pg_opclass rd_opclass;
-   Datum       datum;
-   bool        disnull;
-   oidvector  *indclass;
    Oid         opclass,
                opfamily,
                opcintype;
@@ -910,41 +905,19 @@ gistproperty(Oid index_oid, int attno,
    }
 
    /* First we need to know the column's opclass. */
-
-   tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
-   if (!HeapTupleIsValid(tuple))
+   opclass = get_index_column_opclass(index_oid, attno);
+   if (!OidIsValid(opclass))
    {
        *isnull = true;
        return true;
    }
-   rd_index = (Form_pg_index) GETSTRUCT(tuple);
-
-   /* caller is supposed to guarantee this */
-   Assert(attno > 0 && attno <= rd_index->indnatts);
-
-   datum = SysCacheGetAttr(INDEXRELID, tuple,
-                           Anum_pg_index_indclass, &disnull);
-   Assert(!disnull);
-
-   indclass = ((oidvector *) DatumGetPointer(datum));
-   opclass = indclass->values[attno - 1];
-
-   ReleaseSysCache(tuple);
 
    /* Now look up the opclass family and input datatype. */
-
-   tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
-   if (!HeapTupleIsValid(tuple))
+   if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
    {
        *isnull = true;
        return true;
    }
-   rd_opclass = (Form_pg_opclass) GETSTRUCT(tuple);
-
-   opfamily = rd_opclass->opcfamily;
-   opcintype = rd_opclass->opcintype;
-
-   ReleaseSysCache(tuple);
 
    /* And now we can check whether the function is provided. */
 
@@ -967,6 +940,8 @@ gistproperty(Oid index_oid, int attno,
                                      Int16GetDatum(GIST_COMPRESS_PROC));
    }
 
+   *isnull = false;
+
    return true;
 }
 
index 22b5cc921f820eeba776cbfed73399dc5e46b662..eade540ef5da6ec87e742c5194d9e6d86326e4fb 100644 (file)
@@ -74,6 +74,7 @@
 #include "access/transam.h"
 #include "access/xlog.h"
 #include "catalog/index.h"
+#include "catalog/pg_type.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -897,3 +898,72 @@ index_getprocinfo(Relation irel,
 
    return locinfo;
 }
+
+/* ----------------
+ *     index_store_float8_orderby_distances
+ *
+ *     Convert AM distance function's results (that can be inexact)
+ *     to ORDER BY types and save them into xs_orderbyvals/xs_orderbynulls
+ *     for a possible recheck.
+ * ----------------
+ */
+void
+index_store_float8_orderby_distances(IndexScanDesc scan, Oid *orderByTypes,
+                                    double *distances, bool recheckOrderBy)
+{
+   int         i;
+
+   scan->xs_recheckorderby = recheckOrderBy;
+
+   if (!distances)
+   {
+       Assert(!scan->xs_recheckorderby);
+
+       for (i = 0; i < scan->numberOfOrderBys; i++)
+       {
+           scan->xs_orderbyvals[i] = (Datum) 0;
+           scan->xs_orderbynulls[i] = true;
+       }
+
+       return;
+   }
+
+   for (i = 0; i < scan->numberOfOrderBys; i++)
+   {
+       if (orderByTypes[i] == FLOAT8OID)
+       {
+#ifndef USE_FLOAT8_BYVAL
+           /* must free any old value to avoid memory leakage */
+           if (!scan->xs_orderbynulls[i])
+               pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
+#endif
+           scan->xs_orderbyvals[i] = Float8GetDatum(distances[i]);
+           scan->xs_orderbynulls[i] = false;
+       }
+       else if (orderByTypes[i] == FLOAT4OID)
+       {
+           /* convert distance function's result to ORDER BY type */
+#ifndef USE_FLOAT4_BYVAL
+           /* must free any old value to avoid memory leakage */
+           if (!scan->xs_orderbynulls[i])
+               pfree(DatumGetPointer(scan->xs_orderbyvals[i]));
+#endif
+           scan->xs_orderbyvals[i] = Float4GetDatum((float4) distances[i]);
+           scan->xs_orderbynulls[i] = false;
+       }
+       else
+       {
+           /*
+            * If the ordering operator's return value is anything else, we
+            * don't know how to convert the float8 bound calculated by the
+            * distance function to that.  The executor won't actually need
+            * the order by values we return here, if there are no lossy
+            * results, so only insist on converting if the *recheck flag is
+            * set.
+            */
+           if (scan->xs_recheckorderby)
+               elog(ERROR, "ORDER BY operator must return float8 or float4 if the distance function is lossy");
+           scan->xs_orderbynulls[i] = true;
+       }
+   }
+}
index 14948a531ee816b2d2e1185543fe587b1c6802c2..5be3df5992607d252ebc7a14f1c5a4f23d6bb832 100644 (file)
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = spgutils.o spginsert.o spgscan.o spgvacuum.o spgvalidate.o \
    spgdoinsert.o spgxlog.o \
-   spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o
+   spgtextproc.o spgquadtreeproc.o spgkdtreeproc.o \
+   spgproc.o
 
 include $(top_srcdir)/src/backend/common.mk
index 09ab21af264900021b07c40d46b5e6dc25ef9b5d..b55b07383206401aac9aadc5b51904b666b039ec 100644 (file)
@@ -41,7 +41,11 @@ contain exactly one inner tuple.
 
 When the search traversal algorithm reaches an inner tuple, it chooses a set
 of nodes to continue tree traverse in depth.  If it reaches a leaf page it
-scans a list of leaf tuples to find the ones that match the query.
+scans a list of leaf tuples to find the ones that match the query. SP-GiST
+also supports ordered (nearest-neighbor) searches - that is during scan pending
+nodes are put into priority queue, so traversal is performed by the
+closest-first model.
+
 
 The insertion algorithm descends the tree similarly, except it must choose
 just one node to descend to from each inner tuple.  Insertion might also have
index 556f3a4e076db3ca4bc4b4c284af107d44fba296..105fc72c7a2973a267505537bd1efcaa3260e5d9 100644 (file)
 #include "postgres.h"
 
 #include "access/spgist.h"
+#include "access/spgist_private.h"
 #include "access/stratnum.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
+#include "utils/float.h"
 #include "utils/geo_decls.h"
 
 
@@ -162,6 +164,7 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
    double      coord;
    int         which;
    int         i;
+   BOX         bboxes[2];
 
    Assert(in->hasPrefix);
    coord = DatumGetFloat8(in->prefixDatum);
@@ -248,12 +251,87 @@ spg_kd_inner_consistent(PG_FUNCTION_ARGS)
    }
 
    /* We must descend into the children identified by which */
-   out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
    out->nNodes = 0;
+
+   /* Fast-path for no matching children */
+   if (!which)
+       PG_RETURN_VOID();
+
+   out->nodeNumbers = (int *) palloc(sizeof(int) * 2);
+
+   /*
+    * When ordering scan keys are specified, we've to calculate distance for
+    * them.  In order to do that, we need calculate bounding boxes for both
+    * children nodes.  Calculation of those bounding boxes on non-zero level
+    * require knowledge of bounding box of upper node.  So, we save bounding
+    * boxes to traversalValues.
+    */
+   if (in->norderbys > 0)
+   {
+       BOX         infArea;
+       BOX        *area;
+
+       out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+       out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+
+       if (in->level == 0)
+       {
+           float8      inf = get_float8_infinity();
+
+           infArea.high.x = inf;
+           infArea.high.y = inf;
+           infArea.low.x = -inf;
+           infArea.low.y = -inf;
+           area = &infArea;
+       }
+       else
+       {
+           area = (BOX *) in->traversalValue;
+           Assert(area);
+       }
+
+       bboxes[0].low = area->low;
+       bboxes[1].high = area->high;
+
+       if (in->level % 2)
+       {
+           /* split box by x */
+           bboxes[0].high.x = bboxes[1].low.x = coord;
+           bboxes[0].high.y = area->high.y;
+           bboxes[1].low.y = area->low.y;
+       }
+       else
+       {
+           /* split box by y */
+           bboxes[0].high.y = bboxes[1].low.y = coord;
+           bboxes[0].high.x = area->high.x;
+           bboxes[1].low.x = area->low.x;
+       }
+   }
+
    for (i = 1; i <= 2; i++)
    {
        if (which & (1 << i))
-           out->nodeNumbers[out->nNodes++] = i - 1;
+       {
+           out->nodeNumbers[out->nNodes] = i - 1;
+
+           if (in->norderbys > 0)
+           {
+               MemoryContext oldCtx = MemoryContextSwitchTo(
+                                                            in->traversalMemoryContext);
+               BOX        *box = box_copy(&bboxes[i - 1]);
+
+               MemoryContextSwitchTo(oldCtx);
+
+               out->traversalValues[out->nNodes] = box;
+
+               out->distances[out->nNodes] = spg_key_orderbys_distances(
+                                                                        BoxPGetDatum(box), false,
+                                                                        in->orderbys, in->norderbys);
+           }
+
+           out->nNodes++;
+       }
    }
 
    /* Set up level increments, too */
diff --git a/src/backend/access/spgist/spgproc.c b/src/backend/access/spgist/spgproc.c
new file mode 100644 (file)
index 0000000..0bf8001
--- /dev/null
@@ -0,0 +1,88 @@
+/*-------------------------------------------------------------------------
+ *
+ * spgproc.c
+ *   Common supporting procedures for SP-GiST opclasses.
+ *
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *         src/backend/access/spgist/spgproc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include <math.h>
+
+#include "access/spgist_private.h"
+#include "utils/builtins.h"
+#include "utils/float.h"
+#include "utils/geo_decls.h"
+
+#define point_point_distance(p1,p2) \
+   DatumGetFloat8(DirectFunctionCall2(point_distance, \
+                                      PointPGetDatum(p1), PointPGetDatum(p2)))
+
+/* Point-box distance in the assumption that box is aligned by axis */
+static double
+point_box_distance(Point *point, BOX *box)
+{
+   double      dx,
+               dy;
+
+   if (isnan(point->x) || isnan(box->low.x) ||
+       isnan(point->y) || isnan(box->low.y))
+       return get_float8_nan();
+
+   if (point->x < box->low.x)
+       dx = box->low.x - point->x;
+   else if (point->x > box->high.x)
+       dx = point->x - box->high.x;
+   else
+       dx = 0.0;
+
+   if (point->y < box->low.y)
+       dy = box->low.y - point->y;
+   else if (point->y > box->high.y)
+       dy = point->y - box->high.y;
+   else
+       dy = 0.0;
+
+   return HYPOT(dx, dy);
+}
+
+/*
+ * Returns distances from given key to array of ordering scan keys.  Leaf key
+ * is expected to be point, non-leaf key is expected to be box.  Scan key
+ * arguments are expected to be points.
+ */
+double *
+spg_key_orderbys_distances(Datum key, bool isLeaf,
+                          ScanKey orderbys, int norderbys)
+{
+   int         sk_num;
+   double     *distances = (double *) palloc(norderbys * sizeof(double)),
+              *distance = distances;
+
+   for (sk_num = 0; sk_num < norderbys; ++sk_num, ++orderbys, ++distance)
+   {
+       Point      *point = DatumGetPointP(orderbys->sk_argument);
+
+       *distance = isLeaf ? point_point_distance(point, DatumGetPointP(key))
+           : point_box_distance(point, DatumGetBoxP(key));
+   }
+
+   return distances;
+}
+
+BOX *
+box_copy(BOX *orig)
+{
+   BOX        *result = palloc(sizeof(BOX));
+
+   *result = *orig;
+   return result;
+}
index 8700ff357310e82ffd4fa30bb631cfaa1fc6ab90..dee438a307d17846ab5e5e0023059693ca5508c0 100644 (file)
 
 #include "access/spgist.h"
 #include "access/stratnum.h"
+#include "access/spgist_private.h"
 #include "catalog/pg_type.h"
 #include "utils/builtins.h"
+#include "utils/float.h"
 #include "utils/geo_decls.h"
 
 
@@ -77,6 +79,38 @@ getQuadrant(Point *centroid, Point *tst)
    return 0;
 }
 
+/* Returns bounding box of a given quadrant inside given bounding box */
+static BOX *
+getQuadrantArea(BOX *bbox, Point *centroid, int quadrant)
+{
+   BOX        *result = (BOX *) palloc(sizeof(BOX));
+
+   switch (quadrant)
+   {
+       case 1:
+           result->high = bbox->high;
+           result->low = *centroid;
+           break;
+       case 2:
+           result->high.x = bbox->high.x;
+           result->high.y = centroid->y;
+           result->low.x = centroid->x;
+           result->low.y = bbox->low.y;
+           break;
+       case 3:
+           result->high = *centroid;
+           result->low = bbox->low;
+           break;
+       case 4:
+           result->high.x = centroid->x;
+           result->high.y = bbox->high.y;
+           result->low.x = bbox->low.x;
+           result->low.y = centroid->y;
+           break;
+   }
+
+   return result;
+}
 
 Datum
 spg_quad_choose(PG_FUNCTION_ARGS)
@@ -196,19 +230,68 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
    spgInnerConsistentIn *in = (spgInnerConsistentIn *) PG_GETARG_POINTER(0);
    spgInnerConsistentOut *out = (spgInnerConsistentOut *) PG_GETARG_POINTER(1);
    Point      *centroid;
+   BOX         infbbox;
+   BOX        *bbox = NULL;
    int         which;
    int         i;
 
    Assert(in->hasPrefix);
    centroid = DatumGetPointP(in->prefixDatum);
 
+   /*
+    * When ordering scan keys are specified, we've to calculate distance for
+    * them.  In order to do that, we need calculate bounding boxes for all
+    * children nodes.  Calculation of those bounding boxes on non-zero level
+    * require knowledge of bounding box of upper node.  So, we save bounding
+    * boxes to traversalValues.
+    */
+   if (in->norderbys > 0)
+   {
+       out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+       out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+
+       if (in->level == 0)
+       {
+           double      inf = get_float8_infinity();
+
+           infbbox.high.x = inf;
+           infbbox.high.y = inf;
+           infbbox.low.x = -inf;
+           infbbox.low.y = -inf;
+           bbox = &infbbox;
+       }
+       else
+       {
+           bbox = in->traversalValue;
+           Assert(bbox);
+       }
+   }
+
    if (in->allTheSame)
    {
        /* Report that all nodes should be visited */
        out->nNodes = in->nNodes;
        out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
        for (i = 0; i < in->nNodes; i++)
+       {
            out->nodeNumbers[i] = i;
+
+           if (in->norderbys > 0)
+           {
+               MemoryContext oldCtx = MemoryContextSwitchTo(
+                                                            in->traversalMemoryContext);
+
+               /* Use parent quadrant box as traversalValue */
+               BOX        *quadrant = box_copy(bbox);
+
+               MemoryContextSwitchTo(oldCtx);
+
+               out->traversalValues[i] = quadrant;
+               out->distances[i] = spg_key_orderbys_distances(
+                                                              BoxPGetDatum(quadrant), false,
+                                                              in->orderbys, in->norderbys);
+           }
+       }
        PG_RETURN_VOID();
    }
 
@@ -286,13 +369,37 @@ spg_quad_inner_consistent(PG_FUNCTION_ARGS)
            break;              /* no need to consider remaining conditions */
    }
 
+   out->levelAdds = palloc(sizeof(int) * 4);
+   for (i = 0; i < 4; ++i)
+       out->levelAdds[i] = 1;
+
    /* We must descend into the quadrant(s) identified by which */
    out->nodeNumbers = (int *) palloc(sizeof(int) * 4);
    out->nNodes = 0;
+
    for (i = 1; i <= 4; i++)
    {
        if (which & (1 << i))
-           out->nodeNumbers[out->nNodes++] = i - 1;
+       {
+           out->nodeNumbers[out->nNodes] = i - 1;
+
+           if (in->norderbys > 0)
+           {
+               MemoryContext oldCtx = MemoryContextSwitchTo(
+                                                            in->traversalMemoryContext);
+               BOX        *quadrant = getQuadrantArea(bbox, centroid, i);
+
+               MemoryContextSwitchTo(oldCtx);
+
+               out->traversalValues[out->nNodes] = quadrant;
+
+               out->distances[out->nNodes] = spg_key_orderbys_distances(
+                                                                        BoxPGetDatum(quadrant), false,
+                                                                        in->orderbys, in->norderbys);
+           }
+
+           out->nNodes++;
+       }
    }
 
    PG_RETURN_VOID();
@@ -356,5 +463,11 @@ spg_quad_leaf_consistent(PG_FUNCTION_ARGS)
            break;
    }
 
+   if (res && in->norderbys > 0)
+       /* ok, it passes -> let's compute the distances */
+       out->distances = spg_key_orderbys_distances(
+                                                   BoxPGetDatum(in->leafDatum), true,
+                                                   in->orderbys, in->norderbys);
+
    PG_RETURN_BOOL(res);
 }
index 5260d5017d1952a4a6714a87f96b254b0ab1725d..a63fde2c8af8184475c5355fea40ac499ce7a356 100644 (file)
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/relscan.h"
 #include "access/spgist_private.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
 #include "utils/datum.h"
+#include "utils/float.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
-
 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
-                              Datum leafValue, bool isnull, bool recheck);
+                              Datum leafValue, bool isNull, bool recheck,
+                              bool recheckDistances, double *distances);
 
-typedef struct ScanStackEntry
+/*
+ * Pairing heap comparison function for the SpGistSearchItem queue.
+ * KNN-searches currently only support NULLS LAST.  So, preserve this logic
+ * here.
+ */
+static int
+pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
+                                const pairingheap_node *b, void *arg)
 {
-   Datum       reconstructedValue; /* value reconstructed from parent */
-   void       *traversalValue; /* opclass-specific traverse value */
-   int         level;          /* level of items on this page */
-   ItemPointerData ptr;        /* block and offset to scan from */
-} ScanStackEntry;
+   const       SpGistSearchItem *sa = (const SpGistSearchItem *) a;
+   const       SpGistSearchItem *sb = (const SpGistSearchItem *) b;
+   SpGistScanOpaque so = (SpGistScanOpaque) arg;
+   int         i;
+
+   if (sa->isNull)
+   {
+       if (!sb->isNull)
+           return -1;
+   }
+   else if (sb->isNull)
+   {
+       return 1;
+   }
+   else
+   {
+       /* Order according to distance comparison */
+       for (i = 0; i < so->numberOfOrderBys; i++)
+       {
+           if (isnan(sa->distances[i]) && isnan(sb->distances[i]))
+               continue;       /* NaN == NaN */
+           if (isnan(sa->distances[i]))
+               return -1;      /* NaN > number */
+           if (isnan(sb->distances[i]))
+               return 1;       /* number < NaN */
+           if (sa->distances[i] != sb->distances[i])
+               return (sa->distances[i] < sb->distances[i]) ? 1 : -1;
+       }
+   }
 
+   /* Leaf items go before inner pages, to ensure a depth-first search */
+   if (sa->isLeaf && !sb->isLeaf)
+       return 1;
+   if (!sa->isLeaf && sb->isLeaf)
+       return -1;
+
+   return 0;
+}
 
-/* Free a ScanStackEntry */
 static void
-freeScanStackEntry(SpGistScanOpaque so, ScanStackEntry *stackEntry)
+spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem * item)
 {
    if (!so->state.attLeafType.attbyval &&
-       DatumGetPointer(stackEntry->reconstructedValue) != NULL)
-       pfree(DatumGetPointer(stackEntry->reconstructedValue));
-   if (stackEntry->traversalValue)
-       pfree(stackEntry->traversalValue);
+       DatumGetPointer(item->value) != NULL)
+       pfree(DatumGetPointer(item->value));
 
-   pfree(stackEntry);
+   if (item->traversalValue)
+       pfree(item->traversalValue);
+
+   pfree(item);
 }
 
-/* Free the entire stack */
+/*
+ * Add SpGistSearchItem to queue
+ *
+ * Called in queue context
+ */
 static void
-freeScanStack(SpGistScanOpaque so)
+spgAddSearchItemToQueue(SpGistScanOpaque so, SpGistSearchItem * item)
 {
-   ListCell   *lc;
+   pairingheap_add(so->scanQueue, &item->phNode);
+}
 
-   foreach(lc, so->scanStack)
-   {
-       freeScanStackEntry(so, (ScanStackEntry *) lfirst(lc));
-   }
-   list_free(so->scanStack);
-   so->scanStack = NIL;
+static SpGistSearchItem *
+spgAllocSearchItem(SpGistScanOpaque so, bool isnull, double *distances)
+{
+   /* allocate distance array only for non-NULL items */
+   SpGistSearchItem *item =
+   palloc(SizeOfSpGistSearchItem(isnull ? 0 : so->numberOfOrderBys));
+
+   item->isNull = isnull;
+
+   if (!isnull && so->numberOfOrderBys > 0)
+       memcpy(item->distances, distances,
+              so->numberOfOrderBys * sizeof(double));
+
+   return item;
+}
+
+static void
+spgAddStartItem(SpGistScanOpaque so, bool isnull)
+{
+   SpGistSearchItem *startEntry =
+   spgAllocSearchItem(so, isnull, so->zeroDistances);
+
+   ItemPointerSet(&startEntry->heapPtr,
+                  isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO,
+                  FirstOffsetNumber);
+   startEntry->isLeaf = false;
+   startEntry->level = 0;
+   startEntry->value = (Datum) 0;
+   startEntry->traversalValue = NULL;
+   startEntry->recheck = false;
+   startEntry->recheckDistances = false;
+
+   spgAddSearchItemToQueue(so, startEntry);
 }
 
 /*
- * Initialize scanStack to search the root page, resetting
+ * Initialize queue to search the root page, resetting
  * any previously active scan
  */
 static void
 resetSpGistScanOpaque(SpGistScanOpaque so)
 {
-   ScanStackEntry *startEntry;
-
-   freeScanStack(so);
+   MemoryContext oldCtx;
 
    /*
     * clear traversal context before proceeding to the next scan; this must
@@ -81,20 +153,29 @@ resetSpGistScanOpaque(SpGistScanOpaque so)
     */
    MemoryContextReset(so->traversalCxt);
 
+   oldCtx = MemoryContextSwitchTo(so->traversalCxt);
+
+   /* initialize queue only for distance-ordered scans */
+   so->scanQueue = pairingheap_allocate(pairingheap_SpGistSearchItem_cmp, so);
+
    if (so->searchNulls)
-   {
-       /* Stack a work item to scan the null index entries */
-       startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
-       ItemPointerSet(&startEntry->ptr, SPGIST_NULL_BLKNO, FirstOffsetNumber);
-       so->scanStack = lappend(so->scanStack, startEntry);
-   }
+       /* Add a work item to scan the null index entries */
+       spgAddStartItem(so, true);
 
    if (so->searchNonNulls)
+       /* Add a work item to scan the non-null index entries */
+       spgAddStartItem(so, false);
+
+   MemoryContextSwitchTo(oldCtx);
+
+   if (so->numberOfOrderBys > 0)
    {
-       /* Stack a work item to scan the non-null index entries */
-       startEntry = (ScanStackEntry *) palloc0(sizeof(ScanStackEntry));
-       ItemPointerSet(&startEntry->ptr, SPGIST_ROOT_BLKNO, FirstOffsetNumber);
-       so->scanStack = lappend(so->scanStack, startEntry);
+       /* Must pfree distances to avoid memory leak */
+       int         i;
+
+       for (i = 0; i < so->nPtrs; i++)
+           if (so->distances[i])
+               pfree(so->distances[i]);
    }
 
    if (so->want_itup)
@@ -129,6 +210,9 @@ spgPrepareScanKeys(IndexScanDesc scan)
    int         nkeys;
    int         i;
 
+   so->numberOfOrderBys = scan->numberOfOrderBys;
+   so->orderByData = scan->orderByData;
+
    if (scan->numberOfKeys <= 0)
    {
        /* If no quals, whole-index scan is required */
@@ -189,8 +273,9 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 {
    IndexScanDesc scan;
    SpGistScanOpaque so;
+   int         i;
 
-   scan = RelationGetIndexScan(rel, keysz, 0);
+   scan = RelationGetIndexScan(rel, keysz, orderbysz);
 
    so = (SpGistScanOpaque) palloc0(sizeof(SpGistScanOpaqueData));
    if (keysz > 0)
@@ -198,6 +283,7 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
    else
        so->keyData = NULL;
    initSpGistState(&so->state, scan->indexRelation);
+
    so->tempCxt = AllocSetContextCreate(CurrentMemoryContext,
                                        "SP-GiST search temporary context",
                                        ALLOCSET_DEFAULT_SIZES);
@@ -208,6 +294,32 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
    /* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */
    so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel);
 
+   if (scan->numberOfOrderBys > 0)
+   {
+       so->zeroDistances = palloc(sizeof(double) * scan->numberOfOrderBys);
+       so->infDistances = palloc(sizeof(double) * scan->numberOfOrderBys);
+
+       for (i = 0; i < scan->numberOfOrderBys; i++)
+       {
+           so->zeroDistances[i] = 0.0;
+           so->infDistances[i] = get_float8_infinity();
+       }
+
+       scan->xs_orderbyvals = palloc0(sizeof(Datum) * scan->numberOfOrderBys);
+       scan->xs_orderbynulls = palloc(sizeof(bool) * scan->numberOfOrderBys);
+       memset(scan->xs_orderbynulls, true, sizeof(bool) * scan->numberOfOrderBys);
+   }
+
+   fmgr_info_copy(&so->innerConsistentFn,
+                  index_getprocinfo(rel, 1, SPGIST_INNER_CONSISTENT_PROC),
+                  CurrentMemoryContext);
+
+   fmgr_info_copy(&so->leafConsistentFn,
+                  index_getprocinfo(rel, 1, SPGIST_LEAF_CONSISTENT_PROC),
+                  CurrentMemoryContext);
+
+   so->indexCollation = rel->rd_indcollation[0];
+
    scan->opaque = so;
 
    return scan;
@@ -221,15 +333,42 @@ spgrescan(IndexScanDesc scan, ScanKey scankey, int nscankeys,
 
    /* copy scankeys into local storage */
    if (scankey && scan->numberOfKeys > 0)
-   {
        memmove(scan->keyData, scankey,
                scan->numberOfKeys * sizeof(ScanKeyData));
+
+   if (orderbys && scan->numberOfOrderBys > 0)
+   {
+       int         i;
+
+       memmove(scan->orderByData, orderbys,
+               scan->numberOfOrderBys * sizeof(ScanKeyData));
+
+       so->orderByTypes = (Oid *) palloc(sizeof(Oid) * scan->numberOfOrderBys);
+
+       for (i = 0; i < scan->numberOfOrderBys; i++)
+       {
+           ScanKey     skey = &scan->orderByData[i];
+
+           /*
+            * Look up the datatype returned by the original ordering
+            * operator. SP-GiST always uses a float8 for the distance
+            * function, but the ordering operator could be anything else.
+            *
+            * XXX: The distance function is only allowed to be lossy if the
+            * ordering operator's result type is float4 or float8.  Otherwise
+            * we don't know how to return the distance to the executor.  But
+            * we cannot check that here, as we won't know if the distance
+            * function is lossy until it returns *recheck = true for the
+            * first time.
+            */
+           so->orderByTypes[i] = get_func_rettype(skey->sk_func.fn_oid);
+       }
    }
 
    /* preprocess scankeys, set up the representation in *so */
    spgPrepareScanKeys(scan);
 
-   /* set up starting stack entries */
+   /* set up starting queue entries */
    resetSpGistScanOpaque(so);
 }
 
@@ -240,65 +379,332 @@ spgendscan(IndexScanDesc scan)
 
    MemoryContextDelete(so->tempCxt);
    MemoryContextDelete(so->traversalCxt);
+
+   if (scan->numberOfOrderBys > 0)
+   {
+       pfree(so->zeroDistances);
+       pfree(so->infDistances);
+   }
+}
+
+/*
+ * Leaf SpGistSearchItem constructor, called in queue context
+ */
+static SpGistSearchItem *
+spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
+              Datum leafValue, bool recheck, bool recheckDistances,
+              bool isnull, double *distances)
+{
+   SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
+
+   item->level = level;
+   item->heapPtr = *heapPtr;
+   /* copy value to queue cxt out of tmp cxt */
+   item->value = isnull ? (Datum) 0 :
+       datumCopy(leafValue, so->state.attLeafType.attbyval,
+                 so->state.attLeafType.attlen);
+   item->traversalValue = NULL;
+   item->isLeaf = true;
+   item->recheck = recheck;
+   item->recheckDistances = recheckDistances;
+
+   return item;
 }
 
 /*
  * Test whether a leaf tuple satisfies all the scan keys
  *
- * *leafValue is set to the reconstructed datum, if provided
- * *recheck is set true if any of the operators are lossy
+ * *reportedSome is set to true if:
+ *     the scan is not ordered AND the item satisfies the scankeys
  */
 static bool
-spgLeafTest(Relation index, SpGistScanOpaque so,
+spgLeafTest(SpGistScanOpaque so, SpGistSearchItem * item,
            SpGistLeafTuple leafTuple, bool isnull,
-           int level, Datum reconstructedValue,
-           void *traversalValue,
-           Datum *leafValue, bool *recheck)
+           bool *reportedSome, storeRes_func storeRes)
 {
+   Datum       leafValue;
+   double     *distances;
    bool        result;
-   Datum       leafDatum;
-   spgLeafConsistentIn in;
-   spgLeafConsistentOut out;
-   FmgrInfo   *procinfo;
-   MemoryContext oldCtx;
+   bool        recheck;
+   bool        recheckDistances;
 
    if (isnull)
    {
        /* Should not have arrived on a nulls page unless nulls are wanted */
        Assert(so->searchNulls);
-       *leafValue = (Datum) 0;
-       *recheck = false;
-       return true;
+       leafValue = (Datum) 0;
+       distances = NULL;
+       recheck = false;
+       recheckDistances = false;
+       result = true;
+   }
+   else
+   {
+       spgLeafConsistentIn in;
+       spgLeafConsistentOut out;
+
+       /* use temp context for calling leaf_consistent */
+       MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
+
+       in.scankeys = so->keyData;
+       in.nkeys = so->numberOfKeys;
+       in.orderbys = so->orderByData;
+       in.norderbys = so->numberOfOrderBys;
+       in.reconstructedValue = item->value;
+       in.traversalValue = item->traversalValue;
+       in.level = item->level;
+       in.returnData = so->want_itup;
+       in.leafDatum = SGLTDATUM(leafTuple, &so->state);
+
+       out.leafValue = (Datum) 0;
+       out.recheck = false;
+       out.distances = NULL;
+       out.recheckDistances = false;
+
+       result = DatumGetBool(FunctionCall2Coll(&so->leafConsistentFn,
+                                               so->indexCollation,
+                                               PointerGetDatum(&in),
+                                               PointerGetDatum(&out)));
+       recheck = out.recheck;
+       recheckDistances = out.recheckDistances;
+       leafValue = out.leafValue;
+       distances = out.distances;
+
+       MemoryContextSwitchTo(oldCxt);
    }
 
-   leafDatum = SGLTDATUM(leafTuple, &so->state);
+   if (result)
+   {
+       /* item passes the scankeys */
+       if (so->numberOfOrderBys > 0)
+       {
+           /* the scan is ordered -> add the item to the queue */
+           MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt);
+           SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
+                                                       &leafTuple->heapPtr,
+                                                       leafValue,
+                                                       recheck,
+                                                       recheckDistances,
+                                                       isnull,
+                                                       distances);
+
+           spgAddSearchItemToQueue(so, heapItem);
+
+           MemoryContextSwitchTo(oldCxt);
+       }
+       else
+       {
+           /* non-ordered scan, so report the item right away */
+           Assert(!recheckDistances);
+           storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
+                    recheck, false, NULL);
+           *reportedSome = true;
+       }
+   }
 
-   /* use temp context for calling leaf_consistent */
-   oldCtx = MemoryContextSwitchTo(so->tempCxt);
+   return result;
+}
 
-   in.scankeys = so->keyData;
-   in.nkeys = so->numberOfKeys;
-   in.reconstructedValue = reconstructedValue;
-   in.traversalValue = traversalValue;
-   in.level = level;
-   in.returnData = so->want_itup;
-   in.leafDatum = leafDatum;
+/* A bundle initializer for inner_consistent methods */
+static void
+spgInitInnerConsistentIn(spgInnerConsistentIn *in,
+                        SpGistScanOpaque so,
+                        SpGistSearchItem * item,
+                        SpGistInnerTuple innerTuple)
+{
+   in->scankeys = so->keyData;
+   in->orderbys = so->orderByData;
+   in->nkeys = so->numberOfKeys;
+   in->norderbys = so->numberOfOrderBys;
+   in->reconstructedValue = item->value;
+   in->traversalMemoryContext = so->traversalCxt;
+   in->traversalValue = item->traversalValue;
+   in->level = item->level;
+   in->returnData = so->want_itup;
+   in->allTheSame = innerTuple->allTheSame;
+   in->hasPrefix = (innerTuple->prefixSize > 0);
+   in->prefixDatum = SGITDATUM(innerTuple, &so->state);
+   in->nNodes = innerTuple->nNodes;
+   in->nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
+}
 
-   out.leafValue = (Datum) 0;
-   out.recheck = false;
+static SpGistSearchItem *
+spgMakeInnerItem(SpGistScanOpaque so,
+                SpGistSearchItem * parentItem,
+                SpGistNodeTuple tuple,
+                spgInnerConsistentOut *out, int i, bool isnull,
+                double *distances)
+{
+   SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
 
-   procinfo = index_getprocinfo(index, 1, SPGIST_LEAF_CONSISTENT_PROC);
-   result = DatumGetBool(FunctionCall2Coll(procinfo,
-                                           index->rd_indcollation[0],
-                                           PointerGetDatum(&in),
-                                           PointerGetDatum(&out)));
+   item->heapPtr = tuple->t_tid;
+   item->level = out->levelAdds ? parentItem->level + out->levelAdds[i]
+       : parentItem->level;
 
-   *leafValue = out.leafValue;
-   *recheck = out.recheck;
+   /* Must copy value out of temp context */
+   item->value = out->reconstructedValues
+       ? datumCopy(out->reconstructedValues[i],
+                   so->state.attLeafType.attbyval,
+                   so->state.attLeafType.attlen)
+       : (Datum) 0;
 
-   MemoryContextSwitchTo(oldCtx);
+   /*
+    * Elements of out.traversalValues should be allocated in
+    * in.traversalMemoryContext, which is actually a long lived context of
+    * index scan.
+    */
+   item->traversalValue =
+       out->traversalValues ? out->traversalValues[i] : NULL;
 
-   return result;
+   item->isLeaf = false;
+   item->recheck = false;
+   item->recheckDistances = false;
+
+   return item;
+}
+
+static void
+spgInnerTest(SpGistScanOpaque so, SpGistSearchItem * item,
+            SpGistInnerTuple innerTuple, bool isnull)
+{
+   MemoryContext oldCxt = MemoryContextSwitchTo(so->tempCxt);
+   spgInnerConsistentOut out;
+   int         nNodes = innerTuple->nNodes;
+   int         i;
+
+   memset(&out, 0, sizeof(out));
+
+   if (!isnull)
+   {
+       spgInnerConsistentIn in;
+
+       spgInitInnerConsistentIn(&in, so, item, innerTuple);
+
+       /* use user-defined inner consistent method */
+       FunctionCall2Coll(&so->innerConsistentFn,
+                         so->indexCollation,
+                         PointerGetDatum(&in),
+                         PointerGetDatum(&out));
+   }
+   else
+   {
+       /* force all children to be visited */
+       out.nNodes = nNodes;
+       out.nodeNumbers = (int *) palloc(sizeof(int) * nNodes);
+       for (i = 0; i < nNodes; i++)
+           out.nodeNumbers[i] = i;
+   }
+
+   /* If allTheSame, they should all or none of them match */
+   if (innerTuple->allTheSame && out.nNodes != 0 && out.nNodes != nNodes)
+       elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
+
+   if (out.nNodes)
+   {
+       /* collect node pointers */
+       SpGistNodeTuple node;
+       SpGistNodeTuple *nodes = (SpGistNodeTuple *) palloc(
+                                                           sizeof(SpGistNodeTuple) * nNodes);
+
+       SGITITERATE(innerTuple, i, node)
+       {
+           nodes[i] = node;
+       }
+
+       MemoryContextSwitchTo(so->traversalCxt);
+
+       for (i = 0; i < out.nNodes; i++)
+       {
+           int         nodeN = out.nodeNumbers[i];
+           SpGistSearchItem *innerItem;
+           double     *distances;
+
+           Assert(nodeN >= 0 && nodeN < nNodes);
+
+           node = nodes[nodeN];
+
+           if (!ItemPointerIsValid(&node->t_tid))
+               continue;
+
+           /*
+            * Use infinity distances if innerConsistent() failed to return
+            * them or if is a NULL item (their distances are really unused).
+            */
+           distances = out.distances ? out.distances[i] : so->infDistances;
+
+           innerItem = spgMakeInnerItem(so, item, node, &out, i, isnull,
+                                        distances);
+
+           spgAddSearchItemToQueue(so, innerItem);
+       }
+   }
+
+   MemoryContextSwitchTo(oldCxt);
+}
+
+/* Returns a next item in an (ordered) scan or null if the index is exhausted */
+static SpGistSearchItem *
+spgGetNextQueueItem(SpGistScanOpaque so)
+{
+   if (pairingheap_is_empty(so->scanQueue))
+       return NULL;            /* Done when both heaps are empty */
+
+   /* Return item; caller is responsible to pfree it */
+   return (SpGistSearchItem *) pairingheap_remove_first(so->scanQueue);
+}
+
+enum SpGistSpecialOffsetNumbers
+{
+   SpGistBreakOffsetNumber = InvalidOffsetNumber,
+   SpGistRedirectOffsetNumber = MaxOffsetNumber + 1,
+   SpGistErrorOffsetNumber = MaxOffsetNumber + 2
+};
+
+static OffsetNumber
+spgTestLeafTuple(SpGistScanOpaque so,
+                SpGistSearchItem * item,
+                Page page, OffsetNumber offset,
+                bool isnull, bool isroot,
+                bool *reportedSome,
+                storeRes_func storeRes)
+{
+   SpGistLeafTuple leafTuple = (SpGistLeafTuple)
+   PageGetItem(page, PageGetItemId(page, offset));
+
+   if (leafTuple->tupstate != SPGIST_LIVE)
+   {
+       if (!isroot)            /* all tuples on root should be live */
+       {
+           if (leafTuple->tupstate == SPGIST_REDIRECT)
+           {
+               /* redirection tuple should be first in chain */
+               Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
+               /* transfer attention to redirect point */
+               item->heapPtr = ((SpGistDeadTuple) leafTuple)->pointer;
+               Assert(ItemPointerGetBlockNumber(&item->heapPtr) != SPGIST_METAPAGE_BLKNO);
+               return SpGistRedirectOffsetNumber;
+           }
+
+           if (leafTuple->tupstate == SPGIST_DEAD)
+           {
+               /* dead tuple should be first in chain */
+               Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
+               /* No live entries on this page */
+               Assert(leafTuple->nextOffset == InvalidOffsetNumber);
+               return SpGistBreakOffsetNumber;
+           }
+       }
+
+       /* We should not arrive at a placeholder */
+       elog(ERROR, "unexpected SPGiST tuple state: %d", leafTuple->tupstate);
+       return SpGistErrorOffsetNumber;
+   }
+
+   Assert(ItemPointerIsValid(&leafTuple->heapPtr));
+
+   spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
+
+   return leafTuple->nextOffset;
 }
 
 /*
@@ -317,247 +723,101 @@ spgWalk(Relation index, SpGistScanOpaque so, bool scanWholeIndex,
 
    while (scanWholeIndex || !reportedSome)
    {
-       ScanStackEntry *stackEntry;
-       BlockNumber blkno;
-       OffsetNumber offset;
-       Page        page;
-       bool        isnull;
-
-       /* Pull next to-do item from the list */
-       if (so->scanStack == NIL)
-           break;              /* there are no more pages to scan */
+       SpGistSearchItem *item = spgGetNextQueueItem(so);
 
-       stackEntry = (ScanStackEntry *) linitial(so->scanStack);
-       so->scanStack = list_delete_first(so->scanStack);
+       if (item == NULL)
+           break;              /* No more items in queue -> done */
 
 redirect:
        /* Check for interrupts, just in case of infinite loop */
        CHECK_FOR_INTERRUPTS();
 
-       blkno = ItemPointerGetBlockNumber(&stackEntry->ptr);
-       offset = ItemPointerGetOffsetNumber(&stackEntry->ptr);
-
-       if (buffer == InvalidBuffer)
+       if (item->isLeaf)
        {
-           buffer = ReadBuffer(index, blkno);
-           LockBuffer(buffer, BUFFER_LOCK_SHARE);
+           /* We store heap items in the queue only in case of ordered search */
+           Assert(so->numberOfOrderBys > 0);
+           storeRes(so, &item->heapPtr, item->value, item->isNull,
+                    item->recheck, item->recheckDistances, item->distances);
+           reportedSome = true;
        }
-       else if (blkno != BufferGetBlockNumber(buffer))
+       else
        {
-           UnlockReleaseBuffer(buffer);
-           buffer = ReadBuffer(index, blkno);
-           LockBuffer(buffer, BUFFER_LOCK_SHARE);
-       }
-       /* else new pointer points to the same page, no work needed */
+           BlockNumber blkno = ItemPointerGetBlockNumber(&item->heapPtr);
+           OffsetNumber offset = ItemPointerGetOffsetNumber(&item->heapPtr);
+           Page        page;
+           bool        isnull;
 
-       page = BufferGetPage(buffer);
-       TestForOldSnapshot(snapshot, index, page);
+           if (buffer == InvalidBuffer)
+           {
+               buffer = ReadBuffer(index, blkno);
+               LockBuffer(buffer, BUFFER_LOCK_SHARE);
+           }
+           else if (blkno != BufferGetBlockNumber(buffer))
+           {
+               UnlockReleaseBuffer(buffer);
+               buffer = ReadBuffer(index, blkno);
+               LockBuffer(buffer, BUFFER_LOCK_SHARE);
+           }
 
-       isnull = SpGistPageStoresNulls(page) ? true : false;
+           /* else new pointer points to the same page, no work needed */
 
-       if (SpGistPageIsLeaf(page))
-       {
-           SpGistLeafTuple leafTuple;
-           OffsetNumber max = PageGetMaxOffsetNumber(page);
-           Datum       leafValue = (Datum) 0;
-           bool        recheck = false;
+           page = BufferGetPage(buffer);
+           TestForOldSnapshot(snapshot, index, page);
+
+           isnull = SpGistPageStoresNulls(page) ? true : false;
 
-           if (SpGistBlockIsRoot(blkno))
+           if (SpGistPageIsLeaf(page))
            {
-               /* When root is a leaf, examine all its tuples */
-               for (offset = FirstOffsetNumber; offset <= max; offset++)
-               {
-                   leafTuple = (SpGistLeafTuple)
-                       PageGetItem(page, PageGetItemId(page, offset));
-                   if (leafTuple->tupstate != SPGIST_LIVE)
-                   {
-                       /* all tuples on root should be live */
-                       elog(ERROR, "unexpected SPGiST tuple state: %d",
-                            leafTuple->tupstate);
-                   }
+               /* Page is a leaf - that is, all it's tuples are heap items */
+               OffsetNumber max = PageGetMaxOffsetNumber(page);
 
-                   Assert(ItemPointerIsValid(&leafTuple->heapPtr));
-                   if (spgLeafTest(index, so,
-                                   leafTuple, isnull,
-                                   stackEntry->level,
-                                   stackEntry->reconstructedValue,
-                                   stackEntry->traversalValue,
-                                   &leafValue,
-                                   &recheck))
-                   {
-                       storeRes(so, &leafTuple->heapPtr,
-                                leafValue, isnull, recheck);
-                       reportedSome = true;
-                   }
+               if (SpGistBlockIsRoot(blkno))
+               {
+                   /* When root is a leaf, examine all its tuples */
+                   for (offset = FirstOffsetNumber; offset <= max; offset++)
+                       (void) spgTestLeafTuple(so, item, page, offset,
+                                               isnull, true,
+                                               &reportedSome, storeRes);
                }
-           }
-           else
-           {
-               /* Normal case: just examine the chain we arrived at */
-               while (offset != InvalidOffsetNumber)
+               else
                {
-                   Assert(offset >= FirstOffsetNumber && offset <= max);
-                   leafTuple = (SpGistLeafTuple)
-                       PageGetItem(page, PageGetItemId(page, offset));
-                   if (leafTuple->tupstate != SPGIST_LIVE)
+                   /* Normal case: just examine the chain we arrived at */
+                   while (offset != InvalidOffsetNumber)
                    {
-                       if (leafTuple->tupstate == SPGIST_REDIRECT)
-                       {
-                           /* redirection tuple should be first in chain */
-                           Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
-                           /* transfer attention to redirect point */
-                           stackEntry->ptr = ((SpGistDeadTuple) leafTuple)->pointer;
-                           Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
+                       Assert(offset >= FirstOffsetNumber && offset <= max);
+                       offset = spgTestLeafTuple(so, item, page, offset,
+                                                 isnull, false,
+                                                 &reportedSome, storeRes);
+                       if (offset == SpGistRedirectOffsetNumber)
                            goto redirect;
-                       }
-                       if (leafTuple->tupstate == SPGIST_DEAD)
-                       {
-                           /* dead tuple should be first in chain */
-                           Assert(offset == ItemPointerGetOffsetNumber(&stackEntry->ptr));
-                           /* No live entries on this page */
-                           Assert(leafTuple->nextOffset == InvalidOffsetNumber);
-                           break;
-                       }
-                       /* We should not arrive at a placeholder */
-                       elog(ERROR, "unexpected SPGiST tuple state: %d",
-                            leafTuple->tupstate);
-                   }
-
-                   Assert(ItemPointerIsValid(&leafTuple->heapPtr));
-                   if (spgLeafTest(index, so,
-                                   leafTuple, isnull,
-                                   stackEntry->level,
-                                   stackEntry->reconstructedValue,
-                                   stackEntry->traversalValue,
-                                   &leafValue,
-                                   &recheck))
-                   {
-                       storeRes(so, &leafTuple->heapPtr,
-                                leafValue, isnull, recheck);
-                       reportedSome = true;
                    }
-
-                   offset = leafTuple->nextOffset;
-               }
-           }
-       }
-       else                    /* page is inner */
-       {
-           SpGistInnerTuple innerTuple;
-           spgInnerConsistentIn in;
-           spgInnerConsistentOut out;
-           FmgrInfo   *procinfo;
-           SpGistNodeTuple *nodes;
-           SpGistNodeTuple node;
-           int         i;
-           MemoryContext oldCtx;
-
-           innerTuple = (SpGistInnerTuple) PageGetItem(page,
-                                                       PageGetItemId(page, offset));
-
-           if (innerTuple->tupstate != SPGIST_LIVE)
-           {
-               if (innerTuple->tupstate == SPGIST_REDIRECT)
-               {
-                   /* transfer attention to redirect point */
-                   stackEntry->ptr = ((SpGistDeadTuple) innerTuple)->pointer;
-                   Assert(ItemPointerGetBlockNumber(&stackEntry->ptr) != SPGIST_METAPAGE_BLKNO);
-                   goto redirect;
                }
-               elog(ERROR, "unexpected SPGiST tuple state: %d",
-                    innerTuple->tupstate);
-           }
-
-           /* use temp context for calling inner_consistent */
-           oldCtx = MemoryContextSwitchTo(so->tempCxt);
-
-           in.scankeys = so->keyData;
-           in.nkeys = so->numberOfKeys;
-           in.reconstructedValue = stackEntry->reconstructedValue;
-           in.traversalMemoryContext = so->traversalCxt;
-           in.traversalValue = stackEntry->traversalValue;
-           in.level = stackEntry->level;
-           in.returnData = so->want_itup;
-           in.allTheSame = innerTuple->allTheSame;
-           in.hasPrefix = (innerTuple->prefixSize > 0);
-           in.prefixDatum = SGITDATUM(innerTuple, &so->state);
-           in.nNodes = innerTuple->nNodes;
-           in.nodeLabels = spgExtractNodeLabels(&so->state, innerTuple);
-
-           /* collect node pointers */
-           nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * in.nNodes);
-           SGITITERATE(innerTuple, i, node)
-           {
-               nodes[i] = node;
-           }
-
-           memset(&out, 0, sizeof(out));
-
-           if (!isnull)
-           {
-               /* use user-defined inner consistent method */
-               procinfo = index_getprocinfo(index, 1, SPGIST_INNER_CONSISTENT_PROC);
-               FunctionCall2Coll(procinfo,
-                                 index->rd_indcollation[0],
-                                 PointerGetDatum(&in),
-                                 PointerGetDatum(&out));
-           }
-           else
-           {
-               /* force all children to be visited */
-               out.nNodes = in.nNodes;
-               out.nodeNumbers = (int *) palloc(sizeof(int) * in.nNodes);
-               for (i = 0; i < in.nNodes; i++)
-                   out.nodeNumbers[i] = i;
            }
-
-           MemoryContextSwitchTo(oldCtx);
-
-           /* If allTheSame, they should all or none of 'em match */
-           if (innerTuple->allTheSame)
-               if (out.nNodes != 0 && out.nNodes != in.nNodes)
-                   elog(ERROR, "inconsistent inner_consistent results for allTheSame inner tuple");
-
-           for (i = 0; i < out.nNodes; i++)
+           else                /* page is inner */
            {
-               int         nodeN = out.nodeNumbers[i];
+               SpGistInnerTuple innerTuple = (SpGistInnerTuple)
+               PageGetItem(page, PageGetItemId(page, offset));
 
-               Assert(nodeN >= 0 && nodeN < in.nNodes);
-               if (ItemPointerIsValid(&nodes[nodeN]->t_tid))
+               if (innerTuple->tupstate != SPGIST_LIVE)
                {
-                   ScanStackEntry *newEntry;
-
-                   /* Create new work item for this node */
-                   newEntry = palloc(sizeof(ScanStackEntry));
-                   newEntry->ptr = nodes[nodeN]->t_tid;
-                   if (out.levelAdds)
-                       newEntry->level = stackEntry->level + out.levelAdds[i];
-                   else
-                       newEntry->level = stackEntry->level;
-                   /* Must copy value out of temp context */
-                   if (out.reconstructedValues)
-                       newEntry->reconstructedValue =
-                           datumCopy(out.reconstructedValues[i],
-                                     so->state.attLeafType.attbyval,
-                                     so->state.attLeafType.attlen);
-                   else
-                       newEntry->reconstructedValue = (Datum) 0;
-
-                   /*
-                    * Elements of out.traversalValues should be allocated in
-                    * in.traversalMemoryContext, which is actually a long
-                    * lived context of index scan.
-                    */
-                   newEntry->traversalValue = (out.traversalValues) ?
-                       out.traversalValues[i] : NULL;
-
-                   so->scanStack = lcons(newEntry, so->scanStack);
+                   if (innerTuple->tupstate == SPGIST_REDIRECT)
+                   {
+                       /* transfer attention to redirect point */
+                       item->heapPtr = ((SpGistDeadTuple) innerTuple)->pointer;
+                       Assert(ItemPointerGetBlockNumber(&item->heapPtr) !=
+                              SPGIST_METAPAGE_BLKNO);
+                       goto redirect;
+                   }
+                   elog(ERROR, "unexpected SPGiST tuple state: %d",
+                        innerTuple->tupstate);
                }
+
+               spgInnerTest(so, item, innerTuple, isnull);
            }
        }
 
-       /* done with this scan stack entry */
-       freeScanStackEntry(so, stackEntry);
+       /* done with this scan item */
+       spgFreeSearchItem(so, item);
        /* clear temp context before proceeding to the next one */
        MemoryContextReset(so->tempCxt);
    }
@@ -566,11 +826,14 @@ redirect:
        UnlockReleaseBuffer(buffer);
 }
 
+
 /* storeRes subroutine for getbitmap case */
 static void
 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
-           Datum leafValue, bool isnull, bool recheck)
+           Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
+           double *distances)
 {
+   Assert(!recheckDistances && !distances);
    tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
    so->ntids++;
 }
@@ -594,11 +857,26 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 /* storeRes subroutine for gettuple case */
 static void
 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
-             Datum leafValue, bool isnull, bool recheck)
+             Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
+             double *distances)
 {
    Assert(so->nPtrs < MaxIndexTuplesPerPage);
    so->heapPtrs[so->nPtrs] = *heapPtr;
    so->recheck[so->nPtrs] = recheck;
+   so->recheckDistances[so->nPtrs] = recheckDistances;
+
+   if (so->numberOfOrderBys > 0)
+   {
+       if (isnull)
+           so->distances[so->nPtrs] = NULL;
+       else
+       {
+           Size        size = sizeof(double) * so->numberOfOrderBys;
+
+           so->distances[so->nPtrs] = memcpy(palloc(size), distances, size);
+       }
+   }
+
    if (so->want_itup)
    {
        /*
@@ -627,14 +905,29 @@ spggettuple(IndexScanDesc scan, ScanDirection dir)
    {
        if (so->iPtr < so->nPtrs)
        {
-           /* continuing to return tuples from a leaf page */
+           /* continuing to return reported tuples */
            scan->xs_ctup.t_self = so->heapPtrs[so->iPtr];
            scan->xs_recheck = so->recheck[so->iPtr];
            scan->xs_hitup = so->reconTups[so->iPtr];
+
+           if (so->numberOfOrderBys > 0)
+               index_store_float8_orderby_distances(scan, so->orderByTypes,
+                                                    so->distances[so->iPtr],
+                                                    so->recheckDistances[so->iPtr]);
            so->iPtr++;
            return true;
        }
 
+       if (so->numberOfOrderBys > 0)
+       {
+           /* Must pfree distances to avoid memory leak */
+           int         i;
+
+           for (i = 0; i < so->nPtrs; i++)
+               if (so->distances[i])
+                   pfree(so->distances[i]);
+       }
+
        if (so->want_itup)
        {
            /* Must pfree reconstructed tuples to avoid memory leak */
index 6d59b316ae3c64e590ebc1262d3532f2c5baf51c..9919e6f0d727c4f6969f9d5ee7a856fab9a8ce7e 100644 (file)
 
 #include "postgres.h"
 
+#include "access/amvalidate.h"
+#include "access/htup_details.h"
 #include "access/reloptions.h"
 #include "access/spgist_private.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/pg_amop.h"
+#include "optimizer/paths.h"
 #include "storage/bufmgr.h"
 #include "storage/indexfsm.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
+#include "utils/catcache.h"
 #include "utils/index_selfuncs.h"
 #include "utils/lsyscache.h"
+#include "utils/syscache.h"
 
+extern Expr *spgcanorderbyop(IndexOptInfo *index,
+               PathKey *pathkey, int pathkeyno,
+               Expr *orderby_clause, int *indexcol_p);
 
 /*
  * SP-GiST handler function: return IndexAmRoutine with access method parameters
@@ -39,7 +48,7 @@ spghandler(PG_FUNCTION_ARGS)
    amroutine->amstrategies = 0;
    amroutine->amsupport = SPGISTNProc;
    amroutine->amcanorder = false;
-   amroutine->amcanorderbyop = false;
+   amroutine->amcanorderbyop = true;
    amroutine->amcanbackward = false;
    amroutine->amcanunique = false;
    amroutine->amcanmulticol = false;
@@ -61,7 +70,7 @@ spghandler(PG_FUNCTION_ARGS)
    amroutine->amcanreturn = spgcanreturn;
    amroutine->amcostestimate = spgcostestimate;
    amroutine->amoptions = spgoptions;
-   amroutine->amproperty = NULL;
+   amroutine->amproperty = spgproperty;
    amroutine->amvalidate = spgvalidate;
    amroutine->ambeginscan = spgbeginscan;
    amroutine->amrescan = spgrescan;
@@ -949,3 +958,82 @@ SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
 
    return offnum;
 }
+
+/*
+ * spgproperty() -- Check boolean properties of indexes.
+ *
+ * This is optional for most AMs, but is required for SP-GiST because the core
+ * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
+ */
+bool
+spgproperty(Oid index_oid, int attno,
+           IndexAMProperty prop, const char *propname,
+           bool *res, bool *isnull)
+{
+   Oid         opclass,
+               opfamily,
+               opcintype;
+   CatCList   *catlist;
+   int         i;
+
+   /* Only answer column-level inquiries */
+   if (attno == 0)
+       return false;
+
+   switch (prop)
+   {
+       case AMPROP_DISTANCE_ORDERABLE:
+           break;
+       default:
+           return false;
+   }
+
+   /*
+    * Currently, SP-GiST distance-ordered scans require that there be a
+    * distance operator in the opclass with the default types. So we assume
+    * that if such a operator exists, then there's a reason for it.
+    */
+
+   /* First we need to know the column's opclass. */
+   opclass = get_index_column_opclass(index_oid, attno);
+   if (!OidIsValid(opclass))
+   {
+       *isnull = true;
+       return true;
+   }
+
+   /* Now look up the opclass family and input datatype. */
+   if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
+   {
+       *isnull = true;
+       return true;
+   }
+
+   /* And now we can check whether the operator is provided. */
+   catlist = SearchSysCacheList1(AMOPSTRATEGY,
+                                 ObjectIdGetDatum(opfamily));
+
+   *res = false;
+
+   for (i = 0; i < catlist->n_members; i++)
+   {
+       HeapTuple   amoptup = &catlist->members[i]->tuple;
+       Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
+
+       if (amopform->amoppurpose == AMOP_ORDER &&
+           (amopform->amoplefttype == opcintype ||
+            amopform->amoprighttype == opcintype) &&
+           opfamily_can_sort_type(amopform->amopsortfamily,
+                                  get_op_rettype(amopform->amopopr)))
+       {
+           *res = true;
+           break;
+       }
+   }
+
+   ReleaseSysCacheList(catlist);
+
+   *isnull = false;
+
+   return true;
+}
index c7acc7fc0255afb6c02526ca63700058e0969c39..8ba6c26f0c6e8ed16bd3636a39794d4a7bb149c9 100644 (file)
@@ -187,6 +187,7 @@ spgvalidate(Oid opclassoid)
    {
        HeapTuple   oprtup = &oprlist->members[i]->tuple;
        Form_pg_amop oprform = (Form_pg_amop) GETSTRUCT(oprtup);
+       Oid         op_rettype;
 
        /* TODO: Check that only allowed strategy numbers exist */
        if (oprform->amopstrategy < 1 || oprform->amopstrategy > 63)
@@ -200,20 +201,26 @@ spgvalidate(Oid opclassoid)
            result = false;
        }
 
-       /* spgist doesn't support ORDER BY operators */
-       if (oprform->amoppurpose != AMOP_SEARCH ||
-           OidIsValid(oprform->amopsortfamily))
+       /* spgist supports ORDER BY operators */
+       if (oprform->amoppurpose != AMOP_SEARCH)
        {
-           ereport(INFO,
-                   (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
-                    errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s",
-                           opfamilyname, "spgist",
-                           format_operator(oprform->amopopr))));
-           result = false;
+           /* ... and operator result must match the claimed btree opfamily */
+           op_rettype = get_op_rettype(oprform->amopopr);
+           if (!opfamily_can_sort_type(oprform->amopsortfamily, op_rettype))
+           {
+               ereport(INFO,
+                       (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+                        errmsg("operator family \"%s\" of access method %s contains invalid ORDER BY specification for operator %s",
+                               opfamilyname, "spgist",
+                               format_operator(oprform->amopopr))));
+               result = false;
+           }
        }
+       else
+           op_rettype = BOOLOID;
 
        /* Check operator signature --- same for all spgist strategies */
-       if (!check_amop_signature(oprform->amopopr, BOOLOID,
+       if (!check_amop_signature(oprform->amopopr, op_rettype,
                                  oprform->amoplefttype,
                                  oprform->amoprighttype))
        {
index f9e8db63ddc491153e0310e1bb350b86ffc3338a..e52d80f5e13463cf876469d19b57e8583c6a12fa 100644 (file)
 #include "postgres.h"
 
 #include "access/spgist.h"
+#include "access/spgist_private.h"
 #include "access/stratnum.h"
 #include "catalog/pg_type.h"
 #include "utils/float.h"
+#include "utils/fmgroids.h"
 #include "utils/fmgrprotos.h"
 #include "utils/geo_decls.h"
 
@@ -367,6 +369,31 @@ overAbove4D(RectBox *rect_box, RangeBox *query)
    return overHigher2D(&rect_box->range_box_y, &query->right);
 }
 
+/* Lower bound for the distance between point and rect_box */
+static double
+pointToRectBoxDistance(Point *point, RectBox *rect_box)
+{
+   double      dx;
+   double      dy;
+
+   if (point->x < rect_box->range_box_x.left.low)
+       dx = rect_box->range_box_x.left.low - point->x;
+   else if (point->x > rect_box->range_box_x.right.high)
+       dx = point->x - rect_box->range_box_x.right.high;
+   else
+       dx = 0;
+
+   if (point->y < rect_box->range_box_y.left.low)
+       dy = rect_box->range_box_y.left.low - point->y;
+   else if (point->y > rect_box->range_box_y.right.high)
+       dy = point->y - rect_box->range_box_y.right.high;
+   else
+       dy = 0;
+
+   return HYPOT(dx, dy);
+}
+
+
 /*
  * SP-GiST config function
  */
@@ -534,6 +561,15 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
    RangeBox   *centroid,
              **queries;
 
+   /*
+    * We are saving the traversal value or initialize it an unbounded one, if
+    * we have just begun to walk the tree.
+    */
+   if (in->traversalValue)
+       rect_box = in->traversalValue;
+   else
+       rect_box = initRectBox();
+
    if (in->allTheSame)
    {
        /* Report that all nodes should be visited */
@@ -542,18 +578,32 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
        for (i = 0; i < in->nNodes; i++)
            out->nodeNumbers[i] = i;
 
+       if (in->norderbys > 0 && in->nNodes > 0)
+       {
+           double     *distances = palloc(sizeof(double) * in->norderbys);
+           int         j;
+
+           for (j = 0; j < in->norderbys; j++)
+           {
+               Point      *pt = DatumGetPointP(in->orderbys[j].sk_argument);
+
+               distances[j] = pointToRectBoxDistance(pt, rect_box);
+           }
+
+           out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
+           out->distances[0] = distances;
+
+           for (i = 1; i < in->nNodes; i++)
+           {
+               out->distances[i] = palloc(sizeof(double) * in->norderbys);
+               memcpy(out->distances[i], distances,
+                      sizeof(double) * in->norderbys);
+           }
+       }
+
        PG_RETURN_VOID();
    }
 
-   /*
-    * We are saving the traversal value or initialize it an unbounded one, if
-    * we have just begun to walk the tree.
-    */
-   if (in->traversalValue)
-       rect_box = in->traversalValue;
-   else
-       rect_box = initRectBox();
-
    /*
     * We are casting the prefix and queries to RangeBoxes for ease of the
     * following operations.
@@ -571,6 +621,8 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
    out->nNodes = 0;
    out->nodeNumbers = (int *) palloc(sizeof(int) * in->nNodes);
    out->traversalValues = (void **) palloc(sizeof(void *) * in->nNodes);
+   if (in->norderbys > 0)
+       out->distances = (double **) palloc(sizeof(double *) * in->nNodes);
 
    /*
     * We switch memory context, because we want to allocate memory for new
@@ -648,6 +700,22 @@ spg_box_quad_inner_consistent(PG_FUNCTION_ARGS)
        {
            out->traversalValues[out->nNodes] = next_rect_box;
            out->nodeNumbers[out->nNodes] = quadrant;
+
+           if (in->norderbys > 0)
+           {
+               double     *distances = palloc(sizeof(double) * in->norderbys);
+               int         j;
+
+               out->distances[out->nNodes] = distances;
+
+               for (j = 0; j < in->norderbys; j++)
+               {
+                   Point      *pt = DatumGetPointP(in->orderbys[j].sk_argument);
+
+                   distances[j] = pointToRectBoxDistance(pt, next_rect_box);
+               }
+           }
+
            out->nNodes++;
        }
        else
@@ -763,6 +831,17 @@ spg_box_quad_leaf_consistent(PG_FUNCTION_ARGS)
            break;
    }
 
+   if (flag && in->norderbys > 0)
+   {
+       Oid         distfnoid = in->orderbys[0].sk_func.fn_oid;
+
+       out->distances = spg_key_orderbys_distances(leaf, false,
+                                                   in->orderbys, in->norderbys);
+
+       /* Recheck is necessary when computing distance to polygon */
+       out->recheckDistances = distfnoid == F_DIST_POLYP;
+   }
+
    PG_RETURN_BOOL(flag);
 }
 
index bba595ad1daf174eb8333572d9f1a5c1b024afb7..0c116b32efd51b1299203ea798e1b605e98c8fa3 100644 (file)
@@ -1067,6 +1067,32 @@ get_opclass_input_type(Oid opclass)
    return result;
 }
 
+/*
+ * get_opclass_family_and_input_type
+ *
+ *     Returns the OID of the operator family the opclass belongs to,
+ *             the OID of the datatype the opclass indexes
+ */
+bool
+get_opclass_opfamily_and_input_type(Oid opclass, Oid *opfamily, Oid *opcintype)
+{
+   HeapTuple   tp;
+   Form_pg_opclass cla_tup;
+
+   tp = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
+   if (!HeapTupleIsValid(tp))
+       return false;
+
+   cla_tup = (Form_pg_opclass) GETSTRUCT(tp);
+
+   *opfamily = cla_tup->opcfamily;
+   *opcintype = cla_tup->opcintype;
+
+   ReleaseSysCache(tp);
+
+   return true;
+}
+
 /*             ---------- OPERATOR CACHE ----------                     */
 
 /*
@@ -3106,3 +3132,45 @@ get_range_subtype(Oid rangeOid)
    else
        return InvalidOid;
 }
+
+/*             ---------- PG_INDEX CACHE ----------                 */
+
+/*
+ * get_index_column_opclass
+ *
+ *     Given the index OID and column number,
+ *     return opclass of the index column
+ *         or InvalidOid if the index was not found.
+ */
+Oid
+get_index_column_opclass(Oid index_oid, int attno)
+{
+   HeapTuple   tuple;
+   Form_pg_index rd_index PG_USED_FOR_ASSERTS_ONLY;
+   Datum       datum;
+   bool        isnull;
+   oidvector  *indclass;
+   Oid         opclass;
+
+   /* First we need to know the column's opclass. */
+
+   tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(index_oid));
+   if (!HeapTupleIsValid(tuple))
+       return InvalidOid;
+
+   rd_index = (Form_pg_index) GETSTRUCT(tuple);
+
+   /* caller is supposed to guarantee this */
+   Assert(attno > 0 && attno <= rd_index->indnatts);
+
+   datum = SysCacheGetAttr(INDEXRELID, tuple,
+                           Anum_pg_index_indclass, &isnull);
+   Assert(!isnull);
+
+   indclass = ((oidvector *) DatumGetPointer(datum));
+   opclass = indclass->values[attno - 1];
+
+   ReleaseSysCache(tuple);
+
+   return opclass;
+}
index 24c720bf421d3c6a4619c7452987a443ee50e6a3..534fac7bf2f41b3170d51423d8c66ec077742636 100644 (file)
@@ -174,6 +174,9 @@ extern RegProcedure index_getprocid(Relation irel, AttrNumber attnum,
                uint16 procnum);
 extern FmgrInfo *index_getprocinfo(Relation irel, AttrNumber attnum,
                  uint16 procnum);
+extern void index_store_float8_orderby_distances(IndexScanDesc scan,
+                                    Oid *orderByTypes, double *distances,
+                                    bool recheckOrderBy);
 
 /*
  * index access method support routines (in genam.c)
index c6d7e22a389814ffcff1e0b0be806063b1463237..9c19e9e63822a41f90df95d984336ca83f54d18b 100644 (file)
@@ -136,7 +136,10 @@ typedef struct spgPickSplitOut
 typedef struct spgInnerConsistentIn
 {
    ScanKey     scankeys;       /* array of operators and comparison values */
-   int         nkeys;          /* length of array */
+   ScanKey     orderbys;       /* array of ordering operators and comparison
+                                * values */
+   int         nkeys;          /* length of scankeys array */
+   int         norderbys;      /* length of orderbys array */
 
    Datum       reconstructedValue; /* value reconstructed at parent */
    void       *traversalValue; /* opclass-specific traverse value */
@@ -159,6 +162,7 @@ typedef struct spgInnerConsistentOut
    int        *levelAdds;      /* increment level by this much for each */
    Datum      *reconstructedValues;    /* associated reconstructed values */
    void      **traversalValues;    /* opclass-specific traverse values */
+   double    **distances;      /* associated distances */
 } spgInnerConsistentOut;
 
 /*
@@ -167,7 +171,10 @@ typedef struct spgInnerConsistentOut
 typedef struct spgLeafConsistentIn
 {
    ScanKey     scankeys;       /* array of operators and comparison values */
-   int         nkeys;          /* length of array */
+   ScanKey     orderbys;       /* array of ordering operators and comparison
+                                * values */
+   int         nkeys;          /* length of scankeys array */
+   int         norderbys;      /* length of orderbys array */
 
    Datum       reconstructedValue; /* value reconstructed at parent */
    void       *traversalValue; /* opclass-specific traverse value */
@@ -181,6 +188,8 @@ typedef struct spgLeafConsistentOut
 {
    Datum       leafValue;      /* reconstructed original data, if any */
    bool        recheck;        /* set true if operator must be rechecked */
+   bool        recheckDistances;   /* set true if distances must be rechecked */
+   double     *distances;      /* associated distances */
 } spgLeafConsistentOut;
 
 
index 99365c8a45d8c4f7f74702aab305c1b04bdace7c..d23862ea71df632c98dcdfa691be8ae39848cedf 100644 (file)
@@ -18,6 +18,7 @@
 #include "access/spgist.h"
 #include "nodes/tidbitmap.h"
 #include "storage/buf.h"
+#include "utils/geo_decls.h"
 #include "utils/relcache.h"
 
 
@@ -130,14 +131,35 @@ typedef struct SpGistState
    bool        isBuild;        /* true if doing index build */
 } SpGistState;
 
+typedef struct SpGistSearchItem
+{
+   pairingheap_node phNode;    /* pairing heap node */
+   Datum       value;          /* value reconstructed from parent or
+                                * leafValue if heaptuple */
+   void       *traversalValue; /* opclass-specific traverse value */
+   int         level;          /* level of items on this page */
+   ItemPointerData heapPtr;    /* heap info, if heap tuple */
+   bool        isNull;         /* SearchItem is NULL item */
+   bool        isLeaf;         /* SearchItem is heap item */
+   bool        recheck;        /* qual recheck is needed */
+   bool        recheckDistances;   /* distance recheck is needed */
+
+   /* array with numberOfOrderBys entries */
+   double      distances[FLEXIBLE_ARRAY_MEMBER];
+}          SpGistSearchItem;
+
+#define SizeOfSpGistSearchItem(n_distances) \
+   (offsetof(SpGistSearchItem, distances) + sizeof(double) * (n_distances))
+
 /*
  * Private state of an index scan
  */
 typedef struct SpGistScanOpaqueData
 {
    SpGistState state;          /* see above */
+   pairingheap *scanQueue;     /* queue of to be visited items */
    MemoryContext tempCxt;      /* short-lived memory context */
-   MemoryContext traversalCxt; /* memory context for traversalValues */
+   MemoryContext traversalCxt; /* single scan lifetime memory context */
 
    /* Control flags showing whether to search nulls and/or non-nulls */
    bool        searchNulls;    /* scan matches (all) null entries */
@@ -146,9 +168,18 @@ typedef struct SpGistScanOpaqueData
    /* Index quals to be passed to opclass (null-related quals removed) */
    int         numberOfKeys;   /* number of index qualifier conditions */
    ScanKey     keyData;        /* array of index qualifier descriptors */
+   int         numberOfOrderBys;   /* number of ordering operators */
+   ScanKey     orderByData;    /* array of ordering op descriptors */
+   Oid        *orderByTypes;   /* array of ordering op return types */
+   Oid         indexCollation; /* collation of index column */
 
-   /* Stack of yet-to-be-visited pages */
-   List       *scanStack;      /* List of ScanStackEntrys */
+   /* Opclass defined functions: */
+   FmgrInfo    innerConsistentFn;
+   FmgrInfo    leafConsistentFn;
+
+   /* Pre-allocated workspace arrays: */
+   double     *zeroDistances;
+   double     *infDistances;
 
    /* These fields are only used in amgetbitmap scans: */
    TIDBitmap  *tbm;            /* bitmap being filled */
@@ -161,7 +192,10 @@ typedef struct SpGistScanOpaqueData
    int         iPtr;           /* index for scanning through same */
    ItemPointerData heapPtrs[MaxIndexTuplesPerPage];    /* TIDs from cur page */
    bool        recheck[MaxIndexTuplesPerPage]; /* their recheck flags */
+   bool        recheckDistances[MaxIndexTuplesPerPage];    /* distance recheck
+                                                            * flags */
    HeapTuple   reconTups[MaxIndexTuplesPerPage];   /* reconstructed tuples */
+   double     *distances[MaxIndexTuplesPerPage];   /* distances (for recheck) */
 
    /*
     * Note: using MaxIndexTuplesPerPage above is a bit hokey since
@@ -410,6 +444,9 @@ extern OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page,
                     Item item, Size size,
                     OffsetNumber *startOffset,
                     bool errorOK);
+extern bool spgproperty(Oid index_oid, int attno,
+           IndexAMProperty prop, const char *propname,
+           bool *res, bool *isnull);
 
 /* spgdoinsert.c */
 extern void spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
@@ -421,4 +458,9 @@ extern void spgPageIndexMultiDelete(SpGistState *state, Page page,
 extern bool spgdoinsert(Relation index, SpGistState *state,
            ItemPointer heapPtr, Datum datum, bool isnull);
 
+/* spgproc.c */
+extern double *spg_key_orderbys_distances(Datum key, bool isLeaf,
+                          ScanKey orderbys, int norderbys);
+extern BOX *box_copy(BOX *orig);
+
 #endif                         /* SPGIST_PRIVATE_H */
index 30bf93f7c30d9183bde4e24d8f3a3bc75e46f9fb..6ab2187949a1fa06c59435948e01cc1ff41ee2a0 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 201809181
+#define CATALOG_VERSION_NO 201809191
 
 #endif
index fb58f774b9338bb3689a1d9dc0a16094f39fa091..5f85e9507ca08cef7d3fbb9325696a063d7e08ef 100644 (file)
 { amopfamily => 'spgist/quad_point_ops', amoplefttype => 'point',
   amoprighttype => 'box', amopstrategy => '8', amopopr => '<@(point,box)',
   amopmethod => 'spgist' },
+{ amopfamily => 'spgist/quad_point_ops', amoplefttype => 'point',
+  amoprighttype => 'point', amopstrategy => '15', amopopr => '<->(point,point)',
+  amopmethod => 'spgist', amoppurpose => 'o',
+  amopsortfamily => 'btree/float_ops' },
 
 # SP-GiST kd_point_ops
 { amopfamily => 'spgist/kd_point_ops', amoplefttype => 'point',
 { amopfamily => 'spgist/kd_point_ops', amoplefttype => 'point',
   amoprighttype => 'box', amopstrategy => '8', amopopr => '<@(point,box)',
   amopmethod => 'spgist' },
+{ amopfamily => 'spgist/kd_point_ops', amoplefttype => 'point',
+  amoprighttype => 'point', amopstrategy => '15', amopopr => '<->(point,point)',
+  amopmethod => 'spgist', amoppurpose => 'o',
+  amopsortfamily => 'btree/float_ops' },
 
 # SP-GiST text_ops
 { amopfamily => 'spgist/text_ops', amoplefttype => 'text',
 { amopfamily => 'spgist/poly_ops', amoplefttype => 'polygon',
   amoprighttype => 'polygon', amopstrategy => '12',
   amopopr => '|&>(polygon,polygon)', amopmethod => 'spgist' },
+{ amopfamily => 'spgist/poly_ops', amoplefttype => 'polygon',
+  amoprighttype => 'point', amopstrategy => '15',
+  amopopr => '<->(polygon,point)', amoppurpose => 'o', amopmethod => 'spgist',
+  amopsortfamily => 'btree/float_ops' },
 
 # GiST inet_ops
 { amopfamily => 'gist/network_ops', amoplefttype => 'inet',
index e55ea4035b80fda16ac45a9e53149bb53ce8e57a..e0eea2a0dc503334944f01afcf8c38022a13c395 100644 (file)
@@ -95,6 +95,8 @@ extern char *get_constraint_name(Oid conoid);
 extern char *get_language_name(Oid langoid, bool missing_ok);
 extern Oid get_opclass_family(Oid opclass);
 extern Oid get_opclass_input_type(Oid opclass);
+extern bool get_opclass_opfamily_and_input_type(Oid opclass,
+                                   Oid *opfamily, Oid *opcintype);
 extern RegProcedure get_opcode(Oid opno);
 extern char *get_opname(Oid opno);
 extern Oid get_op_rettype(Oid opno);
@@ -176,6 +178,7 @@ extern void free_attstatsslot(AttStatsSlot *sslot);
 extern char *get_namespace_name(Oid nspid);
 extern char *get_namespace_name_or_temp(Oid nspid);
 extern Oid get_range_subtype(Oid rangeOid);
+extern Oid get_index_column_opclass(Oid index_oid, int attno);
 
 #define type_is_array(typid)  (get_element_type(typid) != InvalidOid)
 /* type_is_array_domain accepts both plain arrays and domains over arrays */
index 24cd3c5e2e4a4936b0f59bbdc07806a43008fe45..4570a39b058795d26ad61f4f1d46411988cb80b9 100644 (file)
@@ -83,7 +83,8 @@ select prop,
        pg_index_column_has_property('onek_hundred'::regclass, 1, prop) as btree,
        pg_index_column_has_property('hash_i4_index'::regclass, 1, prop) as hash,
        pg_index_column_has_property('gcircleind'::regclass, 1, prop) as gist,
-       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist,
+       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist_radix,
+       pg_index_column_has_property('sp_quad_ind'::regclass, 1, prop) as spgist_quad,
        pg_index_column_has_property('botharrayidx'::regclass, 1, prop) as gin,
        pg_index_column_has_property('brinidx'::regclass, 1, prop) as brin
   from unnest(array['asc', 'desc', 'nulls_first', 'nulls_last',
@@ -92,18 +93,18 @@ select prop,
                     'bogus']::text[])
          with ordinality as u(prop,ord)
  order by ord;
-        prop        | btree | hash | gist | spgist | gin | brin 
---------------------+-------+------+------+--------+-----+------
- asc                | t     | f    | f    | f      | f   | f
- desc               | f     | f    | f    | f      | f   | f
- nulls_first        | f     | f    | f    | f      | f   | f
- nulls_last         | t     | f    | f    | f      | f   | f
- orderable          | t     | f    | f    | f      | f   | f
- distance_orderable | f     | f    | t    | f      | f   | f
- returnable         | t     | f    | f    | t      | f   | f
- search_array       | t     | f    | f    | f      | f   | f
- search_nulls       | t     | f    | t    | t      | f   | t
- bogus              |       |      |      |        |     | 
+        prop        | btree | hash | gist | spgist_radix | spgist_quad | gin | brin 
+--------------------+-------+------+------+--------------+-------------+-----+------
+ asc                | t     | f    | f    | f            | f           | f   | f
+ desc               | f     | f    | f    | f            | f           | f   | f
+ nulls_first        | f     | f    | f    | f            | f           | f   | f
+ nulls_last         | t     | f    | f    | f            | f           | f   | f
+ orderable          | t     | f    | f    | f            | f           | f   | f
+ distance_orderable | f     | f    | t    | f            | t           | f   | f
+ returnable         | t     | f    | f    | t            | t           | f   | f
+ search_array       | t     | f    | f    | f            | f           | f   | f
+ search_nulls       | t     | f    | t    | t            | t           | f   | t
+ bogus              |       |      |      |              |             |     | 
 (10 rows)
 
 select prop,
index be25101db24cbbc673aec6230e436dcf44eca5d8..0065e325c2b9cd5ed93c5dcf648d1b5a61e8678e 100644 (file)
@@ -294,6 +294,15 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
      1
 (1 row)
 
+CREATE TEMP TABLE quad_point_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+CREATE TEMP TABLE quad_point_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE quad_point_tbl_ord_seq3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
  count 
 -------
@@ -888,6 +897,71 @@ SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
      1
 (1 row)
 
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Order By: (p <-> '(0,0)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN quad_point_tbl_ord_idx1 idx
+ON seq.n = idx.n
+AND (seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p <@ '(1000,1000),(200,200)'::box)
+         Order By: (p <-> '(0,0)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN quad_point_tbl_ord_idx2 idx
+ON seq.n = idx.n
+AND (seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_quad_ind on quad_point_tbl
+         Index Cond: (p IS NOT NULL)
+         Order By: (p <-> '(333,400)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN quad_point_tbl_ord_idx3 idx
+ON seq.n = idx.n
+AND (seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
 EXPLAIN (COSTS OFF)
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
                        QUERY PLAN                        
@@ -993,6 +1067,71 @@ SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
      1
 (1 row)
 
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+                      QUERY PLAN                       
+-------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Order By: (p <-> '(0,0)'::point)
+(3 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN kd_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+                       QUERY PLAN                        
+---------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Index Cond: (p <@ '(1000,1000),(200,200)'::box)
+         Order By: (p <-> '(0,0)'::point)
+(4 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN kd_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+                      QUERY PLAN                       
+-------------------------------------------------------
+ WindowAgg
+   ->  Index Only Scan using sp_kd_ind on kd_point_tbl
+         Index Cond: (p IS NOT NULL)
+         Order By: (p <-> '(333,400)'::point)
+(4 rows)
+
+CREATE TEMP TABLE kd_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN kd_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+ n | dist | p | n | dist | p 
+---+------+---+---+------+---
+(0 rows)
+
 EXPLAIN (COSTS OFF)
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
                          QUERY PLAN                         
index 3c6d853ffbb516174f2d8b9e085172539a49ec40..7bcc03b9ada37938f9ad91efccc13a0212135aa3 100644 (file)
@@ -1911,6 +1911,7 @@ ORDER BY 1, 2, 3;
        4000 |           12 | <=
        4000 |           12 | |&>
        4000 |           14 | >=
+       4000 |           15 | <->
        4000 |           15 | >
        4000 |           16 | @>
        4000 |           18 | =
@@ -1924,7 +1925,7 @@ ORDER BY 1, 2, 3;
        4000 |           26 | >>
        4000 |           27 | >>=
        4000 |           28 | ^@
-(122 rows)
+(123 rows)
 
 -- Check that all opclass search operators have selectivity estimators.
 -- This is not absolutely required, but it seems a reasonable thing
index 4a1f60427ab748bb9793abcc8982d6baae4a2c1a..cd8c98b3be7fda7e405609b2418cecaf86cd25ac 100644 (file)
@@ -462,6 +462,54 @@ SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(2
   1000
 (1 row)
 
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+                        QUERY PLAN                         
+-----------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_poly_tbl_idx on quad_poly_tbl
+         Order By: (p <-> '(123,456)'::point)
+(3 rows)
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+SELECT *
+FROM quad_poly_tbl_ord_seq1 seq FULL JOIN quad_poly_tbl_ord_idx1 idx
+   ON seq.n = idx.n AND seq.id = idx.id AND
+       (seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+                                   QUERY PLAN                                    
+---------------------------------------------------------------------------------
+ WindowAgg
+   ->  Index Scan using quad_poly_tbl_idx on quad_poly_tbl
+         Index Cond: (p <@ '((300,300),(400,600),(600,500),(700,200))'::polygon)
+         Order By: (p <-> '(123,456)'::point)
+(4 rows)
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+SELECT *
+FROM quad_poly_tbl_ord_seq2 seq FULL JOIN quad_poly_tbl_ord_idx2 idx
+   ON seq.n = idx.n AND seq.id = idx.id AND
+       (seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+ n | dist | id | n | dist | id 
+---+------+----+---+------+----
+(0 rows)
+
 RESET enable_seqscan;
 RESET enable_indexscan;
 RESET enable_bitmapscan;
index 8ca85ecf00696aa6a711885aaffd853368b2da14..06e7fa10d956cdef6bac3f1b1a4f0654e6c60650 100644 (file)
@@ -40,7 +40,8 @@ select prop,
        pg_index_column_has_property('onek_hundred'::regclass, 1, prop) as btree,
        pg_index_column_has_property('hash_i4_index'::regclass, 1, prop) as hash,
        pg_index_column_has_property('gcircleind'::regclass, 1, prop) as gist,
-       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist,
+       pg_index_column_has_property('sp_radix_ind'::regclass, 1, prop) as spgist_radix,
+       pg_index_column_has_property('sp_quad_ind'::regclass, 1, prop) as spgist_quad,
        pg_index_column_has_property('botharrayidx'::regclass, 1, prop) as gin,
        pg_index_column_has_property('brinidx'::regclass, 1, prop) as brin
   from unnest(array['asc', 'desc', 'nulls_first', 'nulls_last',
index f9e7118f0d3d864325de765f83e6015f90bbd778..be7f261871e5b91c6fc740b7c455e2c0a9a6f73a 100644 (file)
@@ -198,6 +198,18 @@ SELECT count(*) FROM quad_point_tbl WHERE p >^ '(5000, 4000)';
 
 SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 
+CREATE TEMP TABLE quad_point_tbl_ord_seq1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+
+CREATE TEMP TABLE quad_point_tbl_ord_seq2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+
+CREATE TEMP TABLE quad_point_tbl_ord_seq3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcde';
@@ -363,6 +375,39 @@ EXPLAIN (COSTS OFF)
 SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 SELECT count(*) FROM quad_point_tbl WHERE p ~= '(4585, 365)';
 
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+CREATE TEMP TABLE quad_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN quad_point_tbl_ord_idx1 idx
+ON seq.n = idx.n
+AND (seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE quad_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM quad_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN quad_point_tbl_ord_idx2 idx
+ON seq.n = idx.n
+AND (seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+CREATE TEMP TABLE quad_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM quad_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN quad_point_tbl_ord_idx3 idx
+ON seq.n = idx.n
+AND (seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
 EXPLAIN (COSTS OFF)
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
 SELECT count(*) FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
@@ -391,6 +436,39 @@ EXPLAIN (COSTS OFF)
 SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 SELECT count(*) FROM kd_point_tbl WHERE p ~= '(4585, 365)';
 
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+CREATE TEMP TABLE kd_point_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl;
+SELECT * FROM quad_point_tbl_ord_seq1 seq FULL JOIN kd_point_tbl_ord_idx1 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+CREATE TEMP TABLE kd_point_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> '0,0') n, p <-> '0,0' dist, p
+FROM kd_point_tbl WHERE p <@ box '(200,200,1000,1000)';
+SELECT * FROM quad_point_tbl_ord_seq2 seq FULL JOIN kd_point_tbl_ord_idx2 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+CREATE TEMP TABLE kd_point_tbl_ord_idx3 AS
+SELECT rank() OVER (ORDER BY p <-> '333,400') n, p <-> '333,400' dist, p
+FROM kd_point_tbl WHERE p IS NOT NULL;
+SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN kd_point_tbl_ord_idx3 idx
+ON seq.n = idx.n AND
+(seq.dist = idx.dist AND seq.p ~= idx.p OR seq.p IS NULL AND idx.p IS NULL)
+WHERE seq.n IS NULL OR idx.n IS NULL;
+
 EXPLAIN (COSTS OFF)
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
 SELECT count(*) FROM radix_text_tbl WHERE t = 'P0123456789abcdef';
index 7e8cb08cd8b3ae36bb6085e18b592c2e7fde9812..ba86669ff2d73f42b873b11556b518707d43a47e 100644 (file)
@@ -206,6 +206,39 @@ EXPLAIN (COSTS OFF)
 SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
 SELECT count(*) FROM quad_poly_tbl WHERE p ~= polygon '((200, 300),(210, 310),(230, 290))';
 
+-- test ORDER BY distance
+SET enable_indexscan = ON;
+SET enable_bitmapscan = OFF;
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx1 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl;
+
+SELECT *
+FROM quad_poly_tbl_ord_seq1 seq FULL JOIN quad_poly_tbl_ord_idx1 idx
+   ON seq.n = idx.n AND seq.id = idx.id AND
+       (seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
+
+EXPLAIN (COSTS OFF)
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+CREATE TEMP TABLE quad_poly_tbl_ord_idx2 AS
+SELECT rank() OVER (ORDER BY p <-> point '123,456') n, p <-> point '123,456' dist, id
+FROM quad_poly_tbl WHERE p <@ polygon '((300,300),(400,600),(600,500),(700,200))';
+
+SELECT *
+FROM quad_poly_tbl_ord_seq2 seq FULL JOIN quad_poly_tbl_ord_idx2 idx
+   ON seq.n = idx.n AND seq.id = idx.id AND
+       (seq.dist = idx.dist OR seq.dist IS NULL AND idx.dist IS NULL)
+WHERE seq.id IS NULL OR idx.id IS NULL;
+
 RESET enable_seqscan;
 RESET enable_indexscan;
 RESET enable_bitmapscan;