/* common function to decode tuples */
-static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
+static void DecodeXLogTuple(char *data, Size len, HeapTuple tuple);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
ReorderBufferChange *change;
xl_multi_insert_tuple *xlhdr;
int datalen;
- ReorderBufferTupleBuf *tuple;
+ HeapTuple tuple;
HeapTupleHeader header;
change = ReorderBufferGetChange(ctx->reorder);
ReorderBufferGetTupleBuf(ctx->reorder, datalen);
tuple = change->data.tp.newtuple;
- header = tuple->tuple.t_data;
+ header = tuple->t_data;
/* not a disk based tuple */
- ItemPointerSetInvalid(&tuple->tuple.t_self);
+ ItemPointerSetInvalid(&tuple->t_self);
/*
* We can only figure this out after reassembling the transactions.
*/
- tuple->tuple.t_tableOid = InvalidOid;
+ tuple->t_tableOid = InvalidOid;
- tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
+ tuple->t_len = datalen + SizeofHeapTupleHeader;
memset(header, 0, SizeofHeapTupleHeader);
- memcpy((char *) tuple->tuple.t_data + SizeofHeapTupleHeader,
+ memcpy((char *) tuple->t_data + SizeofHeapTupleHeader,
(char *) data,
datalen);
header->t_infomask = xlhdr->t_infomask;
* computed outside as they are record specific.
*/
static void
-DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+DecodeXLogTuple(char *data, Size len, HeapTuple tuple)
{
xl_heap_header xlhdr;
int datalen = len - SizeOfHeapHeader;
Assert(datalen >= 0);
- tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
- header = tuple->tuple.t_data;
+ tuple->t_len = datalen + SizeofHeapTupleHeader;
+ header = tuple->t_data;
/* not a disk based tuple */
- ItemPointerSetInvalid(&tuple->tuple.t_self);
+ ItemPointerSetInvalid(&tuple->t_self);
/* we can only figure this out after reassembling the transactions */
- tuple->tuple.t_tableOid = InvalidOid;
+ tuple->t_tableOid = InvalidOid;
/* data is not stored aligned, copy to aligned storage */
memcpy((char *) &xlhdr,
memset(header, 0, SizeofHeapTupleHeader);
- memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+ memcpy(((char *) tuple->t_data) + SizeofHeapTupleHeader,
data + SizeOfHeapHeader,
datalen);
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
if (change->data.tp.newtuple)
{
- ReorderBufferReturnTupleBuf(rb, change->data.tp.newtuple);
+ ReorderBufferReturnTupleBuf(change->data.tp.newtuple);
change->data.tp.newtuple = NULL;
}
if (change->data.tp.oldtuple)
{
- ReorderBufferReturnTupleBuf(rb, change->data.tp.oldtuple);
+ ReorderBufferReturnTupleBuf(change->data.tp.oldtuple);
change->data.tp.oldtuple = NULL;
}
break;
}
/*
- * Get a fresh ReorderBufferTupleBuf fitting at least a tuple of size
- * tuple_len (excluding header overhead).
+ * Get a fresh HeapTuple fitting a tuple of size tuple_len (excluding header
+ * overhead).
*/
-ReorderBufferTupleBuf *
+HeapTuple
ReorderBufferGetTupleBuf(ReorderBuffer *rb, Size tuple_len)
{
- ReorderBufferTupleBuf *tuple;
+ HeapTuple tuple;
Size alloc_len;
alloc_len = tuple_len + SizeofHeapTupleHeader;
- tuple = (ReorderBufferTupleBuf *)
- MemoryContextAlloc(rb->tup_context,
- sizeof(ReorderBufferTupleBuf) +
- MAXIMUM_ALIGNOF + alloc_len);
- tuple->alloc_tuple_size = alloc_len;
- tuple->tuple.t_data = ReorderBufferTupleBufData(tuple);
+ tuple = (HeapTuple) MemoryContextAlloc(rb->tup_context,
+ HEAPTUPLESIZE + alloc_len);
+ tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
return tuple;
}
/*
- * Free a ReorderBufferTupleBuf.
+ * Free a HeapTuple returned by ReorderBufferGetTupleBuf().
*/
void
-ReorderBufferReturnTupleBuf(ReorderBuffer *rb, ReorderBufferTupleBuf *tuple)
+ReorderBufferReturnTupleBuf(HeapTuple tuple)
{
pfree(tuple);
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
{
char *data;
- ReorderBufferTupleBuf *oldtup,
- *newtup;
+ HeapTuple oldtup,
+ newtup;
Size oldlen = 0;
Size newlen = 0;
if (oldtup)
{
sz += sizeof(HeapTupleData);
- oldlen = oldtup->tuple.t_len;
+ oldlen = oldtup->t_len;
sz += oldlen;
}
if (newtup)
{
sz += sizeof(HeapTupleData);
- newlen = newtup->tuple.t_len;
+ newlen = newtup->t_len;
sz += newlen;
}
if (oldlen)
{
- memcpy(data, &oldtup->tuple, sizeof(HeapTupleData));
+ memcpy(data, oldtup, sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
- memcpy(data, oldtup->tuple.t_data, oldlen);
+ memcpy(data, oldtup->t_data, oldlen);
data += oldlen;
}
if (newlen)
{
- memcpy(data, &newtup->tuple, sizeof(HeapTupleData));
+ memcpy(data, newtup, sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
- memcpy(data, newtup->tuple.t_data, newlen);
+ memcpy(data, newtup->t_data, newlen);
data += newlen;
}
break;
case REORDER_BUFFER_CHANGE_DELETE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
{
- ReorderBufferTupleBuf *oldtup,
- *newtup;
+ HeapTuple oldtup,
+ newtup;
Size oldlen = 0;
Size newlen = 0;
if (oldtup)
{
sz += sizeof(HeapTupleData);
- oldlen = oldtup->tuple.t_len;
+ oldlen = oldtup->t_len;
sz += oldlen;
}
if (newtup)
{
sz += sizeof(HeapTupleData);
- newlen = newtup->tuple.t_len;
+ newlen = newtup->t_len;
sz += newlen;
}
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
/* restore ->tuple */
- memcpy(&change->data.tp.oldtuple->tuple, data,
+ memcpy(change->data.tp.oldtuple, data,
sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
/* reset t_data pointer into the new tuplebuf */
- change->data.tp.oldtuple->tuple.t_data =
- ReorderBufferTupleBufData(change->data.tp.oldtuple);
+ change->data.tp.oldtuple->t_data =
+ (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
/* restore tuple data itself */
- memcpy(change->data.tp.oldtuple->tuple.t_data, data, tuplelen);
+ memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
data += tuplelen;
}
ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
/* restore ->tuple */
- memcpy(&change->data.tp.newtuple->tuple, data,
+ memcpy(change->data.tp.newtuple, data,
sizeof(HeapTupleData));
data += sizeof(HeapTupleData);
/* reset t_data pointer into the new tuplebuf */
- change->data.tp.newtuple->tuple.t_data =
- ReorderBufferTupleBufData(change->data.tp.newtuple);
+ change->data.tp.newtuple->t_data =
+ (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
/* restore tuple data itself */
- memcpy(change->data.tp.newtuple->tuple.t_data, data, tuplelen);
+ memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
data += tuplelen;
}
Relation relation, ReorderBufferChange *change)
{
ReorderBufferToastEnt *ent;
- ReorderBufferTupleBuf *newtup;
+ HeapTuple newtup;
bool found;
int32 chunksize;
bool isnull;
Assert(IsToastRelation(relation));
newtup = change->data.tp.newtuple;
- chunk_id = DatumGetObjectId(fastgetattr(&newtup->tuple, 1, desc, &isnull));
+ chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
Assert(!isnull);
- chunk_seq = DatumGetInt32(fastgetattr(&newtup->tuple, 2, desc, &isnull));
+ chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
Assert(!isnull);
ent = (ReorderBufferToastEnt *)
elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
chunk_seq, chunk_id, ent->last_chunk_seq + 1);
- chunk = DatumGetPointer(fastgetattr(&newtup->tuple, 3, desc, &isnull));
+ chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
Assert(!isnull);
/* calculate size so we can allocate the right size at once later */
Relation toast_rel;
TupleDesc toast_desc;
MemoryContext oldcontext;
- ReorderBufferTupleBuf *newtup;
+ HeapTuple newtup;
Size old_size;
/* no toast tuples changed */
newtup = change->data.tp.newtuple;
- heap_deform_tuple(&newtup->tuple, desc, attrs, isnull);
+ heap_deform_tuple(newtup, desc, attrs, isnull);
for (natt = 0; natt < desc->natts; natt++)
{
{
bool cisnull;
ReorderBufferChange *cchange;
- ReorderBufferTupleBuf *ctup;
+ HeapTuple ctup;
Pointer chunk;
cchange = dlist_container(ReorderBufferChange, node, it.cur);
ctup = cchange->data.tp.newtuple;
- chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &cisnull));
+ chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
Assert(!cisnull);
Assert(!VARATT_IS_EXTERNAL(chunk));
* the tuplebuf because attrs[] will point back into the current content.
*/
tmphtup = heap_form_tuple(desc, attrs, isnull);
- Assert(newtup->tuple.t_len <= MaxHeapTupleSize);
- Assert(ReorderBufferTupleBufData(newtup) == newtup->tuple.t_data);
+ Assert(newtup->t_len <= MaxHeapTupleSize);
+ Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
- memcpy(newtup->tuple.t_data, tmphtup->t_data, tmphtup->t_len);
- newtup->tuple.t_len = tmphtup->t_len;
+ memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
+ newtup->t_len = tmphtup->t_len;
/*
* free resources we won't further need, more persistent stuff will be
DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE,
} DebugLogicalRepStreamingMode;
-/* an individual tuple, stored in one chunk of memory */
-typedef struct ReorderBufferTupleBuf
-{
- /* position in preallocated list */
- slist_node node;
-
- /* tuple header, the interesting bit for users of logical decoding */
- HeapTupleData tuple;
-
- /* pre-allocated size of tuple buffer, different from tuple size */
- Size alloc_tuple_size;
-
- /* actual tuple data follows */
-} ReorderBufferTupleBuf;
-
-/* pointer to the data stored in a TupleBuf */
-#define ReorderBufferTupleBufData(p) \
- ((HeapTupleHeader) MAXALIGN(((char *) p) + sizeof(ReorderBufferTupleBuf)))
-
/*
* Types of the change passed to a 'change' callback.
*
bool clear_toast_afterwards;
/* valid for DELETE || UPDATE */
- ReorderBufferTupleBuf *oldtuple;
+ HeapTuple oldtuple;
/* valid for INSERT || UPDATE */
- ReorderBufferTupleBuf *newtuple;
+ HeapTuple newtuple;
} tp;
/*
extern ReorderBuffer *ReorderBufferAllocate(void);
extern void ReorderBufferFree(ReorderBuffer *rb);
-extern ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *rb,
- Size tuple_len);
-extern void ReorderBufferReturnTupleBuf(ReorderBuffer *rb,
- ReorderBufferTupleBuf *tuple);
+extern HeapTuple ReorderBufferGetTupleBuf(ReorderBuffer *rb,
+ Size tuple_len);
+extern void ReorderBufferReturnTupleBuf(HeapTuple tuple);
+
extern ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *rb);
extern void ReorderBufferReturnChange(ReorderBuffer *rb,
ReorderBufferChange *change, bool upd_mem);