diff options
author | Egon Valdmees | 2011-03-22 13:56:50 +0000 |
---|---|---|
committer | Egon Valdmees | 2011-03-22 13:56:50 +0000 |
commit | ac8785753fefb2aed5975a3fc70d4de7268f33ec (patch) | |
tree | 41796b0d6ba21f517a13110a8be6b25de4f43253 /python | |
parent | 88a1f049fd2a9013be85d1dd4c8aec5852c26292 (diff) |
fix: temp table schema & quoting, update & delete sql, bulk load count checks
Diffstat (limited to 'python')
-rw-r--r-- | python/londiste/handlers/dispatch.py | 101 |
1 files changed, 55 insertions, 46 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index 91c2c8b2..b93651e2 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -268,9 +268,8 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader): BaseBulkCollectingLoader.__init__(self, table, pkeys, log, conf) # temp table name self.temp = self.table.replace('.', '_') + "_loadertmp" - # quoted table names + # quoted table name self.qtable = quote_fqident(self.table) - self.qtemp = quote_fqident(self.temp) # all fields self.fields = None # key fields used in where part, possible to add non pk fields @@ -291,17 +290,16 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader): # create sql parts def _where(self): - tmpl = "%(tbl)s.%(col)s = %(tmp)s.%(col)s" + tmpl = "%(tbl)s.%(col)s = t.%(col)s" stmt = (tmpl % {'col': quote_ident(f), 'tbl': self.qtable, - 'tmp': self.qtemp} + } for f in self.keys) return ' and '.join(stmt) def _set(self): - tmpl = "%(col)s = %(tmp)s.%(col)s" - stmt = (tmpl % {'col': quote_ident(f), - 'tmp': self.qtemp} + tmpl = "%(col)s = t.%(col)s" + stmt = (tmpl % {'col': quote_ident(f)} for f in self.nonkeys()) return ", ".join(stmt) @@ -310,31 +308,31 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader): def insert(self, curs): sql = "insert into %s (%s) select %s from %s" % ( - self.qtable, self._cols(), self._cols(), self.qtemp) + self.qtable, self._cols(), self._cols(), self.temp) return self.logexec(curs, sql) def update(self, curs): - sql = "update only %s set %s from %s where %s" % ( - self.qtable, self._set(), self.qtemp, self._where()) + sql = "update only %s set %s from %s as t where %s" % ( + self.qtable, self._set(), self.temp, self._where()) return self.logexec(curs, sql) def delete(self, curs): sql = "delete from only %s using %s as t where %s" % ( - self.qtable, self.qtemp, self._where()) + self.qtable, self.temp, self._where()) return self.logexec(curs, sql) def truncate(self, curs): - return self.logexec(curs, "truncate %s" % self.qtemp) + return self.logexec(curs, "truncate %s" % self.temp) def drop(self, curs): - return self.logexec(curs, "drop table %s" % self.qtemp) + return self.logexec(curs, "drop table %s" % self.temp) def create(self, curs): tmpl = "create temp table %s (like %s) on commit preserve rows" - return self.logexec(curs, tmpl % (self.qtemp, self.qtable)) + return self.logexec(curs, tmpl % (self.temp, self.qtable)) def analyze(self, curs): - return self.logexec(curs, "analyze %s" % self.qtemp) + return self.logexec(curs, "analyze %s" % self.temp) def process(self, op, row): BaseBulkCollectingLoader.process(self, op, row) @@ -348,8 +346,8 @@ class BulkLoader(BaseBulkTempLoader): def __init__(self, table, pkeys, log, conf): BaseBulkTempLoader.__init__(self, table, pkeys, log, conf) self.method = self.conf['method'] - # is temp table used - self.temp_used = False + # is temp table created + self.temp_present = False def process(self, op, row): if self.method == METH_INSERT and op != 'I': @@ -363,14 +361,12 @@ class BulkLoader(BaseBulkTempLoader): if (cnt == 0): return self.log.debug("bulk: Deleting %d rows from %s" % (cnt, self.table)) - # delete old rows from temp - self.truncate(curs) # copy rows to temp self.bulk_insert(curs, data) # delete rows using temp self.delete(curs) - # check if right amount of rows deleted - if cnt != curs.rowcount: + # check if right amount of rows deleted (only in direct mode) + if self.conf.table_mode == 'direct' and cnt != curs.rowcount: self.log.warning("Delete mismatch: expected=%s deleted=%d" % (cnt, curs.rowcount)) @@ -386,28 +382,26 @@ class BulkLoader(BaseBulkTempLoader): if (cnt == 0): return self.log.debug("bulk: Updating %d rows in %s" % (cnt, self.table)) - # delete old rows from temp - self.truncate(curs) # copy rows to temp self.bulk_insert(curs, data) if self.method == METH_CORRECT: # update main table from temp self.update(curs) - # check count - if cnt != curs.rowcount: + # check count (only in direct mode) + if self.conf.table_mode == 'direct' and cnt != curs.rowcount: self.log.warning("Update mismatch: expected=%s updated=%d" % (cnt, curs.rowcount)) else: # delete from main table using temp self.delete(curs) - # check count - if real_cnt != curs.rowcount: + # check count (only in direct mode) + if self.conf.table_mode == 'direct' and real_cnt != curs.rowcount: self.log.warning("bulk: Update mismatch: expected=%s deleted=%d" % (real_cnt, curs.rowcount)) # insert into main table if AVOID_BIZGRES_BUG: # copy again, into main table - self.bulk_insert(curs, data, table = self.table) + self.bulk_insert(curs, data, table = self.qtable) else: # insert from temp - better way, but does not work # due bizgres bug @@ -422,7 +416,7 @@ class BulkLoader(BaseBulkTempLoader): return self.log.debug("bulk: Inserting %d rows into %s" % (cnt, self.table)) # copy into target table (no temp used) - self.bulk_insert(curs, data, table = self.table) + self.bulk_insert(curs, data, table = self.qtable) def bulk_flush(self, curs, op_map): self.log.debug("bulk_flush: %s (I/U/D = %d/%d/%d)" % ( @@ -435,6 +429,8 @@ class BulkLoader(BaseBulkTempLoader): for key in dist_keys: if key not in self.keys: self.keys.append(key) + # check if temp table present + self.check_temp(curs) # process I,U,D self.process_delete(curs, op_map) self.process_update(curs, op_map) @@ -442,37 +438,50 @@ class BulkLoader(BaseBulkTempLoader): # truncate or drop temp table self.clean_temp(curs) + def check_temp(self, curs): + self.temp_present = skytools.exists_temp_table(curs, self.temp) + def clean_temp(self, curs): # delete remaining rows - if self.temp_used: + if self.temp_present: if USE_LONGLIVED_TEMP_TABLES: self.truncate(curs) else: # fscking problems with long-lived temp tables self.drop(curs) + + def create_temp(self, curs): + """ check if temp table exists. Returns False if using existing temp + table and True if creating new + """ + if USE_LONGLIVED_TEMP_TABLES: + if self.temp_present: + self.log.debug("bulk: Using existing temp table %s" % self.temp) + return False + self.create(curs) + self.temp_present = True + return True def bulk_insert(self, curs, data, table = None): - """Copy data to table. If table not provided, use temp table + """Copy data to table. If table not provided, use temp table. + When re-using existing temp table, it is always truncated first and + analyzed after copy. """ if not data: return - _table = table or self.temp - self.log.debug("bulk: COPY %d rows into %s" % (len(data), _table)) - if not table: - self.create_temp(curs) - skytools.magic_insert(curs, _table, data, self.fields) - if not table: + _use_temp = table is None + # if table not specified use temp + if _use_temp: + table = self.temp + # truncate when re-using existing table + if not self.create_temp(curs): + self.truncate(curs) + self.log.debug("bulk: COPY %d rows into %s" % (len(data), table)) + skytools.magic_insert(curs, table, data, self.fields, + quoted_table = True) + if _use_temp: self.analyze(curs) - def create_temp(self, curs): - self.temp_used = True - # check if exists - if USE_LONGLIVED_TEMP_TABLES: - if skytools.exists_temp_table(curs, self.temp): - self.log.debug("bulk: Using existing temp table %s" % self.temp) - return - self.create(curs) - def find_dist_fields(self, curs): """Find GP distribution keys""" if not skytools.exists_table(curs, "pg_catalog.gp_distribution_policy"): |