diff options
| author | Tomas Vondra | 2016-08-10 21:23:55 +0000 |
|---|---|---|
| committer | Tomas Vondra | 2017-02-27 00:21:09 +0000 |
| commit | 72e6220f64a89cd215660311a5680f07f543b150 (patch) | |
| tree | c76f2df22da3819a28cad200f4b2a45642dfacdf /client/collectors | |
| parent | cbac00d3965ad4f27f1e812668b5732c1c50b1dd (diff) | |
Import initial version of the client
Diffstat (limited to 'client/collectors')
| -rw-r--r-- | client/collectors/__init__.py | 0 | ||||
| -rw-r--r-- | client/collectors/collector.py | 28 | ||||
| -rw-r--r-- | client/collectors/linux.py | 93 | ||||
| -rw-r--r-- | client/collectors/postgres.py | 157 |
4 files changed, 278 insertions, 0 deletions
diff --git a/client/collectors/__init__.py b/client/collectors/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/client/collectors/__init__.py diff --git a/client/collectors/collector.py b/client/collectors/collector.py new file mode 100644 index 0000000..d73ef5b --- /dev/null +++ b/client/collectors/collector.py @@ -0,0 +1,28 @@ + +class MultiCollector(object): + 'a collector combining multiple other collectors' + + def __init__(self): + self._collectors = {} + + + def register(self, name, collector): + self._collectors[name] = collector + + + def start(self): + for name in self._collectors: + self._collectors[name].start() + + + def stop(self): + for name in self._collectors: + self._collectors[name].stop() + + + def result(self): + r = {} + for name in self._collectors: + r.update({name : self._collectors[name].result()}) + + return r diff --git a/client/collectors/linux.py b/client/collectors/linux.py new file mode 100644 index 0000000..9112a10 --- /dev/null +++ b/client/collectors/linux.py @@ -0,0 +1,93 @@ +from datetime import datetime, timedelta, time +from utils.logging import log +from utils.misc import run_cmd + + +class LinuxCollector(object): + 'collects various Linux-specific statistics (cpuinfo, mounts, sar)' + + def __init__(self, sar_path = '/var/log/sa'): + self._start_ts = None + self._end_ts = None + self._sar = sar_path + + + def start(self): + self._start_ts = datetime.now() + + + def stop(self): + self._end_ts = datetime.now() + + + def result(self): + 'build the results' + + r = { + 'sar' : self._collect_sar_stats(), + 'sysctl' : self._collect_sysctl() + } + r.update(self._collect_system_info()) + + return r + + + def _collect_sar_stats(self): + 'extracts all data available in sar, filters by timestamp range' + + sar = {} + log("collecting sar stats") + + d = self._start_ts.date() + while d <= self._end_ts.date(): + + # FIXME maybe skip if the file does not exist + filename = '%(path)s/sa%(day)s' % {'path' : self._sar, 'day' : d.strftime('%d')} + + log("extracting sar data from '%s'" % (filename,)) + + # need to use the right combination of start/end timestamps + s = self._start_ts.strftime('%H:%M:%S') + e = self._end_ts.strftime('%H:%M:%S') + + if d == self._start_ts.date() and d == self._end_ts.date(): + r = run_cmd(['sar', '-A', '-p', '-s', s, '-e', e, '-f', filename]) + elif d == self._start_ts.date(): + r = run_cmd(['sar', '-A', '-p', '-s', s, '-f', filename]) + elif d == self._end_ts.date(): + r = run_cmd(['sar', '-A', '-p', '-e', e, '-f', filename]) + else: + r = run_cmd(['sar', '-A', '-p', '-f', filename]) + + sar[str(d)] = r[1] + + # proceed to the next day + d += timedelta(days=1) + + return sar + + + def _collect_sysctl(self): + 'collect kernel configuration' + + log("collecting sysctl") + r = run_cmd(['/usr/sbin/sysctl', '-a']) + + return r[1] + + + def _collect_system_info(self): + 'collect cpuinfo, meminfo, mounts' + + system = {} + + with open('/proc/cpuinfo', 'r') as f: + system['cpuinfo'] = f.read() + + with open('/proc/meminfo', 'r') as f: + system['meminfo'] = f.read() + + with open('/proc/mounts', 'r') as f: + system['mounts'] = f.read() + + return system diff --git a/client/collectors/postgres.py b/client/collectors/postgres.py new file mode 100644 index 0000000..306c2b7 --- /dev/null +++ b/client/collectors/postgres.py @@ -0,0 +1,157 @@ +import csv +import multiprocessing +import os +import psycopg2 +import psycopg2.extras +import Queue +import time + +from multiprocessing import Process, Queue + + +class PostgresCollector(object): + 'collects basic PostgreSQL-level statistics (bgwriter, databases, tables, indexes)' + + def __init__(self, dbname): + self._dbname = dbname + + + def start(self): + self._in_queue = multiprocessing.Queue() + self._out_queue = multiprocessing.Queue() + self._worker = Process(target=run_collector, args=(self._in_queue, self._out_queue, self._dbname)) + self._worker.start() + + + def stop(self): + # signal the worker process to stop by writing a value into the queue + self._in_queue.put(True) + + # FIXME this gets stuck for some reason (but we'll wait for queue anyway) + # self._worker.join() + + # and then read the result + self._result = self._out_queue.get() + + self._worker = None + self._in_queue = None + self._out_queue = None + + + def result(self): + return self._result + + +def run_collector(in_queue, out_queue, dbname, interval=1.0): + 'collector code for a separate process, communicating through a pair of queues' + + bgwriter_log = None + tables_log = None + indexes_log = None + database_log = None + + # get current timestamp + ts = time.time() + + while True: + + # wait until the next tick + ts += interval + + # if we're behind, skip forward + if ts < time.time(): + continue + + # sleep (but only for the remaining time, to prevent drift) + time.sleep(ts - time.time()) + + # if we've received message in the input queue (not empty), terminate + if not in_queue.empty(): + break + + # open connection to the benchmark database (if can't open, continue) + # notice this is intentionally after the wait, so we'll wait before + # next connection attempt + try: + conn = psycopg2.connect('host=localhost dbname=%s' % (dbname,)) + cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) + except Exception as ex: + continue + + # background writer stats + cur.execute('SELECT EXTRACT(EPOCH FROM now()) AS ts, * FROM pg_stat_bgwriter') + + # on the first iteration, construct the CSV files + if not bgwriter_log: + fields = [desc[0] for desc in cur.description] + bgwriter_log = csv.DictWriter(open('bgwriter.csv', 'w'), fields) + bgwriter_log.writeheader() + + bgwriter_log.writerows(cur.fetchall()) + + # TODO we can assume statistics for most objects (tables, indexes) won't + # change every second, so we can optimize the amount of data by detecting + # changes and only keeping the two rows next to it + + # table statistics + cur.execute('SELECT EXTRACT(EPOCH FROM now()) AS ts, * FROM pg_stat_all_tables JOIN pg_statio_all_tables USING (relid, schemaname, relname)') + + # on the first iteration, construct the CSV files + if not tables_log: + fields = [desc[0] for desc in cur.description] + tables_log = csv.DictWriter(open('tables.csv', 'w'), fields) + tables_log.writeheader() + + tables_log.writerows(cur.fetchall()) + + # index statistics + cur.execute('SELECT EXTRACT(EPOCH FROM now()) AS ts, * FROM pg_stat_all_indexes JOIN pg_statio_all_indexes USING (relid, indexrelid, schemaname, relname, indexrelname)') + + # on the first iteration, construct the CSV files + if not indexes_log: + fields = [desc[0] for desc in cur.description] + indexes_log = csv.DictWriter(open('indexes.csv', 'w'), fields) + indexes_log.writeheader() + + indexes_log.writerows(cur.fetchall()) + + # database statistics + cur.execute('SELECT EXTRACT(EPOCH FROM now()) AS ts, * FROM pg_stat_database') + + # on the first iteration, construct the CSV files + if not database_log: + fields = [desc[0] for desc in cur.description] + database_log = csv.DictWriter(open('database.csv', 'w'), fields) + database_log.writeheader() + + database_log.writerows(cur.fetchall()) + + conn.close() + + # close the CSV writers + bgwriter_log = None + tables_log = None + indexes_log = None + database_log = None + + result = {} + + with open('bgwriter.csv', 'r') as f: + result.update({'bgwriter' : f.read()}) + + with open('tables.csv', 'r') as f: + result.update({'tables' : f.read()}) + + with open('indexes.csv', 'r') as f: + result.update({'indexes' : f.read()}) + + with open('database.csv', 'r') as f: + result.update({'database' : f.read()}) + + # remove the files + os.remove('bgwriter.csv') + os.remove('tables.csv') + os.remove('indexes.csv') + os.remove('database.csv') + + out_queue.put(result) |
