diff options
Diffstat (limited to 'python')
| -rwxr-xr-x | python/londiste.py | 17 | ||||
| -rw-r--r-- | python/londiste/playback.py | 128 | ||||
| -rw-r--r-- | python/londiste/setup.py | 2 | ||||
| -rw-r--r-- | python/londiste/table_copy.py | 9 | ||||
| -rw-r--r-- | python/pgq/rawconsumer.py | 4 | ||||
| -rw-r--r-- | python/pgq/setadmin.py | 32 | ||||
| -rw-r--r-- | python/pgq/setconsumer.py | 160 | ||||
| -rw-r--r-- | python/skytools/scripting.py | 2 | ||||
| -rw-r--r-- | python/skytools/sqltools.py | 5 |
9 files changed, 215 insertions, 144 deletions
diff --git a/python/londiste.py b/python/londiste.py index 82e49281..e2ac37df 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -61,6 +61,20 @@ class NodeSetup(pgq.setadmin.SetAdmin): extra_objs = [ skytools.DBSchema("londiste", sql_file="londiste.sql") ] def __init__(self, args): pgq.setadmin.SetAdmin.__init__(self, 'londiste', args) + def extra_init(self, node_type, node_db, provider_db): + if not provider_db: + return + pcurs = provider_db.cursor() + ncurs = node_db.cursor() + q = "select table_name from londiste.set_get_table_list(%s)" + pcurs.execute(q, [self.set_name]) + for row in pcurs.fetchall(): + tbl = row['table_name'] + q = "select * from londiste.set_add_table(%s, %s)" + ncurs.execute(q, [self.set_name, tbl]) + node_db.commit() + provider_db.commit() + cmd_handlers = ( (('init-root', 'init-branch', 'init-leaf', 'members', 'tag-dead', 'tag-alive', @@ -70,6 +84,7 @@ cmd_handlers = ( 'missing', 'resync', 'check', 'fkeys'), londiste.LondisteSetup), (('compare',), londiste.Comparator), (('repair',), londiste.Repairer), + (('copy',), londiste.CopyTable), ) class Londiste(skytools.DBScript): @@ -105,6 +120,8 @@ class Londiste(skytools.DBScript): help = "add: no copy needed", default=False) g.add_option("--skip-truncate", action="store_true", dest="skip_truncate", help = "add: keep old data", default=False) + g.add_option("--provider", + help = "init: upstream node temp connect string", default=None) p.add_option_group(g) return p diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 34513c4a..67b40951 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -263,18 +263,18 @@ class Replicator(pgq.SetConsumer): self.copy_thread = 0 self.seq_cache = SeqCache() - def process_set_batch(self, src_db, dst_db, ev_list, copy_queue): + def process_set_batch(self, src_db, dst_db, ev_list): "All work for a batch. Entry point from SetConsumer." # this part can play freely with transactions dst_curs = dst_db.cursor() - self.cur_tick = self.cur_batch_info['tick_id'] - self.prev_tick = self.cur_batch_info['prev_tick_id'] + self.cur_tick = self.src_queue.cur_tick + self.prev_tick = self.src_queue.prev_tick self.load_table_state(dst_curs) - self.sync_tables(dst_db) + self.sync_tables(src_db, dst_db) self.copy_snapshot_cleanup(dst_db) @@ -292,13 +292,14 @@ class Replicator(pgq.SetConsumer): self.handle_seqs(dst_curs) self.sql_list = [] - SetConsumer.process_set_batch(self, src_db, dst_db, ev_list, copy_queue) + pgq.SetConsumer.process_set_batch(self, src_db, dst_db, ev_list) self.flush_sql(dst_curs) # finalize table changes self.save_table_state(dst_curs) def handle_seqs(self, dst_curs): + return # FIXME if self.copy_thread: return @@ -313,7 +314,7 @@ class Replicator(pgq.SetConsumer): src_curs = self.get_database('provider_db').cursor() self.seq_cache.resync(src_curs, dst_curs) - def sync_tables(self, dst_db): + def sync_tables(self, src_db, dst_db): """Table sync loop. Calls appropriate handles, which is expected to @@ -323,13 +324,14 @@ class Replicator(pgq.SetConsumer): while 1: cnt = Counter(self.table_list) if self.copy_thread: - res = self.sync_from_copy_thread(cnt, dst_db) + res = self.sync_from_copy_thread(cnt, src_db, dst_db) else: - res = self.sync_from_main_thread(cnt, dst_db) + res = self.sync_from_main_thread(cnt, src_db, dst_db) if res == SYNC_EXIT: self.log.debug('Sync tables: exit') - self.detach() + self.unregister_consumer(src_db.cursor()) + src_db.commit() sys.exit(0) elif res == SYNC_OK: return @@ -342,7 +344,7 @@ class Replicator(pgq.SetConsumer): self.load_table_state(dst_db.cursor()) dst_db.commit() - def sync_from_main_thread(self, cnt, dst_db): + def sync_from_main_thread(self, cnt, src_db, dst_db): "Main thread sync logic." # @@ -386,7 +388,7 @@ class Replicator(pgq.SetConsumer): # seems everything is in sync return SYNC_OK - def sync_from_copy_thread(self, cnt, dst_db): + def sync_from_copy_thread(self, cnt, src_db, dst_db): "Copy thread sync logic." # @@ -419,7 +421,7 @@ class Replicator(pgq.SetConsumer): elif cnt.copy: # table is not copied yet, do it t = self.get_table_by_state(TABLE_IN_COPY) - self.do_copy(t) + self.do_copy(t, src_db, dst_db) # forget previous value self.work_state = 1 @@ -431,21 +433,27 @@ class Replicator(pgq.SetConsumer): def process_set_event(self, dst_curs, ev): """handle one event""" - if not self.interesting(ev): - self.stat_inc('ignored_events') - ev.tag_done() - elif ev.type in ('I', 'U', 'D'): + self.log.debug("New event: id=%s / type=%s / data=%s / extra1=%s" % (ev.id, ev.type, ev.data, ev.extra1)) + if ev.type in ('I', 'U', 'D'): self.handle_data_event(ev, dst_curs) + elif ev.type == 'add-table': + self.add_set_table(dst_curs, ev.data) + elif ev.type == 'remove-table': + self.remove_set_table(dst_curs, ev.data) else: - self.handle_system_event(ev, dst_curs) + pgq.SetConsumer.process_set_event(self, dst_curs, ev) def handle_data_event(self, ev, dst_curs): - # buffer SQL statements, then send them together - fmt = self.sql_command[ev.type] - sql = fmt % (ev.extra1, ev.data) - self.sql_list.append(sql) - if len(self.sql_list) > 200: - self.flush_sql(dst_curs) + t = self.get_table_by_name(ev.extra1) + if t and t.interesting(ev, self.cur_tick, self.copy_thread): + # buffer SQL statements, then send them together + fmt = self.sql_command[ev.type] + sql = fmt % (ev.extra1, ev.data) + self.sql_list.append(sql) + if len(self.sql_list) > 200: + self.flush_sql(dst_curs) + else: + self.stat_inc('ignored_events') ev.tag_done() def flush_sql(self, dst_curs): @@ -461,44 +469,24 @@ class Replicator(pgq.SetConsumer): def interesting(self, ev): if ev.type not in ('I', 'U', 'D'): - return 1 + raise Exception('bug - bad event type in .interesting') t = self.get_table_by_name(ev.extra1) if t: return t.interesting(ev, self.cur_tick, self.copy_thread) else: return 0 - def handle_system_event(self, ev, dst_curs): - "System event." - - if ev.type == "T": - self.log.info("got new table event: "+ev.data) - # check tables to be dropped - name_list = [] - for name in ev.data.split(','): - name_list.append(name.strip()) - - del_list = [] - for tbl in self.table_list: - if tbl.name in name_list: - continue - del_list.append(tbl) + def add_set_table(self, dst_curs, tbl): + q = "select londiste.set_add_table(%s, %s)" + dst_curs.execute(q, [self.set_name, tbl]) - # separate loop to avoid changing while iterating - for tbl in del_list: - self.log.info("Removing table %s from set" % tbl.name) - self.remove_table(tbl, dst_curs) - - ev.tag_done() - else: - self.log.warning("Unknows op %s" % ev.type) - ev.tag_failed("Unknown operation") - - def remove_table(self, tbl, dst_curs): - del self.table_map[tbl.name] - self.table_list.remove(tbl) - q = "select londiste.subscriber_remove_table(%s, %s)" - dst_curs.execute(q, [self.pgq_queue_name, tbl.name]) + def remove_set_table(self, dst_curs, tbl): + if tbl in self.table_map: + t = self.table_map[tbl] + del self.table_map[tbl] + self.table_list.remove(t) + q = "select londiste.set_remove_table(%s, %s)" + dst_curs.execute(q, [self.set_name, tbl]) def load_table_state(self, curs): """Load table state from database. @@ -507,9 +495,9 @@ class Replicator(pgq.SetConsumer): to load state on every batch. """ - q = "select table_name, snapshot, merge_state, skip_truncate"\ - " from londiste.subscriber_get_table_list(%s)" - curs.execute(q, [self.pgq_queue_name]) + q = "select table_name, custom_snapshot, merge_state, skip_truncate"\ + " from londiste.node_get_table_list(%s)" + curs.execute(q, [self.set_name]) new_list = [] new_map = {} @@ -517,7 +505,7 @@ class Replicator(pgq.SetConsumer): t = self.get_table_by_name(row['table_name']) if not t: t = TableState(row['table_name'], self.log) - t.loaded_state(row['merge_state'], row['snapshot'], row['skip_truncate']) + t.loaded_state(row['merge_state'], row['custom_snapshot'], row['skip_truncate']) new_list.append(t) new_map[t.name] = t @@ -534,8 +522,8 @@ class Replicator(pgq.SetConsumer): merge_state = t.render_state() self.log.info("storing state of %s: copy:%d new_state:%s" % ( t.name, self.copy_thread, merge_state)) - q = "select londiste.subscriber_set_table_state(%s, %s, %s, %s)" - curs.execute(q, [self.pgq_queue_name, + q = "select londiste.node_set_table_state(%s, %s, %s, %s)" + curs.execute(q, [self.set_name, t.name, t.str_snapshot, merge_state]) t.changed = 0 got_changes = 1 @@ -563,18 +551,6 @@ class Replicator(pgq.SetConsumer): return self.table_map[name] return None - def fill_mirror_queue(self, ev_list, dst_curs): - # insert events - rows = [] - fields = ['ev_type', 'ev_data', 'ev_extra1'] - for ev in mirror_list: - rows.append((ev.type, ev.data, ev.extra1)) - pgq.bulk_insert_events(dst_curs, rows, fields, self.mirror_queue) - - # create tick - q = "select pgq.ticker(%s, %s)" - dst_curs.execute(q, [self.mirror_queue, self.cur_tick]) - def launch_copy(self, tbl_stat): self.log.info("Launching copy process") script = sys.argv[0] @@ -631,12 +607,12 @@ class Replicator(pgq.SetConsumer): """Restore fkeys that have both tables on sync.""" dst_curs = dst_db.cursor() # restore fkeys -- one at a time - q = "select * from londiste.subscriber_get_queue_valid_pending_fkeys(%s)" - dst_curs.execute(q, [self.pgq_queue_name]) + q = "select * from londiste.node_get_valid_pending_fkeys(%s)" + dst_curs.execute(q, [self.set_name]) list = dst_curs.dictfetchall() for row in list: self.log.info('Creating fkey: %(fkey_name)s (%(from_table)s --> %(to_table)s)' % row) - q2 = "select londiste.subscriber_restore_table_fkey(%(from_table)s, %(fkey_name)s)" + q2 = "select londiste.restore_table_fkey(%(from_table)s, %(fkey_name)s)" dst_curs.execute(q2, row) dst_db.commit() @@ -649,7 +625,7 @@ class Replicator(pgq.SetConsumer): list = dst_curs.dictfetchall() for row in list: self.log.info('Dropping fkey: %s' % row['fkey_name']) - q2 = "select londiste.subscriber_drop_table_fkey(%(from_table)s, %(fkey_name)s)" + q2 = "select londiste.drop_table_fkey(%(from_table)s, %(fkey_name)s)" dst_curs.execute(q2, row) dst_db.commit() diff --git a/python/londiste/setup.py b/python/londiste/setup.py index ef3acd02..15991311 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -93,6 +93,8 @@ class LondisteSetup(skytools.DBScript): help="force", default=False) p.add_option("--all", action="store_true", help="include all tables", default=False) + p.add_option("--provider", + help="init: upstream node temp connect string", default=None) return p def exec_checked(self, curs, sql, args): diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py index 5e21483a..c712d8bd 100644 --- a/python/londiste/table_copy.py +++ b/python/londiste/table_copy.py @@ -18,12 +18,11 @@ class CopyTable(Replicator): if copy_thread: self.pidfile += ".copy" - self.consumer_id += "_copy" + self.consumer_name += "_copy" self.copy_thread = 1 + self.main_worker = False - def do_copy(self, tbl_stat): - src_db = self.get_database('provider_db') - dst_db = self.get_database('subscriber_db') + def do_copy(self, tbl_stat, src_db, dst_db): # it should not matter to pgq src_db.commit() @@ -84,7 +83,7 @@ class CopyTable(Replicator): # to make state juggling faster. on mostly idle db-s # each step may take tickers idle_timeout secs, which is pain. q = "select pgq.force_tick(%s)" - src_curs.execute(q, [self.pgq_queue_name]) + src_curs.execute(q, [self.src_queue.queue_name]) src_db.commit() def real_copy(self, srccurs, dstcurs, tbl_stat): diff --git a/python/pgq/rawconsumer.py b/python/pgq/rawconsumer.py index c1df916d..1ab452fc 100644 --- a/python/pgq/rawconsumer.py +++ b/python/pgq/rawconsumer.py @@ -31,7 +31,7 @@ class RawQueue: return self.batch_id - def finish_batch(self, curs, batch_id): + def finish_batch(self, curs): q = "select * from pgq.finish_batch(%s)" curs.execute(q, [self.batch_id]) @@ -39,7 +39,7 @@ class RawQueue: return pgq.consumer._BatchWalker(curs, self.batch_id, self.queue_name) def bulk_insert(self, curs, ev): - row = map(ev.__getattribute__, self.bulk_insert_fields) + row = map(ev.__getattr__, self.bulk_insert_fields) self.bulk_insert_buf.append(row) if len(self.bulk_insert_buf) >= self.bulk_insert_size: self.finish_bulk_insert(curs) diff --git a/python/pgq/setadmin.py b/python/pgq/setadmin.py index a389f389..d1b10e33 100644 --- a/python/pgq/setadmin.py +++ b/python/pgq/setadmin.py @@ -40,9 +40,9 @@ class SetAdmin(skytools.DBScript): g = optparse.OptionGroup(p, "actual setadm options") g.add_option("--connstr", action="store_true", - help = "add: ignore table differences, repair: ignore lag") + help = "initial connect string") g.add_option("--provider", - help = "add: ignore table differences, repair: ignore lag") + help = "init: connect string for provider") p.add_option_group(g) return p @@ -71,6 +71,8 @@ class SetAdmin(skytools.DBScript): return True def init_node(self, node_type, node_name, node_location): + provider_loc = self.options.provider + # connect to database db = self.get_database("new_node", connstr = node_location) @@ -98,8 +100,9 @@ class SetAdmin(skytools.DBScript): [self.set_name, node_name, node_location]) self.exec_sql(db, "select pgq_set.create_node(%s, %s, %s, %s, %s, %s, %s)", [self.set_name, node_type, node_name, worker_name, provider_name, global_watermark, combined_set]) + provider_db = None else: - root_db = self.find_root_db() + root_db = self.find_root_db(provider_loc) set = self.load_root_info(root_db) # check if member already exists @@ -109,7 +112,15 @@ class SetAdmin(skytools.DBScript): global_watermark = set.global_watermark combined_set = None - provider_name = self.options.provider + + provider_db = self.get_database('provider_db', connstr = provider_loc) + curs = provider_db.cursor() + curs.execute("select node_type, node_name from pgq_set.get_node_info(%s)", [self.set_name]) + provider_db.commit() + row = curs.fetchone() + if not row: + raise Exceotion("provider node not found") + provider_name = row['node_name'] # register member on root self.exec_sql(root_db, "select pgq_set.add_member(%s, %s, %s, false)", @@ -123,7 +134,6 @@ class SetAdmin(skytools.DBScript): sys.exit(1) # register on provider - provider_db = self.get_database('provider_db', connstr = provider.location) self.exec_sql(provider_db, "select pgq_set.add_member(%s, %s, %s, false)", [self.set_name, node_name, node_location]) self.exec_sql(provider_db, "select pgq_set.subscribe_node(%s, %s, %s)", @@ -140,10 +150,18 @@ class SetAdmin(skytools.DBScript): global_watermark, combined_set]) db.commit() + self.extra_init(node_type, db, provider_db) + self.log.info("Done") - def find_root_db(self): - loc = self.cf.get(self.initial_db_name) + def extra_init(self, node_type, node_db, provider_db): + pass + + def find_root_db(self, initial_loc): + if initial_loc: + loc = initial_loc + else: + loc = self.cf.get(self.initial_db_name) while 1: db = self.get_database('root_db', connstr = loc) diff --git a/python/pgq/setconsumer.py b/python/pgq/setconsumer.py index 6e01ccf6..3b87cbfd 100644 --- a/python/pgq/setconsumer.py +++ b/python/pgq/setconsumer.py @@ -20,8 +20,9 @@ class MemberInfo: self.dead = row['dead'] class NodeInfo: - def __init__(self, row, member_list): + def __init__(self, row, member_list, main_worker = True): self.member_map = {} + self.main_worker = main_worker for r in member_list: m = MemberInfo(r) self.member_map[m.name] = m @@ -43,6 +44,9 @@ class NodeInfo: self.worker_name = row['worker_name'] def need_action(self, action_name): + if not self.main_worker: + return action_name in ('process-batch', 'process-events') + typ = self.type if type == 'merge-leaf': if self.target_type == 'combined-branch': @@ -69,12 +73,13 @@ class NodeInfo: action_map = { 'process-batch': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1}, -'process-events': {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0}, -'copy-events': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, +'process-events': {'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0}, +'copy-events': {'root':0, 'branch':1, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, 'tick-event': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':0}, 'global-wm-event': {'root':1, 'branch':0, 'leaf':0, 'combined-root':1, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, 'wait-behind': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':0, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':1}, 'sync-part-pos': {'root':0, 'branch':0, 'leaf':0, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':0, 'merge-leaf-to-branch':0}, +'local-wm-publish':{'root':0, 'branch':1, 'leaf':1, 'combined-root':0, 'combined-branch':1, 'merge-leaf-to-root':1, 'merge-leaf-to-branch':1}, } node_properties = { @@ -83,84 +88,101 @@ node_properties = { } class SetConsumer(skytools.DBScript): - last_global_wm_event = 0 + last_local_wm_publish_time = 0 + last_global_wm_publish_time = 0 + main_worker = True + reg_ok = False + def __init__(self, service_name, args, + node_db_name = 'node_db'): + skytools.DBScript.__init__(self, service_name, args) + self.node_db_name = node_db_name + self.consumer_name = self.cf.get('consumer_name', self.job_name) + def work(self): self.tick_id_cache = {} self.set_name = self.cf.get('set_name') - target_db = self.get_database('subscriber_db') - - node = self.load_node_info(target_db) - self.consumer_name = node.worker_name + dst_db = self.get_database(self.node_db_name) + dst_curs = dst_db.cursor() - if not node.up_to_date: - self.tag_node_uptodate(target_db) + dst_node = self.load_node_info(dst_db) + if self.main_worker: + self.consumer_name = dst_node.worker_name + if not dst_node.up_to_date: + self.tag_node_uptodate(dst_db) - if node.paused: + if dst_node.paused: return 0 - if node.need_action('global-wm-event'): - curs = target_db.cursor() - self.set_global_watermark(curs, node.local_watermark) - target_db.commit() + if dst_node.need_action('global-wm-event'): + self.publish_global_watermark(dst_db, dst_node.local_watermark) - if not node.need_action('process-batch'): + if not dst_node.need_action('process-batch'): return 0 # # batch processing follows # - source_db = self.get_database('source_db', connstr = node.provider_location) - srcnode = self.load_node_info(source_db) + src_db = self.get_database('src_db', connstr = dst_node.provider_location) + src_curs = src_db.cursor() + src_node = self.load_node_info(src_db) # get batch - srcqueue = RawQueue(srcnode.queue_name, self.consumer_name) + src_queue = RawQueue(src_node.queue_name, self.consumer_name) + self.src_queue = src_queue + self.dst_queue = None + + if not self.main_worker and not self.reg_ok: + self.register_consumer(src_curs) - batch_id = srcqueue.next_batch(source_db.cursor()) - source_db.commit() + batch_id = src_queue.next_batch(src_curs) + src_db.commit() if batch_id is None: return 0 - if node.need_action('wait-behind'): - if node.should_wait(queue.cur_tick): + self.log.debug("New batch: tick_id=%d / batch_id=%d" % (src_queue.cur_tick, batch_id)) + + if dst_node.need_action('wait-behind'): + if dst_node.should_wait(src_queue.cur_tick): return 0 - if node.need_action('process-event'): + if dst_node.need_action('process-events'): # load and process batch data - ev_list = self.get_batch_events(source_db, batch_id) + ev_list = src_queue.get_batch_events(src_curs) - copy_queue = None - if node.need_action('copy-events'): - copy_queue = node.get_target_queue() - self.process_set_batch(target_db, ev_list, copy_queue) - if copy_queue: - copy_queue.finish_bulk_insert(curs) - self.copy_tick(target_curs, srcqueue, copy_queue) + if dst_node.need_action('copy-events'): + self.dst_queue = RawQueue(dst_node.get_target_queue(), self.consumer_name) + self.process_set_batch(src_db, dst_db, ev_list) + if self.dst_queue: + self.dst_queue.finish_bulk_insert(dst_curs) + self.copy_tick(dst_curs, src_queue, self.dst_queue) # COMBINED_BRANCH needs to sync with part sets - if node.need_action('sync-part-pos'): - self.move_part_positions(target_curs) + if dst_node.need_action('sync-part-pos'): + self.move_part_positions(dst_curs) # we are done on target - self.set_tick_complete(target_curs) - target_db.commit() + self.set_tick_complete(dst_curs, src_queue.cur_tick) + dst_db.commit() # done on source - self.finish_batch(source_db, batch_id) + src_queue.finish_batch(src_curs) + src_db.commit() # occasinally send watermark upwards - self.send_local_watermark_upwards(target_db, source_db) + if dst_node.need_action('local-wm-publish'): + self.send_local_watermark_upwards(src_db, dst_node) # got a batch so there can be more return 1 - def process_set_batch(self, src_db, dst_db, ev_list, copy_queue = None): + def process_set_batch(self, src_db, dst_db, ev_list): dst_curs = dst_db.cursor() for ev in ev_list: self.process_set_event(dst_curs, ev) - if copy_queue: - copy_queue.bulk_insert(dst_curs, ev) + if self.dst_queue: + self.dst_queue.bulk_insert(dst_curs, ev) self.stat_add('count', len(ev_list)) def process_set_event(self, dst_curs, ev): @@ -203,21 +225,35 @@ class SetConsumer(skytools.DBScript): q = "select * from pgq_set.add_member(%s, %s, %s, %s)" dst_curs.execute(q, [set_name, node_name, node_location, dead]) - def send_local_watermark_upwards(self, target_db, source_db): - target_curs = target_db.cursor() - source_curs = source_db.cursor() - q = "select pgq_ext.get_local_watermark(%s)" - target_curs.execute(q, [self.set_name]) - wm = target_curs.fetchone()[0] - target_db.commit() - - q = "select pgq_ext.set_subscriber_watermark(%s, %s, %s)" - source_curs.execute(q, [self.set_name]) + def send_local_watermark_upwards(self, src_db, node): + # fixme - delay + now = time.time() + delay = now - self.last_local_wm_publish_time + if delay < 1*60: + return + self.last_local_wm_publish_time = now + + self.log.debug("send_local_watermark_upwards") + src_curs = src_db.cursor() + q = "select pgq_set.set_subscriber_watermark(%s, %s, %s)" + src_curs.execute(q, [self.set_name, node.name, node.local_watermark]) + src_db.commit() def set_global_watermark(self, dst_curs, tick_id): + self.log.debug("set_global_watermark: %s" % tick_id) q = "select pgq_set.set_global_watermark(%s, %s)" dst_curs.execute(q, [self.set_name, tick_id]) + def publish_global_watermark(self, dst_db, watermark): + now = time.time() + delay = now - self.last_global_wm_publish_time + if delay < 1*60: + return + self.last_global_wm_publish_time = now + + self.set_global_watermark(dst_db.cursor(), watermark) + dst_db.commit() + def load_node_info(self, db): curs = db.cursor() @@ -232,10 +268,10 @@ class SetConsumer(skytools.DBScript): mbr_list = curs.dictfetchall() db.commit() - return NodeInfo(node_row, mbr_list) + return NodeInfo(node_row, mbr_list, self.main_worker) def tag_node_uptodate(self, dst_db): - dst_curs = db.cursor() + dst_curs = dst_db.cursor() q = "select * from pgq_set.set_node_uptodate(%s, true)" dst_curs.execute(q, [self.set_name]) dst_db.commit() @@ -244,6 +280,24 @@ class SetConsumer(skytools.DBScript): q = "select * from pgq.ticker(%s, %s)" dst_curs.execute(q, [dst_queue.queue_name, src_queue.cur_tick]) + def set_tick_complete(self, dst_curs, tick_id): + q = "select * from pgq_set.set_completed_tick(%s, %s, %s)" + dst_curs.execute(q, [self.set_name, self.consumer_name, tick_id]) + + def register_consumer(self, src_curs): + if self.main_worker: + raise Exception('main set worker should not play with registrations') + + q = "select * from pgq.register_consumer(%s, %s)" + src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name]) + + def unregister_consumer(self, src_curs): + if self.main_worker: + raise Exception('main set worker should not play with registrations') + + q = "select * from pgq.unregister_consumer(%s, %s)" + src_curs.execute(q, [self.src_queue.queue_name, self.consumer_name]) + if __name__ == '__main__': script = SetConsumer('setconsumer', sys.argv[1:]) script.start() diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index cb6ee802..3a3556a2 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -510,6 +510,8 @@ class DBScript(object): if self.looping and not self.do_single_loop: time.sleep(20) return 1 + else: + sys.exit(1) def work(self): """Here should user's processing happen. diff --git a/python/skytools/sqltools.py b/python/skytools/sqltools.py index 726f94f7..566576eb 100644 --- a/python/skytools/sqltools.py +++ b/python/skytools/sqltools.py @@ -330,7 +330,10 @@ class DBObject(object): sql = open(fn, "r").read() else: raise Exception('object not defined') - curs.execute(sql) + for stmt in skytools.parse_statements(sql): + if log: + log.debug(repr(stmt)) + curs.execute(stmt) def find_file(self): full_fn = None |
