LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
+ LogicalDecodeSequenceCB sequence_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeFilterPrepareCB filter_prepare_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamSequenceCB stream_sequence_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
and <function>commit_cb</function> callbacks are required,
while <function>startup_cb</function>,
<function>filter_by_origin_cb</function>, <function>truncate_cb</function>,
- and <function>shutdown_cb</function> are optional.
- If <function>truncate_cb</function> is not set but a
+ <function>sequence_cb</function>, and <function>shutdown_cb</function> are
+ optional. If <function>truncate_cb</function> is not set but a
<command>TRUNCATE</command> is to be decoded, the action will be ignored.
+ Similarly, if <function>sequence_cb</function> is not set and a sequence
+ change is to be decoded, the action will be ignored.
</para>
<para>
<function>stream_stop_cb</function>, <function>stream_abort_cb</function>,
<function>stream_commit_cb</function>, <function>stream_change_cb</function>,
and <function>stream_prepare_cb</function>
- are required, while <function>stream_message_cb</function> and
+ are required, while <function>stream_message_cb</function>,
+ <function>stream_sequence_cb</function>, and
<function>stream_truncate_cb</function> are optional.
</para>
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-sequence">
+ <title>Sequence Callback</title>
+
+ <para>
+ The optional <function>sequence_cb</function> callback is called for
+ actions that update a sequence value.
+<programlisting>
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+</programlisting>
+ The <parameter>txn</parameter> parameter contains meta information about
+ the transaction the sequence change is part of. Note however that for
+ non-transactional increments, the transaction may be either NULL or not
+ NULL, depending on if the transaction already has XID assigned.
+ The <parameter>sequence_lsn</parameter> has WAL location of the sequence
+ update. The <parameter>transactional</parameter> says if the sequence has
+ to be replayed as part of the transaction or directly.
+
+ The <parameter>last_value</parameter>, <parameter>log_cnt</parameter> and
+ <parameter>is_called</parameter> parameters describe the sequence change.
+ </para>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-filter-prepare">
<title>Prepare Filter Callback</title>
</para>
</sect3>
+ <sect3 id="logicaldecoding-output-plugin-stream-sequence">
+ <title>Stream Sequence Callback</title>
+ <para>
+ The optional <function>stream_sequence_cb</function> callback is called
+ for actions that change a sequence in a block of streamed changes
+ (demarcated by <function>stream_start_cb</function> and
+ <function>stream_stop_cb</function> calls).
+<programlisting>
+typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+</programlisting>
+ </para>
+ </sect3>
+
<sect3 id="logicaldecoding-output-plugin-stream-truncate">
<title>Stream Truncate Callback</title>
<para>
in-progress transactions. There are multiple required streaming callbacks
(<function>stream_start_cb</function>, <function>stream_stop_cb</function>,
<function>stream_abort_cb</function>, <function>stream_commit_cb</function>
- and <function>stream_change_cb</function>) and two optional callbacks
- (<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
+ and <function>stream_change_cb</function>) and multiple optional callbacks
+ (<function>stream_message_cb</function>, <function>stream_sequence_cb</function>,
+ and <function>stream_truncate_cb</function>).
Also, if streaming of two-phase commands is to be supported, then additional
callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
for details).
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(rel))
+ {
GetTopTransactionId();
+ if (XLogLogicalInfoActive())
+ GetCurrentTransactionId();
+ }
+
START_CRIT_SECTION();
MarkBufferDirty(buf);
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.node = rel->rd_node;
+ xlrec.created = true;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) tuple->t_data, tuple->t_len);
* It's sufficient to ensure the toplevel transaction has an xid, no need
* to assign xids subxacts, that'll already trigger an appropriate wait.
* (Have to do that here, so we're outside the critical section)
+ *
+ * We have to ensure we have a proper XID, which will be included in
+ * the XLOG record by XLogRecordAssemble. Otherwise the first nextval()
+ * in a subxact (without any preceding changes) would get XID 0, and it
+ * would then be impossible to decide which top xact it belongs to.
+ * It'd also trigger assert in DecodeSequence. We only do that with
+ * wal_level=logical, though.
+ *
+ * XXX This might seem unnecessary, because if there's no XID the xact
+ * couldn't have done anything important yet, e.g. it could not have
+ * created a sequence. But that's incorrect, because of subxacts. The
+ * current subtransaction might not have done anything yet (thus no XID),
+ * but an earlier one might have created the sequence.
*/
if (logit && RelationNeedsWAL(seqrel))
+ {
GetTopTransactionId();
+ if (XLogLogicalInfoActive())
+ GetCurrentTransactionId();
+ }
+
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
seq->log_cnt = 0;
xlrec.node = seqrel->rd_node;
+ xlrec.created = false;
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
/* check the comment above nextval_internal()'s equivalent call. */
if (RelationNeedsWAL(seqrel))
+ {
GetTopTransactionId();
+ if (XLogLogicalInfoActive())
+ GetCurrentTransactionId();
+ }
+
/* ready to change the on-disk (or really, in-buffer) tuple */
START_CRIT_SECTION();
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
xlrec.node = seqrel->rd_node;
+ xlrec.created = false;
+
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/standby.h"
+#include "commands/sequence.h"
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* common function to decode tuples */
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
+static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
/* helper functions for decoding transactions */
static inline bool FilterPrepare(LogicalDecodingContext *ctx,
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
}
+
+/*
+ * DecodeSeqTuple
+ * decode tuple describing the sequence increment
+ *
+ * Sequences are represented as a table with a single row, which gets updated
+ * by nextval(). The tuple is stored in WAL right after the xl_seq_rec, so we
+ * simply copy it into the tuplebuf (similar to seq_redo).
+ */
+static void
+DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
+{
+ int datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
+
+ Assert(datalen >= 0);
+
+ tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;
+
+ ItemPointerSetInvalid(&tuple->tuple.t_self);
+
+ tuple->tuple.t_tableOid = InvalidOid;
+
+ memcpy(((char *) tuple->tuple.t_data),
+ data + sizeof(xl_seq_rec),
+ SizeofHeapTupleHeader);
+
+ memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
+ data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
+ datalen);
+}
+
+/*
+ * Handle sequence decode
+ *
+ * Decoding sequences is a bit tricky, because while most sequence actions
+ * are non-transactional (not subject to rollback), some need to be handled
+ * as transactional.
+ *
+ * By default, a sequence increment is non-transactional - we must not queue
+ * it in a transaction as other changes, because the transaction might get
+ * rolled back and we'd discard the increment. The downstream would not be
+ * notified about the increment, which is wrong.
+ *
+ * On the other hand, the sequence may be created in a transaction. In this
+ * case we *should* queue the change as other changes in the transaction,
+ * because we don't want to send the increments for unknown sequence to the
+ * plugin - it might get confused about which sequence it's related to etc.
+ */
+void
+sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferTupleBuf *tuplebuf;
+ RelFileNode target_node;
+ XLogReaderState *r = buf->record;
+ char *tupledata = NULL;
+ Size tuplelen;
+ Size datalen = 0;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+ xl_seq_rec *xlrec;
+ Snapshot snapshot;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
+ bool transactional;
+
+ /* only decode changes flagged with XLOG_SEQ_LOG */
+ if (info != XLOG_SEQ_LOG)
+ elog(ERROR, "unexpected RM_SEQ_ID record type: %u", info);
+
+ /*
+ * If we don't have snapshot or we are just fast-forwarding, there is no
+ * point in decoding messages.
+ */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+ ctx->fast_forward)
+ return;
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ tupledata = XLogRecGetData(r);
+ datalen = XLogRecGetDataLen(r);
+ tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
+
+ /* extract the WAL record, with "created" flag */
+ xlrec = (xl_seq_rec *) XLogRecGetData(r);
+
+ /* XXX how could we have sequence change without data? */
+ if(!datalen || !tupledata)
+ return;
+
+ tuplebuf = ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
+ DecodeSeqTuple(tupledata, datalen, tuplebuf);
+
+ /*
+ * Should we handle the sequence increment as transactional or not?
+ *
+ * If the sequence was created in a still-running transaction, treat
+ * it as transactional and queue the increments. Otherwise it needs
+ * to be treated as non-transactional, in which case we send it to
+ * the plugin right away.
+ */
+ transactional = ReorderBufferSequenceIsTransactional(ctx->reorder,
+ target_node,
+ xlrec->created);
+
+ /* Skip the change if already processed (per the snapshot). */
+ if (transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+ else if (!transactional &&
+ (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+ SnapBuildXactNeedsSkip(builder, buf->origptr)))
+ return;
+
+ /* Queue the increment (or send immediately if not transactional). */
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr,
+ origin_id, target_node, transactional,
+ xlrec->created, tuplebuf);
+}
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called);
/* streaming callbacks */
static void stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
static void stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
+static void stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called);
static void stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change);
ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
+ ctx->reorder->sequence = sequence_cb_wrapper;
/*
* To support streaming, we require start/stop/abort/commit/change
(ctx->callbacks.stream_commit_cb != NULL) ||
(ctx->callbacks.stream_change_cb != NULL) ||
(ctx->callbacks.stream_message_cb != NULL) ||
+ (ctx->callbacks.stream_sequence_cb != NULL) ||
(ctx->callbacks.stream_truncate_cb != NULL);
/*
ctx->reorder->stream_commit = stream_commit_cb_wrapper;
ctx->reorder->stream_change = stream_change_cb_wrapper;
ctx->reorder->stream_message = stream_message_cb_wrapper;
+ ctx->reorder->stream_sequence = stream_sequence_cb_wrapper;
ctx->reorder->stream_truncate = stream_truncate_cb_wrapper;
error_context_stack = errcallback.previous;
}
+static void
+sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel, bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ if (ctx->callbacks.sequence_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "sequence";
+ state.report_location = sequence_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = sequence_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+ last_value, log_cnt, is_called);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr first_lsn)
error_context_stack = errcallback.previous;
}
+static void
+stream_sequence_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ /* We're only supposed to call this when streaming is supported. */
+ Assert(ctx->streaming);
+
+ /* this callback is optional */
+ if (ctx->callbacks.stream_sequence_cb == NULL)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "stream_sequence";
+ state.report_location = sequence_lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+ ctx->write_location = sequence_lsn;
+
+ /* do the actual work: call callback */
+ ctx->callbacks.sequence_cb(ctx, txn, sequence_lsn, rel, transactional,
+ last_value, log_cnt, is_called);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
static void
stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
int nrelations, Relation relations[],
* a bit more memory to the oldest subtransactions, because it's likely
* they are the source for the next sequence of changes.
*
+ * When decoding sequences, we differentiate between a sequences created
+ * in a (running) transaction, and sequences created in other (already
+ * committed) transactions. Changes for sequences created in the same
+ * top-level transaction are treated as "transactional" i.e. just like
+ * any other change from that transaction (and discarded in case of a
+ * rollback). Changes for sequences created earlier are treated as not
+ * transactional - are processed immediately, as if performed outside
+ * any transaction (and thus not rolled back).
+ *
+ * This mixed behavior is necessary - sequences are non-transactional
+ * (e.g. ROLLBACK does not undo the sequence increments). But for new
+ * sequences, we need to handle them in a transactional way, because if
+ * we ever get some DDL support, the sequence won't exist until the
+ * transaction gets applied. So we need to ensure the increments don't
+ * happen until the sequence gets created.
+ *
+ * To differentiate which sequences are "old" and which were created
+ * in a still-running transaction, we track sequences created in running
+ * transactions in a hash table. Sequences are identified by relfilenode,
+ * and we track XID of the (sub)transaction that created it. This means
+ * that if a transaction does something that changes the relfilenode
+ * (like an alter / reset of a sequence), the new relfilenode will be
+ * treated as if created in the transaction. The list of sequences gets
+ * discarded when the transaction completes (commit/rollback).
+ *
+ * We don't use the XID to check if it's the same top-level transaction.
+ * It's enough to know it was created in an in-progress transaction,
+ * and we know it must be the current one because otherwise it wouldn't
+ * see the sequence object.
+ *
+ * The XID may be valid even for non-transactional sequences - we simply
+ * keep the XID logged to WAL, it's up to the reorderbuffer to decide if
+ * the increment is transactional.
+ *
* -------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
#include "catalog/catalog.h"
+#include "commands/sequence.h"
#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pgstat.h"
ReorderBufferTXN *txn;
} ReorderBufferTXNByIdEnt;
+/* entry for hash table we use to track sequences created in running xacts */
+typedef struct ReorderBufferSequenceEnt
+{
+ RelFileNode rnode;
+ TransactionId xid;
+} ReorderBufferSequenceEnt;
+
/* data structures for (relfilenode, ctid) => (cmin, cmax) mapping */
typedef struct ReorderBufferTupleCidKey
{
buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* hash table of sequences, mapping relfilenode to XID of transaction */
+ hash_ctl.keysize = sizeof(RelFileNode);
+ hash_ctl.entrysize = sizeof(ReorderBufferSequenceEnt);
+ hash_ctl.hcxt = buffer->context;
+
+ buffer->sequences = hash_create("ReorderBufferSequenceHash", 1000, &hash_ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
change->data.truncate.relids = NULL;
}
break;
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ if (change->data.sequence.tuple)
+ {
+ ReorderBufferReturnTupleBuf(rb, change->data.sequence.tuple);
+ change->data.sequence.tuple = NULL;
+ }
+ break;
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
}
}
+/*
+ * Treat the sequence increment as transactional?
+ *
+ * The hash table tracks all sequences created in in-progress transactions,
+ * so we simply do a lookup (the sequence is identified by relfilende). If
+ * we find a match, the increment should be handled as transactional.
+ */
+bool
+ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+ RelFileNode rnode, bool created)
+{
+ bool found = false;
+
+ if (created)
+ return true;
+
+ hash_search(rb->sequences,
+ (void *) &rnode,
+ HASH_FIND,
+ &found);
+
+ return found;
+}
+
+/*
+ * Cleanup sequences created in in-progress transactions.
+ *
+ * There's no way to search by XID, so we simply do a seqscan of all
+ * the entries in the hash table. Hopefully there are only a couple
+ * entries in most cases - people generally don't create many new
+ * sequences over and over.
+ */
+static void
+ReorderBufferSequenceCleanup(ReorderBuffer *rb, TransactionId xid)
+{
+ HASH_SEQ_STATUS scan_status;
+ ReorderBufferSequenceEnt *ent;
+
+ hash_seq_init(&scan_status, rb->sequences);
+ while ((ent = (ReorderBufferSequenceEnt *) hash_seq_search(&scan_status)) != NULL)
+ {
+ /* skip sequences not from this transaction */
+ if (ent->xid != xid)
+ continue;
+
+ (void) hash_search(rb->sequences,
+ (void *) &(ent->rnode),
+ HASH_REMOVE, NULL);
+ }
+}
+
+/*
+ * A transactional sequence increment is queued to be processed upon commit
+ * and a non-transactional increment gets processed immediately.
+ *
+ * A sequence update may be both transactional and non-transactional. When
+ * created in a running transaction, treat it as transactional and queue
+ * the change in it. Otherwise treat it as non-transactional, so that we
+ * don't forget the increment in case of a rollback.
+ */
+void
+ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+ RelFileNode rnode, bool transactional, bool created,
+ ReorderBufferTupleBuf *tuplebuf)
+{
+ /*
+ * Change needs to be handled as transactional, because the sequence was
+ * created in a transaction that is still running. In that case all the
+ * changes need to be queued in that transaction, we must not send them
+ * to the downstream until the transaction commits.
+ *
+ * There's a bit of a trouble with subtransactions - we can't queue it
+ * into the subxact, because it might be rolled back and we'd lose the
+ * increment. We need to queue it into the same (sub)xact that created
+ * the sequence, which is why we track the XID in the hash table.
+ */
+ if (transactional)
+ {
+ MemoryContext oldcontext;
+ ReorderBufferChange *change;
+
+ /* lookup sequence by relfilenode */
+ ReorderBufferSequenceEnt *ent;
+ bool found;
+
+ /* transactional changes require a transaction */
+ Assert(xid != InvalidTransactionId);
+
+ /* search the lookup table (we ignore the return value, found is enough) */
+ ent = hash_search(rb->sequences,
+ (void *) &rnode,
+ created ? HASH_ENTER : HASH_FIND,
+ &found);
+
+ /*
+ * If this is the "create" increment, we must not have found any
+ * pre-existing entry in the hash table (i.e. there must not be
+ * any conflicting sequence).
+ */
+ Assert(!(created && found));
+
+ /* But we must have either created or found an existing entry. */
+ Assert(created || found);
+
+ /*
+ * When creating the sequence, remember the XID of the transaction
+ * that created id.
+ */
+ if (created)
+ ent->xid = xid;
+
+ /* XXX Maybe check that we're still in the same top-level xact? */
+
+ /* OK, allocate and queue the change */
+ oldcontext = MemoryContextSwitchTo(rb->context);
+
+ change = ReorderBufferGetChange(rb);
+
+ change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
+ change->origin_id = origin_id;
+
+ memcpy(&change->data.sequence.relnode, &rnode, sizeof(RelFileNode));
+
+ change->data.sequence.tuple = tuplebuf;
+
+ /* add it to the same subxact that created the sequence */
+ ReorderBufferQueueChange(rb, ent->xid, lsn, change, false);
+
+ MemoryContextSwitchTo(oldcontext);
+ }
+ else
+ {
+ /*
+ * This increment is for a sequence that was not created in any
+ * running transaction, so we treat it as non-transactional and
+ * just send it to the output plugin directly.
+ */
+ ReorderBufferTXN *txn = NULL;
+ volatile Snapshot snapshot_now = snapshot;
+ bool using_subtxn;
+
+#ifdef USE_ASSERT_CHECKING
+ /* All "creates" have to be handled as transactional. */
+ Assert(!created);
+
+ /* Make sure the sequence is not in the hash table. */
+ {
+ bool found;
+ hash_search(rb->sequences,
+ (void *) &rnode,
+ HASH_FIND, &found);
+ Assert(!found);
+ }
+#endif
+
+ if (xid != InvalidTransactionId)
+ txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+ /* setup snapshot to allow catalog access */
+ SetupHistoricSnapshot(snapshot_now, NULL);
+
+ /*
+ * Decoding needs access to syscaches et al., which in turn use
+ * heavyweight locks and such. Thus we need to have enough state around to
+ * keep track of those. The easiest way is to simply use a transaction
+ * internally. That also allows us to easily enforce that nothing writes
+ * to the database by checking for xid assignments.
+ *
+ * When we're called via the SQL SRF there's already a transaction
+ * started, so start an explicit subtransaction there.
+ */
+ using_subtxn = IsTransactionOrTransactionBlock();
+
+ PG_TRY();
+ {
+ Relation relation;
+ HeapTuple tuple;
+ Form_pg_sequence_data seq;
+ Oid reloid;
+
+ if (using_subtxn)
+ BeginInternalSubTransaction("sequence");
+ else
+ StartTransactionCommand();
+
+ reloid = RelidByRelfilenode(rnode.spcNode, rnode.relNode);
+
+ if (reloid == InvalidOid)
+ elog(ERROR, "could not map filenode \"%s\" to relation OID",
+ relpathperm(rnode,
+ MAIN_FORKNUM));
+
+ relation = RelationIdGetRelation(reloid);
+ tuple = &tuplebuf->tuple;
+ seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+
+ rb->sequence(rb, txn, lsn, relation, transactional,
+ seq->last_value, seq->log_cnt, seq->is_called);
+
+ RelationClose(relation);
+
+ TeardownHistoricSnapshot(false);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+ }
+ PG_CATCH();
+ {
+ TeardownHistoricSnapshot(true);
+
+ AbortCurrentTransaction();
+
+ if (using_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+}
+
/*
* AssertTXNLsnOrder
* Verify LSN ordering of transaction lists in the reorderbuffer
&found);
Assert(found);
+ /* Remove sequences created in this transaction (if any). */
+ ReorderBufferSequenceCleanup(rb, txn->xid);
+
/* remove entries spilled to disk */
if (rbtxn_is_serialized(txn))
ReorderBufferRestoreCleanup(rb, txn);
change->data.msg.message);
}
+/*
+ * Helper function for ReorderBufferProcessTXN for applying sequences.
+ */
+static inline void
+ReorderBufferApplySequence(ReorderBuffer *rb, ReorderBufferTXN *txn,
+ Relation relation, ReorderBufferChange *change,
+ bool streaming)
+{
+ HeapTuple tuple;
+ Form_pg_sequence_data seq;
+
+ tuple = &change->data.sequence.tuple->tuple;
+ seq = (Form_pg_sequence_data) GETSTRUCT(tuple);
+
+ /* Only ever called from ReorderBufferApplySequence, so transational. */
+ if (streaming)
+ rb->stream_sequence(rb, txn, change->lsn, relation, true,
+ seq->last_value, seq->log_cnt, seq->is_called);
+ else
+ rb->sequence(rb, txn, change->lsn, relation, true,
+ seq->last_value, seq->log_cnt, seq->is_called);
+}
+
/*
* Function to store the command id and snapshot at the end of the current
* stream so that we can reuse the same while sending the next stream.
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
elog(ERROR, "tuplecid value in changequeue");
break;
+
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ Assert(snapshot_now);
+
+ reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
+ change->data.sequence.relnode.relNode);
+
+ if (reloid == InvalidOid)
+ elog(ERROR, "could not map filenode \"%s\" to relation OID",
+ relpathperm(change->data.sequence.relnode,
+ MAIN_FORKNUM));
+
+ relation = RelationIdGetRelation(reloid);
+
+ if (!RelationIsValid(relation))
+ elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
+ reloid,
+ relpathperm(change->data.sequence.relnode,
+ MAIN_FORKNUM));
+
+ if (RelationIsLogicallyLogged(relation))
+ ReorderBufferApplySequence(rb, txn, relation, change, streaming);
+
+ RelationClose(relation);
+ break;
}
}
memcpy(data, change->data.truncate.relids, size);
data += size;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ {
+ char *data;
+ ReorderBufferTupleBuf *tup;
+ Size len = 0;
+
+ tup = change->data.sequence.tuple;
+
+ if (tup)
+ {
+ sz += sizeof(HeapTupleData);
+ len = tup->tuple.t_len;
+ sz += len;
+ }
+
+ /* make sure we have enough space */
+ ReorderBufferSerializeReserve(rb, sz);
+
+ data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+ /* might have been reallocated above */
+ ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+ if (len)
+ {
+ memcpy(data, &tup->tuple, sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ memcpy(data, tup->tuple.t_data, len);
+ data += len;
+ }
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
{
sz += sizeof(Oid) * change->data.truncate.nrelids;
+ break;
+ }
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ {
+ ReorderBufferTupleBuf *tup;
+ Size len = 0;
+
+ tup = change->data.sequence.tuple;
+
+ if (tup)
+ {
+ sz += sizeof(HeapTupleData);
+ len = tup->tuple.t_len;
+ sz += len;
+ }
+
break;
}
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
break;
}
+
+ case REORDER_BUFFER_CHANGE_SEQUENCE:
+ if (change->data.sequence.tuple)
+ {
+ uint32 tuplelen = ((HeapTuple) data)->t_len;
+
+ change->data.sequence.tuple =
+ ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
+
+ /* restore ->tuple */
+ memcpy(&change->data.sequence.tuple->tuple, data,
+ sizeof(HeapTupleData));
+ data += sizeof(HeapTupleData);
+
+ /* reset t_data pointer into the new tuplebuf */
+ change->data.sequence.tuple->tuple.t_data =
+ ReorderBufferTupleBufData(change->data.sequence.tuple);
+
+ /* restore tuple data itself */
+ memcpy(change->data.sequence.tuple->tuple.t_data, data, tuplelen);
+ data += tuplelen;
+ }
+ break;
+
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL)
PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL)
PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL)
-PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL)
+PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, sequence_decode)
PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL)
PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL)
PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL)
typedef struct xl_seq_rec
{
RelFileNode node;
+ bool created; /* creates a new relfilenode (CREATE/ALTER) */
/* SEQUENCE TUPLE DATA FOLLOWS AT THE END */
} xl_seq_rec;
extern void xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+extern void sequence_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
extern void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx,
XLogReaderState *record);
Size message_size,
const char *message);
+/*
+ * Called for the generic logical decoding sequences.
+ */
+typedef void (*LogicalDecodeSequenceCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+
/*
* Filter changes by origin.
*/
Size message_size,
const char *message);
+/*
+ * Called for the streaming generic logical decoding sequences from in-progress
+ * transactions.
+ */
+typedef void (*LogicalDecodeStreamSequenceCB) (struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ int64 last_value,
+ int64 log_cnt,
+ bool is_called);
+
/*
* Callback for streaming truncates from in-progress transactions.
*/
LogicalDecodeTruncateCB truncate_cb;
LogicalDecodeCommitCB commit_cb;
LogicalDecodeMessageCB message_cb;
+ LogicalDecodeSequenceCB sequence_cb;
LogicalDecodeFilterByOriginCB filter_by_origin_cb;
LogicalDecodeShutdownCB shutdown_cb;
LogicalDecodeStreamCommitCB stream_commit_cb;
LogicalDecodeStreamChangeCB stream_change_cb;
LogicalDecodeStreamMessageCB stream_message_cb;
+ LogicalDecodeStreamSequenceCB stream_sequence_cb;
LogicalDecodeStreamTruncateCB stream_truncate_cb;
} OutputPluginCallbacks;
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
- REORDER_BUFFER_CHANGE_TRUNCATE
+ REORDER_BUFFER_CHANGE_TRUNCATE,
+ REORDER_BUFFER_CHANGE_SEQUENCE
};
/* forward declaration */
uint32 ninvalidations; /* Number of messages */
SharedInvalidationMessage *invalidations; /* invalidation message */
} inval;
+
+ /* Context data for Sequence changes */
+ struct
+ {
+ RelFileNode relnode;
+ ReorderBufferTupleBuf *tuple;
+ } sequence;
} data;
/*
const char *prefix, Size sz,
const char *message);
+/* sequence callback signature */
+typedef void (*ReorderBufferSequenceCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+
/* begin prepare callback signature */
typedef void (*ReorderBufferBeginPrepareCB) (ReorderBuffer *rb,
ReorderBufferTXN *txn);
const char *prefix, Size sz,
const char *message);
+/* stream sequence callback signature */
+typedef void (*ReorderBufferStreamSequenceCB) (ReorderBuffer *rb,
+ ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn,
+ Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt,
+ bool is_called);
+
/* stream truncate callback signature */
typedef void (*ReorderBufferStreamTruncateCB) (
ReorderBuffer *rb,
*/
HTAB *by_txn;
+ /*
+ * relfilenode => XID lookup table for sequences created in a transaction
+ * (also includes altered sequences, which assigns new relfilenode)
+ */
+ HTAB *sequences;
+
/*
* Transactions that could be a toplevel xact, ordered by LSN of the first
* record bearing that xid.
ReorderBufferApplyTruncateCB apply_truncate;
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;
+ ReorderBufferSequenceCB sequence;
/*
* Callbacks to be called when streaming a transaction at prepare time.
ReorderBufferStreamCommitCB stream_commit;
ReorderBufferStreamChangeCB stream_change;
ReorderBufferStreamMessageCB stream_message;
+ ReorderBufferStreamSequenceCB stream_sequence;
ReorderBufferStreamTruncateCB stream_truncate;
/*
void ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
bool transactional, const char *prefix,
Size message_size, const char *message);
+void ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid,
+ Snapshot snapshot, XLogRecPtr lsn, RepOriginId origin_id,
+ RelFileNode rnode, bool transactional, bool created,
+ ReorderBufferTupleBuf *tuplebuf);
void ReorderBufferCommit(ReorderBuffer *, TransactionId,
XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
void StartupReorderBuffer(void);
+bool ReorderBufferSequenceIsTransactional(ReorderBuffer *rb,
+ RelFileNode rnode, bool created);
+
#endif