From b269094eae644b6bf6843a7933ec5cb60125ce31 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Thu, 5 Nov 2009 14:57:24 +0200 Subject: qadmin: extend REGISTER CONSUMER Custom tick pos: at Copy other registration: copy --- python/qadmin.py | 54 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 41 insertions(+), 13 deletions(-) (limited to 'python') diff --git a/python/qadmin.py b/python/qadmin.py index f2a0a23d..0c8d625b 100755 --- a/python/qadmin.py +++ b/python/qadmin.py @@ -11,13 +11,14 @@ drop queue ; show queue ; - register consumer foo [on ]; + register consumer foo [on | at | copy ]* ; unregister consumer foo [from ]; show consumer [on ]; show node [on ]; Following commands expect default queue: + show queue; show batch ; show batch ; @@ -216,6 +217,11 @@ class BatchId(DynList): def get_wlist(self): return script.get_batch_list() +class TickId(DynList): + tk_type = ("num",) + def get_wlist(self): + return [] + class Port(DynList): tk_type = ("num",) def get_wlist(self): @@ -324,13 +330,15 @@ w_create = Word('queue', Queue(w_done, name = 'queue'), w_drop = Word('queue', Queue(w_done, name = 'queue'), name = 'cmd2') -w_on_queue = Word('on', - Queue(w_done, name = 'queue'), - name = 'onqueue') +w_reg_target = Proxy() +w_reg_target.set_real(WList( + Word('on', Queue(w_reg_target, name = 'queue')), + Word('copy', Consumer(w_reg_target, name = 'copy_reg')), + Word('at', TickId(w_reg_target, name = 'at_tick')), + w_done)) w_cons_on_queue = Word('consumer', - Consumer(WList(w_done, w_on_queue), - name = 'consumer'), + Consumer(w_reg_target, name = 'consumer'), name = 'cmd2') w_from_queue = Word('from', @@ -860,15 +868,35 @@ class AdminConsole: if not queue: print 'No queue specified' return + at_tick = params.get('at_tick') + copy_reg = params.get('copy_reg') consumer = params['consumer'] curs = self.db.cursor() - q = "select * from pgq.get_consumer_info(%s, %s)" - curs.execute(q, [queue, consumer]) - if curs.fetchone(): - print 'Consumer already registered' - return - q = "select * from pgq.register_consumer(%s, %s)" - curs.execute(q, [queue, consumer]) + + # copy other registration + if copy_reg: + q = "select coalesce(next_tick, last_tick) as pos from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [queue, copy_reg]) + reg = curs.fetchone() + if not reg: + print "Consumer %s not registered on queue %d" % (copy_reg, queue) + return + at_tick = reg['pos'] + + # avoid double reg if specific pos is not requested + if not at_tick: + q = "select * from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [queue, consumer]) + if curs.fetchone(): + print 'Consumer already registered' + return + + if at_tick: + q = "select * from pgq.register_consumer_at(%s, %s, %s)" + curs.execute(q, [queue, consumer, int(at_tick)]) + else: + q = "select * from pgq.register_consumer(%s, %s)" + curs.execute(q, [queue, consumer]) print "REGISTER" def cmd_unregister_consumer(self, params): -- cgit v1.2.3 From 0e00bd1e946f0b2153b24bda460b345be0aa79f9 Mon Sep 17 00:00:00 2001 From: Marko Kreen Date: Tue, 24 Nov 2009 15:55:09 +0200 Subject: psycopgwrapper: Activate TCP keepalive on database connections Turn on keepalive by default, to avoid hanged scripts in case of network problems. Also tune the keepalive timeouts smaller, on Linux at least, so that the error happens after 5 min blackout. (4 min idle + 4x15sec keepalive pings) --- python/skytools/psycopgwrapper.py | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) (limited to 'python') diff --git a/python/skytools/psycopgwrapper.py b/python/skytools/psycopgwrapper.py index aeaa7a74..6aa5835b 100644 --- a/python/skytools/psycopgwrapper.py +++ b/python/skytools/psycopgwrapper.py @@ -62,6 +62,7 @@ __all__ = ['connect_database'] # to the point of avoiding optimized access. # only backwards compat thing we need is dict* methods +import socket import psycopg2.extensions, psycopg2.extras from skytools.sqltools import dbdict @@ -102,10 +103,15 @@ class _CompatConnection(psycopg2.extensions.connection): def cursor(self): return psycopg2.extensions.connection.cursor(self, cursor_factory = _CompatCursor) -def connect_database(connstr): - """Create a db connection with connect_timeout option. +def connect_database(connstr, keepalive = True, + tcp_keepidle = 4 * 60, # 7200 + tcp_keepcnt = 4, # 9 + tcp_keepintvl = 15): # 75 + """Create a db connection with connect_timeout and TCP keepalive. Default connect_timeout is 15, to change put it directly into dsn. + + The extra tcp_* options are Linux-specific, see `man 7 tcp` for details. """ # allow override @@ -115,6 +121,16 @@ def connect_database(connstr): # create connection db = _CompatConnection(connstr) + # turn on keepalive on the connection + if keepalive and hasattr(socket, 'SO_KEEPALIVE'): + fd = db.cursor().fileno() + s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) + s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) + if hasattr(socket, 'TCP_KEEPCNT'): + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, tcp_keepidle) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, tcp_keepcnt) + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, tcp_keepintvl) + # fill .server_version on older psycopg if not hasattr(db, 'server_version'): iso = db.isolation_level @@ -123,5 +139,6 @@ def connect_database(connstr): curs.execute('show server_version_num') db.server_version = int(curs.fetchone()[0]) db.set_isolation_level(iso) + return db -- cgit v1.2.3