summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/londiste.py2
-rw-r--r--python/londiste/compare.py12
-rw-r--r--python/londiste/handler.py16
-rw-r--r--python/londiste/handlers/dispatch.py4
-rw-r--r--python/londiste/handlers/part.py9
-rw-r--r--python/londiste/handlers/qtable.py2
-rw-r--r--python/londiste/repair.py19
-rw-r--r--python/londiste/setup.py4
-rw-r--r--python/londiste/syncer.py11
-rw-r--r--python/londiste/table_copy.py23
10 files changed, 52 insertions, 50 deletions
diff --git a/python/londiste.py b/python/londiste.py
index 5d46bbdc..b6c59e15 100755
--- a/python/londiste.py
+++ b/python/londiste.py
@@ -138,8 +138,6 @@ class Londiste(skytools.DBScript):
help = "add: walk upstream to find node to copy from")
g.add_option("--copy-node", dest="copy_node",
help = "add: use NODE as source for initial COPY")
- g.add_option("--copy-condition", dest="copy_condition",
- help = "add: set WHERE expression for copy")
g.add_option("--merge-all", action="store_true",
help="merge tables from all source queues", default=False)
g.add_option("--no-merge", action="store_true",
diff --git a/python/londiste/compare.py b/python/londiste/compare.py
index b08a04e4..1412336c 100644
--- a/python/londiste/compare.py
+++ b/python/londiste/compare.py
@@ -15,12 +15,18 @@ class Comparator(Syncer):
"""Simple checker based in Syncer.
When tables are in sync runs simple SQL query on them.
"""
- def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
+ def process_sync(self, t1, t2, src_db, dst_db):
"""Actual comparision."""
+ src_tbl = t1.dest_table
+ dst_tbl = t2.dest_table
+
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
+ src_where = t1.plugin.get_copy_condition(src_curs, dst_curs)
+ dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs)
+
self.log.info('Counting %s' % dst_tbl)
# get common cols
@@ -37,7 +43,11 @@ class Comparator(Syncer):
q = self.cf.get('compare_sql', q)
q = q.replace("_COLS_", cols)
src_q = q.replace('_TABLE_', skytools.quote_fqident(src_tbl))
+ if src_where:
+ src_q = src_q + " WHERE " + src_where
dst_q = q.replace('_TABLE_', skytools.quote_fqident(dst_tbl))
+ if dst_where:
+ dst_q = dst_q + " WHERE " + dst_where
f = "%(cnt)d rows, checksum=%(chksum)s"
f = self.cf.get('compare_fmt', f)
diff --git a/python/londiste/handler.py b/python/londiste/handler.py
index 87a16b62..ad4239ff 100644
--- a/python/londiste/handler.py
+++ b/python/londiste/handler.py
@@ -106,11 +106,15 @@ class BaseHandler:
"""Called when batch finishes."""
pass
- def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list):
+ def get_copy_condition(self, src_curs, dst_curs):
+ """ Use if you want to filter data """
+ return ''
+
+ def real_copy(self, src_tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
copyed
"""
- condition = ' and '.join(cond_list)
+ condition = self.get_copy_condition(src_curs, dst_curs)
return skytools.full_copy(src_tablename, src_curs, dst_curs,
column_list, condition,
dst_tablename = self.dest_table)
@@ -184,19 +188,17 @@ class TableHandler(BaseHandler):
return self.enc.validate_dict(row, self.table_name)
return row
- def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list):
+ def real_copy(self, src_tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
copyed
"""
-
- condition = ' and '.join(cond_list)
-
+
if self.enc:
def _write_hook(obj, data):
return self.enc.validate_copy(data, column_list, src_tablename)
else:
_write_hook = None
-
+ condition = self.get_copy_condition(src_curs, dst_curs)
return skytools.full_copy(src_tablename, src_curs, dst_curs,
column_list, condition,
dst_tablename = self.dest_table,
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index dcfede52..2917472b 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -866,12 +866,12 @@ class Dispatcher(BaseHandler):
exec_with_vals(self.conf.post_part)
self.log.info("Created table: %s" % dst)
- def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+ def real_copy(self, tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
copyed
"""
_src_cols = _dst_cols = column_list
- condition = ' and '.join(cond_list)
+ condition = ''
if self.conf.skip_fields:
_src_cols = [col for col in column_list
diff --git a/python/londiste/handlers/part.py b/python/londiste/handlers/part.py
index 8f0eb378..5213f67d 100644
--- a/python/londiste/handlers/part.py
+++ b/python/londiste/handlers/part.py
@@ -81,15 +81,12 @@ class PartHandler(TableHandler):
self.log.debug('part.process_event: my event, processing')
TableHandler.process_event(self, ev, sql_queue_func, arg)
- def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
- """Copy only slots needed locally."""
+ def get_copy_condition(self, src_curs, dst_curs):
+ """Prepare the where condition for copy and replay filtering"""
self.load_part_info(dst_curs)
w = "(%s & %d) = %d" % (self.hashexpr, self.max_part, self.local_part)
self.log.debug('part: copy_condition=%s' % w)
- cond_list.append(w)
-
- return TableHandler.real_copy(self, tablename, src_curs, dst_curs,
- column_list, cond_list)
+ return w
def load_part_info(self, curs):
"""Load slot info from database."""
diff --git a/python/londiste/handlers/qtable.py b/python/londiste/handlers/qtable.py
index b93543e9..13ca4099 100644
--- a/python/londiste/handlers/qtable.py
+++ b/python/londiste/handlers/qtable.py
@@ -33,7 +33,7 @@ class QueueTableHandler(BaseHandler):
trigger_arg_list.append('SKIP')
trigger_arg_list.append('expect_sync')
- def real_copy(self, tablename, src_curs, dst_curs, column_list, cond_list):
+ def real_copy(self, tablename, src_curs, dst_curs, column_list):
"""Force copy not to start"""
return (0,0)
diff --git a/python/londiste/repair.py b/python/londiste/repair.py
index 02494b53..5a04d7a0 100644
--- a/python/londiste/repair.py
+++ b/python/londiste/repair.py
@@ -33,7 +33,7 @@ class Repairer(Syncer):
p.add_option("--apply", action="store_true", help="apply fixes")
return p
- def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
+ def process_sync(self, t1, t2, src_db, dst_db):
"""Actual comparision."""
apply_db = None
@@ -43,6 +43,9 @@ class Repairer(Syncer):
self.apply_curs = apply_db.cursor()
self.apply_curs.execute("set session_replication_role = 'replica'")
+ src_tbl = t1.dest_table
+ dst_tbl = t2.dest_table
+
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
@@ -57,10 +60,12 @@ class Repairer(Syncer):
dump_dst = dst_tbl + ".dst"
self.log.info("Dumping src table: %s" % src_tbl)
- self.dump_table(src_tbl, src_curs, dump_src)
+ src_where = t1.plugin.get_copy_condition(src_curs, dst_curs)
+ self.dump_table(src_tbl, src_curs, dump_src, src_where)
src_db.commit()
self.log.info("Dumping dst table: %s" % dst_tbl)
- self.dump_table(dst_tbl, dst_curs, dump_dst)
+ dst_where = t2.plugin.get_copy_condition(src_curs, dst_curs)
+ self.dump_table(dst_tbl, dst_curs, dump_dst, dst_where)
dst_db.commit()
self.log.info("Sorting src table: %s" % dump_src)
@@ -123,11 +128,13 @@ class Repairer(Syncer):
cols = ",".join(fqlist)
self.log.debug("using columns: %s" % cols)
- def dump_table(self, tbl, curs, fn):
+ def dump_table(self, tbl, curs, fn, whr):
"""Dump table to disk."""
cols = ','.join(self.fq_common_fields)
- q = "copy %s (%s) to stdout" % (skytools.quote_fqident(tbl), cols)
-
+ if len(whr) == 0:
+ whr = 'true'
+ q = "copy (SELECT %s FROM %s WHERE %s) to stdout" % (cols, skytools.quote_fqident(tbl), whr)
+ self.log.debug("Query: %s" % q)
f = open(fn, "w", 64*1024)
curs.copy_expert(q, f)
size = f.tell()
diff --git a/python/londiste/setup.py b/python/londiste/setup.py
index b8ca4a0c..f06d9b0b 100644
--- a/python/londiste/setup.py
+++ b/python/londiste/setup.py
@@ -50,8 +50,6 @@ class LondisteSetup(CascadeAdmin):
help = "add: find table source for copy by walking upwards")
p.add_option("--copy-node", dest="copy_node",
help = "add: use NODE as source for initial copy")
- p.add_option("--copy-condition", dest="copy_condition",
- help = "copy: where expression")
p.add_option("--force", action="store_true",
help="force", default=False)
p.add_option("--all", action="store_true",
@@ -247,8 +245,6 @@ class LondisteSetup(CascadeAdmin):
if not self.options.expect_sync:
if self.options.skip_truncate:
attrs['skip_truncate'] = 1
- if self.options.copy_condition:
- attrs['copy_condition'] = self.options.copy_condition
if self.options.max_parallel_copy:
attrs['max_parallel_copy'] = self.options.max_parallel_copy
diff --git a/python/londiste/syncer.py b/python/londiste/syncer.py
index 05df41ad..5eb5da73 100644
--- a/python/londiste/syncer.py
+++ b/python/londiste/syncer.py
@@ -155,7 +155,7 @@ class Syncer(skytools.DBScript):
self.check_consumer(setup_db)
- self.check_table(t1.dest_table, t2.dest_table, lock_db, src_db, dst_db, setup_db)
+ self.check_table(t1, t2, lock_db, src_db, dst_db, setup_db)
lock_db.commit()
src_db.commit()
dst_db.commit()
@@ -185,9 +185,12 @@ class Syncer(skytools.DBScript):
if dur > 10 and not self.options.force:
raise Exception("Ticker seems dead")
- def check_table(self, src_tbl, dst_tbl, lock_db, src_db, dst_db, setup_db):
+ def check_table(self, t1, t2, lock_db, src_db, dst_db, setup_db):
"""Get transaction to same state, then process."""
+ src_tbl = t1.dest_table
+ dst_tbl = t2.dest_table
+
lock_curs = lock_db.cursor()
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
@@ -221,7 +224,7 @@ class Syncer(skytools.DBScript):
self.unlock_table_branch(lock_db, setup_db)
# do work
- bad = self.process_sync(src_tbl, dst_tbl, src_db, dst_db)
+ bad = self.process_sync(t1, t2, src_db, dst_db)
if bad:
self.bad_tables += 1
@@ -320,7 +323,7 @@ class Syncer(skytools.DBScript):
setup_curs = setup_db.cursor()
self.resume_consumer(setup_curs, self.provider_node['worker_name'])
- def process_sync(self, src_tbl, dst_tbl, src_db, dst_db):
+ def process_sync(self, t1, t2, src_db, dst_db):
"""It gets 2 connections in state where tbl should be in same state.
"""
raise Exception('process_sync not implemented')
diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py
index 65a702fb..a2138d98 100644
--- a/python/londiste/table_copy.py
+++ b/python/londiste/table_copy.py
@@ -159,7 +159,12 @@ class CopyTable(Replicator):
tbl_stat.dropped_ddl = ddl
# do truncate & copy
- self.real_copy(src_curs, dst_curs, tbl_stat, common_cols, src_real_table)
+ self.log.info("%s: start copy" % tbl_stat.name)
+ p = tbl_stat.get_plugin()
+ stats = p.real_copy(src_real_table, src_curs, dst_curs, common_cols)
+ if stats:
+ self.log.info("%s: copy finished: %d bytes, %d rows" % (
+ tbl_stat.name, stats[0], stats[1]))
# get snapshot
src_curs.execute("select txid_current_snapshot()")
@@ -215,22 +220,6 @@ class CopyTable(Replicator):
src_curs.execute(q, [self.queue_name])
src_db.commit()
- def real_copy(self, srccurs, dstcurs, tbl_stat, col_list, src_real_table):
- "Actual copy."
-
- tablename = tbl_stat.name
- # do copy
- self.log.info("%s: start copy" % tablename)
- p = tbl_stat.get_plugin()
- cond_list = []
- cond = tbl_stat.table_attrs.get('copy_condition')
- if cond:
- cond_list.append(cond)
- stats = p.real_copy(src_real_table, srccurs, dstcurs, col_list, cond_list)
- if stats:
- self.log.info("%s: copy finished: %d bytes, %d rows" % (
- tablename, stats[0], stats[1]))
-
def work(self):
if not self.reg_ok:
# check if needed? (table, not existing reg)