Reduce the number of pallocs() in BRIN
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 7 Apr 2017 21:54:26 +0000 (18:54 -0300)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Fri, 7 Apr 2017 22:08:43 +0000 (19:08 -0300)
Instead of allocating memory in brin_deform_tuple and brin_copy_tuple
over and over during a scan, allow reuse of previously allocated memory.
This is said to make for a measurable performance improvement.

Author: Jinyu Zhang, Álvaro Herrera
Reviewed by: Tomas Vondra
Discussion: https://postgr.es/m/495deb78.4186.1500dacaa63.Coremail.beijing_pg@163.com

contrib/pageinspect/brinfuncs.c
src/backend/access/brin/brin.c
src/backend/access/brin/brin_pageops.c
src/backend/access/brin/brin_tuple.c
src/include/access/brin_tuple.h

index 2c7963ec19b74973313a5e1dc542c63caed3a82b..dc9cc2d09aa3c67b2e1b2c7d544b1de91c2b136c 100644 (file)
@@ -226,7 +226,8 @@ brin_page_items(PG_FUNCTION_ARGS)
            if (ItemIdIsUsed(itemId))
            {
                dtup = brin_deform_tuple(bdesc,
-                                   (BrinTuple *) PageGetItem(page, itemId));
+                                   (BrinTuple *) PageGetItem(page, itemId),
+                                   NULL);
                attno = 1;
                unusedItem = false;
            }
index cf3139cbabf04b803ca2eade33db3ba94f6fb2c1..2594407754696bf2f839d88474d94242ed32c541 100644 (file)
@@ -217,7 +217,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
            MemoryContextSwitchTo(tupcxt);
        }
 
-       dtup = brin_deform_tuple(bdesc, brtup);
+       dtup = brin_deform_tuple(bdesc, brtup, NULL);
 
        /*
         * Compare the key values of the new tuple to the stored index values;
@@ -268,7 +268,7 @@ brininsert(Relation idxRel, Datum *values, bool *nulls,
             * re-acquiring the lock.
             */
            origsz = ItemIdGetLength(lp);
-           origtup = brin_copy_tuple(brtup, origsz);
+           origtup = brin_copy_tuple(brtup, origsz, NULL, NULL);
 
            /*
             * Before releasing the lock, check if we can attempt a same-page
@@ -363,6 +363,9 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
    FmgrInfo   *consistentFn;
    MemoryContext oldcxt;
    MemoryContext perRangeCxt;
+   BrinMemTuple *dtup;
+   BrinTuple    *btup = NULL;
+   Size        btupsz = 0;
 
    opaque = (BrinOpaque *) scan->opaque;
    bdesc = opaque->bo_bdesc;
@@ -384,6 +387,9 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
     */
    consistentFn = palloc0(sizeof(FmgrInfo) * bdesc->bd_tupdesc->natts);
 
+   /* allocate an initial in-memory tuple, out of the per-range memcxt */
+   dtup = brin_new_memtuple(bdesc);
+
    /*
     * Setup and use a per-range memory context, which is reset every time we
     * loop below.  This avoids having to free the tuples within the loop.
@@ -401,6 +407,7 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
    for (heapBlk = 0; heapBlk < nblocks; heapBlk += opaque->bo_pagesPerRange)
    {
        bool        addrange;
+       bool        gottuple = false;
        BrinTuple  *tup;
        OffsetNumber off;
        Size        size;
@@ -414,7 +421,8 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
                                       scan->xs_snapshot);
        if (tup)
        {
-           tup = brin_copy_tuple(tup, size);
+           gottuple = true;
+           btup = brin_copy_tuple(tup, size, btup, &btupsz);
            LockBuffer(buf, BUFFER_LOCK_UNLOCK);
        }
 
@@ -422,15 +430,13 @@ bringetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
         * For page ranges with no indexed tuple, we must return the whole
         * range; otherwise, compare it to the scan keys.
         */
-       if (tup == NULL)
+       if (!gottuple)
        {
            addrange = true;
        }
        else
        {
-           BrinMemTuple *dtup;
-
-           dtup = brin_deform_tuple(bdesc, tup);
+           dtup = brin_deform_tuple(bdesc, btup, dtup);
            if (dtup->bt_placeholder)
            {
                /*
@@ -1210,7 +1216,7 @@ summarize_range(IndexInfo *indexInfo, BrinBuildState *state, Relation heapRel,
        /* the placeholder tuple must exist */
        if (phtup == NULL)
            elog(ERROR, "missing placeholder tuple");
-       phtup = brin_copy_tuple(phtup, phsz);
+       phtup = brin_copy_tuple(phtup, phsz, NULL, NULL);
        LockBuffer(phbuf, BUFFER_LOCK_UNLOCK);
 
        /* merge it into the tuple from the heap scan */
@@ -1358,7 +1364,7 @@ union_tuples(BrinDesc *bdesc, BrinMemTuple *a, BrinTuple *b)
                                "brin union",
                                ALLOCSET_DEFAULT_SIZES);
    oldcxt = MemoryContextSwitchTo(cxt);
-   db = brin_deform_tuple(bdesc, b);
+   db = brin_deform_tuple(bdesc, b, NULL);
    MemoryContextSwitchTo(oldcxt);
 
    for (keyno = 0; keyno < bdesc->bd_tupdesc->natts; keyno++)
index da94efc61cad5d4392dcb8da9d3f968777571570..1725591b0515e69bb838bd39833994bd28728c73 100644 (file)
@@ -548,6 +548,8 @@ brin_evacuate_page(Relation idxRel, BlockNumber pagesPerRange,
    OffsetNumber off;
    OffsetNumber maxoff;
    Page        page;
+   BrinTuple  *btup = NULL;
+   Size        btupsz = 0;
 
    page = BufferGetPage(buf);
 
@@ -567,7 +569,7 @@ brin_evacuate_page(Relation idxRel, BlockNumber pagesPerRange,
        {
            sz = ItemIdGetLength(lp);
            tup = (BrinTuple *) PageGetItem(page, lp);
-           tup = brin_copy_tuple(tup, sz);
+           tup = brin_copy_tuple(tup, sz, btup, &btupsz);
 
            LockBuffer(buf, BUFFER_LOCK_UNLOCK);
 
index ec5fc5660ffa54c05ab379e10e43a4615f4cf535..e2e1d23377a6a0dbcbaf0662479494d77141997f 100644 (file)
@@ -311,17 +311,26 @@ brin_free_tuple(BrinTuple *tuple)
 }
 
 /*
- * Create a palloc'd copy of a BrinTuple.
+ * Given a brin tuple of size len, create a copy of it.  If 'dest' is not
+ * NULL, its size is destsz, and can be used as output buffer; if the tuple
+ * to be copied does not fit, it is enlarged by repalloc, and the size is
+ * updated to match.  This avoids palloc/free cycles when many brin tuples
+ * are being processed in loops.
  */
 BrinTuple *
-brin_copy_tuple(BrinTuple *tuple, Size len)
+brin_copy_tuple(BrinTuple *tuple, Size len, BrinTuple *dest, Size *destsz)
 {
-   BrinTuple  *newtup;
+   if (!destsz || *destsz == 0)
+       dest = palloc(len);
+   else if (len > *destsz)
+   {
+       dest = repalloc(dest, len);
+       *destsz = len;
+   }
 
-   newtup = palloc(len);
-   memcpy(newtup, tuple, len);
+   memcpy(dest, tuple, len);
 
-   return newtup;
+   return dest;
 }
 
 /*
@@ -348,54 +357,69 @@ BrinMemTuple *
 brin_new_memtuple(BrinDesc *brdesc)
 {
    BrinMemTuple *dtup;
-   char       *currdatum;
    long        basesize;
-   int         i;
 
    basesize = MAXALIGN(sizeof(BrinMemTuple) +
                        sizeof(BrinValues) * brdesc->bd_tupdesc->natts);
    dtup = palloc0(basesize + sizeof(Datum) * brdesc->bd_totalstored);
-   currdatum = (char *) dtup + basesize;
-   for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
-   {
-       dtup->bt_columns[i].bv_attno = i + 1;
-       dtup->bt_columns[i].bv_allnulls = true;
-       dtup->bt_columns[i].bv_hasnulls = false;
-       dtup->bt_columns[i].bv_values = (Datum *) currdatum;
-       currdatum += sizeof(Datum) * brdesc->bd_info[i]->oi_nstored;
-   }
+
+   dtup->bt_values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
+   dtup->bt_allnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
+   dtup->bt_hasnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
 
    dtup->bt_context = AllocSetContextCreate(CurrentMemoryContext,
                                             "brin dtuple",
                                             ALLOCSET_DEFAULT_SIZES);
+
+   brin_memtuple_initialize(dtup, brdesc);
+
    return dtup;
 }
 
 /*
- * Reset a BrinMemTuple to initial state
+ * Reset a BrinMemTuple to initial state.  We return the same tuple, for
+ * notational convenience.
  */
-void
+BrinMemTuple *
 brin_memtuple_initialize(BrinMemTuple *dtuple, BrinDesc *brdesc)
 {
    int         i;
+   char       *currdatum;
 
    MemoryContextReset(dtuple->bt_context);
+
+   currdatum = (char *) dtuple +
+       MAXALIGN(sizeof(BrinMemTuple) +
+                sizeof(BrinValues) * brdesc->bd_tupdesc->natts);
    for (i = 0; i < brdesc->bd_tupdesc->natts; i++)
    {
        dtuple->bt_columns[i].bv_allnulls = true;
        dtuple->bt_columns[i].bv_hasnulls = false;
+
+       dtuple->bt_columns[i].bv_attno = i + 1;
+       dtuple->bt_columns[i].bv_allnulls = true;
+       dtuple->bt_columns[i].bv_hasnulls = false;
+       dtuple->bt_columns[i].bv_values = (Datum *) currdatum;
+       currdatum += sizeof(Datum) * brdesc->bd_info[i]->oi_nstored;
    }
+
+   return dtuple;
 }
 
 /*
  * Convert a BrinTuple back to a BrinMemTuple.  This is the reverse of
  * brin_form_tuple.
  *
+ * As an optimization, the caller can pass a previously allocated 'dMemtuple'.
+ * This avoids having to allocate it here, which can be useful when this
+ * function is called many times in a loop.  It is caller's responsibility
+ * that the given BrinMemTuple matches what we need here.
+ *
  * Note we don't need the "on disk tupdesc" here; we rely on our own routine to
  * deconstruct the tuple from the on-disk format.
  */
 BrinMemTuple *
-brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
+brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple, BrinMemTuple *dMemtuple)
 {
    BrinMemTuple *dtup;
    Datum      *values;
@@ -407,15 +431,16 @@ brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
    int         valueno;
    MemoryContext oldcxt;
 
-   dtup = brin_new_memtuple(brdesc);
+   dtup = dMemtuple ? brin_memtuple_initialize(dMemtuple, brdesc) :
+       brin_new_memtuple(brdesc);
 
    if (BrinTupleIsPlaceholder(tuple))
        dtup->bt_placeholder = true;
    dtup->bt_blkno = tuple->bt_blkno;
 
-   values = palloc(sizeof(Datum) * brdesc->bd_totalstored);
-   allnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
-   hasnulls = palloc(sizeof(bool) * brdesc->bd_tupdesc->natts);
+   values = dtup->bt_values;
+   allnulls = dtup->bt_allnulls;
+   hasnulls = dtup->bt_hasnulls;
 
    tp = (char *) tuple + BrinTupleDataOffset(tuple);
 
@@ -458,10 +483,6 @@ brin_deform_tuple(BrinDesc *brdesc, BrinTuple *tuple)
 
    MemoryContextSwitchTo(oldcxt);
 
-   pfree(values);
-   pfree(allnulls);
-   pfree(hasnulls);
-
    return dtup;
 }
 
index 6927865182cb50d8cc4b528e93f98693e6cbe867..b042fa8d50b8e67843aae01469fa920cc26cd2df 100644 (file)
@@ -38,6 +38,11 @@ typedef struct BrinMemTuple
    bool        bt_placeholder; /* this is a placeholder tuple */
    BlockNumber bt_blkno;       /* heap blkno that the tuple is for */
    MemoryContext bt_context;   /* memcxt holding the bt_columns values */
+   /* output arrays for brin_deform_tuple: */
+   Datum       *bt_values;     /* values array */
+   bool        *bt_allnulls;   /* allnulls array */
+   bool        *bt_hasnulls;   /* hasnulls array */
+   /* not an output array, but must be last */
    BrinValues  bt_columns[FLEXIBLE_ARRAY_MEMBER];
 } BrinMemTuple;
 
@@ -83,14 +88,15 @@ extern BrinTuple *brin_form_tuple(BrinDesc *brdesc, BlockNumber blkno,
 extern BrinTuple *brin_form_placeholder_tuple(BrinDesc *brdesc,
                            BlockNumber blkno, Size *size);
 extern void brin_free_tuple(BrinTuple *tuple);
-extern BrinTuple *brin_copy_tuple(BrinTuple *tuple, Size len);
+extern BrinTuple *brin_copy_tuple(BrinTuple *tuple, Size len,
+               BrinTuple *dest, Size *destsz);
 extern bool brin_tuples_equal(const BrinTuple *a, Size alen,
                  const BrinTuple *b, Size blen);
 
 extern BrinMemTuple *brin_new_memtuple(BrinDesc *brdesc);
-extern void brin_memtuple_initialize(BrinMemTuple *dtuple,
+extern BrinMemTuple *brin_memtuple_initialize(BrinMemTuple *dtuple,
                         BrinDesc *brdesc);
 extern BrinMemTuple *brin_deform_tuple(BrinDesc *brdesc,
-                 BrinTuple *tuple);
+                 BrinTuple *tuple, BrinMemTuple *dMemtuple);
 
 #endif   /* BRIN_TUPLE_H */