summaryrefslogtreecommitdiff
path: root/client/collectors
diff options
context:
space:
mode:
authorTomas Vondra2016-08-10 21:23:55 +0000
committerTomas Vondra2017-02-27 00:21:09 +0000
commit72e6220f64a89cd215660311a5680f07f543b150 (patch)
treec76f2df22da3819a28cad200f4b2a45642dfacdf /client/collectors
parentcbac00d3965ad4f27f1e812668b5732c1c50b1dd (diff)
Import initial version of the client
Diffstat (limited to 'client/collectors')
-rw-r--r--client/collectors/__init__.py0
-rw-r--r--client/collectors/collector.py28
-rw-r--r--client/collectors/linux.py93
-rw-r--r--client/collectors/postgres.py157
4 files changed, 278 insertions, 0 deletions
diff --git a/client/collectors/__init__.py b/client/collectors/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/client/collectors/__init__.py
diff --git a/client/collectors/collector.py b/client/collectors/collector.py
new file mode 100644
index 0000000..d73ef5b
--- /dev/null
+++ b/client/collectors/collector.py
@@ -0,0 +1,28 @@
+
+class MultiCollector(object):
+ 'a collector combining multiple other collectors'
+
+ def __init__(self):
+ self._collectors = {}
+
+
+ def register(self, name, collector):
+ self._collectors[name] = collector
+
+
+ def start(self):
+ for name in self._collectors:
+ self._collectors[name].start()
+
+
+ def stop(self):
+ for name in self._collectors:
+ self._collectors[name].stop()
+
+
+ def result(self):
+ r = {}
+ for name in self._collectors:
+ r.update({name : self._collectors[name].result()})
+
+ return r
diff --git a/client/collectors/linux.py b/client/collectors/linux.py
new file mode 100644
index 0000000..9112a10
--- /dev/null
+++ b/client/collectors/linux.py
@@ -0,0 +1,93 @@
+from datetime import datetime, timedelta, time
+from utils.logging import log
+from utils.misc import run_cmd
+
+
+class LinuxCollector(object):
+ 'collects various Linux-specific statistics (cpuinfo, mounts, sar)'
+
+ def __init__(self, sar_path = '/var/log/sa'):
+ self._start_ts = None
+ self._end_ts = None
+ self._sar = sar_path
+
+
+ def start(self):
+ self._start_ts = datetime.now()
+
+
+ def stop(self):
+ self._end_ts = datetime.now()
+
+
+ def result(self):
+ 'build the results'
+
+ r = {
+ 'sar' : self._collect_sar_stats(),
+ 'sysctl' : self._collect_sysctl()
+ }
+ r.update(self._collect_system_info())
+
+ return r
+
+
+ def _collect_sar_stats(self):
+ 'extracts all data available in sar, filters by timestamp range'
+
+ sar = {}
+ log("collecting sar stats")
+
+ d = self._start_ts.date()
+ while d <= self._end_ts.date():
+
+ # FIXME maybe skip if the file does not exist
+ filename = '%(path)s/sa%(day)s' % {'path' : self._sar, 'day' : d.strftime('%d')}
+
+ log("extracting sar data from '%s'" % (filename,))
+
+ # need to use the right combination of start/end timestamps
+ s = self._start_ts.strftime('%H:%M:%S')
+ e = self._end_ts.strftime('%H:%M:%S')
+
+ if d == self._start_ts.date() and d == self._end_ts.date():
+ r = run_cmd(['sar', '-A', '-p', '-s', s, '-e', e, '-f', filename])
+ elif d == self._start_ts.date():
+ r = run_cmd(['sar', '-A', '-p', '-s', s, '-f', filename])
+ elif d == self._end_ts.date():
+ r = run_cmd(['sar', '-A', '-p', '-e', e, '-f', filename])
+ else:
+ r = run_cmd(['sar', '-A', '-p', '-f', filename])
+
+ sar[str(d)] = r[1]
+
+ # proceed to the next day
+ d += timedelta(days=1)
+
+ return sar
+
+
+ def _collect_sysctl(self):
+ 'collect kernel configuration'
+
+ log("collecting sysctl")
+ r = run_cmd(['/usr/sbin/sysctl', '-a'])
+
+ return r[1]
+
+
+ def _collect_system_info(self):
+ 'collect cpuinfo, meminfo, mounts'
+
+ system = {}
+
+ with open('/proc/cpuinfo', 'r') as f:
+ system['cpuinfo'] = f.read()
+
+ with open('/proc/meminfo', 'r') as f:
+ system['meminfo'] = f.read()
+
+ with open('/proc/mounts', 'r') as f:
+ system['mounts'] = f.read()
+
+ return system
diff --git a/client/collectors/postgres.py b/client/collectors/postgres.py
new file mode 100644
index 0000000..306c2b7
--- /dev/null
+++ b/client/collectors/postgres.py
@@ -0,0 +1,157 @@
+import csv
+import multiprocessing
+import os
+import psycopg2
+import psycopg2.extras
+import Queue
+import time
+
+from multiprocessing import Process, Queue
+
+
+class PostgresCollector(object):
+ 'collects basic PostgreSQL-level statistics (bgwriter, databases, tables, indexes)'
+
+ def __init__(self, dbname):
+ self._dbname = dbname
+
+
+ def start(self):
+ self._in_queue = multiprocessing.Queue()
+ self._out_queue = multiprocessing.Queue()
+ self._worker = Process(target=run_collector, args=(self._in_queue, self._out_queue, self._dbname))
+ self._worker.start()
+
+
+ def stop(self):
+ # signal the worker process to stop by writing a value into the queue
+ self._in_queue.put(True)
+
+ # FIXME this gets stuck for some reason (but we'll wait for queue anyway)
+ # self._worker.join()
+
+ # and then read the result
+ self._result = self._out_queue.get()
+
+ self._worker = None
+ self._in_queue = None
+ self._out_queue = None
+
+
+ def result(self):
+ return self._result
+
+
+def run_collector(in_queue, out_queue, dbname, interval=1.0):
+ 'collector code for a separate process, communicating through a pair of queues'
+
+ bgwriter_log = None
+ tables_log = None
+ indexes_log = None
+ database_log = None
+
+ # get current timestamp
+ ts = time.time()
+
+ while True:
+
+ # wait until the next tick
+ ts += interval
+
+ # if we're behind, skip forward
+ if ts < time.time():
+ continue
+
+ # sleep (but only for the remaining time, to prevent drift)
+ time.sleep(ts - time.time())
+
+ # if we've received message in the input queue (not empty), terminate
+ if not in_queue.empty():
+ break
+
+ # open connection to the benchmark database (if can't open, continue)
+ # notice this is intentionally after the wait, so we'll wait before
+ # next connection attempt
+ try:
+ conn = psycopg2.connect('host=localhost dbname=%s' % (dbname,))
+ cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
+ except Exception as ex:
+ continue
+
+ # background writer stats
+ cur.execute('SELECT EXTRACT(EPOCH FROM now()) AS ts, * FROM pg_stat_bgwriter')
+
+ # on the first iteration, construct the CSV files
+ if not bgwriter_log:
+ fields = [desc[0] for desc in cur.description]
+ bgwriter_log = csv.DictWriter(open('bgwriter.csv', 'w'), fields)
+ bgwriter_log.writeheader()
+
+ bgwriter_log.writerows(cur.fetchall())
+
+ # TODO we can assume statistics for most objects (tables, indexes) won't
+ # change every second, so we can optimize the amount of data by detecting
+ # changes and only keeping the two rows next to it
+
+ # table statistics
+ cur.execute('SELECT EXTRACT(EPOCH FROM now()) AS ts, * FROM pg_stat_all_tables JOIN pg_statio_all_tables USING (relid, schemaname, relname)')
+
+ # on the first iteration, construct the CSV files
+ if not tables_log:
+ fields = [desc[0] for desc in cur.description]
+ tables_log = csv.DictWriter(open('tables.csv', 'w'), fields)
+ tables_log.writeheader()
+
+ tables_log.writerows(cur.fetchall())
+
+ # index statistics
+ cur.execute('SELECT EXTRACT(EPOCH FROM now()) AS ts, * FROM pg_stat_all_indexes JOIN pg_statio_all_indexes USING (relid, indexrelid, schemaname, relname, indexrelname)')
+
+ # on the first iteration, construct the CSV files
+ if not indexes_log:
+ fields = [desc[0] for desc in cur.description]
+ indexes_log = csv.DictWriter(open('indexes.csv', 'w'), fields)
+ indexes_log.writeheader()
+
+ indexes_log.writerows(cur.fetchall())
+
+ # database statistics
+ cur.execute('SELECT EXTRACT(EPOCH FROM now()) AS ts, * FROM pg_stat_database')
+
+ # on the first iteration, construct the CSV files
+ if not database_log:
+ fields = [desc[0] for desc in cur.description]
+ database_log = csv.DictWriter(open('database.csv', 'w'), fields)
+ database_log.writeheader()
+
+ database_log.writerows(cur.fetchall())
+
+ conn.close()
+
+ # close the CSV writers
+ bgwriter_log = None
+ tables_log = None
+ indexes_log = None
+ database_log = None
+
+ result = {}
+
+ with open('bgwriter.csv', 'r') as f:
+ result.update({'bgwriter' : f.read()})
+
+ with open('tables.csv', 'r') as f:
+ result.update({'tables' : f.read()})
+
+ with open('indexes.csv', 'r') as f:
+ result.update({'indexes' : f.read()})
+
+ with open('database.csv', 'r') as f:
+ result.update({'database' : f.read()})
+
+ # remove the files
+ os.remove('bgwriter.csv')
+ os.remove('tables.csv')
+ os.remove('indexes.csv')
+ os.remove('database.csv')
+
+ out_queue.put(result)