diff options
-rw-r--r-- | python/londiste/handlers/dispatch.py | 36 |
1 files changed, 24 insertions, 12 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index c5c1fd73..02105512 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -164,6 +164,7 @@ METH_INSERT = 3 # BulkLoader hacks AVOID_BIZGRES_BUG = 0 USE_LONGLIVED_TEMP_TABLES = True +USE_REAL_TABLE = True # mode variables (first in list is default value) TABLE_MODES = ['part', 'direct', 'ignore'] @@ -278,7 +279,12 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader): def __init__(self, table, pkeys, log, conf): BaseBulkCollectingLoader.__init__(self, table, pkeys, log, conf) # temp table name - self.temp = self.table.replace('.', '_') + "_loadertmp" + if USE_REAL_TABLE: + self.temp = self.table + "_loadertmpx" + self.qtemp = quote_ident(self.temp) + else: + self.temp = self.table.replace('.', '_') + "_loadertmp" + self.qtemp = quote_fqident(self.temp) # quoted table name self.qtable = quote_fqident(self.table) # all fields @@ -319,31 +325,34 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader): def insert(self, curs): sql = "insert into %s (%s) select %s from %s" % ( - self.qtable, self._cols(), self._cols(), self.temp) + self.qtable, self._cols(), self._cols(), self.qtemp) return self.logexec(curs, sql) def update(self, curs): sql = "update only %s set %s from %s as t where %s" % ( - self.qtable, self._set(), self.temp, self._where()) + self.qtable, self._set(), self.qtemp, self._where()) return self.logexec(curs, sql) def delete(self, curs): sql = "delete from only %s using %s as t where %s" % ( - self.qtable, self.temp, self._where()) + self.qtable, self.qtemp, self._where()) return self.logexec(curs, sql) def truncate(self, curs): - return self.logexec(curs, "truncate %s" % self.temp) + return self.logexec(curs, "truncate %s" % self.qtemp) def drop(self, curs): - return self.logexec(curs, "drop table %s" % self.temp) + return self.logexec(curs, "drop table %s" % self.qtemp) def create(self, curs): - tmpl = "create temp table %s (like %s) on commit preserve rows" - return self.logexec(curs, tmpl % (self.temp, self.qtable)) + if self.USE_REAL_TABLE: + tmpl = "create table %s (like %s)" + else: + tmpl = "create temp table %s (like %s) on commit preserve rows" + return self.logexec(curs, tmpl % (self.qtemp, self.qtable)) def analyze(self, curs): - return self.logexec(curs, "analyze %s" % self.temp) + return self.logexec(curs, "analyze %s" % self.qtemp) def process(self, op, row): BaseBulkCollectingLoader.process(self, op, row) @@ -450,12 +459,15 @@ class BulkLoader(BaseBulkTempLoader): self.clean_temp(curs) def check_temp(self, curs): - self.temp_present = skytools.exists_temp_table(curs, self.temp) + if USE_REAL_TABLE: + self.temp_present = skytools.exists_table(curs, self.temp) + else: + self.temp_present = skytools.exists_temp_table(curs, self.temp) def clean_temp(self, curs): # delete remaining rows if self.temp_present: - if USE_LONGLIVED_TEMP_TABLES: + if USE_LONGLIVED_TEMP_TABLES or USE_REAL_TABLE: self.truncate(curs) else: # fscking problems with long-lived temp tables @@ -465,7 +477,7 @@ class BulkLoader(BaseBulkTempLoader): """ check if temp table exists. Returns False if using existing temp table and True if creating new """ - if USE_LONGLIVED_TEMP_TABLES: + if USE_LONGLIVED_TEMP_TABLES or USE_REAL_TABLE: if self.temp_present: self.log.debug("bulk: Using existing temp table %s" % self.temp) return False |