diff options
author | Egon Valdmees | 2011-05-05 12:37:54 +0000 |
---|---|---|
committer | Marko Kreen | 2011-05-11 09:39:40 +0000 |
commit | ea4ee34522e75908f11f513d8ed040c6580b97fb (patch) | |
tree | 528ce7d743ff588b0151cee48b10ed59d56ab66f /python/skytools/sqltools.py | |
parent | 8abaf61c48f72e573d076a585d334d059f71fa14 (diff) |
OCM-2279: londiste3 should translate everything to utf8, in a lossy way
where appropriate
* added argument 'encoding' to dispatcher handler
* tests for invalid utf8 sequences
* support for renamed table copy in dispatcher handler
Diffstat (limited to 'python/skytools/sqltools.py')
-rw-r--r-- | python/skytools/sqltools.py | 28 |
1 files changed, 22 insertions, 6 deletions
diff --git a/python/skytools/sqltools.py b/python/skytools/sqltools.py index b8a179be..97043231 100644 --- a/python/skytools/sqltools.py +++ b/python/skytools/sqltools.py @@ -331,18 +331,28 @@ def magic_insert(curs, tablename, data, fields = None, use_insert = 0, quoted_ta class CopyPipe(object): "Splits one big COPY to chunks." - def __init__(self, dstcurs, tablename = None, limit = 512*1024, cancel_func=None, sql_from = None): + def __init__(self, dstcurs, tablename = None, limit = 512*1024, + sql_from = None): self.tablename = tablename self.sql_from = sql_from self.dstcurs = dstcurs self.buf = StringIO() self.limit = limit - self.cancel_func = None + #hook for new data, hook func should return new data + #def write_hook(obj, data): + # return data + self.write_hook = None + #hook for flush, hook func result is discarded + # def flush_hook(obj): + # return None + self.flush_hook = None self.total_rows = 0 self.total_bytes = 0 def write(self, data): "New data from psycopg" + if self.write_hook: + data = self.write_hook(self, data) self.total_bytes += len(data) self.total_rows += data.count("\n") @@ -363,8 +373,8 @@ class CopyPipe(object): def flush(self): "Send data out." - if self.cancel_func: - self.cancel_func() + if self.flush_hook: + self.flush_hook(self) if self.buf.tell() <= 0: return @@ -377,8 +387,10 @@ class CopyPipe(object): self.buf.seek(0) self.buf.truncate() + def full_copy(tablename, src_curs, dst_curs, column_list = [], condition = None, - dst_tablename = None, dst_column_list = None): + dst_tablename = None, dst_column_list = None, + write_hook = None, flush_hook = None): """COPY table from one db to another.""" # default dst table and dst columns to source ones @@ -413,12 +425,16 @@ def full_copy(tablename, src_curs, dst_curs, column_list = [], condition = None, sql_to = "COPY %s TO stdout" % src sql_from = "COPY %s FROM stdin" % dst buf = CopyPipe(dst_curs, sql_from = sql_from) + buf.write_hook = write_hook + buf.flush_hook = flush_hook src_curs.copy_expert(sql_to, buf) else: if condition: # regular psycopg copy_to generates invalid sql for subselect copy raise Exception('copy_expert() is needed for conditional copy') buf = CopyPipe(dst_curs, dst) + buf.write_hook = write_hook + buf.flush_hook = flush_hook src_curs.copy_to(buf, src) buf.flush() @@ -601,7 +617,7 @@ def mk_delete_sql(row, tbl, pkey_list, field_map = None): col = skytools.quote_ident(new_k) val = skytools.quote_literal(row[k]) whe_list.append("%s = %s" % (col, val)) - whe_str = " and ".join(whe_list) + whe_str = " and ".join(whe_list) return "delete from only %s where %s;" % (skytools.quote_fqident(tbl), whe_str) if __name__ == '__main__': |