summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMartin Pihlak2009-11-27 09:09:04 +0000
committerMartin Pihlak2009-11-27 09:09:04 +0000
commitfbead402fb934296aa8337d37a77b1fc430f901b (patch)
treece8b9795223a593d5f90805aa5648b39acaa827f /python
parentd7db381ed4365848e7fefdfbee2ffa7f99da4903 (diff)
parent0e00bd1e946f0b2153b24bda460b345be0aa79f9 (diff)
Merge branch 'master' of git://github.com/markokr/skytools-dev
Diffstat (limited to 'python')
-rwxr-xr-xpython/qadmin.py54
-rw-r--r--python/skytools/psycopgwrapper.py21
2 files changed, 60 insertions, 15 deletions
diff --git a/python/qadmin.py b/python/qadmin.py
index f2a0a23d..0c8d625b 100755
--- a/python/qadmin.py
+++ b/python/qadmin.py
@@ -11,13 +11,14 @@
drop queue <qname>;
show queue <qname | *>;
- register consumer foo [on <qname>];
+ register consumer foo [on <qname> | at <tick_id> | copy <consumer> ]* ;
unregister consumer foo [from <qname>];
show consumer <cname | *> [on <qname>];
show node <node | *> [on <qname>];
Following commands expect default queue:
+
show queue;
show batch <batch_id>;
show batch <consumer>;
@@ -216,6 +217,11 @@ class BatchId(DynList):
def get_wlist(self):
return script.get_batch_list()
+class TickId(DynList):
+ tk_type = ("num",)
+ def get_wlist(self):
+ return []
+
class Port(DynList):
tk_type = ("num",)
def get_wlist(self):
@@ -324,13 +330,15 @@ w_create = Word('queue', Queue(w_done, name = 'queue'),
w_drop = Word('queue', Queue(w_done, name = 'queue'),
name = 'cmd2')
-w_on_queue = Word('on',
- Queue(w_done, name = 'queue'),
- name = 'onqueue')
+w_reg_target = Proxy()
+w_reg_target.set_real(WList(
+ Word('on', Queue(w_reg_target, name = 'queue')),
+ Word('copy', Consumer(w_reg_target, name = 'copy_reg')),
+ Word('at', TickId(w_reg_target, name = 'at_tick')),
+ w_done))
w_cons_on_queue = Word('consumer',
- Consumer(WList(w_done, w_on_queue),
- name = 'consumer'),
+ Consumer(w_reg_target, name = 'consumer'),
name = 'cmd2')
w_from_queue = Word('from',
@@ -860,15 +868,35 @@ class AdminConsole:
if not queue:
print 'No queue specified'
return
+ at_tick = params.get('at_tick')
+ copy_reg = params.get('copy_reg')
consumer = params['consumer']
curs = self.db.cursor()
- q = "select * from pgq.get_consumer_info(%s, %s)"
- curs.execute(q, [queue, consumer])
- if curs.fetchone():
- print 'Consumer already registered'
- return
- q = "select * from pgq.register_consumer(%s, %s)"
- curs.execute(q, [queue, consumer])
+
+ # copy other registration
+ if copy_reg:
+ q = "select coalesce(next_tick, last_tick) as pos from pgq.get_consumer_info(%s, %s)"
+ curs.execute(q, [queue, copy_reg])
+ reg = curs.fetchone()
+ if not reg:
+ print "Consumer %s not registered on queue %d" % (copy_reg, queue)
+ return
+ at_tick = reg['pos']
+
+ # avoid double reg if specific pos is not requested
+ if not at_tick:
+ q = "select * from pgq.get_consumer_info(%s, %s)"
+ curs.execute(q, [queue, consumer])
+ if curs.fetchone():
+ print 'Consumer already registered'
+ return
+
+ if at_tick:
+ q = "select * from pgq.register_consumer_at(%s, %s, %s)"
+ curs.execute(q, [queue, consumer, int(at_tick)])
+ else:
+ q = "select * from pgq.register_consumer(%s, %s)"
+ curs.execute(q, [queue, consumer])
print "REGISTER"
def cmd_unregister_consumer(self, params):
diff --git a/python/skytools/psycopgwrapper.py b/python/skytools/psycopgwrapper.py
index aeaa7a74..6aa5835b 100644
--- a/python/skytools/psycopgwrapper.py
+++ b/python/skytools/psycopgwrapper.py
@@ -62,6 +62,7 @@ __all__ = ['connect_database']
# to the point of avoiding optimized access.
# only backwards compat thing we need is dict* methods
+import socket
import psycopg2.extensions, psycopg2.extras
from skytools.sqltools import dbdict
@@ -102,10 +103,15 @@ class _CompatConnection(psycopg2.extensions.connection):
def cursor(self):
return psycopg2.extensions.connection.cursor(self, cursor_factory = _CompatCursor)
-def connect_database(connstr):
- """Create a db connection with connect_timeout option.
+def connect_database(connstr, keepalive = True,
+ tcp_keepidle = 4 * 60, # 7200
+ tcp_keepcnt = 4, # 9
+ tcp_keepintvl = 15): # 75
+ """Create a db connection with connect_timeout and TCP keepalive.
Default connect_timeout is 15, to change put it directly into dsn.
+
+ The extra tcp_* options are Linux-specific, see `man 7 tcp` for details.
"""
# allow override
@@ -115,6 +121,16 @@ def connect_database(connstr):
# create connection
db = _CompatConnection(connstr)
+ # turn on keepalive on the connection
+ if keepalive and hasattr(socket, 'SO_KEEPALIVE'):
+ fd = db.cursor().fileno()
+ s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
+ if hasattr(socket, 'TCP_KEEPCNT'):
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, tcp_keepidle)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, tcp_keepcnt)
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, tcp_keepintvl)
+
# fill .server_version on older psycopg
if not hasattr(db, 'server_version'):
iso = db.isolation_level
@@ -123,5 +139,6 @@ def connect_database(connstr):
curs.execute('show server_version_num')
db.server_version = int(curs.fetchone()[0])
db.set_isolation_level(iso)
+
return db