diff options
-rwxr-xr-x | scripts/cube_dispatcher.py | 21 |
1 files changed, 12 insertions, 9 deletions
diff --git a/scripts/cube_dispatcher.py b/scripts/cube_dispatcher.py index 912dbe73..3ca29058 100755 --- a/scripts/cube_dispatcher.py +++ b/scripts/cube_dispatcher.py @@ -45,10 +45,6 @@ class CubeDispatcher(pgq.SerialConsumer): date_str = self.get_part_date(batch_id) self.dispatch(dst_db, ev_list, self.get_part_date(batch_id)) - # tag as done - for ev in ev_list: - ev.tag_done() - def dispatch(self, dst_db, ev_list, date_str): """Actual event processing.""" @@ -61,11 +57,13 @@ class CubeDispatcher(pgq.SerialConsumer): else: tbl = ev.extra1 + sql = self.make_sql(tbl, ev) + sql_list.append(sql) + if not tbl in tables: tables[tbl] = self.get_table_info(ev, tbl) - sql = self.make_sql(tbl, ev) - sql_list.append(sql) + ev.tag_done() # create tables if needed self.check_tables(dst_db, tables) @@ -86,7 +84,7 @@ class CubeDispatcher(pgq.SerialConsumer): inf = { 'parent': ev.extra1, 'table': tbl, - 'key_list': ev.extra2, + 'key_list': ev.key_list, } return inf @@ -97,8 +95,13 @@ class CubeDispatcher(pgq.SerialConsumer): data = skytools.db_urldecode(ev.data) # parse tbl info - op = ev.type - key_list = ev.extra2.split(',') + if ev.type.find(':') > 0: + op, keys = ev.type.split(':') + else: + op = ev.type + keys = ev.extra2 + ev.key_list = keys + key_list = keys.split(',') if self.keep_latest and len(key_list) == 0: raise Exception('No pkey on table %s' % tbl) |