diff options
| author | Egon Valdmees | 2011-07-19 10:33:34 +0000 |
|---|---|---|
| committer | Egon Valdmees | 2011-08-07 18:52:10 +0000 |
| commit | 068bfc7c01ff5463ed516f48c03145df7414a626 (patch) | |
| tree | e25b5520e97009b50369a152e20836530bd764cd /python | |
| parent | 32b86d54cada0baf64579325a02224c74a9531e5 (diff) | |
multimaster handler
Diffstat (limited to 'python')
| -rw-r--r-- | python/londiste/handlers/__init__.py | 28 | ||||
| -rw-r--r-- | python/londiste/handlers/applyfn.py | 7 | ||||
| -rw-r--r-- | python/londiste/handlers/dispatch.py | 34 | ||||
| -rw-r--r-- | python/londiste/handlers/multimaster.py | 42 |
4 files changed, 82 insertions, 29 deletions
diff --git a/python/londiste/handlers/__init__.py b/python/londiste/handlers/__init__.py index ef3bc87d..66853d75 100644 --- a/python/londiste/handlers/__init__.py +++ b/python/londiste/handlers/__init__.py @@ -1,10 +1,36 @@ # handlers module +import new +import sys DEFAULT_HANDLERS = [ 'londiste.handlers.bulk', 'londiste.handlers.qtable', 'londiste.handlers.dispatch', 'londiste.handlers.applyfn', - 'londiste.handlers.part' + 'londiste.handlers.part', + 'londiste.handlers.multimaster', ] +def handler_args(name, cls): + """Handler arguments initialization decorator + + Define successor for handler class cls with func as argument generator + """ + def wrapper(func): + def _init_override(self, table_name, args, log): + cls.__init__(self, table_name, func(args.copy()), log) + dct = {'__init__': _init_override, 'handler_name': name} + module = sys.modules[cls.__module__] + newname = '%s_%s' % (cls.__name__, name.replace('.','_')) + newcls = new.classobj(newname, (cls,), dct) + setattr(module, newname, newcls) + module.__londiste_handlers__.append(newcls) + module.__all__.append(newname) + return func + return wrapper + +def update(*p): + """ Update dicts given in params with its precessor param dict + in reverse order """ + return reduce(lambda x, y: x.update(y) or x, + (p[i] for i in range(len(p)-1,-1,-1)), {}) diff --git a/python/londiste/handlers/applyfn.py b/python/londiste/handlers/applyfn.py index 48414e6d..23529c49 100644 --- a/python/londiste/handlers/applyfn.py +++ b/python/londiste/handlers/applyfn.py @@ -29,8 +29,13 @@ class ApplyFuncHandler(BaseHandler): qfn = skytools.quote_fqident(fn) qargs = [skytools.quote_literal(a) for a in args] sql = "select %s(%s);" % (qfn, ', '.join(qargs)) - + self.log.debug('applyfn.sql: %s' % sql) sql_queue_func(sql, qfunc_arg) +#------------------------------------------------------------------------------ +# register handler class +#------------------------------------------------------------------------------ + __londiste_handlers__ = [ApplyFuncHandler] + diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index 932e0bcf..19fea548 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -142,7 +142,6 @@ creating or coping initial data to destination table. --expect-sync and import sys import datetime -import new import codecs import re import skytools @@ -150,6 +149,9 @@ from londiste.handler import BaseHandler from skytools import quote_ident, quote_fqident, UsageError from skytools.dbstruct import * from skytools.utf8 import safe_utf8_decode +from functools import partial +from londiste.handlers import handler_args, update + __all__ = ['Dispatcher'] @@ -896,33 +898,11 @@ class Dispatcher(BaseHandler): __londiste_handlers__ = [Dispatcher] - #------------------------------------------------------------------------------ # helper function for creating dispachers with different default values #------------------------------------------------------------------------------ - -def handler(name): - def wrapper(func): - def _init_override(self, table_name, args, log): - Dispatcher.__init__(self, table_name, func(args.copy()), log) - dct = {'__init__': _init_override, 'handler_name': name} - clsname = 'Dispatcher_%s' % name.replace('.','_') - cls = new.classobj(clsname, (Dispatcher,), dct) - setattr(sys.modules[__name__], clsname, cls) - __londiste_handlers__.append(cls) - __all__.append(clsname) - return func - return wrapper - - -def update(*p): - """ Update dicts given in params with its precessor param dict - in reverse order """ - return reduce(lambda x, y: x.update(y) or x, - (p[i] for i in range(len(p)-1,-1,-1)), {}) - - +handler_args = partial(handler_args, cls=Dispatcher) #------------------------------------------------------------------------------ # build set of handlers with different default values for easier use @@ -953,16 +933,16 @@ for load, load_dict in LOAD.items(): def create_handler(): handler_name = '_'.join(p for p in (load, period, mode) if p) default = update(mode_dict, period_dict, load_dict, BASE) - @handler(handler_name) + @handler_args(handler_name) def handler_func(args): return update(args, default) create_handler() -@handler('bulk_direct') +@handler_args('bulk_direct') def bulk_direct_handler(args): return update(args, {'load_mode': 'bulk', 'table_mode': 'direct'}) -@handler('direct') +@handler_args('direct') def direct_handler(args): return update(args, {'load_mode': 'direct', 'table_mode': 'direct'}) diff --git a/python/londiste/handlers/multimaster.py b/python/londiste/handlers/multimaster.py new file mode 100644 index 00000000..494bd866 --- /dev/null +++ b/python/londiste/handlers/multimaster.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +Handler for replica with multiple master nodes. + +Can only handle initial copy from one master. Add other masters with +expect-sync option. + +NB! needs merge_on_time function to be compiled on database first. +""" + +import skytools +from londiste.handlers.applyfn import ApplyFuncHandler +from londiste.handlers import update + +__all__ = ['MultimasterHandler'] + +class MultimasterHandler(ApplyFuncHandler): + """Handle multimaster replicas""" + handler_name = 'multimaster' + + def __init__(self, table_name, args, log): + """Init per-batch table data cache.""" + conf = args.copy() + # remove Multimaster args from conf + for name in ['func_name','func_conf']: + if name in conf: + conf.pop(name) + conf = skytools.db_urlencode(conf) + args = update(args, {'func_name': 'merge_on_time', 'func_conf': conf}) + ApplyFuncHandler.__init__(self, table_name, args, log) + + def add(self, trigger_arg_list): + """Create SKIP and BEFORE INSERT trigger""" + trigger_arg_list.append('no_merge') + + +#------------------------------------------------------------------------------ +# register handler class +#------------------------------------------------------------------------------ + +__londiste_handlers__ = [MultimasterHandler] |
