Don't use Asserts to check for violations of replication protocol.
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 12 Jun 2021 16:59:15 +0000 (12:59 -0400)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 12 Jun 2021 16:59:15 +0000 (12:59 -0400)
Using an Assert to check the validity of incoming messages is an
extremely poor decision.  In a debug build, it should not be that easy
for a broken or malicious remote client to crash the logrep worker.
The consequences could be even worse in non-debug builds, which will
fail to make such checks at all, leading to who-knows-what misbehavior.
Hence, promote every Assert that could possibly be triggered by wrong
or out-of-order replication messages to a full test-and-ereport.

To avoid bloating the set of messages the translation team has to cope
with, establish a policy that replication protocol violation error
reports don't need to be translated.  Hence, all the new messages here
use errmsg_internal().  A couple of old messages are changed likewise
for consistency.

Along the way, fix some non-idiomatic or outright wrong uses of
hash_search().

Most of these mistakes are new with the "streaming replication"
patch (commit 464824323), but a couple go back a long way.
Back-patch as appropriate.

Discussion: https://postgr.es/m/1719083.1623351052@sss.pgh.pa.us

src/backend/replication/logical/reorderbuffer.c
src/backend/replication/logical/worker.c

index 2d9e1279bb27f5e488b910627b24de9402983094..f96029f15a45ea9b0335943e6ba8f686051af7b8 100644 (file)
@@ -1703,7 +1703,7 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
        ent = (ReorderBufferTupleCidEnt *)
            hash_search(txn->tuplecid_hash,
                        (void *) &key,
-                       HASH_ENTER | HASH_FIND,
+                       HASH_ENTER,
                        &found);
        if (!found)
        {
index 689a66cc72ddbe0d5ca6231627347509f8f425ab..4b112593c65eecc90abf3322d7d5a7a1fe043ac6 100644 (file)
@@ -177,7 +177,7 @@ bool        in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
 /* fields valid only when processing streamed transaction */
-bool       in_streamed_transaction = false;
+static bool in_streamed_transaction = false;
 
 static TransactionId stream_xid = InvalidTransactionId;
 
@@ -345,7 +345,10 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
     */
    xid = pq_getmsgint(s, 4);
 
-   Assert(TransactionIdIsValid(xid));
+   if (!TransactionIdIsValid(xid))
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("invalid transaction ID in streamed replication transaction")));
 
    /* Add the new subxact to the array (unless already there). */
    subxact_info_add(xid);
@@ -785,7 +788,12 @@ apply_handle_commit(StringInfo s)
 
    logicalrep_read_commit(s, &commit_data);
 
-   Assert(commit_data.commit_lsn == remote_final_lsn);
+   if (commit_data.commit_lsn != remote_final_lsn)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+                                LSN_FORMAT_ARGS(commit_data.commit_lsn),
+                                LSN_FORMAT_ARGS(remote_final_lsn))));
 
    apply_handle_commit_internal(s, &commit_data);
 
@@ -812,7 +820,7 @@ apply_handle_origin(StringInfo s)
         (IsTransactionState() && !am_tablesync_worker())))
        ereport(ERROR,
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                errmsg("ORIGIN message sent out of order")));
+                errmsg_internal("ORIGIN message sent out of order")));
 }
 
 /*
@@ -824,7 +832,10 @@ apply_handle_stream_start(StringInfo s)
    bool        first_segment;
    HASHCTL     hash_ctl;
 
-   Assert(!in_streamed_transaction);
+   if (in_streamed_transaction)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("duplicate STREAM START message")));
 
    /*
     * Start a transaction on stream start, this transaction will be committed
@@ -841,6 +852,11 @@ apply_handle_stream_start(StringInfo s)
    /* extract XID of the top-level transaction */
    stream_xid = logicalrep_read_stream_start(s, &first_segment);
 
+   if (!TransactionIdIsValid(stream_xid))
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("invalid transaction ID in streamed replication transaction")));
+
    /*
     * Initialize the xidhash table if we haven't yet. This will be used for
     * the entire duration of the apply worker so create it in permanent
@@ -873,7 +889,10 @@ apply_handle_stream_start(StringInfo s)
 static void
 apply_handle_stream_stop(StringInfo s)
 {
-   Assert(in_streamed_transaction);
+   if (!in_streamed_transaction)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("STREAM STOP message without STREAM START")));
 
    /*
     * Close the file with serialized changes, and serialize information about
@@ -905,7 +924,10 @@ apply_handle_stream_abort(StringInfo s)
    TransactionId xid;
    TransactionId subxid;
 
-   Assert(!in_streamed_transaction);
+   if (in_streamed_transaction)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("STREAM ABORT message without STREAM STOP")));
 
    logicalrep_read_stream_abort(s, &xid, &subxid);
 
@@ -932,7 +954,6 @@ apply_handle_stream_abort(StringInfo s)
         * performed rollback to savepoint for one of the earlier
         * sub-transaction.
         */
-
        int64       i;
        int64       subidx;
        BufFile    *fd;
@@ -967,13 +988,15 @@ apply_handle_stream_abort(StringInfo s)
            return;
        }
 
-       Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
-
        ent = (StreamXidHash *) hash_search(xidhash,
                                            (void *) &xid,
                                            HASH_FIND,
-                                           &found);
-       Assert(found);
+                                           NULL);
+       if (!ent)
+           ereport(ERROR,
+                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                    errmsg_internal("transaction %u not found in stream XID hash table",
+                                    xid)));
 
        /* open the changes file */
        changes_filename(path, MyLogicalRepWorker->subid, xid);
@@ -1006,13 +1029,15 @@ apply_handle_stream_commit(StringInfo s)
    int         nchanges;
    char        path[MAXPGPATH];
    char       *buffer = NULL;
-   bool        found;
    LogicalRepCommitData commit_data;
    StreamXidHash *ent;
    MemoryContext oldcxt;
    BufFile    *fd;
 
-   Assert(!in_streamed_transaction);
+   if (in_streamed_transaction)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("STREAM COMMIT message without STREAM STOP")));
 
    xid = logicalrep_read_stream_commit(s, &commit_data);
 
@@ -1031,11 +1056,17 @@ apply_handle_stream_commit(StringInfo s)
    /* open the spool file for the committed transaction */
    changes_filename(path, MyLogicalRepWorker->subid, xid);
    elog(DEBUG1, "replaying changes from file \"%s\"", path);
+
    ent = (StreamXidHash *) hash_search(xidhash,
                                        (void *) &xid,
                                        HASH_FIND,
-                                       &found);
-   Assert(found);
+                                       NULL);
+   if (!ent)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("transaction %u not found in stream XID hash table",
+                                xid)));
+
    fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
 
    buffer = palloc(BLCKSZ);
@@ -1080,7 +1111,9 @@ apply_handle_stream_commit(StringInfo s)
                     errmsg("could not read from streaming transaction's changes file \"%s\": %m",
                            path)));
 
-       Assert(len > 0);
+       if (len <= 0)
+           elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
+                len, path);
 
        /* make sure we have sufficiently large buffer */
        buffer = repalloc(buffer, len);
@@ -1108,7 +1141,7 @@ apply_handle_stream_commit(StringInfo s)
        nchanges++;
 
        if (nchanges % 1000 == 0)
-           elog(DEBUG1, "replayed %d changes from file '%s'",
+           elog(DEBUG1, "replayed %d changes from file \"%s\"",
                 nchanges, path);
    }
 
@@ -2053,7 +2086,8 @@ apply_dispatch(StringInfo s)
 
    ereport(ERROR,
            (errcode(ERRCODE_PROTOCOL_VIOLATION),
-            errmsg("invalid logical replication message type \"%c\"", action)));
+            errmsg_internal("invalid logical replication message type \"%c\"",
+                            action)));
 }
 
 /*
@@ -2589,20 +2623,19 @@ static void
 subxact_info_write(Oid subid, TransactionId xid)
 {
    char        path[MAXPGPATH];
-   bool        found;
    Size        len;
    StreamXidHash *ent;
    BufFile    *fd;
 
    Assert(TransactionIdIsValid(xid));
 
-   /* find the xid entry in the xidhash */
+   /* Find the xid entry in the xidhash */
    ent = (StreamXidHash *) hash_search(xidhash,
                                        (void *) &xid,
                                        HASH_FIND,
-                                       &found);
-   /* we must found the entry for its top transaction by this time */
-   Assert(found);
+                                       NULL);
+   /* By this time we must have created the transaction entry */
+   Assert(ent);
 
    /*
     * If there is no subtransaction then nothing to do, but if already have
@@ -2667,13 +2700,11 @@ static void
 subxact_info_read(Oid subid, TransactionId xid)
 {
    char        path[MAXPGPATH];
-   bool        found;
    Size        len;
    BufFile    *fd;
    StreamXidHash *ent;
    MemoryContext oldctx;
 
-   Assert(TransactionIdIsValid(xid));
    Assert(!subxact_data.subxacts);
    Assert(subxact_data.nsubxacts == 0);
    Assert(subxact_data.nsubxacts_max == 0);
@@ -2682,7 +2713,12 @@ subxact_info_read(Oid subid, TransactionId xid)
    ent = (StreamXidHash *) hash_search(xidhash,
                                        (void *) &xid,
                                        HASH_FIND,
-                                       &found);
+                                       NULL);
+   if (!ent)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("transaction %u not found in stream XID hash table",
+                                xid)));
 
    /*
     * If subxact_fileset is not valid that mean we don't have any subxact
@@ -2836,14 +2872,17 @@ stream_cleanup_files(Oid subid, TransactionId xid)
 {
    char        path[MAXPGPATH];
    StreamXidHash *ent;
-   bool        found = false;
 
-   /* By this time we must have created the transaction entry */
+   /* Find the xid entry in the xidhash */
    ent = (StreamXidHash *) hash_search(xidhash,
                                        (void *) &xid,
                                        HASH_FIND,
-                                       &found);
-   Assert(found);
+                                       NULL);
+   if (!ent)
+       ereport(ERROR,
+               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                errmsg_internal("transaction %u not found in stream XID hash table",
+                                xid)));
 
    /* Delete the change file and release the stream fileset memory */
    changes_filename(path, subid, xid);
@@ -2893,9 +2932,9 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
    /* create or find the xid entry in the xidhash */
    ent = (StreamXidHash *) hash_search(xidhash,
                                        (void *) &xid,
-                                       HASH_ENTER | HASH_FIND,
+                                       HASH_ENTER,
                                        &found);
-   Assert(first_segment || found);
+
    changes_filename(path, subid, xid);
    elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
 
@@ -2915,6 +2954,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
        MemoryContext savectx;
        SharedFileSet *fileset;
 
+       if (found)
+           ereport(ERROR,
+                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                    errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
+
        /*
         * We need to maintain shared fileset across multiple stream
         * start/stop calls. So, need to allocate it in a persistent context.
@@ -2934,6 +2978,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
    }
    else
    {
+       if (!found)
+           ereport(ERROR,
+                   (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                    errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
+
        /*
         * Open the file and seek to the end of the file because we always
         * append the changes file.
@@ -3140,7 +3189,8 @@ ApplyWorkerMain(Datum main_arg)
         */
        if (!myslotname)
            ereport(ERROR,
-                   (errmsg("subscription has no replication slot set")));
+                   (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                    errmsg("subscription has no replication slot set")));
 
        /* Setup replication origin tracking. */
        StartTransactionCommand();