diff options
author | Marko Kreen | 2007-08-12 11:36:09 +0000 |
---|---|---|
committer | Marko Kreen | 2007-08-12 11:36:09 +0000 |
commit | 34bbe94a6a57724877dbab080f4296f75c6289bc (patch) | |
tree | 728c5fb22e70cc26f626b4d74239d3b51233bdac /python | |
parent | de53c39d3eea1f22cbd840a070cc228f8f5e7439 (diff) |
Support for automatic handling of fkeys.
By Eric Jones, plus some hacking by me.
Diffstat (limited to 'python')
-rw-r--r-- | python/londiste/playback.py | 16 | ||||
-rw-r--r-- | python/londiste/setup.py | 67 | ||||
-rw-r--r-- | python/londiste/table_copy.py | 14 | ||||
-rw-r--r-- | python/skytools/dbstruct.py | 2 |
4 files changed, 63 insertions, 36 deletions
diff --git a/python/londiste/playback.py b/python/londiste/playback.py index 82c7d0c0..ebc89204 100644 --- a/python/londiste/playback.py +++ b/python/londiste/playback.py @@ -249,6 +249,8 @@ class Replicator(pgq.SerialConsumer): self.load_table_state(dst_curs) self.sync_tables(dst_db) + self.restore_fkeys(dst_db) + # now the actual event processing happens. # they must be done all in one tx in dst side # and the transaction must be kept open so that @@ -346,7 +348,7 @@ class Replicator(pgq.SerialConsumer): "Copy thread sync logic." # - # decide what to do - order is imortant + # decide what to do - order is important # if cnt.do_sync: # main thread is waiting, catch up, then handle over @@ -580,6 +582,18 @@ class Replicator(pgq.SerialConsumer): if src_enc != dst_enc: dst_curs.execute("set client_encoding = %s", [src_enc]) + def restore_fkeys(self, dst_db): + dst_curs = dst_db.cursor() + # restore fkeys -- one at a time + q = "select * from londiste.subscriber_get_queue_valid_pending_fkeys(%s)" + dst_curs.execute(q, [self.pgq_queue_name]) + list = dst_curs.dictfetchall() + for row in list: + self.log.info('Creating fkey: %(fkey_name)s (%(from_table)s --> %(to_table)s)' % row) + q2 = "select londiste.subscriber_restore_table_fkey(%(from_table)s, %(fkey_name)s)" + dst_curs.execute(q2, row) + dst_db.commit() + if __name__ == '__main__': script = Replicator(sys.argv[1:]) script.start() diff --git a/python/londiste/setup.py b/python/londiste/setup.py index 2e20b164..f42291b2 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -293,7 +293,7 @@ class SubscriberSetup(CommonSetup): elif cmd == "check": self.check_tables(self.get_provider_table_list()) elif cmd == "fkeys": - self.collect_fkeys(self.get_provider_table_list()) + self.collect_fkeys(self.get_provider_table_list(), self.args[3:]) elif cmd == "seqs": self.subscriber_list_seqs() elif cmd == "add-seq": @@ -304,40 +304,41 @@ class SubscriberSetup(CommonSetup): self.log.error('bad subcommand: ' + cmd) sys.exit(1) - def collect_fkeys(self, table_list): + def collect_fkeys(self, table_list, args): + if args == []: + args = ['pending', 'active'] + dst_db = self.get_database('subscriber_db') dst_curs = dst_db.cursor() + + subscriber_tables = self.get_subscriber_table_list() - oid_list = [] - for tbl in table_list: - try: - oid = skytools.get_table_oid(dst_curs, tbl) - if oid: - oid_list.append(str(oid)) - except: - pass - if len(oid_list) == 0: - print "No tables" - return - oid_str = ",".join(oid_list) - - q = "SELECT n.nspname || '.' || t.relname as tbl, c.conname as con,"\ - " pg_get_constraintdef(c.oid) as def"\ - " FROM pg_constraint c, pg_class t, pg_namespace n"\ - " WHERE c.contype = 'f' and (c.conrelid in (%s) or c.confrelid in (%s))"\ - " AND t.oid = c.conrelid AND n.oid = t.relnamespace" % (oid_str, oid_str) - dst_curs.execute(q) - res = dst_curs.dictfetchall() - dst_db.commit() - - print "-- dropping" - for row in res: - q = "ALTER TABLE ONLY %(tbl)s DROP CONSTRAINT %(con)s;" - print q % row - print "-- creating" - for row in res: - q = "ALTER TABLE ONLY %(tbl)s ADD CONSTRAINT %(con)s %(def)s;" - print q % row + fkeys = {'active_fkeys': [], 'pending_fkeys': []} + for type in args: + q = "select * from londiste.subscriber_get_table_%s_fkeys(%%s);" % type + for tbl in table_list: + if tbl not in subscriber_tables: + continue + dst_curs.execute(q, [tbl]) + fkeys['%s_fkeys' % type].extend(dst_curs.dictfetchall()) + dst_db.commit() + + for type in args: + widths = [15,15,15] + for row in fkeys['%s_fkeys' % type]: + widths[0] = widths[0] > len(row[0]) and widths[0] or len(row[0]) + widths[1] = widths[1] > len(row[1]) and widths[1] or len(row[1]) + widths[2] = widths[2] > len(row[2]) and widths[2] or len(row[2]) + widths[0] += 2; widths[1] += 2; widths[2] += 2 + + fmt = '%%-%ds%%-%ds%%-%ds%%s' % tuple(widths) + print '%s fkeys:' % type + print fmt % ('from_table', 'to_table', 'fkey_name', 'fkey_def') + print fmt % ('----------', '--------', '---------', '--------') + + for row in fkeys['%s_fkeys' % type]: + print fmt % row.values() + print '\n' def check_tables(self, table_list): src_db = self.get_database('provider_db') @@ -368,7 +369,7 @@ class SubscriberSetup(CommonSetup): if not oid: self.log.error('Table %s not found' % tbl) return 1 - q = "select count(1) from pg_trigger where tgrelid = %s" + q = "select count(1) from pg_trigger where tgrelid = %s and tgisconstraint is not true" dst_curs.execute(q, [oid]) got = dst_curs.fetchone()[0] if got: diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py index edbcbbde..ba359642 100644 --- a/python/londiste/table_copy.py +++ b/python/londiste/table_copy.py @@ -50,6 +50,17 @@ class CopyTable(Replicator): for c in src_struct.get_column_list(): if c not in dlist: raise Exception('Column %s does not exist on dest side' % c) + + # drop all foreign keys to and from this table + # they need to be dropped one at a time to avoid deadlocks with user code + q = "select * from londiste.find_table_fkeys(%s)" + dst_curs.execute(q, [dst_struct.table_name]) + list = dst_curs.dictfetchall() + for row in list: + self.log.info('Dropping fkey: %s' % row['fkey_name']) + q2 = "select londiste.subscriber_drop_table_fkey(%(from_table)s, %(fkey_name)s)" + dst_curs.execute(q2, row) + dst_db.commit() # drop unnecessary stuff objs = T_CONSTRAINT | T_INDEX | T_TRIGGER | T_RULE @@ -69,13 +80,14 @@ class CopyTable(Replicator): # create previously dropped objects dst_struct.create(dst_curs, objs, log = self.log) + dst_db.commit() # set state - tbl_stat.change_snapshot(snapshot) if self.copy_thread: tbl_stat.change_state(TABLE_CATCHING_UP) else: tbl_stat.change_state(TABLE_OK) + tbl_stat.change_snapshot(snapshot) self.save_table_state(dst_curs) dst_db.commit() diff --git a/python/skytools/dbstruct.py b/python/skytools/dbstruct.py index ece0ea54..39e3b838 100644 --- a/python/skytools/dbstruct.py +++ b/python/skytools/dbstruct.py @@ -74,7 +74,7 @@ class TConstraint(TElem): type = T_CONSTRAINT SQL = """ SELECT conname as name, pg_get_constraintdef(oid) as def, contype - FROM pg_constraint WHERE conrelid = %(oid)s + FROM pg_constraint WHERE conrelid = %(oid)s AND contype != 'f' """ def __init__(self, table_name, row): self.table_name = table_name |