summaryrefslogtreecommitdiff
path: root/python/londiste/setup.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/londiste/setup.py')
-rw-r--r--python/londiste/setup.py93
1 files changed, 72 insertions, 21 deletions
diff --git a/python/londiste/setup.py b/python/londiste/setup.py
index 204850b6..20d3f5c0 100644
--- a/python/londiste/setup.py
+++ b/python/londiste/setup.py
@@ -7,6 +7,7 @@ import sys, os, re, skytools
from pgq.cascade.admin import CascadeAdmin
from londiste.exec_attrs import ExecAttrs
+from londiste.util import find_copy_source
import londiste.handler
@@ -139,36 +140,45 @@ class LondisteSetup(CascadeAdmin):
needs_tbl = self.handler_needs_table()
args = self.expand_arg_list(dst_db, 'r', False, args, needs_tbl)
+ # pick proper create flags
+ if self.options.create_full:
+ create_flags = skytools.T_ALL
+ elif self.options.create:
+ create_flags = skytools.T_TABLE | skytools.T_PKEY
+ else:
+ create_flags = 0
+
+ # search for usable copy node if requested & needed
+ if (self.options.find_copy_node and create_flags != 0
+ and needs_tbl and not self.is_root()):
+ src_name, src_loc, _ = find_copy_source(self, self.queue_name, args, None, self.provider_location)
+ self.options.copy_node = src_name
+ self.close_database('provider_db')
+ src_db = self.get_provider_db()
+ src_curs = src_db.cursor()
+ src_tbls = self.fetch_set_tables(src_curs)
+ src_db.commit()
+
# dont check for exist/not here (root handling)
if not self.is_root() and not self.options.expect_sync and not self.options.find_copy_node:
problems = False
for tbl in args:
tbl = skytools.fq_name(tbl)
if (tbl in src_tbls) and not src_tbls[tbl]['local']:
- self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
- problems = True
+ if self.options.skip_non_existing:
+ self.log.warning("Table %s does not exist on provider", tbl)
+ else:
+ self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
+ problems = True
if problems:
self.log.error("Problems, canceling operation")
sys.exit(1)
- # pick proper create flags
- if self.options.create_full:
- create_flags = skytools.T_ALL
- elif self.options.create:
- create_flags = skytools.T_TABLE | skytools.T_PKEY
- else:
- create_flags = 0
-
# sanity check
if self.options.dest_table and len(args) > 1:
self.log.error("--dest-table can be given only for single table")
sys.exit(1)
- # not implemented
- if self.options.find_copy_node and create_flags != 0:
- self.log.error("--find-copy-node does not work with --create")
- sys.exit(1)
-
# seems ok
for tbl in args:
self.add_table(src_db, dst_db, tbl, create_flags, src_tbls)
@@ -445,6 +455,42 @@ class LondisteSetup(CascadeAdmin):
"""Reload data from provider node."""
db = self.get_database('db')
args = self.expand_arg_list(db, 'r', True, args)
+
+ if not self.options.find_copy_node:
+ self.load_local_info()
+ src_db = self.get_provider_db()
+ src_curs = src_db.cursor()
+ src_tbls = self.fetch_set_tables(src_curs)
+ src_db.commit()
+
+ problems = 0
+ for tbl in args:
+ tbl = skytools.fq_name(tbl)
+ if tbl not in src_tbls or not src_tbls[tbl]['local']:
+ self.log.error("Table %s does not exist on provider, need to switch to different provider", tbl)
+ problems += 1
+ if problems > 0:
+ self.log.error("Problems, cancelling operation")
+ sys.exit(1)
+
+ if self.options.find_copy_node or self.options.copy_node:
+ q = "select table_name, table_attrs from londiste.get_table_list(%s) where local"
+ cur = db.cursor()
+ cur.execute(q, [self.set_name])
+ for row in cur.fetchall():
+ if row['table_name'] not in args:
+ continue
+ attrs = skytools.db_urldecode (row['table_attrs'] or '')
+
+ if self.options.find_copy_node:
+ attrs['copy_node'] = '?'
+ elif self.options.copy_node:
+ attrs['copy_node'] = self.options.copy_node
+
+ attrs = skytools.db_urlencode (attrs)
+ q = "select * from londiste.local_set_table_attrs (%s, %s, %s)"
+ self.exec_cmd(db, q, [self.set_name, row['table_name'], attrs])
+
q = "select * from londiste.local_set_table_state(%s, %s, null, null)"
self.exec_cmd_many(db, q, [self.set_name], args)
@@ -530,9 +576,8 @@ class LondisteSetup(CascadeAdmin):
db.commit()
def get_provider_db(self):
-
- # use custom node for copy
if self.options.copy_node:
+ # use custom node for copy
source_node = self.options.copy_node
m = self.queue_info.get_member(source_node)
if not m:
@@ -546,6 +591,7 @@ class LondisteSetup(CascadeAdmin):
q = 'select * from pgq_node.get_node_info(%s)'
res = self.exec_cmd(db, q, [self.queue_name], quiet = True)
self.provider_location = res[0]['provider_location']
+
return self.get_database('provider_db', connstr = self.provider_location, profile = 'remote')
def expand_arg_list(self, db, kind, existing, args, needs_tbl=True):
@@ -586,6 +632,9 @@ class LondisteSetup(CascadeAdmin):
res = self.solve_globbing(args, lst_exists, map_exists, map_missing, allow_nonexist)
else:
res = self.solve_globbing(args, lst_missing, map_missing, map_exists, allow_nonexist)
+
+ if not res:
+ self.log.info("what to do ?")
return res
def solve_globbing(self, args, full_list, full_map, reverse_map, allow_nonexist):
@@ -657,18 +706,19 @@ class LondisteSetup(CascadeAdmin):
dst_curs = dst_db.cursor()
partial = {}
- done_pos = 1
startup_info = 0
while 1:
dst_curs.execute(q, [self.queue_name])
rows = dst_curs.fetchall()
dst_db.commit()
+ total_count = 0
cur_count = 0
done_list = []
for row in rows:
if not row['local']:
continue
+ total_count += 1
tbl = row['table_name']
if row['merge_state'] != 'ok':
partial[tbl] = 0
@@ -678,13 +728,14 @@ class LondisteSetup(CascadeAdmin):
partial[tbl] = 1
done_list.append(tbl)
+ done_count = total_count - cur_count
+
if not startup_info:
- self.log.info("%d table(s) to copy", len(partial))
+ self.log.info("%d/%d table(s) to copy", cur_count, total_count)
startup_info = 1
for done in done_list:
- self.log.info("%s: finished (%d/%d)", done, done_pos, len(partial))
- done_pos += 1
+ self.log.info("%s: finished (%d/%d)", done, done_count, total_count)
if cur_count == 0:
break