summaryrefslogtreecommitdiff
path: root/python/qadmin.py
diff options
context:
space:
mode:
authorMarko Kreen2009-11-05 12:57:24 +0000
committerMarko Kreen2009-11-05 12:57:24 +0000
commitb269094eae644b6bf6843a7933ec5cb60125ce31 (patch)
tree0156b317b692e52ce0449968ecce61321a76a769 /python/qadmin.py
parent661a2e367246d3915c387679c13f6e56aade9c82 (diff)
qadmin: extend REGISTER CONSUMER
Custom tick pos: at <tick_id> Copy other registration: copy <consumer>
Diffstat (limited to 'python/qadmin.py')
-rwxr-xr-xpython/qadmin.py54
1 files changed, 41 insertions, 13 deletions
diff --git a/python/qadmin.py b/python/qadmin.py
index f2a0a23d..0c8d625b 100755
--- a/python/qadmin.py
+++ b/python/qadmin.py
@@ -11,13 +11,14 @@
drop queue <qname>;
show queue <qname | *>;
- register consumer foo [on <qname>];
+ register consumer foo [on <qname> | at <tick_id> | copy <consumer> ]* ;
unregister consumer foo [from <qname>];
show consumer <cname | *> [on <qname>];
show node <node | *> [on <qname>];
Following commands expect default queue:
+
show queue;
show batch <batch_id>;
show batch <consumer>;
@@ -216,6 +217,11 @@ class BatchId(DynList):
def get_wlist(self):
return script.get_batch_list()
+class TickId(DynList):
+ tk_type = ("num",)
+ def get_wlist(self):
+ return []
+
class Port(DynList):
tk_type = ("num",)
def get_wlist(self):
@@ -324,13 +330,15 @@ w_create = Word('queue', Queue(w_done, name = 'queue'),
w_drop = Word('queue', Queue(w_done, name = 'queue'),
name = 'cmd2')
-w_on_queue = Word('on',
- Queue(w_done, name = 'queue'),
- name = 'onqueue')
+w_reg_target = Proxy()
+w_reg_target.set_real(WList(
+ Word('on', Queue(w_reg_target, name = 'queue')),
+ Word('copy', Consumer(w_reg_target, name = 'copy_reg')),
+ Word('at', TickId(w_reg_target, name = 'at_tick')),
+ w_done))
w_cons_on_queue = Word('consumer',
- Consumer(WList(w_done, w_on_queue),
- name = 'consumer'),
+ Consumer(w_reg_target, name = 'consumer'),
name = 'cmd2')
w_from_queue = Word('from',
@@ -860,15 +868,35 @@ class AdminConsole:
if not queue:
print 'No queue specified'
return
+ at_tick = params.get('at_tick')
+ copy_reg = params.get('copy_reg')
consumer = params['consumer']
curs = self.db.cursor()
- q = "select * from pgq.get_consumer_info(%s, %s)"
- curs.execute(q, [queue, consumer])
- if curs.fetchone():
- print 'Consumer already registered'
- return
- q = "select * from pgq.register_consumer(%s, %s)"
- curs.execute(q, [queue, consumer])
+
+ # copy other registration
+ if copy_reg:
+ q = "select coalesce(next_tick, last_tick) as pos from pgq.get_consumer_info(%s, %s)"
+ curs.execute(q, [queue, copy_reg])
+ reg = curs.fetchone()
+ if not reg:
+ print "Consumer %s not registered on queue %d" % (copy_reg, queue)
+ return
+ at_tick = reg['pos']
+
+ # avoid double reg if specific pos is not requested
+ if not at_tick:
+ q = "select * from pgq.get_consumer_info(%s, %s)"
+ curs.execute(q, [queue, consumer])
+ if curs.fetchone():
+ print 'Consumer already registered'
+ return
+
+ if at_tick:
+ q = "select * from pgq.register_consumer_at(%s, %s, %s)"
+ curs.execute(q, [queue, consumer, int(at_tick)])
+ else:
+ q = "select * from pgq.register_consumer(%s, %s)"
+ curs.execute(q, [queue, consumer])
print "REGISTER"
def cmd_unregister_consumer(self, params):