summaryrefslogtreecommitdiff
path: root/python/skytools/sqltools.py
diff options
context:
space:
mode:
authorEgon Valdmees2011-05-05 12:37:54 +0000
committerMarko Kreen2011-05-11 09:39:40 +0000
commitea4ee34522e75908f11f513d8ed040c6580b97fb (patch)
tree528ce7d743ff588b0151cee48b10ed59d56ab66f /python/skytools/sqltools.py
parent8abaf61c48f72e573d076a585d334d059f71fa14 (diff)
OCM-2279: londiste3 should translate everything to utf8, in a lossy way
where appropriate * added argument 'encoding' to dispatcher handler * tests for invalid utf8 sequences * support for renamed table copy in dispatcher handler
Diffstat (limited to 'python/skytools/sqltools.py')
-rw-r--r--python/skytools/sqltools.py28
1 files changed, 22 insertions, 6 deletions
diff --git a/python/skytools/sqltools.py b/python/skytools/sqltools.py
index b8a179be..97043231 100644
--- a/python/skytools/sqltools.py
+++ b/python/skytools/sqltools.py
@@ -331,18 +331,28 @@ def magic_insert(curs, tablename, data, fields = None, use_insert = 0, quoted_ta
class CopyPipe(object):
"Splits one big COPY to chunks."
- def __init__(self, dstcurs, tablename = None, limit = 512*1024, cancel_func=None, sql_from = None):
+ def __init__(self, dstcurs, tablename = None, limit = 512*1024,
+ sql_from = None):
self.tablename = tablename
self.sql_from = sql_from
self.dstcurs = dstcurs
self.buf = StringIO()
self.limit = limit
- self.cancel_func = None
+ #hook for new data, hook func should return new data
+ #def write_hook(obj, data):
+ # return data
+ self.write_hook = None
+ #hook for flush, hook func result is discarded
+ # def flush_hook(obj):
+ # return None
+ self.flush_hook = None
self.total_rows = 0
self.total_bytes = 0
def write(self, data):
"New data from psycopg"
+ if self.write_hook:
+ data = self.write_hook(self, data)
self.total_bytes += len(data)
self.total_rows += data.count("\n")
@@ -363,8 +373,8 @@ class CopyPipe(object):
def flush(self):
"Send data out."
- if self.cancel_func:
- self.cancel_func()
+ if self.flush_hook:
+ self.flush_hook(self)
if self.buf.tell() <= 0:
return
@@ -377,8 +387,10 @@ class CopyPipe(object):
self.buf.seek(0)
self.buf.truncate()
+
def full_copy(tablename, src_curs, dst_curs, column_list = [], condition = None,
- dst_tablename = None, dst_column_list = None):
+ dst_tablename = None, dst_column_list = None,
+ write_hook = None, flush_hook = None):
"""COPY table from one db to another."""
# default dst table and dst columns to source ones
@@ -413,12 +425,16 @@ def full_copy(tablename, src_curs, dst_curs, column_list = [], condition = None,
sql_to = "COPY %s TO stdout" % src
sql_from = "COPY %s FROM stdin" % dst
buf = CopyPipe(dst_curs, sql_from = sql_from)
+ buf.write_hook = write_hook
+ buf.flush_hook = flush_hook
src_curs.copy_expert(sql_to, buf)
else:
if condition:
# regular psycopg copy_to generates invalid sql for subselect copy
raise Exception('copy_expert() is needed for conditional copy')
buf = CopyPipe(dst_curs, dst)
+ buf.write_hook = write_hook
+ buf.flush_hook = flush_hook
src_curs.copy_to(buf, src)
buf.flush()
@@ -601,7 +617,7 @@ def mk_delete_sql(row, tbl, pkey_list, field_map = None):
col = skytools.quote_ident(new_k)
val = skytools.quote_literal(row[k])
whe_list.append("%s = %s" % (col, val))
- whe_str = " and ".join(whe_list)
+ whe_str = " and ".join(whe_list)
return "delete from only %s where %s;" % (skytools.quote_fqident(tbl), whe_str)
if __name__ == '__main__':