summaryrefslogtreecommitdiff
path: root/client/benchmarks/runner.py
blob: a63c4c3ec06c1d81ac063f98cfae370342e2eb1b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import json
import os

from time import gmtime, strftime

from utils.logging import log
from multiprocessing import Process, Queue


class BenchmarkRunner(object):
    'manages runs of all the benchmarks, including cluster restarts etc.'

    def __init__(self, out_dir, cluster, collector):
        ''

        self._output = out_dir  # where to store output files
        self._benchmarks = {}  # bench name => class implementing the benchmark
        self._configs = {}  # config name => (bench name, config)
        self._cluster = cluster
        self._collector = collector

    def register_benchmark(self, benchmark_name, benchmark_class):
        ''

        # FIXME check if a mapping for the same name already exists
        self._benchmarks.update({benchmark_name: benchmark_class})

    def register_config(self, config_name, benchmark_name, postgres_config,
                        **kwargs):
        ''

        # FIXME check if a mapping for the same name already exists
        # FIXME check that the benchmark mapping already exists
        self._configs.update({config_name: {'benchmark': benchmark_name,
                                            'config': kwargs,
                                            'postgres': postgres_config}})

    def _check_config(self, config_name):
        ''

        log("checking benchmark configuration '%s'" % (config_name,))

        # construct the benchmark class for the given config name
        config = self._configs[config_name]
        bench = self._benchmarks[config['benchmark']]

        # expand the attribute names
        bench = bench(**config['config'])

        # run the tests
        return bench.check_config()

    def check(self):
        'check configurations for all benchmarks'

        issues = {}

        if os.path.exists(self._output):
            issues['global'] = ["output directory '%s' already exists" %
                                (self._output,)]

        for config_name in self._configs:
            t = self._check_config(config_name)
            if t:
                issues[config_name] = t

        return issues

    def _run_config(self, config_name):
        ''

        log("running benchmark configuration '%s'" % (config_name,))

        # construct the benchmark class for the given config name
        config = self._configs[config_name]
        bench = self._benchmarks[config['benchmark']]

        # expand the attribute names
        bench = bench(**config['config'])

        self._cluster.start(config=config['postgres'])

        # start collector(s) of additional info
        self._collector.start()

        # if requested output to CSV, create a queue and collector process
        csv_queue = None
        csv_collector = None
        if 'csv' in config['config'] and config['config']['csv']:
            csv_queue = Queue()
            csv_collector = Process(target=csv_collect_results,
                                    args=(config_name, csv_queue))
            csv_collector.start()

        # run the tests
        r = bench.run_tests(csv_queue)

        # notify the result collector to end and wait for it to terminate
        if csv_queue:
            csv_queue.put("STOP")
            csv_collector.join()

        # stop the cluster and collector
        log("terminating collectors")
        self._collector.stop()
        self._cluster.stop()

        # merge data from the collectors into the JSON document with results
        r.update(self._collector.result())

        # read the postgres log
        with open('pg.log', 'r') as f:
            r['postgres-log'] = f.read()

        r['meta'] = {'benchmark': config['benchmark'],
                     'date': strftime("%Y-%m-%d %H:%M:%S.000000+00", gmtime()),
                     'name': config_name}

        os.remove('pg.log')

        with open('%s/%s.json' % (self._output, config_name), 'w') as f:
            f.write(json.dumps(r, indent=4))

    def run(self):
        'run all the configured benchmarks'

        os.mkdir(self._output)

        for config_name in self._configs:
            self._run_config(config_name)


def csv_collect_results(bench_name, queue):
    'collect results into a CSV files (through a queue)'

    with open("%s.csv" % (bench_name,), 'w') as results_file:

        # collect data from the queue - once we get a plain string (instead of
        # a list), it's a sign to terminate the collector
        while True:

            v = queue.get()

            # if we got a string, it means 'terminate'
            if isinstance(v, str):
                log("terminating CSV result collector")
                return

            v = [str(x) for x in v]

            # otherwise we expect the value to be a list, and we just print it
            results_file.write(bench_name + "\t" + "\t".join(v) + "\n")
            results_file.flush()