diff options
author | Marko Kreen | 2007-07-20 16:25:12 +0000 |
---|---|---|
committer | Marko Kreen | 2007-07-20 16:25:12 +0000 |
commit | 7d06bf4ae9bbdbd3a8f18af07fb6214949807f95 (patch) | |
tree | aed95576f48f3e385dfc73d277aa393f113031f4 /python/pgq/consumer.py | |
parent | 65eb35c30bedb89b57df7c6db6bd5de4eb83056c (diff) |
pgq.SerialConsumer: use functions instead of direct access at tables
Diffstat (limited to 'python/pgq/consumer.py')
-rw-r--r-- | python/pgq/consumer.py | 26 |
1 files changed, 8 insertions, 18 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 8160f1d1..4b9c221b 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -265,7 +265,7 @@ class SerialConsumer(Consumer): def __init__(self, service_name, db_name, remote_db, args): Consumer.__init__(self, service_name, db_name, args) self.remote_db = remote_db - self.dst_completed_table = "pgq_ext.completed_tick" + self.dst_schema = "pgq_ext" self.cur_batch_info = None def startup(self): @@ -319,8 +319,7 @@ class SerialConsumer(Consumer): prev_tick = self.cur_batch_info['prev_tick_id'] - q = "select last_tick_id from %s where consumer_id = %%s" % ( - self.dst_completed_table ,) + q = "select %s.get_last_tick(%%s)" % self.dst_schema dst_curs.execute(q, [self.consumer_id]) res = dst_curs.fetchone() @@ -347,12 +346,8 @@ class SerialConsumer(Consumer): in external database. """ tick_id = self.cur_batch_info['tick_id'] - q = "delete from %s where consumer_id = %%s; "\ - "insert into %s (consumer_id, last_tick_id) values (%%s, %%s)" % ( - self.dst_completed_table, - self.dst_completed_table) - dst_curs.execute(q, [ self.consumer_id, - self.consumer_id, tick_id ]) + q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema + dst_curs.execute(q, [ self.consumer_id, tick_id ]) def attach(self): new = Consumer.attach(self) @@ -369,9 +364,7 @@ class SerialConsumer(Consumer): self.log.info("removing completed tick from dst") dst_db = self.get_database(self.remote_db) dst_curs = dst_db.cursor() - - q = "delete from %s where consumer_id = %%s" % ( - self.dst_completed_table,) + q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema dst_curs.execute(q, [self.consumer_id]) dst_db.commit() @@ -385,13 +378,12 @@ class SerialConsumer(Consumer): src_curs = src_db.cursor() dst_curs = dst_db.cursor() - q = "select last_tick_id from %s where consumer_id = %%s" % ( - self.dst_completed_table,) + q = "select %s.get_last_tick(%%s)" % self.dst_schema dst_curs.execute(q, [self.consumer_id]) row = dst_curs.fetchone() if row: dst_tick = row[0] - q = "select pgq.register_consumer(%s, %s, %s)" + q = "select pgq.register_consumer_at(%s, %s, %s)" src_curs.execute(q, [self.pgq_queue_name, self.consumer_id, dst_tick]) else: self.log.warning('No tick found on dst side') @@ -403,9 +395,7 @@ class SerialConsumer(Consumer): self.log.info("Resetting queue tracking on dst side") dst_db = self.get_database(self.remote_db) dst_curs = dst_db.cursor() - - q = "delete from %s where consumer_id = %%s" % ( - self.dst_completed_table,) + q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema dst_curs.execute(q, [self.consumer_id]) dst_db.commit() |