static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
/* fields valid only when processing streamed transaction */
-bool in_streamed_transaction = false;
+static bool in_streamed_transaction = false;
static TransactionId stream_xid = InvalidTransactionId;
*/
xid = pq_getmsgint(s, 4);
- Assert(TransactionIdIsValid(xid));
+ if (!TransactionIdIsValid(xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid transaction ID in streamed replication transaction")));
/* Add the new subxact to the array (unless already there). */
subxact_info_add(xid);
logicalrep_read_commit(s, &commit_data);
- Assert(commit_data.commit_lsn == remote_final_lsn);
+ if (commit_data.commit_lsn != remote_final_lsn)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+ LSN_FORMAT_ARGS(commit_data.commit_lsn),
+ LSN_FORMAT_ARGS(remote_final_lsn))));
apply_handle_commit_internal(s, &commit_data);
(IsTransactionState() && !am_tablesync_worker())))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("ORIGIN message sent out of order")));
+ errmsg_internal("ORIGIN message sent out of order")));
}
/*
bool first_segment;
HASHCTL hash_ctl;
- Assert(!in_streamed_transaction);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("duplicate STREAM START message")));
/*
* Start a transaction on stream start, this transaction will be committed
/* extract XID of the top-level transaction */
stream_xid = logicalrep_read_stream_start(s, &first_segment);
+ if (!TransactionIdIsValid(stream_xid))
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("invalid transaction ID in streamed replication transaction")));
+
/*
* Initialize the xidhash table if we haven't yet. This will be used for
* the entire duration of the apply worker so create it in permanent
static void
apply_handle_stream_stop(StringInfo s)
{
- Assert(in_streamed_transaction);
+ if (!in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM STOP message without STREAM START")));
/*
* Close the file with serialized changes, and serialize information about
TransactionId xid;
TransactionId subxid;
- Assert(!in_streamed_transaction);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM ABORT message without STREAM STOP")));
logicalrep_read_stream_abort(s, &xid, &subxid);
* performed rollback to savepoint for one of the earlier
* sub-transaction.
*/
-
int64 i;
int64 subidx;
BufFile *fd;
return;
}
- Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
-
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
- Assert(found);
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
/* open the changes file */
changes_filename(path, MyLogicalRepWorker->subid, xid);
int nchanges;
char path[MAXPGPATH];
char *buffer = NULL;
- bool found;
LogicalRepCommitData commit_data;
StreamXidHash *ent;
MemoryContext oldcxt;
BufFile *fd;
- Assert(!in_streamed_transaction);
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
/* open the spool file for the committed transaction */
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
+
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
- Assert(found);
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
+
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
buffer = palloc(BLCKSZ);
errmsg("could not read from streaming transaction's changes file \"%s\": %m",
path)));
- Assert(len > 0);
+ if (len <= 0)
+ elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
+ len, path);
/* make sure we have sufficiently large buffer */
buffer = repalloc(buffer, len);
nchanges++;
if (nchanges % 1000 == 0)
- elog(DEBUG1, "replayed %d changes from file '%s'",
+ elog(DEBUG1, "replayed %d changes from file \"%s\"",
nchanges, path);
}
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid logical replication message type \"%c\"", action)));
+ errmsg_internal("invalid logical replication message type \"%c\"",
+ action)));
}
/*
subxact_info_write(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
- bool found;
Size len;
StreamXidHash *ent;
BufFile *fd;
Assert(TransactionIdIsValid(xid));
- /* find the xid entry in the xidhash */
+ /* Find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
- /* we must found the entry for its top transaction by this time */
- Assert(found);
+ NULL);
+ /* By this time we must have created the transaction entry */
+ Assert(ent);
/*
* If there is no subtransaction then nothing to do, but if already have
subxact_info_read(Oid subid, TransactionId xid)
{
char path[MAXPGPATH];
- bool found;
Size len;
BufFile *fd;
StreamXidHash *ent;
MemoryContext oldctx;
- Assert(TransactionIdIsValid(xid));
Assert(!subxact_data.subxacts);
Assert(subxact_data.nsubxacts == 0);
Assert(subxact_data.nsubxacts_max == 0);
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
/*
* If subxact_fileset is not valid that mean we don't have any subxact
{
char path[MAXPGPATH];
StreamXidHash *ent;
- bool found = false;
- /* By this time we must have created the transaction entry */
+ /* Find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
HASH_FIND,
- &found);
- Assert(found);
+ NULL);
+ if (!ent)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("transaction %u not found in stream XID hash table",
+ xid)));
/* Delete the change file and release the stream fileset memory */
changes_filename(path, subid, xid);
/* create or find the xid entry in the xidhash */
ent = (StreamXidHash *) hash_search(xidhash,
(void *) &xid,
- HASH_ENTER | HASH_FIND,
+ HASH_ENTER,
&found);
- Assert(first_segment || found);
+
changes_filename(path, subid, xid);
elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
MemoryContext savectx;
SharedFileSet *fileset;
+ if (found)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
+
/*
* We need to maintain shared fileset across multiple stream
* start/stop calls. So, need to allocate it in a persistent context.
}
else
{
+ if (!found)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
+
/*
* Open the file and seek to the end of the file because we always
* append the changes file.
*/
if (!myslotname)
ereport(ERROR,
- (errmsg("subscription has no replication slot set")));
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("subscription has no replication slot set")));
/* Setup replication origin tracking. */
StartTransactionCommand();