diff options
Diffstat (limited to 'python/londiste/util.py')
-rw-r--r-- | python/londiste/util.py | 22 |
1 files changed, 14 insertions, 8 deletions
diff --git a/python/londiste/util.py b/python/londiste/util.py index cba18f62..07ff9407 100644 --- a/python/londiste/util.py +++ b/python/londiste/util.py @@ -18,7 +18,7 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati @param script: DbScript @param queue_name: name of the cascaded queue - @param copy_table_name: name of the table + @param copy_table_name: name of the table (or list of names) @param node_name: target node name @param node_location: target node location @returns (node_name, node_location, downstream_worker_name) of source node @@ -27,6 +27,11 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati # None means no steps upwards were taken, so local consumer is worker worker_name = None + if isinstance(copy_table_name, str): + need = set([copy_table_name]) + else: + need = set(copy_table_name) + while 1: src_db = script.get_database('_source_db', connstr = node_location, autocommit = 1, profile = 'remote') src_curs = src_db.cursor() @@ -39,12 +44,12 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati script.log.info("Checking if %s can be used for copy", info['node_name']) - q = "select table_name, local, table_attrs from londiste.get_table_list(%s) where table_name = %s" - src_curs.execute(q, [queue_name, copy_table_name]) - got = False + q = "select table_name, local, table_attrs from londiste.get_table_list(%s)" + src_curs.execute(q, [queue_name]) + got = set() for row in src_curs.fetchall(): tbl = row['table_name'] - if tbl != copy_table_name: + if tbl not in need: continue if not row['local']: script.log.debug("Problem: %s is not local", tbl) @@ -53,14 +58,15 @@ def find_copy_source(script, queue_name, copy_table_name, node_name, node_locati script.log.debug("Problem: %s handler does not store data [%s]", tbl, row['table_attrs']) continue script.log.debug("Good: %s is usable", tbl) - got = True - break + got.add(tbl) script.close_database('_source_db') - if got: + if got == need: script.log.info("Node %s seems good source, using it", info['node_name']) return node_name, node_location, worker_name + else: + script.log.info("Node %s does not have all tables", info['node_name']) if info['node_type'] == 'root': raise skytools.UsageError("Found root and no source found") |