summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/londiste/handlers/dispatch.py36
1 files changed, 24 insertions, 12 deletions
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index c5c1fd73..02105512 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -164,6 +164,7 @@ METH_INSERT = 3
# BulkLoader hacks
AVOID_BIZGRES_BUG = 0
USE_LONGLIVED_TEMP_TABLES = True
+USE_REAL_TABLE = True
# mode variables (first in list is default value)
TABLE_MODES = ['part', 'direct', 'ignore']
@@ -278,7 +279,12 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader):
def __init__(self, table, pkeys, log, conf):
BaseBulkCollectingLoader.__init__(self, table, pkeys, log, conf)
# temp table name
- self.temp = self.table.replace('.', '_') + "_loadertmp"
+ if USE_REAL_TABLE:
+ self.temp = self.table + "_loadertmpx"
+ self.qtemp = quote_ident(self.temp)
+ else:
+ self.temp = self.table.replace('.', '_') + "_loadertmp"
+ self.qtemp = quote_fqident(self.temp)
# quoted table name
self.qtable = quote_fqident(self.table)
# all fields
@@ -319,31 +325,34 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader):
def insert(self, curs):
sql = "insert into %s (%s) select %s from %s" % (
- self.qtable, self._cols(), self._cols(), self.temp)
+ self.qtable, self._cols(), self._cols(), self.qtemp)
return self.logexec(curs, sql)
def update(self, curs):
sql = "update only %s set %s from %s as t where %s" % (
- self.qtable, self._set(), self.temp, self._where())
+ self.qtable, self._set(), self.qtemp, self._where())
return self.logexec(curs, sql)
def delete(self, curs):
sql = "delete from only %s using %s as t where %s" % (
- self.qtable, self.temp, self._where())
+ self.qtable, self.qtemp, self._where())
return self.logexec(curs, sql)
def truncate(self, curs):
- return self.logexec(curs, "truncate %s" % self.temp)
+ return self.logexec(curs, "truncate %s" % self.qtemp)
def drop(self, curs):
- return self.logexec(curs, "drop table %s" % self.temp)
+ return self.logexec(curs, "drop table %s" % self.qtemp)
def create(self, curs):
- tmpl = "create temp table %s (like %s) on commit preserve rows"
- return self.logexec(curs, tmpl % (self.temp, self.qtable))
+ if self.USE_REAL_TABLE:
+ tmpl = "create table %s (like %s)"
+ else:
+ tmpl = "create temp table %s (like %s) on commit preserve rows"
+ return self.logexec(curs, tmpl % (self.qtemp, self.qtable))
def analyze(self, curs):
- return self.logexec(curs, "analyze %s" % self.temp)
+ return self.logexec(curs, "analyze %s" % self.qtemp)
def process(self, op, row):
BaseBulkCollectingLoader.process(self, op, row)
@@ -450,12 +459,15 @@ class BulkLoader(BaseBulkTempLoader):
self.clean_temp(curs)
def check_temp(self, curs):
- self.temp_present = skytools.exists_temp_table(curs, self.temp)
+ if USE_REAL_TABLE:
+ self.temp_present = skytools.exists_table(curs, self.temp)
+ else:
+ self.temp_present = skytools.exists_temp_table(curs, self.temp)
def clean_temp(self, curs):
# delete remaining rows
if self.temp_present:
- if USE_LONGLIVED_TEMP_TABLES:
+ if USE_LONGLIVED_TEMP_TABLES or USE_REAL_TABLE:
self.truncate(curs)
else:
# fscking problems with long-lived temp tables
@@ -465,7 +477,7 @@ class BulkLoader(BaseBulkTempLoader):
""" check if temp table exists. Returns False if using existing temp
table and True if creating new
"""
- if USE_LONGLIVED_TEMP_TABLES:
+ if USE_LONGLIVED_TEMP_TABLES or USE_REAL_TABLE:
if self.temp_present:
self.log.debug("bulk: Using existing temp table %s" % self.temp)
return False