diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/londiste.py | 4 | ||||
-rw-r--r-- | python/londiste/handler.py | 24 | ||||
-rw-r--r-- | python/londiste/handlers/__init__.py | 6 | ||||
-rw-r--r-- | python/londiste/handlers/bulk.py | 4 | ||||
-rw-r--r-- | python/londiste/handlers/dispatch.py | 24 | ||||
-rw-r--r-- | python/londiste/handlers/qtable.py | 16 | ||||
-rw-r--r-- | python/londiste/handlers/vtable.py | 25 | ||||
-rw-r--r-- | python/londiste/playback.py | 77 | ||||
-rw-r--r-- | python/londiste/setup.py | 27 | ||||
-rw-r--r-- | python/londiste/table_copy.py | 50 | ||||
-rw-r--r-- | python/pgq/cascade/consumer.py | 4 | ||||
-rw-r--r-- | python/pgq/consumer.py | 15 | ||||
-rwxr-xr-x | python/qadmin.py | 20 | ||||
-rw-r--r-- | python/skytools/plpy_applyrow.py | 2 | ||||
-rw-r--r-- | python/skytools/scripting.py | 31 |
15 files changed, 249 insertions, 80 deletions
diff --git a/python/londiste.py b/python/londiste.py index 778b9f78..542c9c5d 100755 --- a/python/londiste.py +++ b/python/londiste.py @@ -37,6 +37,7 @@ Replication Extra: compare [TBL ...] compare table contents on both sides repair [TBL ...] repair data on subscriber execute [FILE ...] execute SQL files on set + show-handlers [..] show info about all or specific handler Internal Commands: copy copy table logic @@ -48,6 +49,7 @@ cmd_handlers = ( 'drop-node', 'takeover'), londiste.LondisteSetup), (('add-table', 'remove-table', 'add-seq', 'remove-seq', 'tables', 'seqs', 'missing', 'resync', 'check', 'fkeys', 'execute'), londiste.LondisteSetup), + (('show-handlers',), londiste.LondisteSetup), (('worker', 'replay'), londiste.Replicator), (('compare',), londiste.Comparator), (('repair',), londiste.Repairer), @@ -132,6 +134,8 @@ class Londiste(skytools.DBScript): help="merge tables from all source queues", default=False) g.add_option("--no-merge", action="store_true", help="don't merge tables from source queues", default=False) + g.add_option("--max-parallel-copy", type = "int", + help="max number of parallel copy processes") p.add_option_group(g) diff --git a/python/londiste/handler.py b/python/londiste/handler.py index 9e52db5f..aff0e4ce 100644 --- a/python/londiste/handler.py +++ b/python/londiste/handler.py @@ -146,12 +146,15 @@ _handler_map = { 'londiste': TableHandler, } +_handler_list = _handler_map.keys() + def register_handler_module(modname): """Import and module and register handlers.""" __import__(modname) m = sys.modules[modname] for h in m.__londiste_handlers__: _handler_map[h.handler_name] = h + _handler_list.append(h.handler_name) def _parse_arglist(arglist): args = {} @@ -203,3 +206,24 @@ def load_handler_modules(cf): for m in lst: register_handler_module(m) +def show(mods): + if not mods: + if 0: + names = _handler_map.keys() + names.sort() + else: + names = _handler_list + for n in names: + kls = _handler_map[n] + desc = kls.__doc__ or '' + if desc: + desc = desc.split('\n', 1)[0] + print("%s - %s" % (n, desc)) + else: + for n in mods: + kls = _handler_map[n] + desc = kls.__doc__ or '' + if desc: + desc = desc.rstrip() + print("%s - %s" % (n, desc)) + diff --git a/python/londiste/handlers/__init__.py b/python/londiste/handlers/__init__.py index 66853d75..764518ef 100644 --- a/python/londiste/handlers/__init__.py +++ b/python/londiste/handlers/__init__.py @@ -3,12 +3,14 @@ import new import sys DEFAULT_HANDLERS = [ - 'londiste.handlers.bulk', 'londiste.handlers.qtable', - 'londiste.handlers.dispatch', 'londiste.handlers.applyfn', 'londiste.handlers.part', 'londiste.handlers.multimaster', + 'londiste.handlers.vtable', + + 'londiste.handlers.bulk', + 'londiste.handlers.dispatch', ] def handler_args(name, cls): diff --git a/python/londiste/handlers/bulk.py b/python/londiste/handlers/bulk.py index 691acba2..d22f6ab8 100644 --- a/python/londiste/handlers/bulk.py +++ b/python/londiste/handlers/bulk.py @@ -217,6 +217,10 @@ class BulkLoader(BaseHandler): upd_sql = "update only %s set %s from %s where %s" % ( qtbl, ", ".join(slist), qtemp, whe_expr) + # avoid updates on pk-only table + if not slist: + upd_list = [] + # insert sql colstr = ",".join([quote_ident(c) for c in col_list]) ins_sql = "insert into %s (%s) select %s from %s" % ( diff --git a/python/londiste/handlers/dispatch.py b/python/londiste/handlers/dispatch.py index 02105512..5d08f225 100644 --- a/python/londiste/handlers/dispatch.py +++ b/python/londiste/handlers/dispatch.py @@ -252,6 +252,10 @@ class BaseBulkCollectingLoader(BaseLoader): # when no edge defined for old -> new op, keep old _op = self.OP_GRAPH[_op].get(op, _op) self.pkey_ev_map[pk_data] = (_op, row) + + # skip update to pk-only table + if len(pk_data) == len(row) and _op == 'U': + del self.pkey_ev_map[pk_data] except KeyError: raise Exception('unknown event type: %s' % op) @@ -314,12 +318,6 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader): for f in self.keys) return ' and '.join(stmt) - def _set(self): - tmpl = "%(col)s = t.%(col)s" - stmt = (tmpl % {'col': quote_ident(f)} - for f in self.nonkeys()) - return ", ".join(stmt) - def _cols(self): return ','.join(quote_ident(f) for f in self.fields) @@ -329,8 +327,18 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader): return self.logexec(curs, sql) def update(self, curs): + qcols = [quote_ident(c) for c in self.nonkeys()] + + # no point to update pk-only table + if not qcols: + return + + tmpl = "%s = t.%s" + eqlist = [tmpl % (c,c) for c in qcols] + _set = ", ".join(eqlist) + sql = "update only %s set %s from %s as t where %s" % ( - self.qtable, self._set(), self.qtemp, self._where()) + self.qtable, _set, self.qtemp, self._where()) return self.logexec(curs, sql) def delete(self, curs): @@ -345,7 +353,7 @@ class BaseBulkTempLoader(BaseBulkCollectingLoader): return self.logexec(curs, "drop table %s" % self.qtemp) def create(self, curs): - if self.USE_REAL_TABLE: + if USE_REAL_TABLE: tmpl = "create table %s (like %s)" else: tmpl = "create temp table %s (like %s) on commit preserve rows" diff --git a/python/londiste/handlers/qtable.py b/python/londiste/handlers/qtable.py index 443d2496..b904e0e1 100644 --- a/python/londiste/handlers/qtable.py +++ b/python/londiste/handlers/qtable.py @@ -16,7 +16,7 @@ from londiste.handler import BaseHandler import pgq -__all__ = ['QueueTableHandler', 'FakeLocalHandler', 'QueueSplitterHandler'] +__all__ = ['QueueTableHandler', 'QueueSplitterHandler'] class QueueTableHandler(BaseHandler): @@ -36,17 +36,6 @@ class QueueTableHandler(BaseHandler): def needs_table(self): return False -class FakeLocalHandler(BaseHandler): - handler_name = 'fake_local' - - def add(self, trigger_arg_list): - trigger_arg_list.append('virtual_table') - - def needs_table(self): - return False - - - class QueueSplitterHandler(BaseHandler): handler_name = 'qsplitter' @@ -89,5 +78,4 @@ class QueueSplitterHandler(BaseHandler): return False -__londiste_handlers__ = [QueueTableHandler, FakeLocalHandler, - QueueSplitterHandler] +__londiste_handlers__ = [QueueTableHandler, QueueSplitterHandler] diff --git a/python/londiste/handlers/vtable.py b/python/londiste/handlers/vtable.py new file mode 100644 index 00000000..2f5fd551 --- /dev/null +++ b/python/londiste/handlers/vtable.py @@ -0,0 +1,25 @@ +"""Virtual Table. +""" + +from londiste.handler import BaseHandler + +__all__ = ['VirtualTableHandler', 'FakeLocalHandler'] + +class VirtualTableHandler(BaseHandler): + """Virtual Table. + + """ + handler_name = 'vtable' + + def add(self, trigger_arg_list): + trigger_arg_list.append('virtual_table') + + def needs_table(self): + return False + +class FakeLocalHandler(VirtualTableHandler): + """Deprecated compat name for vtable.""" + handler_name = 'fake_local' + +__londiste_handlers__ = [VirtualTableHandler, FakeLocalHandler] + diff --git a/python/londiste/playback.py b/python/londiste/playback.py index c5fca72f..b7a3ac95 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -25,6 +25,8 @@ SYNC_OK = 0 # continue with batch SYNC_LOOP = 1 # sleep, try again SYNC_EXIT = 2 # nothing to do, exit skript +MAX_PARALLEL_COPY = 8 # default number of allowed max parallel copy processes + class Counter(object): """Counts table statuses.""" @@ -51,6 +53,7 @@ class Counter(object): elif t.state == TABLE_OK: self.ok += 1 + def get_copy_count(self): return self.copy + self.catching_up + self.wanna_sync + self.do_sync @@ -74,6 +77,10 @@ class TableState(object): self.plugin = None # except this self.changed = 0 + # position in parallel copy work order + self.copy_pos = 0 + # max number of parallel copy processesses allowed + self.max_parallel_copy = MAX_PARALLEL_COPY def forget(self): """Reset all info.""" @@ -87,6 +94,8 @@ class TableState(object): self.table_attrs = {} self.changed = 1 self.plugin = None + self.copy_pos = 0 + self.max_parallel_copy = MAX_PARALLEL_COPY def change_snapshot(self, str_snapshot, tag_changed = 1): """Set snapshot.""" @@ -175,10 +184,18 @@ class TableState(object): if row['merge_state'] == "?": self.changed = 1 + self.copy_pos = int(row.get('copy_pos','0')) + self.max_parallel_copy = int(self.table_attrs.get('max_parallel_copy', + self.max_parallel_copy)) + hstr = self.table_attrs.get('handlers', '') # compat hstr = self.table_attrs.get('handler', hstr) self.plugin = build_handler(self.name, hstr, self.log) + def max_parallel_copies_reached(self): + return self.max_parallel_copy and\ + self.copy_pos >= self.max_parallel_copy + def interesting(self, ev, tick_id, copy_thread): """Check if table wants this event.""" @@ -210,7 +227,7 @@ class TableState(object): def gc_snapshot(self, copy_thread, prev_tick, cur_tick, no_lag): """Remove attached snapshot if possible. - + If the event processing is in current moment. the snapshot is not needed beyond next batch. @@ -318,7 +335,7 @@ class Replicator(CascadedWorker): self.code_check_done = 1 self.sync_database_encodings(src_db, dst_db) - + self.cur_tick = self.batch_info['tick_id'] self.prev_tick = self.batch_info['prev_tick_id'] @@ -354,20 +371,25 @@ class Replicator(CascadedWorker): # store event filter if self.cf.getboolean('local_only', False): + # create list of tables if self.copy_thread: _filterlist = skytools.quote_literal(self.copy_table_name) else: _filterlist = ','.join(map(skytools.quote_literal, self.table_map.keys())) - self.consumer_filter = """ -((ev_type like 'pgq%%' or ev_type like 'londiste%%') -or (ev_extra1 in (%s))) -""" % _filterlist + + # build filter + meta = "(ev_type like 'pgq.%' or ev_type like 'londiste.%')" + if _filterlist: + self.consumer_filter = "(%s or (ev_extra1 in (%s)))" % (meta, _filterlist) + else: + self.consumer_filter = meta else: + # no filter self.consumer_filter = None def sync_tables(self, src_db, dst_db): """Table sync loop. - + Calls appropriate handles, which is expected to return one of SYNC_* constants.""" @@ -395,7 +417,7 @@ or (ev_extra1 in (%s))) dst_db.commit() self.load_table_state(dst_db.cursor()) dst_db.commit() - + dsync_backup = None def sync_from_main_thread(self, cnt, src_db, dst_db): "Main thread sync logic." @@ -403,7 +425,7 @@ or (ev_extra1 in (%s))) # This operates on all table, any amount can be in any state ret = SYNC_OK - + if cnt.do_sync: # wait for copy thread to catch up ret = SYNC_LOOP @@ -470,7 +492,7 @@ or (ev_extra1 in (%s))) # there cannot be interesting events in current batch # but maybe there's several tables, lets do them in one go ret = SYNC_LOOP - + return ret @@ -506,9 +528,14 @@ or (ev_extra1 in (%s))) elif t.state == TABLE_CATCHING_UP: # partition merging - if t.copy_role == 'wait-replay': + if t.copy_role in ('wait-replay', 'lead'): return SYNC_LOOP + # copy just finished + if t.dropped_ddl: + self.restore_copy_ddl(t, dst_db) + return SYNC_OK + # is there more work? if self.work_state: return SYNC_OK @@ -528,6 +555,23 @@ or (ev_extra1 in (%s))) # nothing to do return SYNC_EXIT + def restore_copy_ddl(self, ts, dst_db): + self.log.info("%s: restoring DDL", ts.name) + dst_curs = dst_db.cursor() + for ddl in skytools.parse_statements(ts.dropped_ddl): + self.log.info(ddl) + dst_curs.execute(ddl) + q = "select * from londiste.local_set_table_struct(%s, %s, NULL)" + self.exec_cmd(dst_curs, q, [self.queue_name, ts.name]) + ts.dropped_ddl = None + dst_db.commit() + + # analyze + self.log.info("%s: analyze", ts.name) + dst_curs.execute("analyze " + skytools.quote_fqident(ts.name)) + dst_db.commit() + + def do_copy(self, tbl, src_db, dst_db): """Callback for actual copy implementation.""" raise Exception('do_copy not implemented') @@ -566,14 +610,14 @@ or (ev_extra1 in (%s))) if not t or not t.interesting(ev, self.cur_tick, self.copy_thread): self.stat_increase('ignored_events') return - + try: p = self.used_plugins[ev.extra1] except KeyError: p = t.get_plugin() self.used_plugins[ev.extra1] = p p.prepare_batch(self.batch_info, dst_curs) - + p.process_event(ev, self.apply_sql, dst_curs) def handle_truncate_event(self, ev, dst_curs): @@ -588,7 +632,6 @@ or (ev_extra1 in (%s))) sql = "TRUNCATE ONLY %s;" % fqname else: sql = "TRUNCATE %s;" % fqname - sql = "TRUNCATE %s;" % fqname self.flush_sql(dst_curs) dst_curs.execute(sql) @@ -676,7 +719,7 @@ or (ev_extra1 in (%s))) def load_table_state(self, curs): """Load table state from database. - + Todo: if all tables are OK, there is no need to load state on every batch. """ @@ -824,7 +867,7 @@ or (ev_extra1 in (%s))) q2 = "select londiste.restore_table_fkey(%(from_table)s, %(fkey_name)s)" dst_curs.execute(q2, row) dst_db.commit() - + def drop_fkeys(self, dst_db, table_name): """Drop all foreign keys to and from this table. @@ -840,7 +883,7 @@ or (ev_extra1 in (%s))) q2 = "select londiste.drop_table_fkey(%(from_table)s, %(fkey_name)s)" dst_curs.execute(q2, row) dst_db.commit() - + def process_root_node(self, dst_db): """On root node send seq changes to queue.""" diff --git a/python/londiste/setup.py b/python/londiste/setup.py index 1bfd7039..b59fd56f 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -69,6 +69,9 @@ class LondisteSetup(CascadeAdmin): help="merge tables from all source queues", default=False) p.add_option("--no-merge", action="store_true", help="don't merge tables from source queues", default=False) + p.add_option("--max-parallel-copy", type = "int", + help="max number of parallel copy processes") + return p def extra_init(self, node_type, node_db, provider_db): @@ -183,19 +186,25 @@ class LondisteSetup(CascadeAdmin): if self.options.expect_sync: tgargs.append('expect_sync') - # actual table registration - q = "select * from londiste.local_add_table(%s, %s, %s)" - self.exec_cmd(dst_curs, q, [self.set_name, tbl, tgargs]) - if not self.options.expect_sync: if self.options.skip_truncate: attrs['skip_truncate'] = 1 if self.options.copy_condition: attrs['copy_condition'] = self.options.copy_condition + + if self.options.max_parallel_copy: + attrs['max_parallel_copy'] = self.options.max_parallel_copy + + args = [self.set_name, tbl, tgargs] + if attrs: - enc_attrs = skytools.db_urlencode(attrs) - q = "select * from londiste.local_set_table_attrs(%s, %s, %s)" - self.exec_cmd(dst_curs, q, [self.set_name, tbl, enc_attrs]) + args.append(skytools.db_urlencode(attrs)) + + q = "select * from londiste.local_add_table(%s)" %\ + ','.join(['%s']*len(args)) + + # actual table registration + self.exec_cmd(dst_curs, q, args) dst_db.commit() def handler_needs_table(self): @@ -359,6 +368,10 @@ class LondisteSetup(CascadeAdmin): """TODO: show removed triggers.""" pass + def cmd_show_handlers(self, *args): + """Show help about handlers.""" + londiste.handler.show(args) + def cmd_execute(self, *files): db = self.get_database('db') curs = db.cursor() diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py index 7422b533..d7d900c4 100644 --- a/python/londiste/table_copy.py +++ b/python/londiste/table_copy.py @@ -56,8 +56,14 @@ class CopyTable(Replicator): src_curs = src_db.cursor() dst_curs = dst_db.cursor() - while tbl_stat.copy_role == 'wait-copy': - self.log.info('waiting for first partition to initialize copy') + while 1: + if tbl_stat.copy_role == 'wait-copy': + self.log.info('waiting for first partition to initialize copy') + elif tbl_stat.max_parallel_copies_reached(): + self.log.info('number of max parallel copies (%s) reached' %\ + tbl_stat.max_parallel_copy) + else: + break time.sleep(10) tbl_stat = self.reload_table_stat(dst_curs, tbl_stat.name) dst_db.commit() @@ -70,7 +76,7 @@ class CopyTable(Replicator): pt = pmap[tbl_stat.name] if pt.state == TABLE_OK: break - + self.log.warning("table %s not in sync yet on provider, waiting" % tbl_stat.name) time.sleep(10) @@ -84,6 +90,7 @@ class CopyTable(Replicator): cmode = 0 # change to SERIALIZABLE isolation level + oldiso = src_db.isolation_level src_db.set_isolation_level(skytools.I_SERIALIZABLE) src_db.commit() @@ -95,10 +102,14 @@ class CopyTable(Replicator): # !! this may commit, so must be done before anything else !! self.drop_fkeys(dst_db, tbl_stat.name) + # now start ddl-dropping tx + q = "lock table " + skytools.quote_fqident(tbl_stat.name) + dst_curs.execute(q) + # find dst struct src_struct = TableStruct(src_curs, tbl_stat.name) dst_struct = TableStruct(dst_curs, tbl_stat.name) - + # take common columns, warn on missing ones dlist = dst_struct.get_column_list() slist = src_struct.get_column_list() @@ -132,8 +143,11 @@ class CopyTable(Replicator): if cmode == 2 and tbl_stat.dropped_ddl is None: ddl = dst_struct.get_create_sql(objs) - q = "select * from londiste.local_set_table_struct(%s, %s, %s)" - self.exec_cmd(dst_curs, q, [self.queue_name, tbl_stat.name, ddl]) + if ddl: + q = "select * from londiste.local_set_table_struct(%s, %s, %s)" + self.exec_cmd(dst_curs, q, [self.queue_name, tbl_stat.name, ddl]) + else: + ddl = None dst_db.commit() tbl_stat.dropped_ddl = ddl @@ -145,15 +159,21 @@ class CopyTable(Replicator): snapshot = src_curs.fetchone()[0] src_db.commit() - # restore READ COMMITTED behaviour - src_db.set_isolation_level(1) + # restore old behaviour + src_db.set_isolation_level(oldiso) src_db.commit() + tbl_stat.change_state(TABLE_CATCHING_UP) + tbl_stat.change_snapshot(snapshot) + self.save_table_state(dst_curs) + # create previously dropped objects if cmode == 1: dst_struct.create(dst_curs, objs, log = self.log) elif cmode == 2: dst_db.commit() + + # start waiting for other copy processes to finish while tbl_stat.copy_role == 'lead': self.log.info('waiting for other partitions to finish copy') time.sleep(10) @@ -171,24 +191,16 @@ class CopyTable(Replicator): self.looping = 1 dst_db.commit() - # set state - if self.copy_thread: - tbl_stat.change_state(TABLE_CATCHING_UP) - else: + # hack for copy-in-playback + if not self.copy_thread: tbl_stat.change_state(TABLE_OK) - tbl_stat.change_snapshot(snapshot) - self.save_table_state(dst_curs) + self.save_table_state(dst_curs) dst_db.commit() # copy finished if tbl_stat.copy_role == 'wait-replay': return - # analyze - self.log.info("%s: analyze" % tbl_stat.name) - dst_curs.execute("analyze " + skytools.quote_fqident(tbl_stat.name)) - dst_db.commit() - # if copy done, request immidiate tick from pgqadm, # to make state juggling faster. on mostly idle db-s # each step may take tickers idle_timeout secs, which is pain. diff --git a/python/pgq/cascade/consumer.py b/python/pgq/cascade/consumer.py index 39bd906e..cebba94a 100644 --- a/python/pgq/cascade/consumer.py +++ b/python/pgq/cascade/consumer.py @@ -194,7 +194,7 @@ class CascadedConsumer(Consumer): if not self.provider_connstr: raise Exception('provider_connstr not set') - src_db = self.get_database('_provider_db', connstr = self.provider_connstr) + src_db = self.get_provider_db(self._consumer_state) return Consumer.work(self) @@ -226,7 +226,7 @@ class CascadedConsumer(Consumer): # update connection loc = state['provider_location'] if self.provider_connstr != loc: - self.close_database('_provider_db') + self.close_database(PDB) self.provider_connstr = loc return state diff --git a/python/pgq/consumer.py b/python/pgq/consumer.py index 777d1f8b..a21f14d1 100644 --- a/python/pgq/consumer.py +++ b/python/pgq/consumer.py @@ -113,6 +113,11 @@ class Consumer(skytools.DBScript): # whether to use cursor to fetch events (0 disables) #pgq_lazy_fetch = 300 + # whether to read from source size in autocommmit mode + # not compatible with pgq_lazy_fetch + # the actual user script on top of pgq.Consumer must also support it + #pgq_autocommit = 0 + # whether to wait for specified number of events, before # assigning a batch (0 disables) #pgq_batch_collect_events = 0 @@ -128,6 +133,9 @@ class Consumer(skytools.DBScript): # by default, use cursor-based fetch default_lazy_fetch = 300 + # should reader connection be used in autocommit mode + pgq_autocommit = 0 + # proper variables consumer_name = None queue_name = None @@ -173,10 +181,17 @@ class Consumer(skytools.DBScript): self.pgq_queue_name = self.queue_name self.consumer_id = self.consumer_name + # set default just once + self.pgq_autocommit = self.cf.getint("pgq_autocommit", self.pgq_autocommit) + if self.pgq_autocommit and self.pgq_lazy_fetch: + raise skytools.UsageError("pgq_autocommit is not compatible with pgq_lazy_fetch") + self.set_database_defaults(self.db_name, autocommit = self.pgq_autocommit) + def reload(self): skytools.DBScript.reload(self) self.pgq_lazy_fetch = self.cf.getint("pgq_lazy_fetch", self.default_lazy_fetch) + # set following ones to None if not set self.pgq_min_count = self.cf.getint("pgq_batch_collect_events", 0) or None self.pgq_min_interval = self.cf.get("pgq_batch_collect_interval", '') or None diff --git a/python/qadmin.py b/python/qadmin.py index 7d35da0c..608de5cd 100755 --- a/python/qadmin.py +++ b/python/qadmin.py @@ -118,10 +118,26 @@ IGNORE_HOSTS = { 'ip6-mcastprefix': 1, } +_ident_rx =''' ( " ( "" | [^"]+ )* " ) | ( [a-z_][a-z0-9_]* ) | [.] | (?P<err> .) ''' +_ident_rc = re.compile(_ident_rx, re.X | re.I) + def unquote_any(typ, s): + global _ident_rc if typ == 'ident': - ps = [skytools.unquote_ident(p) for p in s.split('.')] - s = '.'.join(ps) + res = [] + pos = 0 + while 1: + m = _ident_rc.match(s, pos) + if not m: + break + if m.group('err'): + raise Exception('invalid syntax for ident') + s1 = m.group() + if s1[0] == '"': + s1 = s1[1:-1].replace('""', '"') + res.append(s1) + pos = m.end() + s = ''.join(res) elif typ == 'str' or typ == 'dolq': s = skytools.unquote_literal(s, True) return s diff --git a/python/skytools/plpy_applyrow.py b/python/skytools/plpy_applyrow.py index bd2ef924..93ab8c11 100644 --- a/python/skytools/plpy_applyrow.py +++ b/python/skytools/plpy_applyrow.py @@ -122,7 +122,7 @@ def applyrow(tblname, ev_type, new_row, # fetch old row if alt_pkey_expr: - q = "select * from only %s where %s for update" % (qtbl, alt_pkey_expr) + q = "select * from only %s where %s for update" % (qtblname, alt_pkey_expr) res = skytools.plpy_exec(gd, q, fields) if res: oldrow = res[0] diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index 9834e588..a7eb7915 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -644,12 +644,16 @@ class DBScript(BaseScript): @param args: cmdline args (sys.argv[1:]), but can be overrided """ self.db_cache = {} + self._db_defaults = {} self._listen_map = {} # dbname: channel_list BaseScript.__init__(self, service_name, args) def connection_hook(self, dbname, conn): pass + def set_database_defaults(self, dbname, **kwargs): + self._db_defaults[dbname] = kwargs + def get_database(self, dbname, autocommit = 0, isolation_level = -1, cache = None, connstr = None): """Load cached database connection. @@ -662,6 +666,22 @@ class DBScript(BaseScript): if not cache: cache = dbname + + params = {} + defs = self._db_defaults.get(cache, {}) + params.update(defs) + if isolation_level >= 0: + params['isolation_level'] = isolation_level + elif autocommit: + params['isolation_level'] = 0 + elif params.get('autocommit', 0): + params['isolation_level'] = 0 + elif not 'isolation_level' in params: + params['isolation_level'] = I_READ_COMMITTED + + if not 'max_age' in params: + params['max_age'] = max_age + if cache in self.db_cache: if connstr is None: connstr = self.cf.get(dbname, '') @@ -672,14 +692,14 @@ class DBScript(BaseScript): if not connstr: connstr = self.cf.get(dbname) self.log.debug("Connect '%s' to '%s'" % (cache, connstr)) - dbc = DBCachedConn(cache, connstr, max_age, setup_func = self.connection_hook) + dbc = DBCachedConn(cache, connstr, params['max_age'], setup_func = self.connection_hook) self.db_cache[cache] = dbc clist = [] if cache in self._listen_map: clist = self._listen_map[cache] - return dbc.get_connection(autocommit, isolation_level, clist) + return dbc.get_connection(params['isolation_level'], clist) def close_database(self, dbname): """Explicitly close a cached connection. @@ -885,12 +905,7 @@ class DBCachedConn(object): return None return self.conn.cursor().fileno() - def get_connection(self, autocommit = 0, isolation_level = I_DEFAULT, listen_channel_list = []): - # autocommit overrider isolation_level - if autocommit: - if isolation_level == I_SERIALIZABLE: - raise Exception('autocommit is not compatible with I_SERIALIZABLE') - isolation_level = I_AUTOCOMMIT + def get_connection(self, isolation_level = I_DEFAULT, listen_channel_list = []): # default isolation_level is READ COMMITTED if isolation_level < 0: |