summaryrefslogtreecommitdiff
path: root/python/londiste
diff options
context:
space:
mode:
authormartinko2013-05-02 09:19:42 +0000
committermartinko2013-05-02 09:19:42 +0000
commitf944c53c995a84d382e09f95de88b86b590b1e56 (patch)
treec748aa8ad8167fa0d6c7a16828ec1eaba72694fa /python/londiste
parentddeed966f08ff7a3445c6e1520999d072b46a16b (diff)
londiste: changed handler argument 'key' to 'hash_key'
Diffstat (limited to 'python/londiste')
-rw-r--r--python/londiste/handlers/dispatch.py6
-rw-r--r--python/londiste/handlers/part.py13
2 files changed, 10 insertions, 9 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index b50b95a2..0f22e1fe 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -724,7 +724,7 @@ class Dispatcher (PartHandler):
raise Exception('Bad argument %s value %r' % (name, val))
return val
- def _validate_key(self):
+ def _validate_hash_key(self):
pass
def reset(self):
@@ -737,7 +737,7 @@ class Dispatcher (PartHandler):
if self.conf.table_mode != 'ignore':
self.batch_info = batch_info
self.dst_curs = dst_curs
- if self.key is not None:
+ if self.hash_key is not None:
super(Dispatcher, self).prepare_batch(batch_info, dst_curs)
def filter_data(self, data):
@@ -920,7 +920,7 @@ class Dispatcher (PartHandler):
def get_copy_condition(self, src_curs, dst_curs):
""" Prepare where condition for copy and replay filtering.
"""
- if self.key is not None:
+ if self.hash_key is not None:
return super(Dispatcher, self).get_copy_condition(src_curs, dst_curs)
return ''
diff --git a/python/londiste/handlers/part.py b/python/londiste/handlers/part.py
index 366675a3..e3d7daee 100644
--- a/python/londiste/handlers/part.py
+++ b/python/londiste/handlers/part.py
@@ -2,6 +2,7 @@
Parameters:
key=COLUMN: column name to use for hashing
+ hash_key=COLUMN: column name to use for hashing (overrides 'key' parameter)
hashfunc=NAME: function to use for hashing (default: partconf.get_hash_raw)
hashexpr=EXPR: full expression to use for hashing (deprecated)
encoding=ENC: validate and fix incoming data (only utf8 supported atm)
@@ -38,18 +39,18 @@ class PartHandler(TableHandler):
self.local_part = None # part number of local node
# primary key columns
- self.key = args.get('key')
- self._validate_key()
+ self.hash_key = args.get('hash_key', args.get('key'))
+ self._validate_hash_key()
# hash function & full expression
hashfunc = args.get('hashfunc', self.DEFAULT_HASHFUNC)
self.hashexpr = self.DEFAULT_HASHEXPR % (
skytools.quote_fqident(hashfunc),
- skytools.quote_ident(self.key))
+ skytools.quote_ident(self.hash_key))
self.hashexpr = args.get('hashexpr', self.hashexpr)
- def _validate_key(self):
- if self.key is None:
+ def _validate_hash_key(self):
+ if self.hash_key is None:
raise Exception('Specify key field as key argument')
def reset(self):
@@ -72,7 +73,7 @@ class PartHandler(TableHandler):
def process_event(self, ev, sql_queue_func, arg):
"""Filter event by hash in extra3, apply only local part."""
- if ev.extra3 and self.key is not None:
+ if ev.extra3 and self.hash_key is not None:
meta = skytools.db_urldecode(ev.extra3)
self.log.debug('part.process_event: hash=%d, max_part=%s, local_part=%d',
int(meta['hash']), self.max_part, self.local_part)