ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static bool publications_valid;
-static bool in_streaming;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("streaming requested, but not supported by output plugin")));
- /* Also remember we're currently not streaming any transaction. */
- in_streaming = false;
-
/*
* Here, we just check whether the two-phase option is passed by
* plugin and decide whether to enable it at later point of time. It
ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool schema_sent;
TransactionId xid = InvalidTransactionId;
TransactionId topxid = InvalidTransactionId;
* If we're not in a streaming block, just use InvalidTransactionId and
* the write methods will not include it.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
if (rbtxn_is_subtxn(change->txn))
* doing that we need to study its impact on the case where we have a mix
* of streaming and non-streaming transactions.
*/
- if (in_streaming)
+ if (data->in_streaming)
schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
else
schema_sent = relentry->schema_sent;
send_relation_and_attrs(relation, xid, ctx, relentry->columns);
- if (in_streaming)
+ if (data->in_streaming)
set_schema_sent_in_streamed_txn(relentry, topxid);
else
relentry->schema_sent = true;
* their association and on aborts, it can discard the corresponding
* changes.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
relentry = get_rel_sync_entry(data, relation);
TransactionId xid = InvalidTransactionId;
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
- if (in_streaming)
+ if (data->in_streaming)
xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context);
* Remember the xid for the message in streaming mode. See
* pgoutput_change.
*/
- if (in_streaming)
+ if (data->in_streaming)
xid = txn->xid;
/*
pgoutput_stream_start(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
/* we can't nest streaming of transactions */
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
/*
* If we already sent the first stream for this transaction then don't
OutputPluginWrite(ctx, true);
/* we're streaming a chunk of transaction now */
- in_streaming = true;
+ data->in_streaming = true;
}
/*
pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+
/* we should be streaming a transaction */
- Assert(in_streaming);
+ Assert(data->in_streaming);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_stop(ctx->out);
OutputPluginWrite(ctx, true);
/* we've stopped streaming a transaction */
- in_streaming = false;
+ data->in_streaming = false;
}
/*
* The abort should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
/* determine the toplevel transaction */
toptxn = rbtxn_get_toptxn(txn);
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
+ PGOutputData *data PG_USED_FOR_ASSERTS_ONLY = (PGOutputData *) ctx->output_plugin_private;
+
/*
* The commit should happen outside streaming block, even for streamed
* transactions. The transaction has to be marked as streamed, though.
*/
- Assert(!in_streaming);
+ Assert(!data->in_streaming);
Assert(rbtxn_is_streamed(txn));
OutputPluginUpdateProgress(ctx, false);