diff options
author | Martin Pihlak | 2009-11-27 09:09:04 +0000 |
---|---|---|
committer | Martin Pihlak | 2009-11-27 09:09:04 +0000 |
commit | fbead402fb934296aa8337d37a77b1fc430f901b (patch) | |
tree | ce8b9795223a593d5f90805aa5648b39acaa827f /python | |
parent | d7db381ed4365848e7fefdfbee2ffa7f99da4903 (diff) | |
parent | 0e00bd1e946f0b2153b24bda460b345be0aa79f9 (diff) |
Merge branch 'master' of git://github.com/markokr/skytools-dev
Diffstat (limited to 'python')
-rwxr-xr-x | python/qadmin.py | 54 | ||||
-rw-r--r-- | python/skytools/psycopgwrapper.py | 21 |
2 files changed, 60 insertions, 15 deletions
diff --git a/python/qadmin.py b/python/qadmin.py index f2a0a23d..0c8d625b 100755 --- a/python/qadmin.py +++ b/python/qadmin.py @@ -11,13 +11,14 @@ drop queue <qname>; show queue <qname | *>; - register consumer foo [on <qname>]; + register consumer foo [on <qname> | at <tick_id> | copy <consumer> ]* ; unregister consumer foo [from <qname>]; show consumer <cname | *> [on <qname>]; show node <node | *> [on <qname>]; Following commands expect default queue: + show queue; show batch <batch_id>; show batch <consumer>; @@ -216,6 +217,11 @@ class BatchId(DynList): def get_wlist(self): return script.get_batch_list() +class TickId(DynList): + tk_type = ("num",) + def get_wlist(self): + return [] + class Port(DynList): tk_type = ("num",) def get_wlist(self): @@ -324,13 +330,15 @@ w_create = Word('queue', Queue(w_done, name = 'queue'), w_drop = Word('queue', Queue(w_done, name = 'queue'), name = 'cmd2') -w_on_queue = Word('on', - Queue(w_done, name = 'queue'), - name = 'onqueue') +w_reg_target = Proxy() +w_reg_target.set_real(WList( + Word('on', Queue(w_reg_target, name = 'queue')), + Word('copy', Consumer(w_reg_target, name = 'copy_reg')), + Word('at', TickId(w_reg_target, name = 'at_tick')), + w_done)) w_cons_on_queue = Word('consumer', - Consumer(WList(w_done, w_on_queue), - name = 'consumer'), + Consumer(w_reg_target, name = 'consumer'), name = 'cmd2') w_from_queue = Word('from', @@ -860,15 +868,35 @@ class AdminConsole: if not queue: print 'No queue specified' return + at_tick = params.get('at_tick') + copy_reg = params.get('copy_reg') consumer = params['consumer'] curs = self.db.cursor() - q = "select * from pgq.get_consumer_info(%s, %s)" - curs.execute(q, [queue, consumer]) - if curs.fetchone(): - print 'Consumer already registered' - return - q = "select * from pgq.register_consumer(%s, %s)" - curs.execute(q, [queue, consumer]) + + # copy other registration + if copy_reg: + q = "select coalesce(next_tick, last_tick) as pos from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [queue, copy_reg]) + reg = curs.fetchone() + if not reg: + print "Consumer %s not registered on queue %d" % (copy_reg, queue) + return + at_tick = reg['pos'] + + # avoid double reg if specific pos is not requested + if not at_tick: + q = "select * from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [queue, consumer]) + if curs.fetchone(): + print 'Consumer already registered' + return + + if at_tick: + q = "select * from pgq.register_consumer_at(%s, %s, %s)" + curs.execute(q, [queue, consumer, int(at_tick)]) + else: + q = "select * from pgq.register_consumer(%s, %s)" + curs.execute(q, [queue, consumer]) print "REGISTER" def cmd_unregister_consumer(self, params): diff --git a/python/skytools/psycopgwrapper.py b/python/skytools/psycopgwrapper.py index aeaa7a74..6aa5835b 100644 --- a/python/skytools/psycopgwrapper.py +++ b/python/skytools/psycopgwrapper.py @@ -62,6 +62,7 @@ __all__ = ['connect_database'] # to the point of avoiding optimized access. # only backwards compat thing we need is dict* methods +import socket import psycopg2.extensions, psycopg2.extras from skytools.sqltools import dbdict @@ -102,10 +103,15 @@ class _CompatConnection(psycopg2.extensions.connection): def cursor(self): return psycopg2.extensions.connection.cursor(self, cursor_factory = _CompatCursor) -def connect_database(connstr): - """Create a db connection with connect_timeout option. +def connect_database(connstr, keepalive = True, + tcp_keepidle = 4 * 60, # 7200 + tcp_keepcnt = 4, # 9 + tcp_keepintvl = 15): # 75 + """Create a db connection with connect_timeout and TCP keepalive. Default connect_timeout is 15, to change put it directly into dsn. + + The extra tcp_* options are Linux-specific, see `man 7 tcp` for details. """ # allow override @@ -115,6 +121,16 @@ def connect_database(connstr): # create connection db = _CompatConnection(connstr) + # turn on keepalive on the connection + if keepalive and hasattr(socket, 'SO_KEEPALIVE'): + fd = db.cursor().fileno() + s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + if hasattr(socket, 'TCP_KEEPCNT'): + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, tcp_keepidle) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, tcp_keepcnt) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, tcp_keepintvl) + # fill .server_version on older psycopg if not hasattr(db, 'server_version'): iso = db.isolation_level @@ -123,5 +139,6 @@ def connect_database(connstr): curs.execute('show server_version_num') db.server_version = int(curs.fetchone()[0]) db.set_isolation_level(iso) + return db |