logfile =
pidfile =
- db =
+ db =
- #rename_tables =
+ #rename_tables =
[DEFAULT]
# alter table only %%(part)s add primary key (%%(pkey)s);
#
### Inherited partitions
- #split_part_template =
+ #split_part_template =
# create table %%(part)s () inherits (%%(parent)s);
# alter table only %%(part)s add primary key (%%(pkey)s);
#bulk_mode=correct
[table public.foo]
- mode =
+ mode =
create_sql =
"""
from pgq.cascade.worker import CascadedWorker
from skytools import quote_ident, quote_fqident, UsageError
-# todo: auto table detect
+# TODO: auto table detect
# BulkLoader load method
METH_CORRECT = 0
AVOID_BIZGRES_BUG = 0
USE_LONGLIVED_TEMP_TABLES = True
+
class BasicLoader:
"""Apply events as-is."""
def __init__(self, table_name, parent_name, log):
curs.execute("\n".join(self.sql_list))
self.sql_list = []
+
class KeepLatestLoader(BasicLoader):
"""Keep latest row version.
self.data = data
self.pk_data = pk_data
+
class BulkLoader(BasicLoader):
"""Instead of statement-per event, load all data with one
big COPY, UPDATE or DELETE statement.
# take last event
ev = ev_list[-1]
-
+
# generate needed commands
if exists_before and exists_after:
upd_list.append(ev.data)
real_update_count = len(upd_list)
- #self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)" % (
- # tbl, len(ins_list), len(upd_list), len(del_list)))
+ #self.log.debug("process_one_table: %s (I/U/D = %d/%d/%d)",
+ # tbl, len(ins_list), len(upd_list), len(del_list))
# hack to unbroke stuff
if LOAD_METHOD == METH_MERGED:
for fld in self.dist_fields:
if fld not in key_fields:
key_fields.append(fld)
- #self.log.debug("PKey fields: %s Extra fields: %s" % (
- # ",".join(cache.pkey_list), ",".join(extra_fields)))
+ #self.log.debug("PKey fields: %s Extra fields: %s",
+ # ",".join(cache.pkey_list), ",".join(extra_fields))
# create temp table
temp = self.create_temp_table(curs)
tbl = self.table_name
-
+
# where expr must have pkey and dist fields
klist = []
for pk in key_fields:
# process deleted rows
if len(del_list) > 0:
- #self.log.info("Deleting %d rows from %s" % (len(del_list), tbl))
+ #self.log.info("Deleting %d rows from %s", len(del_list), tbl)
# delete old rows
q = "truncate %s" % quote_fqident(temp)
self.log.debug(q)
curs.execute(q)
# copy rows
- self.log.debug("COPY %d rows into %s" % (len(del_list), temp))
+ self.log.debug("COPY %d rows into %s", len(del_list), temp)
skytools.magic_insert(curs, temp, del_list, col_list)
# delete rows
self.log.debug(del_sql)
curs.execute(del_sql)
- self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount))
+ self.log.debug("%s - %d", curs.statusmessage, curs.rowcount)
if len(del_list) != curs.rowcount:
- self.log.warning("Delete mismatch: expected=%s deleted=%d"
- % (len(del_list), curs.rowcount))
+ self.log.warning("Delete mismatch: expected=%d deleted=%d",
+ len(del_list), curs.rowcount)
temp_used = True
# process updated rows
if len(upd_list) > 0:
- #self.log.info("Updating %d rows in %s" % (len(upd_list), tbl))
+ #self.log.info("Updating %d rows in %s", len(upd_list), tbl)
# delete old rows
q = "truncate %s" % quote_fqident(temp)
self.log.debug(q)
curs.execute(q)
# copy rows
- self.log.debug("COPY %d rows into %s" % (len(upd_list), temp))
+ self.log.debug("COPY %d rows into %s", len(upd_list), temp)
skytools.magic_insert(curs, temp, upd_list, col_list)
temp_used = True
if LOAD_METHOD == METH_CORRECT:
# update main table
self.log.debug(upd_sql)
curs.execute(upd_sql)
- self.log.debug("%s - %d" % (curs.statusmessage, curs.rowcount))
+ self.log.debug("%s - %d", curs.statusmessage, curs.rowcount)
# check count
if len(upd_list) != curs.rowcount:
- self.log.warning("Update mismatch: expected=%s updated=%d"
- % (len(upd_list), curs.rowcount))
+ self.log.warning("Update mismatch: expected=%d updated=%d",
+ len(upd_list), curs.rowcount)
else:
# delete from main table
self.log.debug(del_sql)
self.log.debug(curs.statusmessage)
# check count
if real_update_count != curs.rowcount:
- self.log.warning("Update mismatch: expected=%s deleted=%d"
- % (real_update_count, curs.rowcount))
+ self.log.warning("Update mismatch: expected=%d deleted=%d",
+ real_update_count, curs.rowcount)
# insert into main table
if AVOID_BIZGRES_BUG:
# copy again, into main table
- self.log.debug("COPY %d rows into %s" % (len(upd_list), tbl))
+ self.log.debug("COPY %d rows into %s", len(upd_list), tbl)
skytools.magic_insert(curs, tbl, upd_list, col_list)
else:
# better way, but does not work due bizgres bug
# process new rows
if len(ins_list) > 0:
- self.log.info("Inserting %d rows into %s" % (len(ins_list), tbl))
+ self.log.info("Inserting %d rows into %s", len(ins_list), tbl)
skytools.magic_insert(curs, tbl, ins_list, col_list)
# delete remaining rows
# check if exists
if USE_LONGLIVED_TEMP_TABLES:
if skytools.exists_temp_table(curs, tempname):
- self.log.debug("Using existing temp table %s" % tempname)
+ self.log.debug("Using existing temp table %s", tempname)
return tempname
-
+
# bizgres crashes on delete rows
arg = "on commit delete rows"
arg = "on commit preserve rows"
# create temp table for loading
q = "create temp table %s (like %s) %s" % (
quote_fqident(tempname), quote_fqident(self.table_name), arg)
- self.log.debug("Creating temp table: %s" % q)
+ self.log.debug("Creating temp table: %s", q)
curs.execute(q)
return tempname
self.split_format = self.split_date_from_field
else:
raise UsageError('Bad value for split_mode: '+smode)
- self.log.debug("%s: split_mode=%s, split_field=%s, split_part=%s" % (
- self.table_name, smode, self.split_field, self.split_part))
+ self.log.debug("%s: split_mode=%s, split_field=%s, split_part=%s",
+ self.table_name, smode, self.split_field, self.split_part)
elif table_mode == 'ignore':
pass
else:
st.flush(curs)
CascadedWorker.finish_remote_batch(self, src_db, dst_db, tick_id)
+
if __name__ == '__main__':
script = QueueLoader('queue_loader', 'db', sys.argv[1:])
script.start()
got = 1
self.add_job(fn, sect)
if not got:
- self.log.warning('Cannot find service for %s' % fn)
+ self.log.warning('Cannot find service for %s', fn)
def add_job(self, cf_file, service_name):
svc = self.svc_map[service_name]
job = self.get_job_by_name (job_name)
if isinstance (job, int):
return job # ret.code
- self.log.info('Starting %s' % job_name)
+ self.log.info('Starting %s', job_name)
pidfile = job['pidfile']
if not pidfile:
- self.log.warning("No pidfile for %s, cannot launch" % job_name)
+ self.log.warning("No pidfile for %s, cannot launch", job_name)
return 0
if os.path.isfile(pidfile):
if skytools.signal_pidfile(pidfile, 0):
- self.log.warning("Script %s seems running" % job_name)
+ self.log.warning("Script %s seems running", job_name)
return 0
else:
- self.log.info("Ignoring stale pidfile for %s" % job_name)
+ self.log.info("Ignoring stale pidfile for %s", job_name)
os.chdir(job['cwd'])
cmd = "%(script)s %(config)s %(args)s -d" % job
res = launch_cmd(job, cmd)
self.log.debug(res)
if res != 0:
- self.log.error('startup failed: %s' % job_name)
+ self.log.error('startup failed: %s', job_name)
return 1
else:
return 0
job = self.get_job_by_name (job_name)
if isinstance (job, int):
return job # ret.code
- self.log.info('Stopping %s' % job_name)
+ self.log.info('Stopping %s', job_name)
self.signal_job(job, signal.SIGINT)
def cmd_reload(self, job_name):
job = self.get_job_by_name (job_name)
if isinstance (job, int):
return job # ret.code
- self.log.info('Reloading %s' % job_name)
+ self.log.info('Reloading %s', job_name)
self.signal_job(job, signal.SIGHUP)
def get_job_by_name (self, job_name):
if job_name not in self.job_map:
- self.log.error ("Unknown job: %s" % job_name)
+ self.log.error ("Unknown job: %s", job_name)
return 1
job = self.job_map[job_name]
if job['disabled']:
- self.log.info ("Skipping %s" % job_name)
+ self.log.info ("Skipping %s", job_name)
return 0
return job
while True:
if skytools.signal_pidfile (job['pidfile'], 0):
if not msg:
- self.log.info ("Waiting for %s to stop" % job_name)
+ self.log.info ("Waiting for %s to stop", job_name)
msg = True
time.sleep (0.1)
else:
def signal_job(self, job, sig):
pidfile = job['pidfile']
if not pidfile:
- self.log.warning("No pidfile for %s (%s)" % (job['job_name'], job['config']))
+ self.log.warning("No pidfile for %s (%s)", job['job_name'], job['config'])
return
if os.path.isfile(pidfile):
pid = int(open(pidfile).read())
# run sudo + kill to avoid killing unrelated processes
res = os.system("sudo -u %s kill %d" % (job['user'], pid))
if res:
- self.log.warning("Signaling %s failed" % (job['job_name'],))
+ self.log.warning("Signaling %s failed", job['job_name'])
else:
# direct kill
try:
os.kill(pid, sig)
except Exception, det:
- self.log.warning("Signaling %s failed: %s" % (job['job_name'], str(det)))
+ self.log.warning("Signaling %s failed: %s", job['job_name'], det)
else:
- self.log.warning("Job %s not running" % job['job_name'])
+ self.log.warning("Job %s not running", job['job_name'])
def work(self):
self.set_single_loop(1)