diff options
| author | Marko Kreen | 2007-07-20 16:39:53 +0000 |
|---|---|---|
| committer | Marko Kreen | 2007-07-20 16:39:53 +0000 |
| commit | d6cb4b1d7ce9f419fb10247099c41d1d254764e0 (patch) | |
| tree | 15786c5173c0d027977a6034c5d8faf6e37aba26 | |
| parent | 7d06bf4ae9bbdbd3a8f18af07fb6214949807f95 (diff) | |
pgq.SerialConsumer: move tick queries to separate functions to allow overriding them if needed
| -rw-r--r-- | python/pgq/consumer.py | 43 |
1 files changed, 18 insertions, 25 deletions
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 4b9c221b..0cd55d1d 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -319,14 +319,10 @@ class SerialConsumer(Consumer): prev_tick = self.cur_batch_info['prev_tick_id'] - q = "select %s.get_last_tick(%%s)" % self.dst_schema - dst_curs.execute(q, [self.consumer_id]) - res = dst_curs.fetchone() - - if not res or not res[0]: + dst_tick = self.get_last_tick(dst_curs) + if not dst_tick: # seems this consumer has not run yet against dst_db return False - dst_tick = res[0] if prev_tick == dst_tick: # on track @@ -346,27 +342,18 @@ class SerialConsumer(Consumer): in external database. """ tick_id = self.cur_batch_info['tick_id'] - q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema - dst_curs.execute(q, [ self.consumer_id, tick_id ]) + self.set_last_tick(dst_curs, tick_id) def attach(self): new = Consumer.attach(self) if new: - self.clean_completed_tick() + self.dst_reset() def detach(self): """If detaching, also clean completed tick table on dest.""" Consumer.detach(self) - self.clean_completed_tick() - - def clean_completed_tick(self): - self.log.info("removing completed tick from dst") - dst_db = self.get_database(self.remote_db) - dst_curs = dst_db.cursor() - q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema - dst_curs.execute(q, [self.consumer_id]) - dst_db.commit() + self.dst_reset() def process_remote_batch(self, db, batch_id, event_list, dst_db): raise Exception('process_remote_batch not implemented') @@ -378,11 +365,8 @@ class SerialConsumer(Consumer): src_curs = src_db.cursor() dst_curs = dst_db.cursor() - q = "select %s.get_last_tick(%%s)" % self.dst_schema - dst_curs.execute(q, [self.consumer_id]) - row = dst_curs.fetchone() - if row: - dst_tick = row[0] + dst_tick = self.get_last_tick(dst_curs) + if dst_tick: q = "select pgq.register_consumer_at(%s, %s, %s)" src_curs.execute(q, [self.pgq_queue_name, self.consumer_id, dst_tick]) else: @@ -395,8 +379,17 @@ class SerialConsumer(Consumer): self.log.info("Resetting queue tracking on dst side") dst_db = self.get_database(self.remote_db) dst_curs = dst_db.cursor() - q = "select %s.set_last_tick(%%s, NULL)" % self.dst_schema - dst_curs.execute(q, [self.consumer_id]) + self.set_last_tick(dst_curs, None) dst_db.commit() + def get_last_tick(self, dst_curs): + q = "select %s.get_last_tick(%%s)" % self.dst_schema + dst_curs.execute(q, [self.consumer_id]) + res = dst_curs.fetchone() + return res[0] + + def set_last_tick(self, dst_curs, tick_id): + q = "select %s.set_last_tick(%%s, %%s)" % self.dst_schema + dst_curs.execute(q, [ self.consumer_id, tick_id ]) + |
