summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/londiste.py4
-rw-r--r--python/londiste/handler.py24
-rw-r--r--python/londiste/handlers/__init__.py6
-rw-r--r--python/londiste/handlers/bulk.py4
-rw-r--r--python/londiste/handlers/dispatch.py24
-rw-r--r--python/londiste/handlers/qtable.py16
-rw-r--r--python/londiste/handlers/vtable.py25
-rw-r--r--python/londiste/playback.py77
-rw-r--r--python/londiste/setup.py27
-rw-r--r--python/londiste/table_copy.py50
-rw-r--r--python/pgq/cascade/consumer.py4
-rw-r--r--python/pgq/consumer.py15
-rwxr-xr-xpython/qadmin.py20
-rw-r--r--python/skytools/plpy_applyrow.py2
-rw-r--r--python/skytools/scripting.py31
15 files changed, 249 insertions, 80 deletions
diff --git a/python/londiste.py b/python/londiste.py
index 778b9f78..542c9c5d 100755
--- a/python/londiste.py
+++ b/python/londiste.py
@@ -37,6 +37,7 @@ Replication Extra:
compare [TBL ...] compare table contents on both sides
repair [TBL ...] repair data on subscriber
execute [FILE ...] execute SQL files on set
+ show-handlers [..] show info about all or specific handler
Internal Commands:
copy copy table logic
@@ -48,6 +49,7 @@ cmd_handlers = (
'drop-node', 'takeover'), londiste.LondisteSetup),
(('add-table', 'remove-table', 'add-seq', 'remove-seq', 'tables', 'seqs',
'missing', 'resync', 'check', 'fkeys', 'execute'), londiste.LondisteSetup),
+ (('show-handlers',), londiste.LondisteSetup),
(('worker', 'replay'), londiste.Replicator),
(('compare',), londiste.Comparator),
(('repair',), londiste.Repairer),
@@ -132,6 +134,8 @@ class Londiste(skytools.DBScript):
help="merge tables from all source queues", default=False)
g.add_option("--no-merge", action="store_true",
help="don't merge tables from source queues", default=False)
+ g.add_option("--max-parallel-copy", type = "int",
+ help="max number of parallel copy processes")
p.add_option_group(g)
diff --git a/python/londiste/handler.py b/python/londiste/handler.py
index 9e52db5f..aff0e4ce 100644
--- a/python/londiste/handler.py
+++ b/python/londiste/handler.py
@@ -146,12 +146,15 @@ _handler_map = {
'londiste': TableHandler,
}
+_handler_list = _handler_map.keys()
+
def register_handler_module(modname):
"""Import and module and register handlers."""
__import__(modname)
m = sys.modules[modname]
for h in m.__londiste_handlers__:
_handler_map[h.handler_name] = h
+ _handler_list.append(h.handler_name)
def _parse_arglist(arglist):
args = {}
@@ -203,3 +206,24 @@ def load_handler_modules(cf):
for m in lst:
register_handler_module(m)
+def show(mods):
+ if not mods:
+ if 0:
+ names = _handler_map.keys()
+ names.sort()
+ else:
+ names = _handler_list
+ for n in names:
+ kls = _handler_map[n]
+ desc = kls.__doc__ or ''
+ if desc:
+ desc = desc.split('\n', 1)[0]
+ print("%s - %s" % (n, desc))
+ else:
+ for n in mods:
+ kls = _handler_map[n]
+ desc = kls.__doc__ or ''
+ if desc:
+ desc = desc.rstrip()
+ print("%s - %s" % (n, desc))
+
diff --git a/python/londiste/handlers/__init__.py b/python/londiste/handlers/__init__.py
index 66853d75..764518ef 100644
--- a/python/londiste/handlers/__init__.py
+++ b/python/londiste/handlers/__init__.py
@@ -3,12 +3,14 @@ import new
import sys
DEFAULT_HANDLERS = [
- 'londiste.handlers.bulk',
'londiste.handlers.qtable',
- 'londiste.handlers.dispatch',
'londiste.handlers.applyfn',
'londiste.handlers.part',
'londiste.handlers.multimaster',
+ 'londiste.handlers.vtable',
+
+ 'londiste.handlers.bulk',
+ 'londiste.handlers.dispatch',
]
def handler_args(name, cls):
diff --git a/python/londiste/handlers/bulk.py b/python/londiste/handlers/bulk.py
index 691acba2..d22f6ab8 100644
--- a/python/londiste/handlers/bulk.py
+++ b/python/londiste/handlers/bulk.py
@@ -217,6 +217,10 @@ class BulkLoader(BaseHandler):
upd_sql = "update only %s set %s from %s where %s" % (
qtbl, ", ".join(slist), qtemp, whe_expr)
+ # avoid updates on pk-only table
+ if not slist:
+ upd_list = []
+
# insert sql
colstr = ",".join([quote_ident(c) for c in col_list])
ins_sql = "insert into %s (%s) select %s from %s" % (
diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py
index 02105512..5d08f225 100644
--- a/python/londiste/handlers/dispatch.py
+++ b/python/londiste/handlers/dispatch.py
@@ -252,6 +252,10 @@ class BaseBulkCollectingLoader(BaseLoader):
# when no edge defined for old -> new op, keep old
_op = self.OP_GRAPH[_op].get(op, _op)
self.pkey_ev_map[pk_data] = (_op, row)
+
+ # skip update to pk-only table
+ if len(pk_data) == len(row) and _op == 'U':
+ del self.pkey_ev_map[pk_data]
except KeyError:
raise Exception('unknown event type: %s' % op)
@@ -314,12 +318,6 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader):
for f in self.keys)
return ' and '.join(stmt)
- def _set(self):
- tmpl = "%(col)s = t.%(col)s"
- stmt = (tmpl % {'col': quote_ident(f)}
- for f in self.nonkeys())
- return ", ".join(stmt)
-
def _cols(self):
return ','.join(quote_ident(f) for f in self.fields)
@@ -329,8 +327,18 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader):
return self.logexec(curs, sql)
def update(self, curs):
+ qcols = [quote_ident(c) for c in self.nonkeys()]
+
+ # no point to update pk-only table
+ if not qcols:
+ return
+
+ tmpl = "%s = t.%s"
+ eqlist = [tmpl % (c,c) for c in qcols]
+ _set = ", ".join(eqlist)
+
sql = "update only %s set %s from %s as t where %s" % (
- self.qtable, self._set(), self.qtemp, self._where())
+ self.qtable, _set, self.qtemp, self._where())
return self.logexec(curs, sql)
def delete(self, curs):
@@ -345,7 +353,7 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader):
return self.logexec(curs, "drop table %s" % self.qtemp)
def create(self, curs):
- if self.USE_REAL_TABLE:
+ if USE_REAL_TABLE:
tmpl = "create table %s (like %s)"
else:
tmpl = "create temp table %s (like %s) on commit preserve rows"
diff --git a/python/londiste/handlers/qtable.py b/python/londiste/handlers/qtable.py
index 443d2496..b904e0e1 100644
--- a/python/londiste/handlers/qtable.py
+++ b/python/londiste/handlers/qtable.py
@@ -16,7 +16,7 @@ from londiste.handler import BaseHandler
import pgq
-__all__ = ['QueueTableHandler', 'FakeLocalHandler', 'QueueSplitterHandler']
+__all__ = ['QueueTableHandler', 'QueueSplitterHandler']
class QueueTableHandler(BaseHandler):
@@ -36,17 +36,6 @@ class QueueTableHandler(BaseHandler):
def needs_table(self):
return False
-class FakeLocalHandler(BaseHandler):
- handler_name = 'fake_local'
-
- def add(self, trigger_arg_list):
- trigger_arg_list.append('virtual_table')
-
- def needs_table(self):
- return False
-
-
-
class QueueSplitterHandler(BaseHandler):
handler_name = 'qsplitter'
@@ -89,5 +78,4 @@ class QueueSplitterHandler(BaseHandler):
return False
-__londiste_handlers__ = [QueueTableHandler, FakeLocalHandler,
- QueueSplitterHandler]
+__londiste_handlers__ = [QueueTableHandler, QueueSplitterHandler]
diff --git a/python/londiste/handlers/vtable.py b/python/londiste/handlers/vtable.py
new file mode 100644
index 00000000..2f5fd551
--- /dev/null
+++ b/python/londiste/handlers/vtable.py
@@ -0,0 +1,25 @@
+"""Virtual Table.
+"""
+
+from londiste.handler import BaseHandler
+
+__all__ = ['VirtualTableHandler', 'FakeLocalHandler']
+
+class VirtualTableHandler(BaseHandler):
+ """Virtual Table.
+
+ """
+ handler_name = 'vtable'
+
+ def add(self, trigger_arg_list):
+ trigger_arg_list.append('virtual_table')
+
+ def needs_table(self):
+ return False
+
+class FakeLocalHandler(VirtualTableHandler):
+ """Deprecated compat name for vtable."""
+ handler_name = 'fake_local'
+
+__londiste_handlers__ = [VirtualTableHandler, FakeLocalHandler]
+
diff --git a/python/londiste/playback.py b/python/londiste/playback.py
index c5fca72f..b7a3ac95 100644
--- a/python/londiste/playback.py
+++ b/python/londiste/playback.py
@@ -25,6 +25,8 @@ SYNC_OK = 0 # continue with batch
SYNC_LOOP = 1 # sleep, try again
SYNC_EXIT = 2 # nothing to do, exit skript
+MAX_PARALLEL_COPY = 8 # default number of allowed max parallel copy processes
+
class Counter(object):
"""Counts table statuses."""
@@ -51,6 +53,7 @@ class Counter(object):
elif t.state == TABLE_OK:
self.ok += 1
+
def get_copy_count(self):
return self.copy + self.catching_up + self.wanna_sync + self.do_sync
@@ -74,6 +77,10 @@ class TableState(object):
self.plugin = None
# except this
self.changed = 0
+ # position in parallel copy work order
+ self.copy_pos = 0
+ # max number of parallel copy processesses allowed
+ self.max_parallel_copy = MAX_PARALLEL_COPY
def forget(self):
"""Reset all info."""
@@ -87,6 +94,8 @@ class TableState(object):
self.table_attrs = {}
self.changed = 1
self.plugin = None
+ self.copy_pos = 0
+ self.max_parallel_copy = MAX_PARALLEL_COPY
def change_snapshot(self, str_snapshot, tag_changed = 1):
"""Set snapshot."""
@@ -175,10 +184,18 @@ class TableState(object):
if row['merge_state'] == "?":
self.changed = 1
+ self.copy_pos = int(row.get('copy_pos','0'))
+ self.max_parallel_copy = int(self.table_attrs.get('max_parallel_copy',
+ self.max_parallel_copy))
+
hstr = self.table_attrs.get('handlers', '') # compat
hstr = self.table_attrs.get('handler', hstr)
self.plugin = build_handler(self.name, hstr, self.log)
+ def max_parallel_copies_reached(self):
+ return self.max_parallel_copy and\
+ self.copy_pos >= self.max_parallel_copy
+
def interesting(self, ev, tick_id, copy_thread):
"""Check if table wants this event."""
@@ -210,7 +227,7 @@ class TableState(object):
def gc_snapshot(self, copy_thread, prev_tick, cur_tick, no_lag):
"""Remove attached snapshot if possible.
-
+
If the event processing is in current moment. the snapshot
is not needed beyond next batch.
@@ -318,7 +335,7 @@ class Replicator(CascadedWorker):
self.code_check_done = 1
self.sync_database_encodings(src_db, dst_db)
-
+
self.cur_tick = self.batch_info['tick_id']
self.prev_tick = self.batch_info['prev_tick_id']
@@ -354,20 +371,25 @@ class Replicator(CascadedWorker):
# store event filter
if self.cf.getboolean('local_only', False):
+ # create list of tables
if self.copy_thread:
_filterlist = skytools.quote_literal(self.copy_table_name)
else:
_filterlist = ','.join(map(skytools.quote_literal, self.table_map.keys()))
- self.consumer_filter = """
-((ev_type like 'pgq%%' or ev_type like 'londiste%%')
-or (ev_extra1 in (%s)))
-""" % _filterlist
+
+ # build filter
+ meta = "(ev_type like 'pgq.%' or ev_type like 'londiste.%')"
+ if _filterlist:
+ self.consumer_filter = "(%s or (ev_extra1 in (%s)))" % (meta, _filterlist)
+ else:
+ self.consumer_filter = meta
else:
+ # no filter
self.consumer_filter = None
def sync_tables(self, src_db, dst_db):
"""Table sync loop.
-
+
Calls appropriate handles, which is expected to
return one of SYNC_* constants."""
@@ -395,7 +417,7 @@ or (ev_extra1 in (%s)))
dst_db.commit()
self.load_table_state(dst_db.cursor())
dst_db.commit()
-
+
dsync_backup = None
def sync_from_main_thread(self, cnt, src_db, dst_db):
"Main thread sync logic."
@@ -403,7 +425,7 @@ or (ev_extra1 in (%s)))
# This operates on all table, any amount can be in any state
ret = SYNC_OK
-
+
if cnt.do_sync:
# wait for copy thread to catch up
ret = SYNC_LOOP
@@ -470,7 +492,7 @@ or (ev_extra1 in (%s)))
# there cannot be interesting events in current batch
# but maybe there's several tables, lets do them in one go
ret = SYNC_LOOP
-
+
return ret
@@ -506,9 +528,14 @@ or (ev_extra1 in (%s)))
elif t.state == TABLE_CATCHING_UP:
# partition merging
- if t.copy_role == 'wait-replay':
+ if t.copy_role in ('wait-replay', 'lead'):
return SYNC_LOOP
+ # copy just finished
+ if t.dropped_ddl:
+ self.restore_copy_ddl(t, dst_db)
+ return SYNC_OK
+
# is there more work?
if self.work_state:
return SYNC_OK
@@ -528,6 +555,23 @@ or (ev_extra1 in (%s)))
# nothing to do
return SYNC_EXIT
+ def restore_copy_ddl(self, ts, dst_db):
+ self.log.info("%s: restoring DDL", ts.name)
+ dst_curs = dst_db.cursor()
+ for ddl in skytools.parse_statements(ts.dropped_ddl):
+ self.log.info(ddl)
+ dst_curs.execute(ddl)
+ q = "select * from londiste.local_set_table_struct(%s, %s, NULL)"
+ self.exec_cmd(dst_curs, q, [self.queue_name, ts.name])
+ ts.dropped_ddl = None
+ dst_db.commit()
+
+ # analyze
+ self.log.info("%s: analyze", ts.name)
+ dst_curs.execute("analyze " + skytools.quote_fqident(ts.name))
+ dst_db.commit()
+
+
def do_copy(self, tbl, src_db, dst_db):
"""Callback for actual copy implementation."""
raise Exception('do_copy not implemented')
@@ -566,14 +610,14 @@ or (ev_extra1 in (%s)))
if not t or not t.interesting(ev, self.cur_tick, self.copy_thread):
self.stat_increase('ignored_events')
return
-
+
try:
p = self.used_plugins[ev.extra1]
except KeyError:
p = t.get_plugin()
self.used_plugins[ev.extra1] = p
p.prepare_batch(self.batch_info, dst_curs)
-
+
p.process_event(ev, self.apply_sql, dst_curs)
def handle_truncate_event(self, ev, dst_curs):
@@ -588,7 +632,6 @@ or (ev_extra1 in (%s)))
sql = "TRUNCATE ONLY %s;" % fqname
else:
sql = "TRUNCATE %s;" % fqname
- sql = "TRUNCATE %s;" % fqname
self.flush_sql(dst_curs)
dst_curs.execute(sql)
@@ -676,7 +719,7 @@ or (ev_extra1 in (%s)))
def load_table_state(self, curs):
"""Load table state from database.
-
+
Todo: if all tables are OK, there is no need
to load state on every batch.
"""
@@ -824,7 +867,7 @@ or (ev_extra1 in (%s)))
q2 = "select londiste.restore_table_fkey(%(from_table)s, %(fkey_name)s)"
dst_curs.execute(q2, row)
dst_db.commit()
-
+
def drop_fkeys(self, dst_db, table_name):
"""Drop all foreign keys to and from this table.
@@ -840,7 +883,7 @@ or (ev_extra1 in (%s)))
q2 = "select londiste.drop_table_fkey(%(from_table)s, %(fkey_name)s)"
dst_curs.execute(q2, row)
dst_db.commit()
-
+
def process_root_node(self, dst_db):
"""On root node send seq changes to queue."""
diff --git a/python/londiste/setup.py b/python/londiste/setup.py
index 1bfd7039..b59fd56f 100644
--- a/python/londiste/setup.py
+++ b/python/londiste/setup.py
@@ -69,6 +69,9 @@ class LondisteSetup(CascadeAdmin):
help="merge tables from all source queues", default=False)
p.add_option("--no-merge", action="store_true",
help="don't merge tables from source queues", default=False)
+ p.add_option("--max-parallel-copy", type = "int",
+ help="max number of parallel copy processes")
+
return p
def extra_init(self, node_type, node_db, provider_db):
@@ -183,19 +186,25 @@ class LondisteSetup(CascadeAdmin):
if self.options.expect_sync:
tgargs.append('expect_sync')
- # actual table registration
- q = "select * from londiste.local_add_table(%s, %s, %s)"
- self.exec_cmd(dst_curs, q, [self.set_name, tbl, tgargs])
-
if not self.options.expect_sync:
if self.options.skip_truncate:
attrs['skip_truncate'] = 1
if self.options.copy_condition:
attrs['copy_condition'] = self.options.copy_condition
+
+ if self.options.max_parallel_copy:
+ attrs['max_parallel_copy'] = self.options.max_parallel_copy
+
+ args = [self.set_name, tbl, tgargs]
+
if attrs:
- enc_attrs = skytools.db_urlencode(attrs)
- q = "select * from londiste.local_set_table_attrs(%s, %s, %s)"
- self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs])
+ args.append(skytools.db_urlencode(attrs))
+
+ q = "select * from londiste.local_add_table(%s)" %\
+ ','.join(['%s']*len(args))
+
+ # actual table registration
+ self.exec_cmd(dst_curs, q, args)
dst_db.commit()
def handler_needs_table(self):
@@ -359,6 +368,10 @@ class LondisteSetup(CascadeAdmin):
"""TODO: show removed triggers."""
pass
+ def cmd_show_handlers(self, *args):
+ """Show help about handlers."""
+ londiste.handler.show(args)
+
def cmd_execute(self, *files):
db = self.get_database('db')
curs = db.cursor()
diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py
index 7422b533..d7d900c4 100644
--- a/python/londiste/table_copy.py
+++ b/python/londiste/table_copy.py
@@ -56,8 +56,14 @@ class CopyTable(Replicator):
src_curs = src_db.cursor()
dst_curs = dst_db.cursor()
- while tbl_stat.copy_role == 'wait-copy':
- self.log.info('waiting for first partition to initialize copy')
+ while 1:
+ if tbl_stat.copy_role == 'wait-copy':
+ self.log.info('waiting for first partition to initialize copy')
+ elif tbl_stat.max_parallel_copies_reached():
+ self.log.info('number of max parallel copies (%s) reached' %\
+ tbl_stat.max_parallel_copy)
+ else:
+ break
time.sleep(10)
tbl_stat = self.reload_table_stat(dst_curs, tbl_stat.name)
dst_db.commit()
@@ -70,7 +76,7 @@ class CopyTable(Replicator):
pt = pmap[tbl_stat.name]
if pt.state == TABLE_OK:
break
-
+
self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name)
time.sleep(10)
@@ -84,6 +90,7 @@ class CopyTable(Replicator):
cmode = 0
# change to SERIALIZABLE isolation level
+ oldiso = src_db.isolation_level
src_db.set_isolation_level(skytools.I_SERIALIZABLE)
src_db.commit()
@@ -95,10 +102,14 @@ class CopyTable(Replicator):
# !! this may commit, so must be done before anything else !!
self.drop_fkeys(dst_db, tbl_stat.name)
+ # now start ddl-dropping tx
+ q = "lock table " + skytools.quote_fqident(tbl_stat.name)
+ dst_curs.execute(q)
+
# find dst struct
src_struct = TableStruct(src_curs, tbl_stat.name)
dst_struct = TableStruct(dst_curs, tbl_stat.name)
-
+
# take common columns, warn on missing ones
dlist = dst_struct.get_column_list()
slist = src_struct.get_column_list()
@@ -132,8 +143,11 @@ class CopyTable(Replicator):
if cmode == 2 and tbl_stat.dropped_ddl is None:
ddl = dst_struct.get_create_sql(objs)
- q = "select * from londiste.local_set_table_struct(%s, %s, %s)"
- self.exec_cmd(dst_curs, q, [self.queue_name, tbl_stat.name, ddl])
+ if ddl:
+ q = "select * from londiste.local_set_table_struct(%s, %s, %s)"
+ self.exec_cmd(dst_curs, q, [self.queue_name, tbl_stat.name, ddl])
+ else:
+ ddl = None
dst_db.commit()
tbl_stat.dropped_ddl = ddl
@@ -145,15 +159,21 @@ class CopyTable(Replicator):
snapshot = src_curs.fetchone()[0]
src_db.commit()
- # restore READ COMMITTED behaviour
- src_db.set_isolation_level(1)
+ # restore old behaviour
+ src_db.set_isolation_level(oldiso)
src_db.commit()
+ tbl_stat.change_state(TABLE_CATCHING_UP)
+ tbl_stat.change_snapshot(snapshot)
+ self.save_table_state(dst_curs)
+
# create previously dropped objects
if cmode == 1:
dst_struct.create(dst_curs, objs, log = self.log)
elif cmode == 2:
dst_db.commit()
+
+ # start waiting for other copy processes to finish
while tbl_stat.copy_role == 'lead':
self.log.info('waiting for other partitions to finish copy')
time.sleep(10)
@@ -171,24 +191,16 @@ class CopyTable(Replicator):
self.looping = 1
dst_db.commit()
- # set state
- if self.copy_thread:
- tbl_stat.change_state(TABLE_CATCHING_UP)
- else:
+ # hack for copy-in-playback
+ if not self.copy_thread:
tbl_stat.change_state(TABLE_OK)
- tbl_stat.change_snapshot(snapshot)
- self.save_table_state(dst_curs)
+ self.save_table_state(dst_curs)
dst_db.commit()
# copy finished
if tbl_stat.copy_role == 'wait-replay':
return
- # analyze
- self.log.info("%s: analyze" % tbl_stat.name)
- dst_curs.execute("analyze " + skytools.quote_fqident(tbl_stat.name))
- dst_db.commit()
-
# if copy done, request immidiate tick from pgqadm,
# to make state juggling faster. on mostly idle db-s
# each step may take tickers idle_timeout secs, which is pain.
diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py
index 39bd906e..cebba94a 100644
--- a/python/pgq/cascade/consumer.py
+++ b/python/pgq/cascade/consumer.py
@@ -194,7 +194,7 @@ class CascadedConsumer(Consumer):
if not self.provider_connstr:
raise Exception('provider_connstr not set')
- src_db = self.get_database('_provider_db', connstr = self.provider_connstr)
+ src_db = self.get_provider_db(self._consumer_state)
return Consumer.work(self)
@@ -226,7 +226,7 @@ class CascadedConsumer(Consumer):
# update connection
loc = state['provider_location']
if self.provider_connstr != loc:
- self.close_database('_provider_db')
+ self.close_database(PDB)
self.provider_connstr = loc
return state
diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py
index 777d1f8b..a21f14d1 100644
--- a/python/pgq/consumer.py
+++ b/python/pgq/consumer.py
@@ -113,6 +113,11 @@ class Consumer(skytools.DBScript):
# whether to use cursor to fetch events (0 disables)
#pgq_lazy_fetch = 300
+ # whether to read from source size in autocommmit mode
+ # not compatible with pgq_lazy_fetch
+ # the actual user script on top of pgq.Consumer must also support it
+ #pgq_autocommit = 0
+
# whether to wait for specified number of events, before
# assigning a batch (0 disables)
#pgq_batch_collect_events = 0
@@ -128,6 +133,9 @@ class Consumer(skytools.DBScript):
# by default, use cursor-based fetch
default_lazy_fetch = 300
+ # should reader connection be used in autocommit mode
+ pgq_autocommit = 0
+
# proper variables
consumer_name = None
queue_name = None
@@ -173,10 +181,17 @@ class Consumer(skytools.DBScript):
self.pgq_queue_name = self.queue_name
self.consumer_id = self.consumer_name
+ # set default just once
+ self.pgq_autocommit = self.cf.getint("pgq_autocommit", self.pgq_autocommit)
+ if self.pgq_autocommit and self.pgq_lazy_fetch:
+ raise skytools.UsageError("pgq_autocommit is not compatible with pgq_lazy_fetch")
+ self.set_database_defaults(self.db_name, autocommit = self.pgq_autocommit)
+
def reload(self):
skytools.DBScript.reload(self)
self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch)
+
# set following ones to None if not set
self.pgq_min_count = self.cf.getint("pgq_batch_collect_events", 0) or None
self.pgq_min_interval = self.cf.get("pgq_batch_collect_interval", '') or None
diff --git a/python/qadmin.py b/python/qadmin.py
index 7d35da0c..608de5cd 100755
--- a/python/qadmin.py
+++ b/python/qadmin.py
@@ -118,10 +118,26 @@ IGNORE_HOSTS = {
'ip6-mcastprefix': 1,
}
+_ident_rx =''' ( " ( "" | [^"]+ )* " ) | ( [a-z_][a-z0-9_]* ) | [.] | (?P<err> .) '''
+_ident_rc = re.compile(_ident_rx, re.X | re.I)
+
def unquote_any(typ, s):
+ global _ident_rc
if typ == 'ident':
- ps = [skytools.unquote_ident(p) for p in s.split('.')]
- s = '.'.join(ps)
+ res = []
+ pos = 0
+ while 1:
+ m = _ident_rc.match(s, pos)
+ if not m:
+ break
+ if m.group('err'):
+ raise Exception('invalid syntax for ident')
+ s1 = m.group()
+ if s1[0] == '"':
+ s1 = s1[1:-1].replace('""', '"')
+ res.append(s1)
+ pos = m.end()
+ s = ''.join(res)
elif typ == 'str' or typ == 'dolq':
s = skytools.unquote_literal(s, True)
return s
diff --git a/python/skytools/plpy_applyrow.py b/python/skytools/plpy_applyrow.py
index bd2ef924..93ab8c11 100644
--- a/python/skytools/plpy_applyrow.py
+++ b/python/skytools/plpy_applyrow.py
@@ -122,7 +122,7 @@ def applyrow(tblname, ev_type, new_row,
# fetch old row
if alt_pkey_expr:
- q = "select * from only %s where %s for update" % (qtbl, alt_pkey_expr)
+ q = "select * from only %s where %s for update" % (qtblname, alt_pkey_expr)
res = skytools.plpy_exec(gd, q, fields)
if res:
oldrow = res[0]
diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py
index 9834e588..a7eb7915 100644
--- a/python/skytools/scripting.py
+++ b/python/skytools/scripting.py
@@ -644,12 +644,16 @@ class DBScript(BaseScript):
@param args: cmdline args (sys.argv[1:]), but can be overrided
"""
self.db_cache = {}
+ self._db_defaults = {}
self._listen_map = {} # dbname: channel_list
BaseScript.__init__(self, service_name, args)
def connection_hook(self, dbname, conn):
pass
+ def set_database_defaults(self, dbname, **kwargs):
+ self._db_defaults[dbname] = kwargs
+
def get_database(self, dbname, autocommit = 0, isolation_level = -1,
cache = None, connstr = None):
"""Load cached database connection.
@@ -662,6 +666,22 @@ class DBScript(BaseScript):
if not cache:
cache = dbname
+
+ params = {}
+ defs = self._db_defaults.get(cache, {})
+ params.update(defs)
+ if isolation_level >= 0:
+ params['isolation_level'] = isolation_level
+ elif autocommit:
+ params['isolation_level'] = 0
+ elif params.get('autocommit', 0):
+ params['isolation_level'] = 0
+ elif not 'isolation_level' in params:
+ params['isolation_level'] = I_READ_COMMITTED
+
+ if not 'max_age' in params:
+ params['max_age'] = max_age
+
if cache in self.db_cache:
if connstr is None:
connstr = self.cf.get(dbname, '')
@@ -672,14 +692,14 @@ class DBScript(BaseScript):
if not connstr:
connstr = self.cf.get(dbname)
self.log.debug("Connect '%s' to '%s'" % (cache, connstr))
- dbc = DBCachedConn(cache, connstr, max_age, setup_func = self.connection_hook)
+ dbc = DBCachedConn(cache, connstr, params['max_age'], setup_func = self.connection_hook)
self.db_cache[cache] = dbc
clist = []
if cache in self._listen_map:
clist = self._listen_map[cache]
- return dbc.get_connection(autocommit, isolation_level, clist)
+ return dbc.get_connection(params['isolation_level'], clist)
def close_database(self, dbname):
"""Explicitly close a cached connection.
@@ -885,12 +905,7 @@ class DBCachedConn(object):
return None
return self.conn.cursor().fileno()
- def get_connection(self, autocommit = 0, isolation_level = I_DEFAULT, listen_channel_list = []):
- # autocommit overrider isolation_level
- if autocommit:
- if isolation_level == I_SERIALIZABLE:
- raise Exception('autocommit is not compatible with I_SERIALIZABLE')
- isolation_level = I_AUTOCOMMIT
+ def get_connection(self, isolation_level = I_DEFAULT, listen_channel_list = []):
# default isolation_level is READ COMMITTED
if isolation_level < 0: