In COPY, insert tuples to the heap in batches.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 9 Nov 2011 08:54:41 +0000 (10:54 +0200)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Wed, 9 Nov 2011 08:54:41 +0000 (10:54 +0200)
This greatly reduces the WAL volume, especially when the table is narrow.
The overhead of locking the heap page is also reduced. Reduced WAL traffic
also makes it scale a lot better, if you run multiple COPY processes at
the same time.

src/backend/access/heap/heapam.c
src/backend/commands/copy.c
src/backend/postmaster/pgstat.c
src/include/access/heapam.h
src/include/access/htup.h
src/include/pgstat.h

index 81422afa2f8c42ab770ec5d88b8127c1105ad3aa..72ed6325384d3c53bc545b99a12ba7beb9009882 100644 (file)
@@ -24,6 +24,7 @@
  *             heap_getnext    - retrieve next tuple in scan
  *             heap_fetch              - retrieve tuple with given tid
  *             heap_insert             - insert tuple into a relation
+ *             heap_multi_insert - insert multiple tuples into a relation
  *             heap_delete             - delete a tuple from a relation
  *             heap_update             - replace a tuple in a relation with another tuple
  *             heap_markpos    - mark scan position
@@ -79,6 +80,8 @@ static HeapScanDesc heap_beginscan_internal(Relation relation,
                                                int nkeys, ScanKey key,
                                                bool allow_strat, bool allow_sync,
                                                bool is_bitmapscan);
+static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
+                                       TransactionId xid, CommandId cid, int options);
 static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
                                ItemPointerData from, Buffer newbuf, HeapTuple newtup,
                                bool all_visible_cleared, bool new_all_visible_cleared);
@@ -1866,55 +1869,14 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
        Buffer          vmbuffer = InvalidBuffer;
        bool            all_visible_cleared = false;
 
-       if (relation->rd_rel->relhasoids)
-       {
-#ifdef NOT_USED
-               /* this is redundant with an Assert in HeapTupleSetOid */
-               Assert(tup->t_data->t_infomask & HEAP_HASOID);
-#endif
-
-               /*
-                * If the object id of this tuple has already been assigned, trust the
-                * caller.      There are a couple of ways this can happen.  At initial db
-                * creation, the backend program sets oids for tuples. When we define
-                * an index, we set the oid.  Finally, in the future, we may allow
-                * users to set their own object ids in order to support a persistent
-                * object store (objects need to contain pointers to one another).
-                */
-               if (!OidIsValid(HeapTupleGetOid(tup)))
-                       HeapTupleSetOid(tup, GetNewOid(relation));
-       }
-       else
-       {
-               /* check there is not space for an OID */
-               Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
-       }
-
-       tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
-       tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
-       tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
-       HeapTupleHeaderSetXmin(tup->t_data, xid);
-       HeapTupleHeaderSetCmin(tup->t_data, cid);
-       HeapTupleHeaderSetXmax(tup->t_data, 0);         /* for cleanliness */
-       tup->t_tableOid = RelationGetRelid(relation);
-
        /*
-        * If the new tuple is too big for storage or contains already toasted
-        * out-of-line attributes from some other relation, invoke the toaster.
+        * Fill in tuple header fields, assign an OID, and toast the tuple if
+        * necessary.
         *
         * Note: below this point, heaptup is the data we actually intend to store
         * into the relation; tup is the caller's original untoasted data.
         */
-       if (relation->rd_rel->relkind != RELKIND_RELATION)
-       {
-               /* toast table entries should never be recursively toasted */
-               Assert(!HeapTupleHasExternal(tup));
-               heaptup = tup;
-       }
-       else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
-               heaptup = toast_insert_or_update(relation, tup, NULL, options);
-       else
-               heaptup = tup;
+       heaptup = heap_prepare_insert(relation, tup, xid, cid, options);
 
        /*
         * We're about to do the actual insert -- but check for conflict first,
@@ -2035,7 +1997,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
         */
        CacheInvalidateHeapTuple(relation, heaptup, NULL);
 
-       pgstat_count_heap_insert(relation);
+       pgstat_count_heap_insert(relation, 1);
 
        /*
         * If heaptup is a private copy, release it.  Don't forget to copy t_self
@@ -2050,6 +2012,285 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
        return HeapTupleGetOid(tup);
 }
 
+/*
+ * Subroutine for heap_insert(). Prepares a tuple for insertion. This sets the
+ * tuple header fields, assigns an OID, and toasts the tuple if necessary.
+ * Returns a toasted version of the tuple if it was toasted, or the original
+ * tuple if not. Note that in any case, the header fields are also set in
+ * the original tuple.
+ */
+static HeapTuple
+heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid,
+                                       CommandId cid, int options)
+{
+       if (relation->rd_rel->relhasoids)
+       {
+#ifdef NOT_USED
+               /* this is redundant with an Assert in HeapTupleSetOid */
+               Assert(tup->t_data->t_infomask & HEAP_HASOID);
+#endif
+
+               /*
+                * If the object id of this tuple has already been assigned, trust the
+                * caller.      There are a couple of ways this can happen.  At initial db
+                * creation, the backend program sets oids for tuples. When we define
+                * an index, we set the oid.  Finally, in the future, we may allow
+                * users to set their own object ids in order to support a persistent
+                * object store (objects need to contain pointers to one another).
+                */
+               if (!OidIsValid(HeapTupleGetOid(tup)))
+                       HeapTupleSetOid(tup, GetNewOid(relation));
+       }
+       else
+       {
+               /* check there is not space for an OID */
+               Assert(!(tup->t_data->t_infomask & HEAP_HASOID));
+       }
+
+       tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);
+       tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);
+       tup->t_data->t_infomask |= HEAP_XMAX_INVALID;
+       HeapTupleHeaderSetXmin(tup->t_data, xid);
+       HeapTupleHeaderSetCmin(tup->t_data, cid);
+       HeapTupleHeaderSetXmax(tup->t_data, 0);         /* for cleanliness */
+       tup->t_tableOid = RelationGetRelid(relation);
+
+       /*
+        * If the new tuple is too big for storage or contains already toasted
+        * out-of-line attributes from some other relation, invoke the toaster.
+        */
+       if (relation->rd_rel->relkind != RELKIND_RELATION)
+       {
+               /* toast table entries should never be recursively toasted */
+               Assert(!HeapTupleHasExternal(tup));
+               return tup;
+       }
+       else if (HeapTupleHasExternal(tup) || tup->t_len > TOAST_TUPLE_THRESHOLD)
+               return toast_insert_or_update(relation, tup, NULL, options);
+       else
+               return tup;
+}
+
+/*
+ *     heap_multi_insert       - insert multiple tuple into a heap
+ *
+ * This is like heap_insert(), but inserts multiple tuples in one operation.
+ * That's faster than calling heap_insert() in a loop, because when multiple
+ * tuples can be inserted on a single page, we can write just a single WAL
+ * record covering all of them, and only need to lock/unlock the page once.
+ *
+ * Note: this leaks memory into the current memory context. You can create a
+ * temporary context before calling this, if that's a problem.
+ */
+void
+heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+                                 CommandId cid, int options, BulkInsertState bistate)
+{
+       TransactionId xid = GetCurrentTransactionId();
+       HeapTuple  *heaptuples;
+       Buffer          buffer;
+       Buffer          vmbuffer = InvalidBuffer;
+       bool            all_visible_cleared = false;
+       int                     i;
+       int                     ndone;
+       char       *scratch = NULL;
+       Page            page;
+       bool            needwal;
+
+       needwal = !(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation);
+
+       /* Toast and set header data in all the tuples */
+       heaptuples = palloc(ntuples * sizeof(HeapTuple));
+       for (i = 0; i < ntuples; i++)
+               heaptuples[i] = heap_prepare_insert(relation, tuples[i],
+                                                                                       xid, cid, options);
+
+       /*
+        * Allocate some memory to use for constructing the WAL record. Using
+        * palloc() within a critical section is not safe, so we allocate this
+        * beforehand.
+        */
+       if (needwal)
+               scratch = palloc(BLCKSZ);
+
+       /*
+        * We're about to do the actual inserts -- but check for conflict first,
+        * to avoid possibly having to roll back work we've just done.
+        *
+        * For a heap insert, we only need to check for table-level SSI locks.
+        * Our new tuple can't possibly conflict with existing tuple locks, and
+        * heap page locks are only consolidated versions of tuple locks; they do
+        * not lock "gaps" as index page locks do.  So we don't need to identify
+        * a buffer before making the call.
+        */
+       CheckForSerializableConflictIn(relation, NULL, InvalidBuffer);
+
+       ndone = 0;
+       while (ndone < ntuples)
+       {
+               int nthispage;
+
+               /*
+                * Find buffer where at least the next tuple will fit.  If the page
+                * is all-visible, this will also pin the requisite visibility map
+                * page.
+                */
+               buffer = RelationGetBufferForTuple(relation, heaptuples[ndone]->t_len,
+                                                                                  InvalidBuffer, options, bistate,
+                                                                                  &vmbuffer, NULL);
+               page = BufferGetPage(buffer);
+
+               if (PageIsAllVisible(page))
+               {
+                       all_visible_cleared = true;
+                       PageClearAllVisible(page);
+                       visibilitymap_clear(relation,
+                                                               BufferGetBlockNumber(buffer),
+                                                               vmbuffer);
+               }
+
+               /* NO EREPORT(ERROR) from here till changes are logged */
+               START_CRIT_SECTION();
+
+               /* Put as many tuples as fit on this page */
+               for (nthispage = 0; ndone + nthispage < ntuples; nthispage++)
+               {
+                       HeapTuple       heaptup = heaptuples[ndone + nthispage];
+
+                       if (PageGetHeapFreeSpace(page) < MAXALIGN(heaptup->t_len))
+                               break;
+
+                       RelationPutHeapTuple(relation, buffer, heaptup);
+               }
+
+               /*
+                * XXX Should we set PageSetPrunable on this page ? See heap_insert()
+                */
+
+               MarkBufferDirty(buffer);
+
+               /* XLOG stuff */
+               if (needwal)
+               {
+                       XLogRecPtr      recptr;
+                       xl_heap_multi_insert *xlrec;
+                       XLogRecData rdata[2];
+                       uint8           info = XLOG_HEAP2_MULTI_INSERT;
+                       char       *tupledata;
+                       int                     totaldatalen;
+                       char       *scratchptr = scratch;
+                       bool            init;
+
+                       /*
+                        * If the page was previously empty, we can reinit the page
+                        * instead of restoring the whole thing.
+                        */
+                       init = (ItemPointerGetOffsetNumber(&(heaptuples[ndone]->t_self)) == FirstOffsetNumber &&
+                                       PageGetMaxOffsetNumber(page) == FirstOffsetNumber + nthispage - 1);
+
+                       /* allocate xl_heap_multi_insert struct from the scratch area */
+                       xlrec = (xl_heap_multi_insert *) scratchptr;
+                       scratchptr += SizeOfHeapMultiInsert;
+
+                       /*
+                        * Allocate offsets array. Unless we're reinitializing the page,
+                        * in that case the tuples are stored in order starting at
+                        * FirstOffsetNumber and we don't need to store the offsets
+                        * explicitly.
+                        */
+                       if (!init)
+                               scratchptr += nthispage * sizeof(OffsetNumber);
+
+                       /* the rest of the scratch space is used for tuple data */
+                       tupledata = scratchptr;
+
+                       xlrec->all_visible_cleared = all_visible_cleared;
+                       xlrec->node = relation->rd_node;
+                       xlrec->blkno = BufferGetBlockNumber(buffer);
+                       xlrec->ntuples = nthispage;
+
+                       /*
+                        * Write out an xl_multi_insert_tuple and the tuple data itself
+                        * for each tuple.
+                        */
+                       for (i = 0; i < nthispage; i++)
+                       {
+                               HeapTuple       heaptup = heaptuples[ndone + i];
+                               xl_multi_insert_tuple *tuphdr;
+                               int                     datalen;
+
+                               if (!init)
+                                       xlrec->offsets[i] = ItemPointerGetOffsetNumber(&heaptup->t_self);
+                               /* xl_multi_insert_tuple needs two-byte alignment. */
+                               tuphdr = (xl_multi_insert_tuple *) SHORTALIGN(scratchptr);
+                               scratchptr = ((char *) tuphdr) + SizeOfMultiInsertTuple;
+
+                               tuphdr->t_infomask2 = heaptup->t_data->t_infomask2;
+                               tuphdr->t_infomask = heaptup->t_data->t_infomask;
+                               tuphdr->t_hoff = heaptup->t_data->t_hoff;
+
+                               /* write bitmap [+ padding] [+ oid] + data */
+                               datalen = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);
+                               memcpy(scratchptr,
+                                          (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits),
+                                          datalen);
+                               tuphdr->datalen = datalen;
+                               scratchptr += datalen;
+                       }
+                       totaldatalen = scratchptr - tupledata;
+                       Assert((scratchptr - scratch) < BLCKSZ);
+
+                       rdata[0].data = (char *) xlrec;
+                       rdata[0].len = tupledata - scratch;
+                       rdata[0].buffer = InvalidBuffer;
+                       rdata[0].next = &rdata[1];
+
+                       rdata[1].data = tupledata;
+                       rdata[1].len = totaldatalen;
+                       rdata[1].buffer = buffer;
+                       rdata[1].buffer_std = true;
+                       rdata[1].next = NULL;
+
+                       /*
+                        * If we're going to reinitialize the whole page using the WAL
+                        * record, hide buffer reference from XLogInsert.
+                        */
+                       if (init)
+                       {
+                               rdata[1].buffer = InvalidBuffer;
+                               info |= XLOG_HEAP_INIT_PAGE;
+                       }
+
+                       recptr = XLogInsert(RM_HEAP2_ID, info, rdata);
+
+                       PageSetLSN(page, recptr);
+                       PageSetTLI(page, ThisTimeLineID);
+               }
+
+               END_CRIT_SECTION();
+
+               UnlockReleaseBuffer(buffer);
+               if (vmbuffer != InvalidBuffer)
+                       ReleaseBuffer(vmbuffer);
+
+               ndone += nthispage;
+       }
+
+       /*
+        * If tuples are cachable, mark them for invalidation from the caches in
+        * case we abort.  Note it is OK to do this after releasing the buffer,
+        * because the heaptuples data structure is all in local memory, not in
+        * the shared buffer.
+        */
+       if (IsSystemRelation(relation))
+       {
+               for (i = 0; i < ntuples; i++)
+                       CacheInvalidateHeapTuple(relation, heaptuples[i], NULL);
+       }
+
+       pgstat_count_heap_insert(relation, ntuples);
+}
+
 /*
  *     simple_heap_insert - insert a tuple
  *
@@ -4775,6 +5016,144 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record)
                XLogRecordPageWithFreeSpace(xlrec->target.node, blkno, freespace);
 }
 
+/*
+ * Handles MULTI_INSERT record type.
+ */
+static void
+heap_xlog_multi_insert(XLogRecPtr lsn, XLogRecord *record)
+{
+       char       *recdata = XLogRecGetData(record);
+       xl_heap_multi_insert *xlrec;
+       Buffer          buffer;
+       Page            page;
+       struct
+       {
+               HeapTupleHeaderData hdr;
+               char            data[MaxHeapTupleSize];
+       }                       tbuf;
+       HeapTupleHeader htup;
+       uint32          newlen;
+       Size            freespace;
+       BlockNumber blkno;
+       int                     i;
+       bool            isinit = (record->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+
+       xlrec = (xl_heap_multi_insert *) recdata;
+       recdata += SizeOfHeapMultiInsert;
+
+       /*
+        * If we're reinitializing the page, the tuples are stored in order from
+        * FirstOffsetNumber. Otherwise there's an array of offsets in the WAL
+        * record.
+        */
+       if (!isinit)
+               recdata += sizeof(OffsetNumber) * xlrec->ntuples;
+
+       blkno = xlrec->blkno;
+
+       /*
+        * The visibility map may need to be fixed even if the heap page is
+        * already up-to-date.
+        */
+       if (xlrec->all_visible_cleared)
+       {
+               Relation        reln = CreateFakeRelcacheEntry(xlrec->node);
+               Buffer          vmbuffer = InvalidBuffer;
+
+               visibilitymap_pin(reln, blkno, &vmbuffer);
+               visibilitymap_clear(reln, blkno, vmbuffer);
+               ReleaseBuffer(vmbuffer);
+               FreeFakeRelcacheEntry(reln);
+       }
+
+       if (record->xl_info & XLR_BKP_BLOCK_1)
+               return;
+
+       if (isinit)
+       {
+               buffer = XLogReadBuffer(xlrec->node, blkno, true);
+               Assert(BufferIsValid(buffer));
+               page = (Page) BufferGetPage(buffer);
+
+               PageInit(page, BufferGetPageSize(buffer), 0);
+       }
+       else
+       {
+               buffer = XLogReadBuffer(xlrec->node, blkno, false);
+               if (!BufferIsValid(buffer))
+                       return;
+               page = (Page) BufferGetPage(buffer);
+
+               if (XLByteLE(lsn, PageGetLSN(page)))    /* changes are applied */
+               {
+                       UnlockReleaseBuffer(buffer);
+                       return;
+               }
+       }
+
+       for (i = 0; i < xlrec->ntuples; i++)
+       {
+               OffsetNumber offnum;
+               xl_multi_insert_tuple *xlhdr;
+
+               if (isinit)
+                       offnum = FirstOffsetNumber + i;
+               else
+                       offnum = xlrec->offsets[i];
+               if (PageGetMaxOffsetNumber(page) + 1 < offnum)
+                       elog(PANIC, "heap_multi_insert_redo: invalid max offset number");
+
+               xlhdr = (xl_multi_insert_tuple *) SHORTALIGN(recdata);
+               recdata += SizeOfMultiInsertTuple;
+
+               newlen = xlhdr->datalen;
+               Assert(newlen <= MaxHeapTupleSize);
+               htup = &tbuf.hdr;
+               MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData));
+               /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */
+               memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits),
+                          (char *) recdata,
+                          newlen);
+               recdata += newlen;
+
+               newlen += offsetof(HeapTupleHeaderData, t_bits);
+               htup->t_infomask2 = xlhdr->t_infomask2;
+               htup->t_infomask = xlhdr->t_infomask;
+               htup->t_hoff = xlhdr->t_hoff;
+               HeapTupleHeaderSetXmin(htup, record->xl_xid);
+               HeapTupleHeaderSetCmin(htup, FirstCommandId);
+               ItemPointerSetBlockNumber(&htup->t_ctid, blkno);
+               ItemPointerSetOffsetNumber(&htup->t_ctid, offnum);
+
+               offnum = PageAddItem(page, (Item) htup, newlen, offnum, true, true);
+               if (offnum == InvalidOffsetNumber)
+                       elog(PANIC, "heap_multi_insert_redo: failed to add tuple");
+       }
+
+       freespace = PageGetHeapFreeSpace(page);         /* needed to update FSM below */
+
+       PageSetLSN(page, lsn);
+       PageSetTLI(page, ThisTimeLineID);
+
+       if (xlrec->all_visible_cleared)
+               PageClearAllVisible(page);
+
+       MarkBufferDirty(buffer);
+       UnlockReleaseBuffer(buffer);
+
+       /*
+        * If the page is running low on free space, update the FSM as well.
+        * Arbitrarily, our definition of "low" is less than 20%. We can't do much
+        * better than that without knowing the fill-factor for the table.
+        *
+        * XXX: We don't get here if the page was restored from full page image.
+        * We don't bother to update the FSM in that case, it doesn't need to be
+        * totally accurate anyway.
+        */
+       if (freespace < BLCKSZ / 5)
+               XLogRecordPageWithFreeSpace(xlrec->node, blkno, freespace);
+}
+
 /*
  * Handles UPDATE and HOT_UPDATE
  */
@@ -5164,6 +5543,9 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record)
                case XLOG_HEAP2_VISIBLE:
                        heap_xlog_visible(lsn, record);
                        break;
+               case XLOG_HEAP2_MULTI_INSERT:
+                       heap_xlog_multi_insert(lsn, record);
+                       break;
                default:
                        elog(PANIC, "heap2_redo: unknown op code %u", info);
        }
@@ -5301,6 +5683,18 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec)
                                                 xlrec->node.spcNode, xlrec->node.dbNode,
                                                 xlrec->node.relNode, xlrec->block);
        }
+       else if (info == XLOG_HEAP2_MULTI_INSERT)
+       {
+               xl_heap_multi_insert *xlrec = (xl_heap_multi_insert *) rec;
+
+               if (xl_info & XLOG_HEAP_INIT_PAGE)
+                       appendStringInfo(buf, "multi-insert (init): ");
+               else
+                       appendStringInfo(buf, "multi-insert: ");
+               appendStringInfo(buf, "rel %u/%u/%u; blk %u; %d tuples",
+                                                xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode,
+                                                xlrec->blkno, xlrec->ntuples);
+       }
        else
                appendStringInfo(buf, "UNKNOWN");
 }
index e0c72d9cdc1d503448083cd5e45187f4cf4528e0..7637b689dba300d7f4e313610a74c97e961f3cf0 100644 (file)
@@ -33,6 +33,7 @@
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "optimizer/clauses.h"
 #include "optimizer/planner.h"
 #include "parser/parse_relation.h"
 #include "rewrite/rewriteHandler.h"
@@ -149,6 +150,7 @@ typedef struct CopyStateData
        Oid                *typioparams;        /* array of element types for in_functions */
        int                *defmap;                     /* array of default att numbers */
        ExprState **defexprs;           /* array of default att expressions */
+       bool            volatile_defexprs; /* is any of defexprs volatile? */
 
        /*
         * These variables are used to reduce overhead in textual COPY FROM.
@@ -277,6 +279,11 @@ static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, Oid tupleOid,
                         Datum *values, bool *nulls);
 static uint64 CopyFrom(CopyState cstate);
+static void CopyFromInsertBatch(CopyState cstate, EState *estate,
+                                       CommandId mycid, int hi_options,
+                                       ResultRelInfo *resultRelInfo, TupleTableSlot *myslot,
+                                       BulkInsertState bistate,
+                                       int nBufferedTuples, HeapTuple *bufferedTuples);
 static bool CopyReadLine(CopyState cstate);
 static bool CopyReadLineText(CopyState cstate);
 static int     CopyReadAttributesText(CopyState cstate);
@@ -1842,11 +1849,17 @@ CopyFrom(CopyState cstate)
        ExprContext *econtext;
        TupleTableSlot *myslot;
        MemoryContext oldcontext = CurrentMemoryContext;
+
        ErrorContextCallback errcontext;
        CommandId       mycid = GetCurrentCommandId(true);
        int                     hi_options = 0; /* start with default heap_insert options */
        BulkInsertState bistate;
        uint64          processed = 0;
+       bool            useHeapMultiInsert;
+       int                     nBufferedTuples = 0;
+#define MAX_BUFFERED_TUPLES 1000
+       HeapTuple  *bufferedTuples;
+       Size            bufferedTuplesSize = 0;
 
        Assert(cstate->rel);
 
@@ -1941,6 +1954,28 @@ CopyFrom(CopyState cstate)
        /* Triggers might need a slot as well */
        estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
 
+       /*
+        * It's more efficient to prepare a bunch of tuples for insertion, and
+        * insert them in one heap_multi_insert() call, than call heap_insert()
+        * separately for every tuple. However, we can't do that if there are
+        * BEFORE/INSTEAD OF triggers, or we need to evaluate volatile default
+        * expressions. Such triggers or expressions might query the table we're
+        * inserting to, and act differently if the tuples that have already been
+        * processed and prepared for insertion are not there.
+        */
+       if ((resultRelInfo->ri_TrigDesc != NULL &&
+               (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
+                resultRelInfo->ri_TrigDesc->trig_insert_instead_row)) ||
+               cstate->volatile_defexprs)
+       {
+               useHeapMultiInsert = false;
+       }
+       else
+       {
+               useHeapMultiInsert = true;
+               bufferedTuples = palloc(MAX_BUFFERED_TUPLES * sizeof(HeapTuple));
+       }
+
        /* Prepare to catch AFTER triggers. */
        AfterTriggerBeginQuery();
 
@@ -1972,8 +2007,15 @@ CopyFrom(CopyState cstate)
 
                CHECK_FOR_INTERRUPTS();
 
-               /* Reset the per-tuple exprcontext */
-               ResetPerTupleExprContext(estate);
+               if (nBufferedTuples == 0)
+               {
+                       /*
+                        * Reset the per-tuple exprcontext. We can only do this if the
+                        * tuple buffer is empty (calling the context the per-tuple memory
+                        * context is a bit of a misnomer now
+                        */
+                       ResetPerTupleExprContext(estate);
+               }
 
                /* Switch into its memory context */
                MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@@ -2010,24 +2052,49 @@ CopyFrom(CopyState cstate)
 
                if (!skip_tuple)
                {
-                       List       *recheckIndexes = NIL;
-
                        /* Check the constraints of the tuple */
                        if (cstate->rel->rd_att->constr)
                                ExecConstraints(resultRelInfo, slot, estate);
 
-                       /* OK, store the tuple and create index entries for it */
-                       heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
+                       if (useHeapMultiInsert)
+                       {
+                               /* Add this tuple to the tuple buffer */
+                               bufferedTuples[nBufferedTuples++] = tuple;
+                               bufferedTuplesSize += tuple->t_len;
 
-                       if (resultRelInfo->ri_NumIndices > 0)
-                               recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
-                                                                                                          estate);
+                               /*
+                                * If the buffer filled up, flush it. Also flush if the total
+                                * size of all the tuples in the buffer becomes large, to
+                                * avoid using large amounts of memory for the buffers when
+                                * the tuples are exceptionally wide.
+                                */
+                               if (nBufferedTuples == MAX_BUFFERED_TUPLES ||
+                                       bufferedTuplesSize > 65535)
+                               {
+                                       CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                                                                               resultRelInfo, myslot, bistate,
+                                                                               nBufferedTuples, bufferedTuples);
+                                       nBufferedTuples = 0;
+                                       bufferedTuplesSize = 0;
+                               }
+                       }
+                       else
+                       {
+                               List       *recheckIndexes = NIL;
 
-                       /* AFTER ROW INSERT Triggers */
-                       ExecARInsertTriggers(estate, resultRelInfo, tuple,
-                                                                recheckIndexes);
+                               /* OK, store the tuple and create index entries for it */
+                               heap_insert(cstate->rel, tuple, mycid, hi_options, bistate);
 
-                       list_free(recheckIndexes);
+                               if (resultRelInfo->ri_NumIndices > 0)
+                                       recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+                                                                                                                  estate);
+
+                               /* AFTER ROW INSERT Triggers */
+                               ExecARInsertTriggers(estate, resultRelInfo, tuple,
+                                                                        recheckIndexes);
+
+                               list_free(recheckIndexes);
+                       }
 
                        /*
                         * We count only tuples not suppressed by a BEFORE INSERT trigger;
@@ -2038,6 +2105,12 @@ CopyFrom(CopyState cstate)
                }
        }
 
+       /* Flush any remaining buffered tuples */
+       if (nBufferedTuples > 0)
+               CopyFromInsertBatch(cstate, estate, mycid, hi_options,
+                                                       resultRelInfo, myslot, bistate,
+                                                       nBufferedTuples, bufferedTuples);
+
        /* Done, clean up */
        error_context_stack = errcontext.previous;
 
@@ -2070,6 +2143,67 @@ CopyFrom(CopyState cstate)
        return processed;
 }
 
+/*
+ * A subroutine of CopyFrom, to write the current batch of buffered heap
+ * tuples to the heap. Also updates indexes and runs AFTER ROW INSERT
+ * triggers.
+ */
+static void
+CopyFromInsertBatch(CopyState cstate, EState *estate, CommandId mycid,
+                                       int hi_options, ResultRelInfo *resultRelInfo,
+                                       TupleTableSlot *myslot, BulkInsertState bistate,
+                                       int nBufferedTuples, HeapTuple *bufferedTuples)
+{
+       MemoryContext oldcontext;
+       int                     i;
+
+       /*
+        * heap_multi_insert leaks memory, so switch to short-lived memory
+        * context before calling it.
+        */
+       oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+       heap_multi_insert(cstate->rel,
+                                         bufferedTuples,
+                                         nBufferedTuples,
+                                         mycid,
+                                         hi_options,
+                                         bistate);
+       MemoryContextSwitchTo(oldcontext);
+
+       /*
+        * If there are any indexes, update them for all the inserted tuples,
+        * and run AFTER ROW INSERT triggers.
+        */
+       if (resultRelInfo->ri_NumIndices > 0)
+       {
+               for (i = 0; i < nBufferedTuples; i++)
+               {
+                       List *recheckIndexes;
+
+                       ExecStoreTuple(bufferedTuples[i], myslot, InvalidBuffer, false);
+                       recheckIndexes =
+                               ExecInsertIndexTuples(myslot, &(bufferedTuples[i]->t_self),
+                                                                         estate);
+                       ExecARInsertTriggers(estate, resultRelInfo,
+                                                                bufferedTuples[i],
+                                                                recheckIndexes);
+                       list_free(recheckIndexes);
+               }
+       }
+       /*
+        * There's no indexes, but see if we need to run AFTER ROW INSERT triggers
+        * anyway.
+        */
+       else if (resultRelInfo->ri_TrigDesc != NULL &&
+                        resultRelInfo->ri_TrigDesc->trig_insert_after_row)
+       {
+               for (i = 0; i < nBufferedTuples; i++)
+                       ExecARInsertTriggers(estate, resultRelInfo,
+                                                                bufferedTuples[i],
+                                                                NIL);
+       }
+}
+
 /*
  * Setup to read tuples from a file for COPY FROM.
  *
@@ -2099,6 +2233,7 @@ BeginCopyFrom(Relation rel,
        int                *defmap;
        ExprState **defexprs;
        MemoryContext oldcontext;
+       bool            volatile_defexprs;
 
        cstate = BeginCopy(true, rel, NULL, NULL, attnamelist, options);
        oldcontext = MemoryContextSwitchTo(cstate->copycontext);
@@ -2122,6 +2257,7 @@ BeginCopyFrom(Relation rel,
        attr = tupDesc->attrs;
        num_phys_attrs = tupDesc->natts;
        num_defaults = 0;
+       volatile_defexprs = false;
 
        /*
         * Pick up the required catalog information for each attribute in the
@@ -2163,6 +2299,9 @@ BeginCopyFrom(Relation rel,
                                                                 expression_planner((Expr *) defexpr), NULL);
                                defmap[num_defaults] = attnum - 1;
                                num_defaults++;
+
+                               if (!volatile_defexprs)
+                                       volatile_defexprs = contain_volatile_functions(defexpr);
                        }
                }
        }
@@ -2172,6 +2311,7 @@ BeginCopyFrom(Relation rel,
        cstate->typioparams = typioparams;
        cstate->defmap = defmap;
        cstate->defexprs = defexprs;
+       cstate->volatile_defexprs = volatile_defexprs;
        cstate->num_defaults = num_defaults;
 
        if (pipe)
index 24582e304c0f1d0a1724900597c38a244e4a8ae5..24f4cde068e7799053152e42af40d3390efc009e 100644 (file)
@@ -1677,10 +1677,10 @@ add_tabstat_xact_level(PgStat_TableStatus *pgstat_info, int nest_level)
 }
 
 /*
- * pgstat_count_heap_insert - count a tuple insertion
+ * pgstat_count_heap_insert - count a tuple insertion of n tuples
  */
 void
-pgstat_count_heap_insert(Relation rel)
+pgstat_count_heap_insert(Relation rel, int n)
 {
        PgStat_TableStatus *pgstat_info = rel->pgstat_info;
 
@@ -1693,7 +1693,7 @@ pgstat_count_heap_insert(Relation rel)
                        pgstat_info->trans->nest_level != nest_level)
                        add_tabstat_xact_level(pgstat_info, nest_level);
 
-               pgstat_info->trans->tuples_inserted++;
+               pgstat_info->trans->tuples_inserted += n;
        }
 }
 
index 85cbeb3273b80fba42d47f2aa0f3b6d7490c2f71..ed30b5dada63e54e326c96fe176f07961c55cf3a 100644 (file)
@@ -97,6 +97,8 @@ extern void FreeBulkInsertState(BulkInsertState);
 
 extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid,
                        int options, BulkInsertState bistate);
+extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
+                                 CommandId cid, int options, BulkInsertState bistate);
 extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
                        ItemPointer ctid, TransactionId *update_xmax,
                        CommandId cid, Snapshot crosscheck, bool wait);
index 966e2d0299e95c280a3acaaea257416bfbbbb2ce..3ca25acb897d06be87568b31b6842deb66bfb4d0 100644 (file)
@@ -608,6 +608,7 @@ typedef HeapTupleData *HeapTuple;
 /* 0x20 is free, was XLOG_HEAP2_CLEAN_MOVE */
 #define XLOG_HEAP2_CLEANUP_INFO 0x30
 #define XLOG_HEAP2_VISIBLE             0x40
+#define XLOG_HEAP2_MULTI_INSERT        0x50
 
 /*
  * All what we need to find changed tuple
@@ -661,6 +662,36 @@ typedef struct xl_heap_insert
 
 #define SizeOfHeapInsert       (offsetof(xl_heap_insert, all_visible_cleared) + sizeof(bool))
 
+/*
+ * This is what we need to know about a multi-insert. The record consists of
+ * xl_heap_multi_insert header, followed by a xl_multi_insert_tuple and tuple
+ * data for each tuple. 'offsets' array is omitted if the whole page is
+ * reinitialized (XLOG_HEAP_INIT_PAGE)
+ */
+typedef struct xl_heap_multi_insert
+{
+       RelFileNode node;
+       BlockNumber     blkno;
+       bool            all_visible_cleared;
+       uint16          ntuples;
+       OffsetNumber offsets[1];
+
+       /* TUPLE DATA (xl_multi_insert_tuples) FOLLOW AT END OF STRUCT */
+} xl_heap_multi_insert;
+
+#define SizeOfHeapMultiInsert  offsetof(xl_heap_multi_insert, offsets)
+
+typedef struct xl_multi_insert_tuple
+{
+       uint16          datalen;                                /* size of tuple data that follows */
+       uint16          t_infomask2;
+       uint16          t_infomask;
+       uint8           t_hoff;
+       /* TUPLE DATA FOLLOWS AT END OF STRUCT */
+} xl_multi_insert_tuple;
+
+#define SizeOfMultiInsertTuple (offsetof(xl_multi_insert_tuple, t_hoff) + sizeof(uint8))
+
 /* This is what we need to know about update|hot_update */
 typedef struct xl_heap_update
 {
index 7b2bd4edc0e1d01c27442ca09b9d9031584af84f..651b7d9ddf85c6e1dded59d68253caf5ab2b7fd8 100644 (file)
@@ -766,7 +766,7 @@ extern void pgstat_initstats(Relation rel);
                        (rel)->pgstat_info->t_counts.t_blocks_hit++;                    \
        } while (0)
 
-extern void pgstat_count_heap_insert(Relation rel);
+extern void pgstat_count_heap_insert(Relation rel, int n);
 extern void pgstat_count_heap_update(Relation rel, bool hot);
 extern void pgstat_count_heap_delete(Relation rel);
 extern void pgstat_update_heap_dead_tuples(Relation rel, int delta);