summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorEgon Valdmees2011-03-22 13:56:50 +0000
committerEgon Valdmees2011-03-22 13:56:50 +0000
commitac8785753fefb2aed5975a3fc70d4de7268f33ec (patch)
tree41796b0d6ba21f517a13110a8be6b25de4f43253 /python
parent88a1f049fd2a9013be85d1dd4c8aec5852c26292 (diff)
fix: temp table schema & quoting, update & delete sql, bulk load count checks
Diffstat (limited to 'python')
-rw-r--r--python/londiste/handlers/dispatch.py101
1 files changed, 55 insertions, 46 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index 91c2c8b2..b93651e2 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -268,9 +268,8 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader):
BaseBulkCollectingLoader.__init__(self, table, pkeys, log, conf)
# temp table name
self.temp = self.table.replace('.', '_') + "_loadertmp"
- # quoted table names
+ # quoted table name
self.qtable = quote_fqident(self.table)
- self.qtemp = quote_fqident(self.temp)
# all fields
self.fields = None
# key fields used in where part, possible to add non pk fields
@@ -291,17 +290,16 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader):
# create sql parts
def _where(self):
- tmpl = "%(tbl)s.%(col)s = %(tmp)s.%(col)s"
+ tmpl = "%(tbl)s.%(col)s = t.%(col)s"
stmt = (tmpl % {'col': quote_ident(f),
'tbl': self.qtable,
- 'tmp': self.qtemp}
+ }
for f in self.keys)
return ' and '.join(stmt)
def _set(self):
- tmpl = "%(col)s = %(tmp)s.%(col)s"
- stmt = (tmpl % {'col': quote_ident(f),
- 'tmp': self.qtemp}
+ tmpl = "%(col)s = t.%(col)s"
+ stmt = (tmpl % {'col': quote_ident(f)}
for f in self.nonkeys())
return ", ".join(stmt)
@@ -310,31 +308,31 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader):
def insert(self, curs):
sql = "insert into %s (%s) select %s from %s" % (
- self.qtable, self._cols(), self._cols(), self.qtemp)
+ self.qtable, self._cols(), self._cols(), self.temp)
return self.logexec(curs, sql)
def update(self, curs):
- sql = "update only %s set %s from %s where %s" % (
- self.qtable, self._set(), self.qtemp, self._where())
+ sql = "update only %s set %s from %s as t where %s" % (
+ self.qtable, self._set(), self.temp, self._where())
return self.logexec(curs, sql)
def delete(self, curs):
sql = "delete from only %s using %s as t where %s" % (
- self.qtable, self.qtemp, self._where())
+ self.qtable, self.temp, self._where())
return self.logexec(curs, sql)
def truncate(self, curs):
- return self.logexec(curs, "truncate %s" % self.qtemp)
+ return self.logexec(curs, "truncate %s" % self.temp)
def drop(self, curs):
- return self.logexec(curs, "drop table %s" % self.qtemp)
+ return self.logexec(curs, "drop table %s" % self.temp)
def create(self, curs):
tmpl = "create temp table %s (like %s) on commit preserve rows"
- return self.logexec(curs, tmpl % (self.qtemp, self.qtable))
+ return self.logexec(curs, tmpl % (self.temp, self.qtable))
def analyze(self, curs):
- return self.logexec(curs, "analyze %s" % self.qtemp)
+ return self.logexec(curs, "analyze %s" % self.temp)
def process(self, op, row):
BaseBulkCollectingLoader.process(self, op, row)
@@ -348,8 +346,8 @@ class BulkLoader(BaseBulkTempLoader):
def __init__(self, table, pkeys, log, conf):
BaseBulkTempLoader.__init__(self, table, pkeys, log, conf)
self.method = self.conf['method']
- # is temp table used
- self.temp_used = False
+ # is temp table created
+ self.temp_present = False
def process(self, op, row):
if self.method == METH_INSERT and op != 'I':
@@ -363,14 +361,12 @@ class BulkLoader(BaseBulkTempLoader):
if (cnt == 0):
return
self.log.debug("bulk: Deleting %d rows from %s" % (cnt, self.table))
- # delete old rows from temp
- self.truncate(curs)
# copy rows to temp
self.bulk_insert(curs, data)
# delete rows using temp
self.delete(curs)
- # check if right amount of rows deleted
- if cnt != curs.rowcount:
+ # check if right amount of rows deleted (only in direct mode)
+ if self.conf.table_mode == 'direct' and cnt != curs.rowcount:
self.log.warning("Delete mismatch: expected=%s deleted=%d"
% (cnt, curs.rowcount))
@@ -386,28 +382,26 @@ class BulkLoader(BaseBulkTempLoader):
if (cnt == 0):
return
self.log.debug("bulk: Updating %d rows in %s" % (cnt, self.table))
- # delete old rows from temp
- self.truncate(curs)
# copy rows to temp
self.bulk_insert(curs, data)
if self.method == METH_CORRECT:
# update main table from temp
self.update(curs)
- # check count
- if cnt != curs.rowcount:
+ # check count (only in direct mode)
+ if self.conf.table_mode == 'direct' and cnt != curs.rowcount:
self.log.warning("Update mismatch: expected=%s updated=%d"
% (cnt, curs.rowcount))
else:
# delete from main table using temp
self.delete(curs)
- # check count
- if real_cnt != curs.rowcount:
+ # check count (only in direct mode)
+ if self.conf.table_mode == 'direct' and real_cnt != curs.rowcount:
self.log.warning("bulk: Update mismatch: expected=%s deleted=%d"
% (real_cnt, curs.rowcount))
# insert into main table
if AVOID_BIZGRES_BUG:
# copy again, into main table
- self.bulk_insert(curs, data, table = self.table)
+ self.bulk_insert(curs, data, table = self.qtable)
else:
# insert from temp - better way, but does not work
# due bizgres bug
@@ -422,7 +416,7 @@ class BulkLoader(BaseBulkTempLoader):
return
self.log.debug("bulk: Inserting %d rows into %s" % (cnt, self.table))
# copy into target table (no temp used)
- self.bulk_insert(curs, data, table = self.table)
+ self.bulk_insert(curs, data, table = self.qtable)
def bulk_flush(self, curs, op_map):
self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)" % (
@@ -435,6 +429,8 @@ class BulkLoader(BaseBulkTempLoader):
for key in dist_keys:
if key not in self.keys:
self.keys.append(key)
+ # check if temp table present
+ self.check_temp(curs)
# process I,U,D
self.process_delete(curs, op_map)
self.process_update(curs, op_map)
@@ -442,37 +438,50 @@ class BulkLoader(BaseBulkTempLoader):
# truncate or drop temp table
self.clean_temp(curs)
+ def check_temp(self, curs):
+ self.temp_present = skytools.exists_temp_table(curs, self.temp)
+
def clean_temp(self, curs):
# delete remaining rows
- if self.temp_used:
+ if self.temp_present:
if USE_LONGLIVED_TEMP_TABLES:
self.truncate(curs)
else:
# fscking problems with long-lived temp tables
self.drop(curs)
+
+ def create_temp(self, curs):
+ """ check if temp table exists. Returns False if using existing temp
+ table and True if creating new
+ """
+ if USE_LONGLIVED_TEMP_TABLES:
+ if self.temp_present:
+ self.log.debug("bulk: Using existing temp table %s" % self.temp)
+ return False
+ self.create(curs)
+ self.temp_present = True
+ return True
def bulk_insert(self, curs, data, table = None):
- """Copy data to table. If table not provided, use temp table
+ """Copy data to table. If table not provided, use temp table.
+ When re-using existing temp table, it is always truncated first and
+ analyzed after copy.
"""
if not data:
return
- _table = table or self.temp
- self.log.debug("bulk: COPY %d rows into %s" % (len(data), _table))
- if not table:
- self.create_temp(curs)
- skytools.magic_insert(curs, _table, data, self.fields)
- if not table:
+ _use_temp = table is None
+ # if table not specified use temp
+ if _use_temp:
+ table = self.temp
+ # truncate when re-using existing table
+ if not self.create_temp(curs):
+ self.truncate(curs)
+ self.log.debug("bulk: COPY %d rows into %s" % (len(data), table))
+ skytools.magic_insert(curs, table, data, self.fields,
+ quoted_table = True)
+ if _use_temp:
self.analyze(curs)
- def create_temp(self, curs):
- self.temp_used = True
- # check if exists
- if USE_LONGLIVED_TEMP_TABLES:
- if skytools.exists_temp_table(curs, self.temp):
- self.log.debug("bulk: Using existing temp table %s" % self.temp)
- return
- self.create(curs)
-
def find_dist_fields(self, curs):
"""Find GP distribution keys"""
if not skytools.exists_table(curs, "pg_catalog.gp_distribution_policy"):