Rename the logical replication global "wrconn"
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 12 May 2021 23:13:54 +0000 (19:13 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 12 May 2021 23:13:54 +0000 (19:13 -0400)
The worker.c global wrconn is only meant to be used by logical apply/
tablesync workers, but there are other variables with the same name. To
reduce future confusion rename the global from "wrconn" to
"LogRepWorkerWalRcvConn".

While this is just cosmetic, it seems better to backpatch it all the way
back to 10 where this code appeared, to avoid future backpatching
issues.

Author: Peter Smith <smithpb2250@gmail.com>
Discussion: https://postgr.es/m/CAHut+Pu7Jv9L2BOEx_Z0UtJxfDevQSAUW2mJqWU+CtmDrEZVAg@mail.gmail.com

src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/include/replication/worker_internal.h

index 186057bd9327383b7dfdc2d2dda878290caa7967..d8ff8664fb68863c3464f0d092a81c94446e1dce 100644 (file)
@@ -716,8 +716,8 @@ static void
 logicalrep_worker_onexit(int code, Datum arg)
 {
    /* Disconnect gracefully from the remote side. */
-   if (wrconn)
-       walrcv_disconnect(wrconn);
+   if (LogRepWorkerWalRcvConn)
+       walrcv_disconnect(LogRepWorkerWalRcvConn);
 
    logicalrep_worker_detach();
 
index 7881079e96b2d49c9c7f2d5e1bd6520d59814886..2db88dc41a0cb2caff31717b568e496b496e3700 100644 (file)
@@ -295,7 +295,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                                   MyLogicalRepWorker->relstate,
                                   MyLogicalRepWorker->relstate_lsn);
 
-       walrcv_endstreaming(wrconn, &tli);
+       walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
        finish_sync_worker();
    }
    else
@@ -591,7 +591,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
        for (;;)
        {
            /* Try read the data. */
-           len = walrcv_receive(wrconn, &buf, &fd);
+           len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
            CHECK_FOR_INTERRUPTS();
 
@@ -665,7 +665,8 @@ fetch_remote_table_info(char *nspname, char *relname,
                     "   AND c.relkind = 'r'",
                     quote_literal_cstr(nspname),
                     quote_literal_cstr(relname));
-   res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+   res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+                     lengthof(tableRow), tableRow);
 
    if (res->status != WALRCV_OK_TUPLES)
        ereport(ERROR,
@@ -701,9 +702,11 @@ fetch_remote_table_info(char *nspname, char *relname,
                     "   AND a.attrelid = %u"
                     " ORDER BY a.attnum",
                     lrel->remoteid,
-                    (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+                    (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+                     "AND a.attgenerated = ''" : ""),
                     lrel->remoteid);
-   res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
+   res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+                     lengthof(attrRow), attrRow);
 
    if (res->status != WALRCV_OK_TUPLES)
        ereport(ERROR,
@@ -773,7 +776,7 @@ copy_table(Relation rel)
    initStringInfo(&cmd);
    appendStringInfo(&cmd, "COPY %s TO STDOUT",
                     quote_qualified_identifier(lrel.nspname, lrel.relname));
-   res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+   res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
    pfree(cmd.data);
    if (res->status != WALRCV_OK_COPY_OUT)
        ereport(ERROR,
@@ -840,8 +843,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
     * application_name, so that it is different from the main apply worker,
     * so that synchronous replication can distinguish them.
     */
-   wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
-   if (wrconn == NULL)
+   LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+                                           slotname, &err);
+   if (LogRepWorkerWalRcvConn == NULL)
        ereport(ERROR,
                (errmsg("could not connect to the publisher: %s", err)));
 
@@ -886,7 +890,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                 * inside the transaction so that we can use the snapshot made
                 * by the slot to get existing data.
                 */
-               res = walrcv_exec(wrconn,
+               res = walrcv_exec(LogRepWorkerWalRcvConn,
                                  "BEGIN READ ONLY ISOLATION LEVEL "
                                  "REPEATABLE READ", 0, NULL);
                if (res->status != WALRCV_OK_COMMAND)
@@ -903,14 +907,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                 * that is consistent with the lsn used by the slot to start
                 * decoding.
                 */
-               walrcv_create_slot(wrconn, slotname, true,
+               walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
                                   CRS_USE_SNAPSHOT, origin_startpos);
 
                PushActiveSnapshot(GetTransactionSnapshot());
                copy_table(rel);
                PopActiveSnapshot();
 
-               res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+               res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
                if (res->status != WALRCV_OK_COMMAND)
                    ereport(ERROR,
                            (errmsg("table copy could not finish transaction on publisher"),
index ce8f4106aa197776df6e41dac0981601b4ff8c5d..9f0d13cbfa916ba62b56063c9b16fc08bdb26f09 100644 (file)
@@ -96,7 +96,7 @@ typedef struct SlotErrCallbackArg
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 
 Subscription *MySubscription = NULL;
 bool       MySubscriptionValid = false;
@@ -1158,7 +1158,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
        MemoryContextSwitchTo(ApplyMessageContext);
 
-       len = walrcv_receive(wrconn, &buf, &fd);
+       len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
        if (len != 0)
        {
@@ -1238,7 +1238,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                    MemoryContextReset(ApplyMessageContext);
                }
 
-               len = walrcv_receive(wrconn, &buf, &fd);
+               len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
            }
        }
 
@@ -1268,7 +1268,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        {
            TimeLineID  tli;
 
-           walrcv_endstreaming(wrconn, &tli);
+           walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
            break;
        }
 
@@ -1431,7 +1431,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
         (uint32) (flushpos >> 32), (uint32) flushpos
        );
 
-   walrcv_send(wrconn, reply_message->data, reply_message->len);
+   walrcv_send(LogRepWorkerWalRcvConn,
+               reply_message->data, reply_message->len);
 
    if (recvpos > last_recvpos)
        last_recvpos = recvpos;
@@ -1743,9 +1744,9 @@ ApplyWorkerMain(Datum main_arg)
        origin_startpos = replorigin_session_get_progress(false);
        CommitTransactionCommand();
 
-       wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
-                               &err);
-       if (wrconn == NULL)
+       LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+                                               MySubscription->name, &err);
+       if (LogRepWorkerWalRcvConn == NULL)
            ereport(ERROR,
                    (errmsg("could not connect to the publisher: %s", err)));
 
@@ -1753,8 +1754,7 @@ ApplyWorkerMain(Datum main_arg)
         * We don't really use the output identify_system for anything but it
         * does some initializations on the upstream so let's still call it.
         */
-       (void) walrcv_identify_system(wrconn, &startpointTLI);
-
+       (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
    }
 
    /*
@@ -1773,7 +1773,7 @@ ApplyWorkerMain(Datum main_arg)
    options.proto.logical.publication_names = MySubscription->publications;
 
    /* Start normal logical streaming replication. */
-   walrcv_startstreaming(wrconn, &options);
+   walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
 
    /* Run the main loop. */
    LogicalRepApplyLoop(origin_startpos);
index 05f4936419a11673292f3b8d4a600323240043b5..8d605445d88bce972fc6b7e2d456932f4d35dc39 100644 (file)
@@ -60,7 +60,7 @@ typedef struct LogicalRepWorker
 extern MemoryContext ApplyContext;
 
 /* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
 
 /* Worker and subscription objects. */
 extern Subscription *MySubscription;