diff options
author | Marko Kreen | 2012-10-05 13:18:05 +0000 |
---|---|---|
committer | Marko Kreen | 2012-10-05 13:18:05 +0000 |
commit | 4a0ad470d63de122c0f1d8111e07f5005442b988 (patch) | |
tree | 62b4fbf64ce42c65c5a718873827741729d23b4d | |
parent | e556c8359ed87f611c7aed1d653e4c9fcbe23cc0 (diff) |
Remove bublin.py - its unmaintained
It was supposed to support more elastic clustering,
but the effort has stalled, and the code is unfinished.
Remove it as it makes maintenance harder.
-rw-r--r-- | python/londiste/bublin.py | 83 |
1 files changed, 0 insertions, 83 deletions
diff --git a/python/londiste/bublin.py b/python/londiste/bublin.py deleted file mode 100644 index 78e2ae5c..00000000 --- a/python/londiste/bublin.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -Experimental event filtering by hash. -""" - -import skytools -from londiste.handler import BaseHandler - -__all__ = ['Bublin'] - -class Bublin(BaseHandler): - handler_name = 'bublin' - - bubbles_max_slot = None # NUM_SLOTS - 1 (NUM_SLOTS -> power of 2) - bubbles_local_slots = None # dict with local slot numbers - - def __init__(self, name, next, args, log): - BaseHandler.__init__(self, name, next, args, log) - self.key = args[0] - - def reset(self): - """Forget config info.""" - if Bublin.bubbles_max_slot: - Bublin.bubbles_max_slot = None - if Bublin.bubbles_local_slots: - Bublin.bubbles_local_slots = None - BaseHandler.reset(self) - - def add(self, trigger_arg_list): - """Let trigger put hash into extra3""" - - arg = "ev_extra3='hash='||hashtext(%s)" % skytools.quote_ident(self.key) - trigger_arg_list.append(arg) - - BaseHandler.add(self, trigger_arg_list) - - def prepare_batch(self, batch_info, dst_curs): - """Called on first event for this table in current batch.""" - if not self.bubbles_max_slot: - self.load_bubbles(dst_curs) - BaseHandler.prepare_batch(self, batch_info, dst_curs) - - def process_event(self, ev, sql_queue_func, arg): - """Filter event by hash in extra3, apply only local slots.""" - if ev.extra3: - meta = skytools.db_urldecode(ev.extra3) - slot = int(meta['hash']) & self.bubbles_max_slot - if slot not in self.bubbles_local_slots: - return - BaseHandler.process_event(self, ev, sql_queue_func, arg) - - def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list): - """Copy only slots needed locally.""" - self.load_bubbles(dst_curs) - - slist = self.bubbles_local_slots.keys() - fn = 'hashtext(%s)' % skytools.quote_ident(self.key) - w = "(((%s) & %d) in (%s))" % (fn, self.bubbles_max_slot, slist) - cond_list.append(w) - - return BaseHandler.real_copy(self, tablename, src_curs, dst_curs, - column_list, cond_list) - - def load_bubbles(self, curs): - """Load slot info from database.""" - - q = "select c.max_slot, m.slot_nr from partconf.slot_map m, partconf.conf c"\ - " where c.part_nr = m.part_nr" - curs.execute(q) - max_slot = 0 - slot_map = {} - for row in curs.fetchall(): - if not max_slot: - max_slot = row['max_slot'] - snr = row['slot_nr'] - slot_map[snr] = 1 - self.bubbles_max_slot = max_slot - self.bubbles_local_slots = slot_map - if not max_slot: - raise Exception("Bubble broke - invalid max_slot") - -# register handler class -__londiste_handlers__ = [Bublin] - |