summaryrefslogtreecommitdiff
path: root/test/stress.py
blob: 836cc79b1cfa8240d3ce4087bdfaf4fd08194d59 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#! /usr/bin/env python

import sys, os, re, time, psycopg
import threading, thread, random

n_thread = 100
longtx = 0
tx_sleep = 0
tx_sleep = 8

conn_data = {
    'dbname': 'marko',
    #'host': '127.0.0.1',
    'host': '/tmp',
    'port': '6000',
    'user': 'marko',
    #'password': '',
    'connect_timeout': '5',
}

def get_connstr():
    tmp = []
    for k, v in conn_data.items():
        tmp.append(k+'='+v)
    return " ".join(tmp)

class WorkThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.setDaemon(True)
        self.stat_lock = threading.Lock()
        self.query_cnt = 0

    def inc_cnt(self):
        self.stat_lock.acquire()
        self.query_cnt += 1
        self.stat_lock.release()

    def fetch_cnt(self):
        self.stat_lock.acquire()
        val = self.query_cnt
        self.query_cnt = 0
        self.stat_lock.release()
        return val

    def run(self):
        try:
            time.sleep(random.random() * 10.0)
        except: pass
        while 1:
            try:
                self.main_loop()
            except KeyboardInterrupt:
                break
            except SystemExit:
                break
            except Exception, d:
                print d
                try:
                    time.sleep(5)
                except: pass

    def main_loop(self):
        db = psycopg.connect(get_connstr())
        if not longtx:
            db.autocommit(1)
        n = 0
        while n < 10:
            self.do_work(db)
            self.inc_cnt()
            n += 1

    def do_work(self, db):
        curs = db.cursor()
        q = "select pg_sleep(%.02f)" % (random.random() * 1)
        curs.execute(q)
        time.sleep(tx_sleep * random.random() + 1)
        if longtx:
            db.commit()

def main():
    print "connstr", get_connstr()

    thread_list = []
    while len(thread_list) < n_thread:
        t = WorkThread()
        t.start()
        thread_list.append(t)

    print "started %d threads" % len(thread_list)

    last = time.time()
    while 1:
        time.sleep(1)
        now = time.time()
        dur = now - last
        if dur >= 5:
            last = now
            cnt = 0
            for t in thread_list:
                cnt += t.fetch_cnt()
            avg = cnt / dur
            print "avg", avg

if __name__ == '__main__':
    try:
        main()
    except SystemExit:
        pass
    except KeyboardInterrupt:
        pass
    #except Exception, d:
    #    print d