diff options
author | Marko Kreen | 2009-11-05 12:57:24 +0000 |
---|---|---|
committer | Marko Kreen | 2009-11-05 12:57:24 +0000 |
commit | b269094eae644b6bf6843a7933ec5cb60125ce31 (patch) | |
tree | 0156b317b692e52ce0449968ecce61321a76a769 /python/qadmin.py | |
parent | 661a2e367246d3915c387679c13f6e56aade9c82 (diff) |
qadmin: extend REGISTER CONSUMER
Custom tick pos: at <tick_id>
Copy other registration: copy <consumer>
Diffstat (limited to 'python/qadmin.py')
-rwxr-xr-x | python/qadmin.py | 54 |
1 files changed, 41 insertions, 13 deletions
diff --git a/python/qadmin.py b/python/qadmin.py index f2a0a23d..0c8d625b 100755 --- a/python/qadmin.py +++ b/python/qadmin.py @@ -11,13 +11,14 @@ drop queue <qname>; show queue <qname | *>; - register consumer foo [on <qname>]; + register consumer foo [on <qname> | at <tick_id> | copy <consumer> ]* ; unregister consumer foo [from <qname>]; show consumer <cname | *> [on <qname>]; show node <node | *> [on <qname>]; Following commands expect default queue: + show queue; show batch <batch_id>; show batch <consumer>; @@ -216,6 +217,11 @@ class BatchId(DynList): def get_wlist(self): return script.get_batch_list() +class TickId(DynList): + tk_type = ("num",) + def get_wlist(self): + return [] + class Port(DynList): tk_type = ("num",) def get_wlist(self): @@ -324,13 +330,15 @@ w_create = Word('queue', Queue(w_done, name = 'queue'), w_drop = Word('queue', Queue(w_done, name = 'queue'), name = 'cmd2') -w_on_queue = Word('on', - Queue(w_done, name = 'queue'), - name = 'onqueue') +w_reg_target = Proxy() +w_reg_target.set_real(WList( + Word('on', Queue(w_reg_target, name = 'queue')), + Word('copy', Consumer(w_reg_target, name = 'copy_reg')), + Word('at', TickId(w_reg_target, name = 'at_tick')), + w_done)) w_cons_on_queue = Word('consumer', - Consumer(WList(w_done, w_on_queue), - name = 'consumer'), + Consumer(w_reg_target, name = 'consumer'), name = 'cmd2') w_from_queue = Word('from', @@ -860,15 +868,35 @@ class AdminConsole: if not queue: print 'No queue specified' return + at_tick = params.get('at_tick') + copy_reg = params.get('copy_reg') consumer = params['consumer'] curs = self.db.cursor() - q = "select * from pgq.get_consumer_info(%s, %s)" - curs.execute(q, [queue, consumer]) - if curs.fetchone(): - print 'Consumer already registered' - return - q = "select * from pgq.register_consumer(%s, %s)" - curs.execute(q, [queue, consumer]) + + # copy other registration + if copy_reg: + q = "select coalesce(next_tick, last_tick) as pos from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [queue, copy_reg]) + reg = curs.fetchone() + if not reg: + print "Consumer %s not registered on queue %d" % (copy_reg, queue) + return + at_tick = reg['pos'] + + # avoid double reg if specific pos is not requested + if not at_tick: + q = "select * from pgq.get_consumer_info(%s, %s)" + curs.execute(q, [queue, consumer]) + if curs.fetchone(): + print 'Consumer already registered' + return + + if at_tick: + q = "select * from pgq.register_consumer_at(%s, %s, %s)" + curs.execute(q, [queue, consumer, int(at_tick)]) + else: + q = "select * from pgq.register_consumer(%s, %s)" + curs.execute(q, [queue, consumer]) print "REGISTER" def cmd_unregister_consumer(self, params): |