diff options
| author | martinko | 2013-02-20 16:10:55 +0000 |
|---|---|---|
| committer | martinko | 2013-02-20 16:10:55 +0000 |
| commit | 355c254cb43fef1c2d82f0cc71a5a6f0423f4a64 (patch) | |
| tree | 514b5c2851e0e920707a65dcde910a5f972e30b7 /python/londiste | |
| parent | 8d085578f174e5cbab5ad564d608674db2a22a9f (diff) | |
londiste: added dispatch handler arg retention_period
It controls how long to keep partitions around. Examples: '3 months', '1 year'
Diffstat (limited to 'python/londiste')
| -rw-r--r-- | python/londiste/handlers/dispatch.py | 16 |
1 files changed, 15 insertions, 1 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index 17e88a24..ff367bd9 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -135,6 +135,9 @@ post_part: sql statement(s) to execute after creating partition table. Usable variables are the same as in part_template +retention_period: + how long to keep partitions around. examples: '3 months', '1 year' + encoding: name of destination encoding. handler replaces all invalid encoding symbols and logs them as warnings @@ -684,6 +687,7 @@ class Dispatcher(BaseHandler): conf.pre_part = self.args.get('pre_part') conf.post_part = self.args.get('post_part') conf.part_func = self.args.get('part_func', PART_FUNC_NEW) + conf.retention_period = self.args.get('retention_period') # set row mode and event types to process conf.row_mode = self.get_arg('row_mode', ROW_MODES) event_types = self.args.get('event_types', '*') @@ -787,6 +791,8 @@ class Dispatcher(BaseHandler): if self.conf.table_mode == 'part': dst, part_time = self.split_format(ev, data) if dst not in self.row_handler.table_map: + if self.conf.retention_period: + self.drop_obsolete_partitions (self.dest_table, self.conf.retention_period, self.conf.period) self.check_part(dst, part_time) else: dst = self.dest_table @@ -880,7 +886,7 @@ class Dispatcher(BaseHandler): have_func = skytools.exists_function(curs, PART_FUNC_OLD, len(PART_FUNC_ARGS)) if have_func: - self.log.debug('check_part.exec: func:%s, args: %s' % (pfcall, vals)) + self.log.debug('check_part.exec: func: %s, args: %s' % (pfcall, vals)) curs.execute(pfcall, vals) else: # @@ -897,6 +903,14 @@ class Dispatcher(BaseHandler): exec_with_vals(self.conf.post_part) self.log.info("Created table: %s" % dst) + def drop_obsolete_partitions (self, parent_table, retention_period, partition_period): + """ Drop obsolete partitions of partition-by-date parent table. """ + curs = self.dst_curs + func = "londiste.drop_obsolete_partitions" + args = [parent_table, retention_period, partition_period] + self.log.debug("func: %s, args: %s" % (func, args)) + curs.callproc (func, args) + def real_copy(self, tablename, src_curs, dst_curs, column_list): """do actual table copy and return tuple with number of bytes and rows copied |
