summaryrefslogtreecommitdiff
path: root/python/pgq/consumer.py
diff options
context:
space:
mode:
authorMarko Kreen2007-07-20 16:25:12 +0000
committerMarko Kreen2007-07-20 16:25:12 +0000
commit7d06bf4ae9bbdbd3a8f18af07fb6214949807f95 (patch)
treeaed95576f48f3e385dfc73d277aa393f113031f4 /python/pgq/consumer.py
parent65eb35c30bedb89b57df7c6db6bd5de4eb83056c (diff)
pgq.SerialConsumer: use functions instead of direct access at tables
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r--python/pgq/consumer.py26
1 files changed, 8 insertions, 18 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index 8160f1d1..4b9c221b 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -265,7 +265,7 @@ class SerialConsumer(Consumer):
def __init__(self, service_name, db_name, remote_db, args):
Consumer.__init__(self, service_name, db_name, args)
self.remote_db = remote_db
- self.dst_completed_table = "pgq_ext.completed_tick"
+ self.dst_schema = "pgq_ext"
self.cur_batch_info = None
def startup(self):
@@ -319,8 +319,7 @@ class SerialConsumer(Consumer):
prev_tick = self.cur_batch_info['prev_tick_id']
- q = "select last_tick_id from %s where consumer_id = %%s" % (
- self.dst_completed_table ,)
+ q = "select %s.get_last_tick(%%s)" % self.dst_schema
dst_curs.execute(q, [self.consumer_id])
res = dst_curs.fetchone()
@@ -347,12 +346,8 @@ class SerialConsumer(Consumer):
in external database.
"""
tick_id = self.cur_batch_info['tick_id']
- q = "delete from %s where consumer_id = %%s; "\
- "insert into %s (consumer_id, last_tick_id) values (%%s, %%s)" % (
- self.dst_completed_table,
- self.dst_completed_table)
- dst_curs.execute(q, [ self.consumer_id,
- self.consumer_id, tick_id ])
+ q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema
+ dst_curs.execute(q, [ self.consumer_id, tick_id ])
def attach(self):
new = Consumer.attach(self)
@@ -369,9 +364,7 @@ class SerialConsumer(Consumer):
self.log.info("removing completed tick from dst")
dst_db = self.get_database(self.remote_db)
dst_curs = dst_db.cursor()
-
- q = "delete from %s where consumer_id = %%s" % (
- self.dst_completed_table,)
+ q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema
dst_curs.execute(q, [self.consumer_id])
dst_db.commit()
@@ -385,13 +378,12 @@ class SerialConsumer(Consumer):
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- q = "select last_tick_id from %s where consumer_id = %%s" % (
- self.dst_completed_table,)
+ q = "select %s.get_last_tick(%%s)" % self.dst_schema
dst_curs.execute(q, [self.consumer_id])
row = dst_curs.fetchone()
if row:
dst_tick = row[0]
- q = "select pgq.register_consumer(%s, %s, %s)"
+ q = "select pgq.register_consumer_at(%s, %s, %s)"
src_curs.execute(q, [self.pgq_queue_name, self.consumer_id, dst_tick])
else:
self.log.warning('No tick found on dst side')
@@ -403,9 +395,7 @@ class SerialConsumer(Consumer):
self.log.info("Resetting queue tracking on dst side")
dst_db = self.get_database(self.remote_db)
dst_curs = dst_db.cursor()
-
- q = "delete from %s where consumer_id = %%s" % (
- self.dst_completed_table,)
+ q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema
dst_curs.execute(q, [self.consumer_id])
dst_db.commit()