summaryrefslogtreecommitdiff
path: root/python/londiste
diff options
context:
space:
mode:
authormartinko2013-02-20 16:10:55 +0000
committermartinko2013-02-20 16:10:55 +0000
commit355c254cb43fef1c2d82f0cc71a5a6f0423f4a64 (patch)
tree514b5c2851e0e920707a65dcde910a5f972e30b7 /python/londiste
parent8d085578f174e5cbab5ad564d608674db2a22a9f (diff)
londiste: added dispatch handler arg retention_period
It controls how long to keep partitions around. Examples: '3 months', '1 year'
Diffstat (limited to 'python/londiste')
-rw-r--r--python/londiste/handlers/dispatch.py16
1 files changed, 15 insertions, 1 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index 17e88a24..ff367bd9 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -135,6 +135,9 @@ post_part:
sql statement(s) to execute after creating partition table. Usable
variables are the same as in part_template
+retention_period:
+ how long to keep partitions around. examples: '3 months', '1 year'
+
encoding:
name of destination encoding. handler replaces all invalid encoding symbols
and logs them as warnings
@@ -684,6 +687,7 @@ class Dispatcher(BaseHandler):
conf.pre_part = self.args.get('pre_part')
conf.post_part = self.args.get('post_part')
conf.part_func = self.args.get('part_func', PART_FUNC_NEW)
+ conf.retention_period = self.args.get('retention_period')
# set row mode and event types to process
conf.row_mode = self.get_arg('row_mode', ROW_MODES)
event_types = self.args.get('event_types', '*')
@@ -787,6 +791,8 @@ class Dispatcher(BaseHandler):
if self.conf.table_mode == 'part':
dst, part_time = self.split_format(ev, data)
if dst not in self.row_handler.table_map:
+ if self.conf.retention_period:
+ self.drop_obsolete_partitions (self.dest_table, self.conf.retention_period, self.conf.period)
self.check_part(dst, part_time)
else:
dst = self.dest_table
@@ -880,7 +886,7 @@ class Dispatcher(BaseHandler):
have_func = skytools.exists_function(curs, PART_FUNC_OLD, len(PART_FUNC_ARGS))
if have_func:
- self.log.debug('check_part.exec: func:%s, args: %s' % (pfcall, vals))
+ self.log.debug('check_part.exec: func: %s, args: %s' % (pfcall, vals))
curs.execute(pfcall, vals)
else:
#
@@ -897,6 +903,14 @@ class Dispatcher(BaseHandler):
exec_with_vals(self.conf.post_part)
self.log.info("Created table: %s" % dst)
+ def drop_obsolete_partitions (self, parent_table, retention_period, partition_period):
+ """ Drop obsolete partitions of partition-by-date parent table. """
+ curs = self.dst_curs
+ func = "londiste.drop_obsolete_partitions"
+ args = [parent_table, retention_period, partition_period]
+ self.log.debug("func: %s, args: %s" % (func, args))
+ curs.callproc (func, args)
+
def real_copy(self, tablename, src_curs, dst_curs, column_list):
"""do actual table copy and return tuple with number of bytes and rows
copied