import os
from utils.logging import log
-
+from multiprocessing import Process, Queue
class BenchmarkRunner(object):
'manages runs of all the benchmarks, including cluster restarts etc.'
# start collector(s) of additional info
self._collector.start()
+ # if requested output to CSV, create a queue and collector process
+ csv_queue = None
+ csv_collector = None
+ if config['benchmark']['csv']:
+ csv_queue = Queue()
+ csv_collector = Process(target=csv_collect_results, args=(config_name, csv_queue))
+ csv_collector.start()
+
# run the tests
- r = bench.run_tests()
+ r = bench.run_tests(csv_queue)
+
+ # notify the result collector to end and wait for it to terminate
+ if csv_queue:
+ csv_queue.put("STOP")
+ csv_collector.join()
# stop the cluster and collector
+ log("terminating collectors")
self._collector.stop()
self._cluster.stop()
for config_name in self._configs:
self._run_config(config_name)
+
+
+def csv_collect_results(bench_name, queue):
+ 'collect results into a CSV files (through a queue)'
+
+ with open("%s.csv" % (bench_name,), 'w') as results_file:
+
+ # collect data from the queue - once we get a plain string (instead of
+ # a list), it's a sign to terminate the collector
+ while True:
+
+ v = queue.get()
+
+ # if we got a string, it means 'terminate'
+ if isinstance(v, str):
+ log("terminating CSV result collector")
+ return
+
+ v = [str(x) for x in v]
+
+ # otherwise we expect the value to be a list, and we just print it
+ results_file.write(bench_name + "\t" + "\t".join(v) + "\n")
+ results_file.flush()