summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAmit Kapila2023-08-25 03:27:55 +0000
committerAmit Kapila2023-08-25 03:27:55 +0000
commit9c13b6814ac7943036c64b377675184b243f04e8 (patch)
tree84e324e26491c21c36fd4e9ddc3e5bcf66d24ec2 /src
parent252dcb32397f64a5e1ceac05b29a271ab19aa960 (diff)
Reset the logical worker type while cleaning up other worker info.
Commit 2a8b40e36 introduces the worker type field for logical replication workers, but forgot to reset the type when the worker exits. This can lead to recognizing a stopped worker as a valid logical replication worker. Fix it by resetting the worker type and additionally adding the safeguard to not use LogicalRepWorker until ->in_use is verified. Reported-by: Thomas Munro based on cfbot reports. Author: Hou Zhijie, Alvaro Herrera Reviewed-by: Amit Kapila Discussion: http://postgr.es/m/CA+hUKGK2RQh4LifVgBmkHsCYChP-65UwGXOmnCzYVa5aAt4GWg@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/launcher.c5
-rw-r--r--src/include/replication/worker_internal.h8
2 files changed, 9 insertions, 4 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 72e44d5a02d..7882fc91ce6 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -793,6 +793,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker)
{
Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE));
+ worker->type = WORKERTYPE_UNKNOWN;
worker->in_use = false;
worker->proc = NULL;
worker->dbid = InvalidOid;
@@ -862,7 +863,7 @@ logicalrep_sync_worker_count(Oid subid)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && isTablesyncWorker(w))
+ if (isTablesyncWorker(w) && w->subid == subid)
res++;
}
@@ -889,7 +890,7 @@ logicalrep_pa_worker_count(Oid subid)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && isParallelApplyWorker(w))
+ if (isParallelApplyWorker(w) && w->subid == subid)
res++;
}
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index a428663859b..8f4bed09585 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -327,8 +327,10 @@ extern void pa_decr_and_wait_stream_block(void);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
-#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
+#define isParallelApplyWorker(worker) ((worker)->in_use && \
+ (worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isTablesyncWorker(worker) ((worker)->in_use && \
+ (worker)->type == WORKERTYPE_TABLESYNC)
static inline bool
am_tablesync_worker(void)
@@ -339,12 +341,14 @@ am_tablesync_worker(void)
static inline bool
am_leader_apply_worker(void)
{
+ Assert(MyLogicalRepWorker->in_use);
return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
}
static inline bool
am_parallel_apply_worker(void)
{
+ Assert(MyLogicalRepWorker->in_use);
return isParallelApplyWorker(MyLogicalRepWorker);
}