Stop storing log files as json
authorMark Wong <mark@2ndQuadrant.com>
Fri, 28 Jul 2017 00:06:06 +0000 (17:06 -0700)
committerMark Wong <mark@2ndQuadrant.com>
Fri, 5 Jan 2018 19:36:20 +0000 (11:36 -0800)
Keep the json results document small by saving log information
externally.  This include:

* postgres log
* pgbench output
* sar output

client/benchmarks/pgbench.py
client/benchmarks/runner.py
client/collectors/linux.py
client/collectors/postgres.py
client/perffarm-client.py
client/utils/cluster.py

index 9b2b3c826522111052e4221afdb2aca8d6728516..bd6a0e68db33d8f4981aa27f4f9ebad5fb76c80f 100644 (file)
@@ -119,54 +119,6 @@ class PgBench(object):
                 'latency': latency,
                 'tps': tps}
 
-    @staticmethod
-    def _merge_logs():
-        'merge log files produced by pgbench threads (aggregated per second)'
-
-        r = {}
-
-        # find pgbench transaction logs in current directory
-        logs = [v for v in os.listdir(os.getcwd())
-                if re.match('pgbench_log.[0-9]+(\.[0-9]+)?', v)]
-
-        # parse each transaction log, and merge it into the existing results
-        for l in logs:
-            worker_log = open(l, 'r')
-            for row in worker_log:
-                values = row.split(' ')
-
-                timestamp = values[0]
-                tps = int(values[1])
-                lat_sum = long(values[2])
-                lat_sum2 = long(values[3])
-                lat_min = int(values[4])
-                lat_max = int(values[5])
-
-                # if first record for the timestamp, store it, otherwise merge
-                if timestamp not in r:
-                    r[timestamp] = {'tps': tps,
-                                    'lat_sum': lat_sum, 'lat_sum2': lat_sum2,
-                                    'lat_min': lat_min, 'lat_max': lat_max}
-                else:
-                    r[timestamp]['tps'] += int(tps)
-                    r[timestamp]['lat_sum'] += long(lat_sum)
-                    r[timestamp]['lat_sum2'] += long(lat_sum2)
-                    r[timestamp]['lat_min'] = min(r[timestamp]['lat_min'],
-                                                  int(lat_min))
-                    r[timestamp]['lat_max'] = max(r[timestamp]['lat_max'],
-                                                  int(lat_max))
-
-            os.remove(l)
-
-        # now produce a simple text log sorted by the timestamp
-        o = []
-        for t in sorted(r.keys()):
-            o.append('%s %d %d %d %d %d' % (t, r[t]['tps'], r[t]['lat_sum'],
-                                            r[t]['lat_sum2'], r[t]['lat_min'],
-                                            r[t]['lat_max']))
-
-        return '\n'.join(o)
-
     def check_config(self):
         'check pgbench configuration (existence of binaries etc.)'
 
@@ -225,9 +177,6 @@ class PgBench(object):
         r = PgBench._parse_results(r[1])
         r.update({'read-only': read_only})
 
-        if aggregate:
-            r.update({'transaction-log': PgBench._merge_logs()})
-
         r.update({'start': start, 'end': end})
 
         if csv_queue is not None:
index a63c4c3ec06c1d81ac063f98cfae370342e2eb1b..80cee215d13d8bb181520405d17cc1dc82cbcb93 100644 (file)
@@ -108,16 +108,10 @@ class BenchmarkRunner(object):
         # merge data from the collectors into the JSON document with results
         r.update(self._collector.result())
 
-        # read the postgres log
-        with open('pg.log', 'r') as f:
-            r['postgres-log'] = f.read()
-
         r['meta'] = {'benchmark': config['benchmark'],
                      'date': strftime("%Y-%m-%d %H:%M:%S.000000+00", gmtime()),
                      'name': config_name}
 
-        os.remove('pg.log')
-
         with open('%s/%s.json' % (self._output, config_name), 'w') as f:
             f.write(json.dumps(r, indent=4))
 
index 8bacacfb7fe788a5f9c4f20fce336a906ce3311d..31c042805cffe6754fdfc3061c083f34da12c98f 100644 (file)
@@ -8,10 +8,12 @@ from utils.misc import run_cmd
 class LinuxCollector(object):
     'collects various Linux-specific statistics (cpuinfo, mounts, sar)'
 
-    def __init__(self, sar_path='/var/log/sa'):
+    def __init__(self, outdir, sar_path='/var/log/sa'):
+        self._outdir = outdir
+        self._sar = sar_path
+
         self._start_ts = None
         self._end_ts = None
-        self._sar = sar_path
 
     def start(self):
         self._start_ts = datetime.now()
@@ -25,9 +27,7 @@ class LinuxCollector(object):
         r = {'sysctl': self._collect_sysctl()}
 
         # ignore sar if we've not found it
-        sar = self._collect_sar_stats()
-        if sar:
-            r['sar'] = sar
+        self._collect_sar_stats()
 
         r.update(self._collect_system_info())
 
@@ -36,7 +36,6 @@ class LinuxCollector(object):
     def _collect_sar_stats(self):
         'extracts all data available in sar, filters by timestamp range'
 
-        sar = {}
         log("collecting sar stats")
 
         d = self._start_ts.date()
@@ -65,8 +64,11 @@ class LinuxCollector(object):
                 else:
                     r = run_cmd(['sar', '-A', '-p', '-f', filename])
 
-                sar[str(d)] = r[1]
-
+                with open(''.join([self._outdir, '/sar-', d.strftime('%d'),
+                                   '.txt']),
+                          'w') as f:
+                    f.write(r[1])
+                    f.close()
             else:
 
                 log("file '%s' does not exist, skipping" % (filename,))
@@ -74,11 +76,6 @@ class LinuxCollector(object):
             # proceed to the next day
             d += timedelta(days=1)
 
-        if not sar:
-            return None
-
-        return sar
-
     def _collect_sysctl(self):
         'collect kernel configuration'
 
index 319fe408fb4c57d51674f30681fa621cdd697f0e..8a726b6f5b31470d03f2b4896bb03ebfd7e65a39 100644 (file)
@@ -7,6 +7,7 @@ import time
 
 from multiprocessing import Process, Queue
 from utils.logging import log
+from utils.misc import run_cmd
 
 
 class PostgresCollector(object):
@@ -15,15 +16,18 @@ class PostgresCollector(object):
     indexes)
     """
 
-    def __init__(self, dbname):
+    def __init__(self, outdir, dbname, bin_path):
+        self._outdir = outdir
         self._dbname = dbname
+        self._bin_path = bin_path
 
     def start(self):
         self._in_queue = Queue()
         self._out_queue = Queue()
         self._worker = Process(target=run_collector,
                                args=(self._in_queue, self._out_queue,
-                                     self._dbname))
+                                     self._dbname, self._bin_path,
+                                     self._outdir))
         self._worker.start()
 
     def stop(self):
@@ -51,7 +55,7 @@ class PostgresCollector(object):
         return self._result
 
 
-def run_collector(in_queue, out_queue, dbname, interval=1.0):
+def run_collector(in_queue, out_queue, dbname, bin_path, outdir, interval=1.0):
     """
     collector code for a separate process, communicating through a pair of
     queues
@@ -98,7 +102,8 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0):
         # on the first iteration, construct the CSV files
         if bgwriter_log is None:
             fields = [desc[0] for desc in cur.description]
-            bgwriter_log = csv.DictWriter(open('bgwriter.csv', 'w'), fields)
+            filename = ''.join([outdir, '/bgwriter.csv'])
+            bgwriter_log = csv.DictWriter(open(filename, 'w'), fields)
             bgwriter_log.writeheader()
 
         bgwriter_log.writerows(cur.fetchall())
@@ -115,7 +120,8 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0):
         # on the first iteration, construct the CSV files
         if tables_log is None:
             fields = [desc[0] for desc in cur.description]
-            tables_log = csv.DictWriter(open('tables.csv', 'w'), fields)
+            filename = ''.join([outdir, '/tables.csv'])
+            tables_log = csv.DictWriter(open(filename, 'w'), fields)
             tables_log.writeheader()
 
         tables_log.writerows(cur.fetchall())
@@ -129,7 +135,8 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0):
         # on the first iteration, construct the CSV files
         if indexes_log is None:
             fields = [desc[0] for desc in cur.description]
-            indexes_log = csv.DictWriter(open('indexes.csv', 'w'), fields)
+            filename = ''.join([outdir, '/indexes.csv'])
+            indexes_log = csv.DictWriter(open(filename, 'w'), fields)
             indexes_log.writeheader()
 
         indexes_log.writerows(cur.fetchall())
@@ -141,14 +148,27 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0):
         # on the first iteration, construct the CSV files
         if database_log is None:
             fields = [desc[0] for desc in cur.description]
-            database_log = csv.DictWriter(open('database.csv', 'w'), fields)
+            filename = ''.join([outdir, '/database.csv'])
+            database_log = csv.DictWriter(open(filename, 'w'), fields)
             database_log.writeheader()
 
         database_log.writerows(cur.fetchall())
 
         conn.close()
 
-    log("PostgreSQL collector generates CSV results")
+    try:
+        conn = psycopg2.connect('host=localhost dbname=%s' % (dbname,))
+        cur = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
+        cur.execute('SELECT name, setting, source '
+                    'FROM pg_settings ORDER BY lower(name)')
+        fields = [desc[0] for desc in cur.description]
+        filename = ''.join([outdir, '/settings.csv'])
+        settings_log = csv.DictWriter(open(filename, 'w'), fields)
+        settings_log.writeheader()
+        settings_log.writerows(cur.fetchall())
+        conn.close()
+    except Exception as ex:
+        pass
 
     # close the CSV writers
     bgwriter_log = None
@@ -158,13 +178,8 @@ def run_collector(in_queue, out_queue, dbname, interval=1.0):
 
     result = {}
 
-    for file in ['bgwriter', 'tables', 'indexes', 'database']:
-        if os.path.isfile(''.join([file, '.csv'])):
-            with open(''.join([file, '.csv']), 'r') as f:
-                result.update({file : f.read()})
-
-                # remove the files
-                os.remove(''.join([file, '.csv']))
+    r = run_cmd([bin_path + '/pg_config'])
+    result['config'] = r[1]
 
     out_queue.put(result)
 
index 1a37ede6e45e287acbf1773182a68d920bf85bc0..49a73aa519b669a4e245254a26d6d8e15a327b55 100755 (executable)
@@ -31,15 +31,17 @@ if __name__ == '__main__':
 
         # build and start a postgres cluster
 
-        cluster = PgCluster(bin_path=BIN_PATH, data_path=DATADIR_PATH)
+        cluster = PgCluster(OUTPUT_DIR, bin_path=BIN_PATH,
+                            data_path=DATADIR_PATH)
 
         # create collectors
 
         collectors = MultiCollector()
 
-        collectors.register('system', LinuxCollector())
-        collectors.register('postgres',
-                            PostgresCollector(dbname=DATABASE_NAME))
+        collectors.register('system', LinuxCollector(OUTPUT_DIR))
+        pg_collector = PostgresCollector(OUTPUT_DIR, dbname=DATABASE_NAME,
+                                         bin_path=('%s/bin' % (BUILD_PATH)))
+        collectors.register('postgres', pg_collector)
 
         runner = BenchmarkRunner(OUTPUT_DIR, cluster, collectors)
 
index 81bca893678bf1b2d50923734c786f859a56ffee..0efdb8b1a30d5078b1549860f2535bbfd7c6c762 100644 (file)
@@ -11,7 +11,8 @@ from utils.logging import log
 class PgCluster(object):
     'basic manipulation of postgres cluster (init, start, stop, destroy)'
 
-    def __init__(self, bin_path, data_path):
+    def __init__(self, outdir, bin_path, data_path):
+        self._outdir = outdir
         self._bin = bin_path
         self._data = data_path
 
@@ -61,7 +62,8 @@ class PgCluster(object):
         with TemporaryFile() as strout:
             log("starting cluster in '%s' using '%s' binaries" %
                 (self._data, self._bin))
-            cmd = ['pg_ctl', '-D', self._data, '-l', 'pg.log', '-w']
+            cmd = ['pg_ctl', '-D', self._data, '-l',
+                   ''.join([self._outdir, '/pg.log']), '-w']
             if len(self._options) > 0:
                 cmd.extend(['-o', self._options])
             cmd.append('start')