Replace nth() calls in inner loops with traversal of the list via
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 5 Feb 2000 23:19:44 +0000 (23:19 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 5 Feb 2000 23:19:44 +0000 (23:19 +0000)
lnext, to eliminate O(N^2) behavior with lots of indexquals.

src/backend/executor/nodeIndexscan.c

index b9a54cb938acc73c73cbe8c19dd48723f9b13de1..e5f7642c85cc6e2de127c0f9673c07557614d9d7 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/executor/nodeIndexscan.c,v 1.45 2000/01/26 05:56:23 momjian Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/executor/nodeIndexscan.c,v 1.46 2000/02/05 23:19:44 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -124,7 +124,7 @@ IndexNext(IndexScan *node)
    if (estate->es_evTuple != NULL &&
        estate->es_evTuple[node->scan.scanrelid - 1] != NULL)
    {
-       int         iptr;
+       List       *qual;
 
        ExecClearTuple(slot);
        if (estate->es_evTupleNull[node->scan.scanrelid - 1])
@@ -134,20 +134,23 @@ IndexNext(IndexScan *node)
        slot->val = estate->es_evTuple[node->scan.scanrelid - 1];
        slot->ttc_shouldFree = false;
 
-       for (iptr = 0; iptr < numIndices; iptr++)
+       scanstate->cstate.cs_ExprContext->ecxt_scantuple = slot;
+
+       /* Does the tuple meet any of the OR'd indxqual conditions? */
+       foreach(qual, node->indxqualorig)
        {
-           scanstate->cstate.cs_ExprContext->ecxt_scantuple = slot;
-           if (ExecQual(nth(iptr, node->indxqualorig),
+           if (ExecQual((List *) lfirst(qual),
                         scanstate->cstate.cs_ExprContext,
                         false))
                break;
        }
-       if (iptr == numIndices) /* would not be returned by indices */
+       if (qual == NIL)        /* would not be returned by indices */
            slot->val = NULL;
 
        /* Flag for the next call that no more tuples */
        estate->es_evTupleNull[node->scan.scanrelid - 1] = true;
-       return (slot);
+
+       return slot;
    }
 
    tuple = &(indexstate->iss_htup);
@@ -189,14 +192,14 @@ IndexNext(IndexScan *node)
            {
                bool        prev_matches = false;
                int         prev_index;
+               List       *qual;
 
-               /* ----------------
+               /*
                 *  store the scanned tuple in the scan tuple slot of
                 *  the scan state.  Eventually we will only do this and not
                 *  return a tuple.  Note: we pass 'false' because tuples
                 *  returned by amgetnext are pointers onto disk pages and
                 *  must not be pfree()'d.
-                * ----------------
                 */
                ExecStoreTuple(tuple,   /* tuple to store */
                               slot,    /* slot to store in */
@@ -211,28 +214,29 @@ IndexNext(IndexScan *node)
                ReleaseBuffer(buffer);
 
                /*
-                * We must check to see if the current tuple would have
-                * been matched by an earlier index, so we don't double
-                * report it. We do this by passing the tuple through
-                * ExecQual and look for failure with all previous
-                * qualifications.
+                * We must check to see if the current tuple was already
+                * matched by an earlier index, so we don't double-report it.
+                * We do this by passing the tuple through ExecQual and
+                * checking for failure with all previous qualifications.
                 */
+               scanstate->cstate.cs_ExprContext->ecxt_scantuple = slot;
+               qual = node->indxqualorig;
                for (prev_index = 0; prev_index < indexstate->iss_IndexPtr;
                     prev_index++)
                {
-                   scanstate->cstate.cs_ExprContext->ecxt_scantuple = slot;
-                   if (ExecQual(nth(prev_index, node->indxqualorig),
+                   if (ExecQual((List *) lfirst(qual),
                                 scanstate->cstate.cs_ExprContext,
                                 false))
                    {
                        prev_matches = true;
                        break;
                    }
+                   qual = lnext(qual);
                }
                if (!prev_matches)
-                   return slot;
-               else
-                   ExecClearTuple(slot);
+                   return slot;        /* OK to return tuple */
+               /* Duplicate tuple, so drop it and loop back for another */
+               ExecClearTuple(slot);
            }
        }
        if (indexNumber < numIndices)
@@ -329,7 +333,6 @@ ExecIndexReScan(IndexScan *node, ExprContext *exprCtxt, Plan *parent)
    scanDescs = indexstate->iss_ScanDescs;
    scanKeys = indexstate->iss_ScanKeys;
    runtimeKeyInfo = (Pointer *) indexstate->iss_RuntimeKeyInfo;
-   indxqual = node->indxqual;
    numScanKeys = indexstate->iss_NumScanKeys;
    indexstate->iss_IndexPtr = -1;
    if (ScanDirectionIsBackward(node->indxorderdir))
@@ -352,9 +355,11 @@ ExecIndexReScan(IndexScan *node, ExprContext *exprCtxt, Plan *parent)
    /*
     * get the index qualifications and recalculate the appropriate values
     */
+   indxqual = node->indxqual;
    for (i = 0; i < numIndices; i++)
    {
-       qual = nth(i, indxqual);
+       qual = lfirst(indxqual);
+       indxqual = lnext(indxqual);
        n_keys = numScanKeys[i];
        scan_keys = (ScanKey) scanKeys[i];
 
@@ -672,7 +677,6 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
     * ----------------
     */
    indxid = node->indxid;
-   indxqual = node->indxqual;
    numIndices = length(indxid);
    indexPtr = -1;
 
@@ -701,6 +705,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
     *  build the index scan keys from the index qualification
     * ----------------
     */
+   indxqual = node->indxqual;
    for (i = 0; i < numIndices; i++)
    {
        int         j;
@@ -709,7 +714,8 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
        ScanKey     scan_keys;
        int        *run_keys;
 
-       qual = nth(i, indxqual);
+       qual = lfirst(indxqual);
+       indxqual = lnext(indxqual);
        n_keys = length(qual);
        scan_keys = (n_keys <= 0) ? NULL :
            (ScanKey) palloc(n_keys * sizeof(ScanKeyData));
@@ -743,8 +749,8 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
            clause = nth(j, qual);
 
            op = (Oper *) clause->oper;
-           if (!IsA(op, Oper))
-               elog(ERROR, "ExecInitIndexScan: op not an Oper!");
+           if (!IsA(clause, Expr) || !IsA(op, Oper))
+               elog(ERROR, "ExecInitIndexScan: indxqual not an opclause!");
 
            opid = op->opid;
 
@@ -1066,9 +1072,7 @@ ExecInitIndexScan(IndexScan *node, EState *estate, Plan *parent)
     */
    for (i = 0; i < numIndices; i++)
    {
-       Oid         indexOid;
-
-       indexOid = (Oid) nthi(i, indxid);
+       Oid         indexOid = (Oid) nthi(i, indxid);
 
        if (indexOid != 0)
        {