summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rw-r--r--python/londiste/handler.py119
-rw-r--r--python/londiste/handlers/dispatch.py50
2 files changed, 117 insertions, 52 deletions
diff --git a/python/londiste/handler.py b/python/londiste/handler.py
index ecce1cba..252bf2de 100644
--- a/python/londiste/handler.py
+++ b/python/londiste/handler.py
@@ -34,7 +34,7 @@ import logging
import skytools
import londiste.handlers
-__all__ = ['RowCache', 'BaseHandler', 'build_handler',
+__all__ = ['RowCache', 'BaseHandler', 'build_handler', 'EncodingValidator',
'load_handler_modules', 'create_handler_string']
class RowCache:
@@ -129,16 +129,27 @@ class TableHandler(BaseHandler):
'D': "delete from only %s where %s;",
}
+ allow_sql_event = 1
+
+ def __init__(self, table_name, args, dest_table):
+ BaseHandler.__init__(self, table_name, args, dest_table)
+
+ enc = args.get('encoding')
+ if enc:
+ self.enc = EncodingValidator(self.log, enc)
+ else:
+ self.enc = None
+
def process_event(self, ev, sql_queue_func, arg):
+ row = self.parse_row_data(ev)
if len(ev.type) == 1:
# sql event
fqname = self.fq_dest_table
fmt = self.sql_command[ev.type]
- sql = fmt % (fqname, ev.data)
+ sql = fmt % (fqname, row)
else:
# urlenc event
pklist = ev.type[2:].split(',')
- row = skytools.db_urldecode(ev.data)
op = ev.type[0]
tbl = self.dest_table
if op == 'I':
@@ -150,6 +161,108 @@ class TableHandler(BaseHandler):
sql_queue_func(sql, arg)
+ def parse_row_data(self, ev):
+ """Extract row data from event, with optional encoding fixes.
+
+ Returns either string (sql event) or dict (urlenc event).
+ """
+
+ if len(ev.type) == 1:
+ if not self.allow_sql_event:
+ raise Exception('SQL events not suppoted by this handler')
+ if self.enc:
+ return self.enc.validate_string(ev.data, self.table_name)
+ return ev.data
+ else:
+ row = skytools.db_urldecode(ev.data)
+ if self.enc:
+ return self.enc.validate_dict(row, self.table_name)
+ return row
+
+ def real_copy(self, src_tablename, src_curs, dst_curs, column_list, cond_list):
+ """do actual table copy and return tuple with number of bytes and rows
+ copyed
+ """
+
+ condition = ' and '.join(cond_list)
+
+ if self.enc:
+ def _write_hook(obj, data):
+ return self.enc.validate_copy(data, column_list, src_tablename)
+ else:
+ _write_hook = None
+
+ return skytools.full_copy(src_tablename, src_curs, dst_curs,
+ column_list, condition,
+ dst_tablename = self.dest_table,
+ write_hook = _write_hook)
+
+
+#------------------------------------------------------------------------------
+# ENCODING VALIDATOR
+#------------------------------------------------------------------------------
+
+class EncodingValidator:
+ def __init__(self, log, encoding = 'utf-8', replacement = u'\ufffd'):
+ """validates the correctness of given encoding. when data contains
+ illegal symbols, replaces them with <replacement> and logs the
+ incident
+ """
+
+ if encoding.lower() not in ('utf8', 'utf-8'):
+ raise Exception('only utf8 supported')
+
+ self.encoding = encoding
+ self.log = log
+ self.columns = None
+ self.error_count = 0
+
+ def show_error(self, col, val, pfx, unew):
+ if pfx:
+ col = pfx + '.' + col
+ self.log.info('Fixed invalid UTF8 in column <%s>', col)
+ self.log.debug('<%s>: old=%r new=%r', col, val, unew)
+
+ def validate_copy(self, data, columns, pfx=""):
+ """Validate tab-separated fields"""
+
+ ok, _unicode = skytools.safe_utf8_decode(data)
+ if ok:
+ return data
+
+ # log error
+ vals = data.split('\t')
+ for i, v in enumerate(vals):
+ ok, tmp = skytools.safe_utf8_decode(v)
+ if not ok:
+ self.show_error(columns[i], v, pfx, tmp)
+
+ # return safe data
+ return _unicode.encode('utf8')
+
+ def validate_dict(self, data, pfx=""):
+ """validates data in dict"""
+ for k, v in data.items():
+ if v:
+ ok, u = skytools.safe_utf8_decode(v)
+ if not ok:
+ self.show_error(k, v, pfx, u)
+ data[k] = u.encode('utf8')
+ return data
+
+ def validate_string(self, value, pfx=""):
+ """validate string"""
+ ok, u = skytools.safe_utf8_decode(value)
+ if ok:
+ return value
+ _pfx = pfx and (pfx+': ') or ""
+ self.log.info('%sFixed invalid UTF8 in string <%s>', _pfx, value)
+ return u.encode('utf8')
+
+#
+# handler management
+#
+
_handler_map = {
'londiste': TableHandler,
}
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index c3acfc6d..dcfede52 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -149,7 +149,7 @@ import datetime
import codecs
import re
import skytools
-from londiste.handler import BaseHandler
+from londiste.handler import BaseHandler, EncodingValidator
from skytools import quote_ident, quote_fqident, UsageError
from skytools.dbstruct import *
from skytools.utf8 import safe_utf8_decode
@@ -601,54 +601,6 @@ ROW_HANDLERS = {'plain': RowHandler,
'keep_latest': KeepLatestRowHandler}
-
-#------------------------------------------------------------------------------
-# ENCODING VALIDATOR
-#------------------------------------------------------------------------------
-
-class EncodingValidator:
- def __init__(self, log, encoding = 'utf-8', replacement = u'\ufffd'):
- """validates the correctness of given encoding. when data contains
- illegal symbols, replaces them with <replacement> and logs the
- incident"""
- self.log = log
- self.columns = None
- self.error_count = 0
-
- def show_error(self, col, val, pfx, unew):
- if pfx:
- col = pfx + '.' + col
- self.log.info('Fixed invalid UTF8 in column <%s>', col)
- self.log.debug('<%s>: old=%r new=%r', col, val, unew)
-
- def validate_copy(self, data, columns, pfx=""):
- """Validate tab-separated fields"""
-
- ok, _unicode = safe_utf8_decode(data)
- if ok:
- return data
-
- # log error
- vals = data.split('\t')
- for i, v in enumerate(vals):
- ok, tmp = safe_utf8_decode(v)
- if not ok:
- self.show_error(columns[i], v, pfx, tmp)
-
- # return safe data
- return _unicode.encode('utf8')
-
- def validate_dict(self, data, pfx=""):
- """validates data in dict"""
- for k, v in data.items():
- if v:
- ok, u = safe_utf8_decode(v)
- if not ok:
- self.show_error(k, v, pfx, u)
- data[k] = u.encode('utf8')
- return data
-
-
#------------------------------------------------------------------------------
# DISPATCHER
#------------------------------------------------------------------------------