summaryrefslogtreecommitdiff
path: root/client
diff options
context:
space:
mode:
authorTomas Vondra2016-10-13 11:41:43 +0000
committerTomas Vondra2017-02-27 00:32:08 +0000
commitd8fe8b8ca939089d3fbe31b8904aa532bac294a5 (patch)
treece23157a377aa1c1eb3b6b41f743b48a9db5e2ed /client
parent128ad6eb0c757c34b9332cd41e79efe6b73e887a (diff)
fix lockups in PostgreSQL collector
When the workers produce results and return them through a queue, we need to first drain the queue and then wait for the thread to join(). Otherwise it seems to lock up - the join() call with wait as long as there are items in the queue. This also happens because we do both things (reading from queue and waiting for the thread) in the same thread.
Diffstat (limited to 'client')
-rw-r--r--client/collectors/postgres.py25
1 files changed, 18 insertions, 7 deletions
diff --git a/client/collectors/postgres.py b/client/collectors/postgres.py
index 306c2b7..7096c27 100644
--- a/client/collectors/postgres.py
+++ b/client/collectors/postgres.py
@@ -3,11 +3,10 @@ import multiprocessing
import os
import psycopg2
import psycopg2.extras
-import Queue
import time
from multiprocessing import Process, Queue
-
+from utils.logging import log
class PostgresCollector(object):
'collects basic PostgreSQL-level statistics (bgwriter, databases, tables, indexes)'
@@ -17,22 +16,29 @@ class PostgresCollector(object):
def start(self):
- self._in_queue = multiprocessing.Queue()
- self._out_queue = multiprocessing.Queue()
+ self._in_queue = Queue()
+ self._out_queue = Queue()
self._worker = Process(target=run_collector, args=(self._in_queue, self._out_queue, self._dbname))
self._worker.start()
def stop(self):
+
# signal the worker process to stop by writing a value into the queue
self._in_queue.put(True)
- # FIXME this gets stuck for some reason (but we'll wait for queue anyway)
- # self._worker.join()
+ log("stopping the PostgreSQL statistics collector")
- # and then read the result
+ # Wait for collector to place result into the output queue. This needs
+ # to happen before calling join() otherwise it causes a deadlock.
+ log("waiting for collector result in a queue")
self._result = self._out_queue.get()
+ # And wait for the worker to terminate. This should be pretty fast as
+ # the collector places result into the queue right before terminating.
+ log("waiting for collector process to terminate")
+ self._worker.join()
+
self._worker = None
self._in_queue = None
self._out_queue = None
@@ -67,6 +73,7 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0):
# if we've received message in the input queue (not empty), terminate
if not in_queue.empty():
+ log("PostgreSQL collector received request to terminate")
break
# open connection to the benchmark database (if can't open, continue)
@@ -128,6 +135,8 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0):
conn.close()
+ log("PostgreSQL collector generates CSV results")
+
# close the CSV writers
bgwriter_log = None
tables_log = None
@@ -155,3 +164,5 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0):
os.remove('database.csv')
out_queue.put(result)
+
+ log("PostgreSQL collector put results into output queue and terminates")