summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xscripts/cube_dispatcher.py21
1 files changed, 12 insertions, 9 deletions
diff --git a/scripts/cube_dispatcher.py b/scripts/cube_dispatcher.py
index 912dbe73..3ca29058 100755
--- a/scripts/cube_dispatcher.py
+++ b/scripts/cube_dispatcher.py
@@ -45,10 +45,6 @@ class CubeDispatcher(pgq.SerialConsumer):
date_str = self.get_part_date(batch_id)
self.dispatch(dst_db, ev_list, self.get_part_date(batch_id))
- # tag as done
- for ev in ev_list:
- ev.tag_done()
-
def dispatch(self, dst_db, ev_list, date_str):
"""Actual event processing."""
@@ -61,11 +57,13 @@ class CubeDispatcher(pgq.SerialConsumer):
else:
tbl = ev.extra1
+ sql = self.make_sql(tbl, ev)
+ sql_list.append(sql)
+
if not tbl in tables:
tables[tbl] = self.get_table_info(ev, tbl)
- sql = self.make_sql(tbl, ev)
- sql_list.append(sql)
+ ev.tag_done()
# create tables if needed
self.check_tables(dst_db, tables)
@@ -86,7 +84,7 @@ class CubeDispatcher(pgq.SerialConsumer):
inf = {
'parent': ev.extra1,
'table': tbl,
- 'key_list': ev.extra2,
+ 'key_list': ev.key_list,
}
return inf
@@ -97,8 +95,13 @@ class CubeDispatcher(pgq.SerialConsumer):
data = skytools.db_urldecode(ev.data)
# parse tbl info
- op = ev.type
- key_list = ev.extra2.split(',')
+ if ev.type.find(':') > 0:
+ op, keys = ev.type.split(':')
+ else:
+ op = ev.type
+ keys = ev.extra2
+ ev.key_list = keys
+ key_list = keys.split(',')
if self.keep_latest and len(key_list) == 0:
raise Exception('No pkey on table %s' % tbl)