'latency': latency,
'tps': tps}
- @staticmethod
- def _merge_logs():
- 'merge log files produced by pgbench threads (aggregated per second)'
-
- r = {}
-
- # find pgbench transaction logs in current directory
- logs = [v for v in os.listdir(os.getcwd())
- if re.match('pgbench_log.[0-9]+(\.[0-9]+)?', v)]
-
- # parse each transaction log, and merge it into the existing results
- for l in logs:
- worker_log = open(l, 'r')
- for row in worker_log:
- values = row.split(' ')
-
- timestamp = values[0]
- tps = int(values[1])
- lat_sum = long(values[2])
- lat_sum2 = long(values[3])
- lat_min = int(values[4])
- lat_max = int(values[5])
-
- # if first record for the timestamp, store it, otherwise merge
- if timestamp not in r:
- r[timestamp] = {'tps': tps,
- 'lat_sum': lat_sum, 'lat_sum2': lat_sum2,
- 'lat_min': lat_min, 'lat_max': lat_max}
- else:
- r[timestamp]['tps'] += int(tps)
- r[timestamp]['lat_sum'] += long(lat_sum)
- r[timestamp]['lat_sum2'] += long(lat_sum2)
- r[timestamp]['lat_min'] = min(r[timestamp]['lat_min'],
- int(lat_min))
- r[timestamp]['lat_max'] = max(r[timestamp]['lat_max'],
- int(lat_max))
-
- os.remove(l)
-
- # now produce a simple text log sorted by the timestamp
- o = []
- for t in sorted(r.keys()):
- o.append('%s %d %d %d %d %d' % (t, r[t]['tps'], r[t]['lat_sum'],
- r[t]['lat_sum2'], r[t]['lat_min'],
- r[t]['lat_max']))
-
- return '\n'.join(o)
-
def check_config(self):
'check pgbench configuration (existence of binaries etc.)'
r = PgBench._parse_results(r[1])
r.update({'read-only': read_only})
- if aggregate:
- r.update({'transaction-log': PgBench._merge_logs()})
-
r.update({'start': start, 'end': end})
if csv_queue is not None:
# merge data from the collectors into the JSON document with results
r.update(self._collector.result())
- # read the postgres log
- with open('pg.log', 'r') as f:
- r['postgres-log'] = f.read()
-
r['meta'] = {'benchmark': config['benchmark'],
'date': strftime("%Y-%m-%d %H:%M:%S.000000+00", gmtime()),
'name': config_name}
- os.remove('pg.log')
-
with open('%s/%s.json' % (self._output, config_name), 'w') as f:
f.write(json.dumps(r, indent=4))
class LinuxCollector(object):
'collects various Linux-specific statistics (cpuinfo, mounts, sar)'
- def __init__(self, sar_path='/var/log/sa'):
+ def __init__(self, outdir, sar_path='/var/log/sa'):
+ self._outdir = outdir
+ self._sar = sar_path
+
self._start_ts = None
self._end_ts = None
- self._sar = sar_path
def start(self):
self._start_ts = datetime.now()
r = {'sysctl': self._collect_sysctl()}
# ignore sar if we've not found it
- sar = self._collect_sar_stats()
- if sar:
- r['sar'] = sar
+ self._collect_sar_stats()
r.update(self._collect_system_info())
def _collect_sar_stats(self):
'extracts all data available in sar, filters by timestamp range'
- sar = {}
log("collecting sar stats")
d = self._start_ts.date()
else:
r = run_cmd(['sar', '-A', '-p', '-f', filename])
- sar[str(d)] = r[1]
-
+ with open(''.join([self._outdir, '/sar-', d.strftime('%d'),
+ '.txt']),
+ 'w') as f:
+ f.write(r[1])
+ f.close()
else:
log("file '%s' does not exist, skipping" % (filename,))
# proceed to the next day
d += timedelta(days=1)
- if not sar:
- return None
-
- return sar
-
def _collect_sysctl(self):
'collect kernel configuration'
from multiprocessing import Process, Queue
from utils.logging import log
+from utils.misc import run_cmd
class PostgresCollector(object):
indexes)
"""
- def __init__(self, dbname):
+ def __init__(self, outdir, dbname, bin_path):
+ self._outdir = outdir
self._dbname = dbname
+ self._bin_path = bin_path
def start(self):
self._in_queue = Queue()
self._out_queue = Queue()
self._worker = Process(target=run_collector,
args=(self._in_queue, self._out_queue,
- self._dbname))
+ self._dbname, self._bin_path,
+ self._outdir))
self._worker.start()
def stop(self):
return self._result
-def run_collector(in_queue, out_queue, dbname, interval=1.0):
+def run_collector(in_queue, out_queue, dbname, bin_path, outdir, interval=1.0):
"""
collector code for a separate process, communicating through a pair of
queues
# on the first iteration, construct the CSV files
if bgwriter_log is None:
fields = [desc[0] for desc in cur.description]
- bgwriter_log = csv.DictWriter(open('bgwriter.csv', 'w'), fields)
+ filename = ''.join([outdir, '/bgwriter.csv'])
+ bgwriter_log = csv.DictWriter(open(filename, 'w'), fields)
bgwriter_log.writeheader()
bgwriter_log.writerows(cur.fetchall())
# on the first iteration, construct the CSV files
if tables_log is None:
fields = [desc[0] for desc in cur.description]
- tables_log = csv.DictWriter(open('tables.csv', 'w'), fields)
+ filename = ''.join([outdir, '/tables.csv'])
+ tables_log = csv.DictWriter(open(filename, 'w'), fields)
tables_log.writeheader()
tables_log.writerows(cur.fetchall())
# on the first iteration, construct the CSV files
if indexes_log is None:
fields = [desc[0] for desc in cur.description]
- indexes_log = csv.DictWriter(open('indexes.csv', 'w'), fields)
+ filename = ''.join([outdir, '/indexes.csv'])
+ indexes_log = csv.DictWriter(open(filename, 'w'), fields)
indexes_log.writeheader()
indexes_log.writerows(cur.fetchall())
# on the first iteration, construct the CSV files
if database_log is None:
fields = [desc[0] for desc in cur.description]
- database_log = csv.DictWriter(open('database.csv', 'w'), fields)
+ filename = ''.join([outdir, '/database.csv'])
+ database_log = csv.DictWriter(open(filename, 'w'), fields)
database_log.writeheader()
database_log.writerows(cur.fetchall())
conn.close()
- log("PostgreSQL collector generates CSV results")
+ try:
+ conn = psycopg2.connect('host=localhost dbname=%s' % (dbname,))
+ cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
+ cur.execute('SELECT name, setting, source '
+ 'FROM pg_settings ORDER BY lower(name)')
+ fields = [desc[0] for desc in cur.description]
+ filename = ''.join([outdir, '/settings.csv'])
+ settings_log = csv.DictWriter(open(filename, 'w'), fields)
+ settings_log.writeheader()
+ settings_log.writerows(cur.fetchall())
+ conn.close()
+ except Exception as ex:
+ pass
# close the CSV writers
bgwriter_log = None
result = {}
- for file in ['bgwriter', 'tables', 'indexes', 'database']:
- if os.path.isfile(''.join([file, '.csv'])):
- with open(''.join([file, '.csv']), 'r') as f:
- result.update({file : f.read()})
-
- # remove the files
- os.remove(''.join([file, '.csv']))
+ r = run_cmd([bin_path + '/pg_config'])
+ result['config'] = r[1]
out_queue.put(result)
# build and start a postgres cluster
- cluster = PgCluster(bin_path=BIN_PATH, data_path=DATADIR_PATH)
+ cluster = PgCluster(OUTPUT_DIR, bin_path=BIN_PATH,
+ data_path=DATADIR_PATH)
# create collectors
collectors = MultiCollector()
- collectors.register('system', LinuxCollector())
- collectors.register('postgres',
- PostgresCollector(dbname=DATABASE_NAME))
+ collectors.register('system', LinuxCollector(OUTPUT_DIR))
+ pg_collector = PostgresCollector(OUTPUT_DIR, dbname=DATABASE_NAME,
+ bin_path=('%s/bin' % (BUILD_PATH)))
+ collectors.register('postgres', pg_collector)
runner = BenchmarkRunner(OUTPUT_DIR, cluster, collectors)
class PgCluster(object):
'basic manipulation of postgres cluster (init, start, stop, destroy)'
- def __init__(self, bin_path, data_path):
+ def __init__(self, outdir, bin_path, data_path):
+ self._outdir = outdir
self._bin = bin_path
self._data = data_path
with TemporaryFile() as strout:
log("starting cluster in '%s' using '%s' binaries" %
(self._data, self._bin))
- cmd = ['pg_ctl', '-D', self._data, '-l', 'pg.log', '-w']
+ cmd = ['pg_ctl', '-D', self._data, '-l',
+ ''.join([self._outdir, '/pg.log']), '-w']
if len(self._options) > 0:
cmd.extend(['-o', self._options])
cmd.append('start')