diff options
| author | Marko Kreen | 2012-11-22 18:21:20 +0000 |
|---|---|---|
| committer | Marko Kreen | 2012-11-26 06:56:34 +0000 |
| commit | 32208f0d51d998fbec56f32c777a6feef8f3486e (patch) | |
| tree | 27977e0581237d5fc9f3669bb5f47aa4c0f2bf2a /python/pgq/cascade | |
| parent | 22af8879b35279317e6a37f54eca190d9ba16895 (diff) | |
londiste resurrect: Get dead root back into cascade
In some cases the 'takeover --dead-root' will be used although
actual data on node is fine. Eg. network downtime, lost power.
It would be good to have a way to bring it back to cascade
without need for full rebuild.
This command sync queue contents and re-subscribes node.
Any events found on old root that are not spread to rest of cascade
are dumped and deleted. No actual table changes are made,
thus there must be external mechanism to avoid data conflicts.
Diffstat (limited to 'python/pgq/cascade')
| -rw-r--r-- | python/pgq/cascade/admin.py | 352 |
1 files changed, 349 insertions, 3 deletions
diff --git a/python/pgq/cascade/admin.py b/python/pgq/cascade/admin.py index 3779ada6..cc4d1265 100644 --- a/python/pgq/cascade/admin.py +++ b/python/pgq/cascade/admin.py @@ -11,13 +11,15 @@ setadm.py INI pause NODE [CONS] """ -import sys, time, optparse, skytools +import sys, time, optparse, skytools, os.path from skytools import UsageError, DBError from pgq.cascade.nodeinfo import * __all__ = ['CascadeAdmin'] +RESURRECT_DUMP_FILE = "resurrect-lost-events.json" + command_usage = """\ %prog [options] INI CMD [subcmd args] @@ -596,13 +598,24 @@ class CascadeAdmin(skytools.AdminScript): self.demote_node(old_node_name, 1, new_node_name) last_tick = self.demote_node(old_node_name, 2, new_node_name) self.wait_for_catchup(new_node_name, last_tick) + else: + q = "select * from pgq.get_queue_info(%s)" + db = self.get_node_database(new_node_name) + curs = db.cursor() + curs.execute(q, [self.queue_name]) + row = curs.fetchone() + last_tick = row['last_tick_id'] + db.commit() self.pause_node(new_node_name) self.promote_branch(new_node_name) if self.node_alive(old_node_name): - q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)' - self.node_cmd(new_node_name, q, [self.queue_name, old_node_name, old_info.worker_name, last_tick]) + old_worker_name = old_info.worker_name + else: + old_worker_name = self.failover_consumer_name(old_node_name) + q = 'select * from pgq_node.register_subscriber(%s, %s, %s, %s)' + self.node_cmd(new_node_name, q, [self.queue_name, old_node_name, old_worker_name, last_tick]) q = "select * from pgq_node.unregister_subscriber(%s, %s)" self.node_cmd(new_info.provider_node, q, [self.queue_name, new_node_name]) @@ -840,6 +853,339 @@ class CascadeAdmin(skytools.AdminScript): self.sleep(2) + def cmd_resurrect(self): + """Convert out-of-sync old root to branch and sync queue contents. + """ + self.load_local_info() + + db = self.get_database(self.initial_db_name) + curs = db.cursor() + + # stop if leaf + if self.queue_info.local_node.type == 'leaf': + self.log.info("Current node is leaf, nothing to do") + return + + # stop if dump file exists + if os.path.lexists(RESURRECT_DUMP_FILE): + self.log.error("Dump file exists, cannot perform resurrection: %s", RESURRECT_DUMP_FILE) + sys.exit(1) + + # + # Find failover position + # + + self.log.info("** Searching for gravestone **") + + # load subscribers + sub_list = [] + q = "select * from pgq_node.get_subscriber_info(%s)" + curs.execute(q, [self.queue_name]) + for row in curs.fetchall(): + sub_list.append(row['node_name']) + db.commit() + + # find backup subscription + this_node = self.queue_info.local_node.name + failover_cons = self.failover_consumer_name(this_node) + full_list = self.queue_info.member_map.keys() + done_nodes = { this_node: 1 } + prov_node = None + root_node = None + for node_name in sub_list + full_list: + if node_name in done_nodes: + continue + done_nodes[node_name] = 1 + if not self.node_alive(node_name): + self.log.info('Node %s is dead, skipping', node_name) + continue + self.log.info('Looking on node %s', node_name) + node_db = None + try: + node_db = self.get_node_database(node_name) + node_curs = node_db.cursor() + node_curs.execute("select * from pgq.get_consumer_info(%s, %s)", [self.queue_name, failover_cons]) + cons_rows = node_curs.fetchall() + node_curs.execute("select * from pgq_node.get_node_info(%s)", [self.queue_name]) + node_info = node_curs.fetchone() + node_db.commit() + if len(cons_rows) == 1: + if prov_node: + raise Exception('Unexcpeted situation: there are two gravestones - on nodes %s and %s' % (prov_node, node_name)) + prov_node = node_name + failover_tick = cons_rows[0]['last_tick'] + self.log.info("Found gravestone on node: %s", node_name) + if node_info['node_type'] == 'root': + self.log.info("Found new root node: %s", node_name) + root_node = node_name + self.close_node_database(node_name) + node_db = None + if root_node and prov_node: + break + except skytools.DBError: + self.log.warning("failed to check node %s", node_name) + if node_db: + self.close_node_database(node_name) + node_db = None + + if not root_node: + self.log.error("Cannot find new root node", failover_cons) + sys.exit(1) + if not prov_node: + self.log.error("Cannot find failover position (%s)", failover_cons) + sys.exit(1) + + # load worker state + q = "select * from pgq_node.get_worker_state(%s)" + rows = self.exec_cmd(db, q, [self.queue_name]) + state = rows[0] + + # demote & pause + self.log.info("** Demote & pause local node **") + if self.queue_info.local_node.type == 'root': + self.log.info('Node %s is root, demoting', this_node) + q = "select * from pgq_node.demote_root(%s, %s, %s)" + self.exec_cmd(db, q, [self.queue_name, 1, prov_node]) + self.exec_cmd(db, q, [self.queue_name, 2, prov_node]) + + # change node type and set worker paused in same TX + curs = db.cursor() + self.exec_cmd(curs, q, [self.queue_name, 3, prov_node]) + q = "select * from pgq_node.set_consumer_paused(%s, %s, true)" + self.exec_cmd(curs, q, [self.queue_name, state['worker_name']]) + db.commit() + elif not state['paused']: + # pause worker, don't wait for reaction, as it may be dead + self.log.info('Node %s is branch, pausing worker: %s', this_node, state['worker_name']) + q = "select * from pgq_node.set_consumer_paused(%s, %s, true)" + self.exec_cmd(db, q, [self.queue_name, state['worker_name']]) + else: + self.log.info('Node %s is branch and worker is paused', this_node) + + # + # Drop old consumers and subscribers + # + self.log.info("** Dropping old subscribers and consumers **") + + # unregister subscriber nodes + q = "select pgq_node.unregister_subscriber(%s, %s)" + for node_name in sub_list: + self.log.info("Dropping old subscriber node: %s", node_name) + curs.execute(q, [self.queue_name, node_name]) + + # unregister consumers + q = "select consumer_name from pgq.get_consumer_info(%s)" + curs.execute(q, [self.queue_name]) + for row in curs.fetchall(): + cname = row['consumer_name'] + if cname[0] == '.': + self.log.info("Keeping consumer: %s", cname) + continue + self.log.info("Dropping old consumer: %s", cname) + q = "pgq.unregister_consumer(%s, %s)" + curs.execute(q, [self.queue_name, cname]) + db.commit() + + # dump events + self.log.info("** Dump & delete lost events **") + stats = self.resurrect_process_lost_events(db, failover_tick) + + self.log.info("** Subscribing %s to %s **", this_node, prov_node) + + # set local position + self.log.info("Reset local completed pos") + q = "select * from pgq_node.set_consumer_completed(%s, %s, %s)" + self.exec_cmd(db, q, [self.queue_name, state['worker_name'], failover_tick]) + + # rename gravestone + self.log.info("Rename gravestone to worker: %s", state['worker_name']) + prov_db = self.get_node_database(prov_node) + prov_curs = prov_db.cursor() + q = "select * from pgq_node.unregister_subscriber(%s, %s)" + self.exec_cmd(prov_curs, q, [self.queue_name, this_node], quiet = True) + q = "select ret_code, ret_note, global_watermark"\ + " from pgq_node.register_subscriber(%s, %s, %s, %s)" + res = self.exec_cmd(prov_curs, q, [self.queue_name, this_node, state['worker_name'], failover_tick], quiet = True) + global_wm = res[0]['global_watermark'] + prov_db.commit() + + # import new global watermark + self.log.info("Reset global watermark") + q = "select * from pgq_node.set_global_watermark(%s, %s)" + self.exec_cmd(db, q, [self.queue_name, global_wm], quiet = True) + + # show stats + if stats: + self.log.info("** Statistics **") + klist = stats.keys() + klist.sort() + for k in klist: + v = stats[k] + self.log.info(" %s: %s", k, str(v)) + self.log.info("** Resurrection done, worker paused **") + + def resurrect_process_lost_events(self, db, failover_tick): + curs = db.cursor() + this_node = self.queue_info.local_node.name + cons_name = this_node + '.dumper' + + self.log.info("Dumping lost events") + + # register temp consumer on queue + q = "select pgq.register_consumer_at(%s, %s, %s)" + curs.execute(q, [self.queue_name, cons_name, failover_tick]) + db.commit() + + # process events as usual + total_count = 0 + final_tick_id = -1 + stats = {} + while 1: + q = "select * from pgq.next_batch_info(%s, %s)" + curs.execute(q, [self.queue_name, cons_name]) + b = curs.fetchone() + batch_id = b['batch_id'] + if batch_id is None: + break + final_tick_id = b['cur_tick_id'] + q = "select * from pgq.get_batch_events(%s)" + curs.execute(q, [batch_id]) + cnt = 0 + for ev in curs.fetchall(): + cnt += 1 + total_count += 1 + self.resurrect_dump_event(ev, stats, b) + + q = "select pgq.finish_batch(%s)" + curs.execute(q, [batch_id]) + if cnt > 0: + db.commit() + + stats['dumped_count'] = total_count + + self.resurrect_dump_finish() + + self.log.info("%s events dumped", total_count) + + # unregiser consumer + q = "select pgq.unregister_consumer(%s, %s)" + curs.execute(q, [self.queue_name, cons_name]) + db.commit() + + if failover_tick == final_tick_id: + self.log.info("No batches found") + return None + + # + # Delete the events from queue + # + # This is done snapshots, to make sure we delete only events + # that were dumped out previously. This uses the long-tx + # resustant logic described in pgq.batch_event_sql(). + # + + # find snapshots + q = "select t1.tick_snapshot as s1, t2.tick_snapshot as s2"\ + " from pgq.tick t1, pgq.tick t2"\ + " where t1.tick_id = %s"\ + " and t2.tick_id = %s" + curs.execute(q, [failover_tick, final_tick_id]) + ticks = curs.fetchone() + s1 = skytools.Snapshot(ticks['s1']) + s2 = skytools.Snapshot(ticks['s2']) + + xlist = [] + for tx in s1.txid_list: + if s2.contains(tx): + xlist.append(str(tx)) + + # create where clauses + W1 = None + if len(xlist) > 0: + W1 = "ev_txid in (%s)" % (",".join(xlist),) + W2 = "ev_txid >= %d AND ev_txid <= %d"\ + " and not txid_visible_in_snapshot(ev_txid, '%s')"\ + " and txid_visible_in_snapshot(ev_txid, '%s')" % ( + s1.xmax, s2.xmax, ticks['s1'], ticks['s2']) + + # loop over all queue data tables + q = "select * from pgq.queue where queue_name = %s" + curs.execute(q, [self.queue_name]) + row = curs.fetchone() + ntables = row['queue_ntables'] + tbl_pfx = row['queue_data_pfx'] + schema, table = tbl_pfx.split('.') + total_del_count = 0 + self.log.info("Deleting lost events") + for i in range(ntables): + del_count = 0 + self.log.debug("Deleting events from table %d" % i) + qtbl = "%s.%s" % (skytools.quote_ident(schema), + skytools.quote_ident(table + '_' + str(i))) + q = "delete from " + qtbl + " where " + if W1: + self.log.debug(q + W1) + curs.execute(q + W1) + if curs.rowcount and curs.rowcount > 0: + del_count += curs.rowcount + self.log.debug(q + W2) + curs.execute(q + W2) + if curs.rowcount and curs.rowcount > 0: + del_count += curs.rowcount + total_del_count += del_count + self.log.debug('%d events deleted', del_count) + self.log.info('%d events deleted', total_del_count) + stats['deleted_count'] = total_del_count + + # delete new ticks + q = "delete from pgq.tick t using pgq.queue q"\ + " where q.queue_name = %s"\ + " and t.tick_queue = q.queue_id"\ + " and t.tick_id > %s"\ + " and t.tick_id <= %s" + curs.execute(q, [self.queue_name, failover_tick, final_tick_id]) + self.log.info("%s ticks deleted", curs.rowcount) + + db.commit() + + return stats + + _json_dump_file = None + def resurrect_dump_event(self, ev, stats, batch_info): + if self._json_dump_file is None: + self._json_dump_file = open(RESURRECT_DUMP_FILE, 'w') + sep = '[' + else: + sep = ',' + + # create orinary dict to avoid problems with row class and datetime + d = { + 'ev_id': ev.ev_id, + 'ev_type': ev.ev_type, + 'ev_data': ev.ev_data, + 'ev_extra1': ev.ev_extra1, + 'ev_extra2': ev.ev_extra2, + 'ev_extra3': ev.ev_extra3, + 'ev_extra4': ev.ev_extra4, + 'ev_time': ev.ev_time.isoformat(), + 'ev_txid': ev.ev_txid, + 'ev_retry': ev.ev_retry, + 'tick_id': batch_info['cur_tick_id'], + 'prev_tick_id': batch_info['prev_tick_id'], + } + jsev = skytools.json_encode(d) + s = sep + '\n' + jsev + self._json_dump_file.write(s) + + def resurrect_dump_finish(self): + if self._json_dump_file: + self._json_dump_file.write('\n]\n') + self._json_dump_file.close() + self._json_dump_file = None + + def failover_consumer_name(self, node_name): + return node_name + ".gravestone" + # # Shortcuts for operating on nodes. # |
