summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorEgon Valdmees2011-07-19 10:33:34 +0000
committerEgon Valdmees2011-08-07 18:52:10 +0000
commit068bfc7c01ff5463ed516f48c03145df7414a626 (patch)
treee25b5520e97009b50369a152e20836530bd764cd /python
parent32b86d54cada0baf64579325a02224c74a9531e5 (diff)
multimaster handler
Diffstat (limited to 'python')
-rw-r--r--python/londiste/handlers/__init__.py28
-rw-r--r--python/londiste/handlers/applyfn.py7
-rw-r--r--python/londiste/handlers/dispatch.py34
-rw-r--r--python/londiste/handlers/multimaster.py42
4 files changed, 82 insertions, 29 deletions
diff --git a/python/londiste/handlers/__init__.py b/python/londiste/handlers/__init__.py
index ef3bc87d..66853d75 100644
--- a/python/londiste/handlers/__init__.py
+++ b/python/londiste/handlers/__init__.py
@@ -1,10 +1,36 @@
# handlers module
+import new
+import sys
DEFAULT_HANDLERS = [
'londiste.handlers.bulk',
'londiste.handlers.qtable',
'londiste.handlers.dispatch',
'londiste.handlers.applyfn',
- 'londiste.handlers.part'
+ 'londiste.handlers.part',
+ 'londiste.handlers.multimaster',
]
+def handler_args(name, cls):
+ """Handler arguments initialization decorator
+
+ Define successor for handler class cls with func as argument generator
+ """
+ def wrapper(func):
+ def _init_override(self, table_name, args, log):
+ cls.__init__(self, table_name, func(args.copy()), log)
+ dct = {'__init__': _init_override, 'handler_name': name}
+ module = sys.modules[cls.__module__]
+ newname = '%s_%s' % (cls.__name__, name.replace('.','_'))
+ newcls = new.classobj(newname, (cls,), dct)
+ setattr(module, newname, newcls)
+ module.__londiste_handlers__.append(newcls)
+ module.__all__.append(newname)
+ return func
+ return wrapper
+
+def update(*p):
+ """ Update dicts given in params with its precessor param dict
+ in reverse order """
+ return reduce(lambda x, y: x.update(y) or x,
+ (p[i] for i in range(len(p)-1,-1,-1)), {})
diff --git a/python/londiste/handlers/applyfn.py b/python/londiste/handlers/applyfn.py
index 48414e6d..23529c49 100644
--- a/python/londiste/handlers/applyfn.py
+++ b/python/londiste/handlers/applyfn.py
@@ -29,8 +29,13 @@ class ApplyFuncHandler(BaseHandler):
qfn = skytools.quote_fqident(fn)
qargs = [skytools.quote_literal(a) for a in args]
sql = "select %s(%s);" % (qfn, ', '.join(qargs))
-
+ self.log.debug('applyfn.sql: %s' % sql)
sql_queue_func(sql, qfunc_arg)
+#------------------------------------------------------------------------------
+# register handler class
+#------------------------------------------------------------------------------
+
__londiste_handlers__ = [ApplyFuncHandler]
+
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index 932e0bcf..19fea548 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -142,7 +142,6 @@ creating or coping initial data to destination table. --expect-sync and
import sys
import datetime
-import new
import codecs
import re
import skytools
@@ -150,6 +149,9 @@ from londiste.handler import BaseHandler
from skytools import quote_ident, quote_fqident, UsageError
from skytools.dbstruct import *
from skytools.utf8 import safe_utf8_decode
+from functools import partial
+from londiste.handlers import handler_args, update
+
__all__ = ['Dispatcher']
@@ -896,33 +898,11 @@ class Dispatcher(BaseHandler):
__londiste_handlers__ = [Dispatcher]
-
#------------------------------------------------------------------------------
# helper function for creating dispachers with different default values
#------------------------------------------------------------------------------
-
-def handler(name):
- def wrapper(func):
- def _init_override(self, table_name, args, log):
- Dispatcher.__init__(self, table_name, func(args.copy()), log)
- dct = {'__init__': _init_override, 'handler_name': name}
- clsname = 'Dispatcher_%s' % name.replace('.','_')
- cls = new.classobj(clsname, (Dispatcher,), dct)
- setattr(sys.modules[__name__], clsname, cls)
- __londiste_handlers__.append(cls)
- __all__.append(clsname)
- return func
- return wrapper
-
-
-def update(*p):
- """ Update dicts given in params with its precessor param dict
- in reverse order """
- return reduce(lambda x, y: x.update(y) or x,
- (p[i] for i in range(len(p)-1,-1,-1)), {})
-
-
+handler_args = partial(handler_args, cls=Dispatcher)
#------------------------------------------------------------------------------
# build set of handlers with different default values for easier use
@@ -953,16 +933,16 @@ for load, load_dict in LOAD.items():
def create_handler():
handler_name = '_'.join(p for p in (load, period, mode) if p)
default = update(mode_dict, period_dict, load_dict, BASE)
- @handler(handler_name)
+ @handler_args(handler_name)
def handler_func(args):
return update(args, default)
create_handler()
-@handler('bulk_direct')
+@handler_args('bulk_direct')
def bulk_direct_handler(args):
return update(args, {'load_mode': 'bulk', 'table_mode': 'direct'})
-@handler('direct')
+@handler_args('direct')
def direct_handler(args):
return update(args, {'load_mode': 'direct', 'table_mode': 'direct'})
diff --git a/python/londiste/handlers/multimaster.py b/python/londiste/handlers/multimaster.py
new file mode 100644
index 00000000..494bd866
--- /dev/null
+++ b/python/londiste/handlers/multimaster.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+# encoding: utf-8
+"""
+Handler for replica with multiple master nodes.
+
+Can only handle initial copy from one master. Add other masters with
+expect-sync option.
+
+NB! needs merge_on_time function to be compiled on database first.
+"""
+
+import skytools
+from londiste.handlers.applyfn import ApplyFuncHandler
+from londiste.handlers import update
+
+__all__ = ['MultimasterHandler']
+
+class MultimasterHandler(ApplyFuncHandler):
+ """Handle multimaster replicas"""
+ handler_name = 'multimaster'
+
+ def __init__(self, table_name, args, log):
+ """Init per-batch table data cache."""
+ conf = args.copy()
+ # remove Multimaster args from conf
+ for name in ['func_name','func_conf']:
+ if name in conf:
+ conf.pop(name)
+ conf = skytools.db_urlencode(conf)
+ args = update(args, {'func_name': 'merge_on_time', 'func_conf': conf})
+ ApplyFuncHandler.__init__(self, table_name, args, log)
+
+ def add(self, trigger_arg_list):
+ """Create SKIP and BEFORE INSERT trigger"""
+ trigger_arg_list.append('no_merge')
+
+
+#------------------------------------------------------------------------------
+# register handler class
+#------------------------------------------------------------------------------
+
+__londiste_handlers__ = [MultimasterHandler]