#include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */
#include "storage/bufmgr.h"
#include "storage/fd.h"
+#include "storage/procarray.h"
#include "storage/sinval.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
bool txn_prepared);
+static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
TransactionId xid, XLogSegNo segno);
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
/*
- * While streaming the previous changes we have detected that the
- * transaction is aborted. So there is no point in collecting further
- * changes for it.
+ * If we have detected that the transaction is aborted while streaming the
+ * previous changes or by checking its CLOG, there is no point in
+ * collecting further changes for it.
*/
- if (txn->concurrent_abort)
+ if (rbtxn_is_aborted(txn))
{
/*
* We don't need to update memory accounting for this change as we
/*
* Discard changes from a transaction (and subtransactions), either after
- * streaming or decoding them at PREPARE. Keep the remaining info -
- * transactions, tuplecids, invalidations and snapshots.
+ * streaming, decoding them at PREPARE, or detecting the transaction abort.
+ * Keep the remaining info - transactions, tuplecids, invalidations and
+ * snapshots.
*
* We additionally remove tuplecids after decoding the transaction at prepare
* time as we only need to perform invalidation at rollback or commit prepared.
Assert(rbtxn_is_known_subxact(subtxn));
Assert(subtxn->nsubtxns == 0);
+ ReorderBufferMaybeMarkTXNStreamed(rb, subtxn);
ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
}
/* Update the memory counter */
ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
- /*
- * Mark the transaction as streamed.
- *
- * The top-level transaction, is marked as streamed always, even if it
- * does not contain any changes (that is, when all the changes are in
- * subtransactions).
- *
- * For subtransactions, we only mark them as streamed when there are
- * changes in them.
- *
- * We do it this way because of aborts - we don't want to send aborts for
- * XIDs the downstream is not aware of. And of course, it always knows
- * about the toplevel xact (we send the XID in all messages), but we never
- * stream XIDs of empty subxacts.
- */
- if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
- txn->txn_flags |= RBTXN_IS_STREAMED;
-
if (txn_prepared)
{
/*
txn->nentries = 0;
}
+/*
+ * Check the transaction status by CLOG lookup and discard all changes if
+ * the transaction is aborted. The transaction status is cached in
+ * txn->txn_flags so we can skip future changes and avoid CLOG lookups on the
+ * next call.
+ *
+ * Return true if the transaction is aborted, otherwise return false.
+ *
+ * When the 'debug_logical_replication_streaming' is set to "immediate", we
+ * don't check the transaction status, meaning the caller will always process
+ * this transaction.
+ */
+static bool
+ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ /* Quick return for regression tests */
+ if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
+ return false;
+
+ /*
+ * Quick return if the transaction status is already known.
+ */
+
+ if (rbtxn_is_committed(txn))
+ return false;
+ if (rbtxn_is_aborted(txn))
+ {
+ /* Already-aborted transactions should not have any changes */
+ Assert(txn->size == 0);
+
+ return true;
+ }
+
+ /* Otherwise, check the transaction status using CLOG lookup */
+
+ if (TransactionIdIsInProgress(txn->xid))
+ return false;
+
+ if (TransactionIdDidCommit(txn->xid))
+ {
+ /*
+ * Remember the transaction is committed so that we can skip CLOG
+ * check next time, avoiding the pressure on CLOG lookup.
+ */
+ Assert(!rbtxn_is_aborted(txn));
+ txn->txn_flags |= RBTXN_IS_COMMITTED;
+ return false;
+ }
+
+ /*
+ * The transaction aborted. We discard both the changes collected so far
+ * and the toast reconstruction data. The full cleanup will happen as part
+ * of decoding ABORT record of this transaction.
+ */
+ ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+ ReorderBufferToastReset(rb, txn);
+
+ /* All changes should be discarded */
+ Assert(txn->size == 0);
+
+ /*
+ * Mark the transaction as aborted so we can ignore future changes of this
+ * transaction.
+ */
+ Assert(!rbtxn_is_committed(txn));
+ txn->txn_flags |= RBTXN_IS_ABORTED;
+
+ return true;
+}
+
/*
* Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
* HeapTupleSatisfiesHistoricMVCC.
* Note, we send stream prepare even if a concurrent abort is
* detected. See DecodePrepare for more information.
*/
+ Assert(!rbtxn_sent_prepare(txn));
rb->stream_prepare(rb, txn, txn->final_lsn);
+ txn->txn_flags |= RBTXN_SENT_PREPARE;
/*
* This is a PREPARED transaction, part of a two-phase commit. The
txn, command_id);
}
+/*
+ * Mark the given transaction as streamed if it's a top-level transaction
+ * or has changes.
+ */
+static void
+ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+ /*
+ * The top-level transaction, is marked as streamed always, even if it
+ * does not contain any changes (that is, when all the changes are in
+ * subtransactions).
+ *
+ * For subtransactions, we only mark them as streamed when there are
+ * changes in them.
+ *
+ * We do it this way because of aborts - we don't want to send aborts for
+ * XIDs the downstream is not aware of. And of course, it always knows
+ * about the top-level xact (we send the XID in all messages), but we
+ * never stream XIDs of empty subxacts.
+ */
+ if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))
+ txn->txn_flags |= RBTXN_IS_STREAMED;
+}
+
/*
* Helper function for ReorderBufferProcessTXN to handle the concurrent
* abort of the streaming transaction. This resets the TXN such that it
* regular ones).
*/
if (rbtxn_prepared(txn))
+ {
+ Assert(!rbtxn_sent_prepare(txn));
rb->prepare(rb, txn, commit_lsn);
+ txn->txn_flags |= RBTXN_SENT_PREPARE;
+ }
else
rb->commit(rb, txn, commit_lsn);
}
*/
if (streaming || rbtxn_prepared(txn))
{
+ if (streaming)
+ ReorderBufferMaybeMarkTXNStreamed(rb, txn);
+
ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
/* Reset the CheckXidAlive */
CheckXidAlive = InvalidTransactionId;
FlushErrorState();
FreeErrorData(errdata);
errdata = NULL;
- curtxn->concurrent_abort = true;
+
+ /* Remember the transaction is aborted. */
+ Assert(!rbtxn_is_committed(curtxn));
+ curtxn->txn_flags |= RBTXN_IS_ABORTED;
+
+ /* Mark the transaction is streamed if appropriate */
+ if (stream_started)
+ ReorderBufferMaybeMarkTXNStreamed(rb, txn);
/* Reset the TXN so that it is allowed to stream remaining data. */
ReorderBufferResetTXN(rb, txn, snapshot_now,
txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
/*
- * We send the prepare for the concurrently aborted xacts so that later
- * when rollback prepared is decoded and sent, the downstream should be
- * able to rollback such a xact. See comments atop DecodePrepare.
- *
- * Note, for the concurrent_abort + streaming case a stream_prepare was
- * already sent within the ReorderBufferReplay call above.
+ * Send a prepare if not already done so. This might occur if we have
+ * detected a concurrent abort while replaying the non-streaming
+ * transaction.
*/
- if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
+ if (!rbtxn_sent_prepare(txn))
+ {
rb->prepare(rb, txn, txn->final_lsn);
+ txn->txn_flags |= RBTXN_SENT_PREPARE;
+ }
}
/*
}
/*
- * Find the largest streamable toplevel transaction to evict (by streaming).
+ * Find the largest streamable (and non-aborted) toplevel transaction to evict
+ * (by streaming).
*
* This can be seen as an optimized version of ReorderBufferLargestTXN, which
* should give us the same transaction (because we don't update memory account
/* base_snapshot must be set */
Assert(txn->base_snapshot != NULL);
+ /* Don't consider these kinds of transactions for eviction. */
+ if (rbtxn_has_partial_change(txn) ||
+ !rbtxn_has_streamable_change(txn) ||
+ rbtxn_is_aborted(txn))
+ continue;
+
+ /* Find the largest of the eviction candidates. */
if ((largest == NULL || txn->total_size > largest_size) &&
- (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
- rbtxn_has_streamable_change(txn))
+ (txn->total_size > 0))
{
largest = txn;
largest_size = txn->total_size;
rb->size > 0))
{
/*
- * Pick the largest transaction and evict it from memory by streaming,
- * if possible. Otherwise, spill to disk.
+ * Pick the largest non-aborted transaction and evict it from memory
+ * by streaming, if possible. Otherwise, spill to disk.
*/
if (ReorderBufferCanStartStreaming(rb) &&
(txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
Assert(txn->total_size > 0);
Assert(rb->size >= txn->total_size);
+ /* skip the transaction if aborted */
+ if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
+ continue;
+
ReorderBufferStreamTXN(rb, txn);
}
else
Assert(txn->size > 0);
Assert(rb->size >= txn->size);
+ /* skip the transaction if aborted */
+ if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
+ continue;
+
ReorderBufferSerializeTXN(rb, txn);
}