Rename the logical replication global "wrconn"
authorAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 12 May 2021 23:13:54 +0000 (19:13 -0400)
committerAlvaro Herrera <alvherre@alvh.no-ip.org>
Wed, 12 May 2021 23:13:54 +0000 (19:13 -0400)
The worker.c global wrconn is only meant to be used by logical apply/
tablesync workers, but there are other variables with the same name. To
reduce future confusion rename the global from "wrconn" to
"LogRepWorkerWalRcvConn".

While this is just cosmetic, it seems better to backpatch it all the way
back to 10 where this code appeared, to avoid future backpatching
issues.

Author: Peter Smith <smithpb2250@gmail.com>
Discussion: https://postgr.es/m/CAHut+Pu7Jv9L2BOEx_Z0UtJxfDevQSAUW2mJqWU+CtmDrEZVAg@mail.gmail.com

src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/include/replication/worker_internal.h

index 85f325c38960f8a00ba09bb636e1ec3f29f12a19..e3b11daa897ff7426158845b5f4b339eeb0952a3 100644 (file)
@@ -643,8 +643,8 @@ static void
 logicalrep_worker_onexit(int code, Datum arg)
 {
        /* Disconnect gracefully from the remote side. */
-       if (wrconn)
-               walrcv_disconnect(wrconn);
+       if (LogRepWorkerWalRcvConn)
+               walrcv_disconnect(LogRepWorkerWalRcvConn);
 
        logicalrep_worker_detach();
 
index 0638f5c7f8768a5b5c75b5c47fe0272e2028510f..67f907cdd968fc9f3ceea30af786b5682efb479b 100644 (file)
@@ -302,8 +302,11 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                                                                   MyLogicalRepWorker->relstate,
                                                                   MyLogicalRepWorker->relstate_lsn);
 
-               /* End wal streaming so wrconn can be re-used to drop the slot. */
-               walrcv_endstreaming(wrconn, &tli);
+               /*
+                * End streaming so that LogRepWorkerWalRcvConn can be used to drop
+                * the slot.
+                */
+               walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 
                /*
                 * Cleanup the tablesync slot.
@@ -322,7 +325,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                 * otherwise, it won't be dropped till the corresponding subscription
                 * is dropped. So passing missing_ok = false.
                 */
-               ReplicationSlotDropAtPubNode(wrconn, syncslotname, false);
+               ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
 
                finish_sync_worker();
        }
@@ -642,7 +645,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
                for (;;)
                {
                        /* Try read the data. */
-                       len = walrcv_receive(wrconn, &buf, &fd);
+                       len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
                        CHECK_FOR_INTERRUPTS();
 
@@ -715,7 +718,8 @@ fetch_remote_table_info(char *nspname, char *relname,
                                         "   AND c.relname = %s",
                                         quote_literal_cstr(nspname),
                                         quote_literal_cstr(relname));
-       res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
+       res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+                                         lengthof(tableRow), tableRow);
 
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
@@ -752,9 +756,11 @@ fetch_remote_table_info(char *nspname, char *relname,
                                         "   AND a.attrelid = %u"
                                         " ORDER BY a.attnum",
                                         lrel->remoteid,
-                                        (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
+                                        (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+                                                "AND a.attgenerated = ''" : ""),
                                         lrel->remoteid);
-       res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
+       res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
+                                         lengthof(attrRow), attrRow);
 
        if (res->status != WALRCV_OK_TUPLES)
                ereport(ERROR,
@@ -841,7 +847,7 @@ copy_table(Relation rel)
                appendStringInfo(&cmd, " FROM %s) TO STDOUT",
                                                 quote_qualified_identifier(lrel.nspname, lrel.relname));
        }
-       res = walrcv_exec(wrconn, cmd.data, 0, NULL);
+       res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
        pfree(cmd.data);
        if (res->status != WALRCV_OK_COPY_OUT)
                ereport(ERROR,
@@ -957,8 +963,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
         * application_name, so that it is different from the main apply worker,
         * so that synchronous replication can distinguish them.
         */
-       wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
-       if (wrconn == NULL)
+       LogRepWorkerWalRcvConn =
+               walrcv_connect(MySubscription->conninfo, true, slotname, &err);
+       if (LogRepWorkerWalRcvConn == NULL)
                ereport(ERROR,
                                (errmsg("could not connect to the publisher: %s", err)));
 
@@ -985,7 +992,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
                 * breakdown then it wouldn't have succeeded so trying it next time
                 * seems like a better bet.
                 */
-               ReplicationSlotDropAtPubNode(wrconn, slotname, true);
+               ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, slotname, true);
        }
        else if (MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY)
        {
@@ -1038,7 +1045,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
         * ensures that both the replication slot we create (see below) and the
         * COPY are consistent with each other.
         */
-       res = walrcv_exec(wrconn,
+       res = walrcv_exec(LogRepWorkerWalRcvConn,
                                          "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
                                          0, NULL);
        if (res->status != WALRCV_OK_COMMAND)
@@ -1058,7 +1065,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
         * slot leading to a dangling slot on the server.
         */
        HOLD_INTERRUPTS();
-       walrcv_create_slot(wrconn, slotname, false /* permanent */ ,
+       walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ ,
                                           CRS_USE_SNAPSHOT, origin_startpos);
        RESUME_INTERRUPTS();
 
@@ -1100,7 +1107,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
        copy_table(rel);
        PopActiveSnapshot();
 
-       res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
+       res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
        if (res->status != WALRCV_OK_COMMAND)
                ereport(ERROR,
                                (errmsg("table copy could not finish transaction on publisher: %s",
index d9f157172b2347120e4df3a9f12ab5dcd864ca9b..1432554d5a718a6b0f520026b2ef04e9f1eb4cf7 100644 (file)
@@ -156,7 +156,7 @@ MemoryContext ApplyContext = NULL;
 /* per stream context for streaming transactions */
 static MemoryContext LogicalStreamingContext = NULL;
 
-WalReceiverConn *wrconn = NULL;
+WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
 
 Subscription *MySubscription = NULL;
 bool           MySubscriptionValid = false;
@@ -2126,7 +2126,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
 
                MemoryContextSwitchTo(ApplyMessageContext);
 
-               len = walrcv_receive(wrconn, &buf, &fd);
+               len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
 
                if (len != 0)
                {
@@ -2206,7 +2206,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                                        MemoryContextReset(ApplyMessageContext);
                                }
 
-                               len = walrcv_receive(wrconn, &buf, &fd);
+                               len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
                        }
                }
 
@@ -2312,7 +2312,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        }
 
        /* All done */
-       walrcv_endstreaming(wrconn, &tli);
+       walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
 }
 
 /*
@@ -2396,7 +2396,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
                 LSN_FORMAT_ARGS(writepos),
                 LSN_FORMAT_ARGS(flushpos));
 
-       walrcv_send(wrconn, reply_message->data, reply_message->len);
+       walrcv_send(LogRepWorkerWalRcvConn,
+                               reply_message->data, reply_message->len);
 
        if (recvpos > last_recvpos)
                last_recvpos = recvpos;
@@ -3090,9 +3091,9 @@ ApplyWorkerMain(Datum main_arg)
                origin_startpos = replorigin_session_get_progress(false);
                CommitTransactionCommand();
 
-               wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
-                                                               &err);
-               if (wrconn == NULL)
+               LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
+                                                                                               MySubscription->name, &err);
+               if (LogRepWorkerWalRcvConn == NULL)
                        ereport(ERROR,
                                        (errmsg("could not connect to the publisher: %s", err)));
 
@@ -3100,7 +3101,7 @@ ApplyWorkerMain(Datum main_arg)
                 * We don't really use the output identify_system for anything but it
                 * does some initializations on the upstream so let's still call it.
                 */
-               (void) walrcv_identify_system(wrconn, &startpointTLI);
+               (void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
        }
 
        /*
@@ -3116,14 +3117,14 @@ ApplyWorkerMain(Datum main_arg)
        options.startpoint = origin_startpos;
        options.slotname = myslotname;
        options.proto.logical.proto_version =
-               walrcv_server_version(wrconn) >= 140000 ?
+               walrcv_server_version(LogRepWorkerWalRcvConn) >= 140000 ?
                LOGICALREP_PROTO_STREAM_VERSION_NUM : LOGICALREP_PROTO_VERSION_NUM;
        options.proto.logical.publication_names = MySubscription->publications;
        options.proto.logical.binary = MySubscription->binary;
        options.proto.logical.streaming = MySubscription->stream;
 
        /* Start normal logical streaming replication. */
-       walrcv_startstreaming(wrconn, &options);
+       walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
 
        /* Run the main loop. */
        LogicalRepApplyLoop(origin_startpos);
index 1cac75e5a9b498713e77f2f7d42e676778d45e8e..179eb43900d512f38789a9b98208dee6bd8d62f6 100644 (file)
@@ -62,7 +62,7 @@ typedef struct LogicalRepWorker
 extern MemoryContext ApplyContext;
 
 /* libpqreceiver connection */
-extern struct WalReceiverConn *wrconn;
+extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
 
 /* Worker and subscription objects. */
 extern Subscription *MySubscription;