summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorMarko Kreen2007-08-12 11:36:09 +0000
committerMarko Kreen2007-08-12 11:36:09 +0000
commit34bbe94a6a57724877dbab080f4296f75c6289bc (patch)
tree728c5fb22e70cc26f626b4d74239d3b51233bdac /python
parentde53c39d3eea1f22cbd840a070cc228f8f5e7439 (diff)
Support for automatic handling of fkeys.
By Eric Jones, plus some hacking by me.
Diffstat (limited to 'python')
-rw-r--r--python/londiste/playback.py16
-rw-r--r--python/londiste/setup.py67
-rw-r--r--python/londiste/table_copy.py14
-rw-r--r--python/skytools/dbstruct.py2
4 files changed, 63 insertions, 36 deletions
diff --git a/python/londiste/playback.py b/python/londiste/playback.py
index 82c7d0c0..ebc89204 100644
--- a/python/londiste/playback.py
+++ b/python/londiste/playback.py
@@ -249,6 +249,8 @@ class Replicator(pgq.SerialConsumer):
self.load_table_state(dst_curs)
self.sync_tables(dst_db)
+ self.restore_fkeys(dst_db)
+
# now the actual event processing happens.
# they must be done all in one tx in dst side
# and the transaction must be kept open so that
@@ -346,7 +348,7 @@ class Replicator(pgq.SerialConsumer):
"Copy thread sync logic."
#
- # decide what to do - order is imortant
+ # decide what to do - order is important
#
if cnt.do_sync:
# main thread is waiting, catch up, then handle over
@@ -580,6 +582,18 @@ class Replicator(pgq.SerialConsumer):
if src_enc != dst_enc:
dst_curs.execute("set client_encoding = %s", [src_enc])
+ def restore_fkeys(self, dst_db):
+ dst_curs = dst_db.cursor()
+ # restore fkeys -- one at a time
+ q = "select * from londiste.subscriber_get_queue_valid_pending_fkeys(%s)"
+ dst_curs.execute(q, [self.pgq_queue_name])
+ list = dst_curs.dictfetchall()
+ for row in list:
+ self.log.info('Creating fkey: %(fkey_name)s (%(from_table)s --> %(to_table)s)' % row)
+ q2 = "select londiste.subscriber_restore_table_fkey(%(from_table)s, %(fkey_name)s)"
+ dst_curs.execute(q2, row)
+ dst_db.commit()
+
if __name__ == '__main__':
script = Replicator(sys.argv[1:])
script.start()
diff --git a/python/londiste/setup.py b/python/londiste/setup.py
index 2e20b164..f42291b2 100644
--- a/python/londiste/setup.py
+++ b/python/londiste/setup.py
@@ -293,7 +293,7 @@ class SubscriberSetup(CommonSetup):
elif cmd == "check":
self.check_tables(self.get_provider_table_list())
elif cmd == "fkeys":
- self.collect_fkeys(self.get_provider_table_list())
+ self.collect_fkeys(self.get_provider_table_list(), self.args[3:])
elif cmd == "seqs":
self.subscriber_list_seqs()
elif cmd == "add-seq":
@@ -304,40 +304,41 @@ class SubscriberSetup(CommonSetup):
self.log.error('bad subcommand: ' + cmd)
sys.exit(1)
- def collect_fkeys(self, table_list):
+ def collect_fkeys(self, table_list, args):
+ if args == []:
+ args = ['pending', 'active']
+
dst_db = self.get_database('subscriber_db')
dst_curs = dst_db.cursor()
+
+ subscriber_tables = self.get_subscriber_table_list()
- oid_list = []
- for tbl in table_list:
- try:
- oid = skytools.get_table_oid(dst_curs, tbl)
- if oid:
- oid_list.append(str(oid))
- except:
- pass
- if len(oid_list) == 0:
- print "No tables"
- return
- oid_str = ",".join(oid_list)
-
- q = "SELECT n.nspname || '.' || t.relname as tbl, c.conname as con,"\
- " pg_get_constraintdef(c.oid) as def"\
- " FROM pg_constraint c, pg_class t, pg_namespace n"\
- " WHERE c.contype = 'f' and (c.conrelid in (%s) or c.confrelid in (%s))"\
- " AND t.oid = c.conrelid AND n.oid = t.relnamespace" % (oid_str, oid_str)
- dst_curs.execute(q)
- res = dst_curs.dictfetchall()
- dst_db.commit()
-
- print "-- dropping"
- for row in res:
- q = "ALTER TABLE ONLY %(tbl)s DROP CONSTRAINT %(con)s;"
- print q % row
- print "-- creating"
- for row in res:
- q = "ALTER TABLE ONLY %(tbl)s ADD CONSTRAINT %(con)s %(def)s;"
- print q % row
+ fkeys = {'active_fkeys': [], 'pending_fkeys': []}
+ for type in args:
+ q = "select * from londiste.subscriber_get_table_%s_fkeys(%%s);" % type
+ for tbl in table_list:
+ if tbl not in subscriber_tables:
+ continue
+ dst_curs.execute(q, [tbl])
+ fkeys['%s_fkeys' % type].extend(dst_curs.dictfetchall())
+ dst_db.commit()
+
+ for type in args:
+ widths = [15,15,15]
+ for row in fkeys['%s_fkeys' % type]:
+ widths[0] = widths[0] > len(row[0]) and widths[0] or len(row[0])
+ widths[1] = widths[1] > len(row[1]) and widths[1] or len(row[1])
+ widths[2] = widths[2] > len(row[2]) and widths[2] or len(row[2])
+ widths[0] += 2; widths[1] += 2; widths[2] += 2
+
+ fmt = '%%-%ds%%-%ds%%-%ds%%s' % tuple(widths)
+ print '%s fkeys:' % type
+ print fmt % ('from_table', 'to_table', 'fkey_name', 'fkey_def')
+ print fmt % ('----------', '--------', '---------', '--------')
+
+ for row in fkeys['%s_fkeys' % type]:
+ print fmt % row.values()
+ print '\n'
def check_tables(self, table_list):
src_db = self.get_database('provider_db')
@@ -368,7 +369,7 @@ class SubscriberSetup(CommonSetup):
if not oid:
self.log.error('Table %s not found' % tbl)
return 1
- q = "select count(1) from pg_trigger where tgrelid = %s"
+ q = "select count(1) from pg_trigger where tgrelid = %s and tgisconstraint is not true"
dst_curs.execute(q, [oid])
got = dst_curs.fetchone()[0]
if got:
diff --git a/python/londiste/table_copy.py b/python/londiste/table_copy.py
index edbcbbde..ba359642 100644
--- a/python/londiste/table_copy.py
+++ b/python/londiste/table_copy.py
@@ -50,6 +50,17 @@ class CopyTable(Replicator):
for c in src_struct.get_column_list():
if c not in dlist:
raise Exception('Column %s does not exist on dest side' % c)
+
+ # drop all foreign keys to and from this table
+ # they need to be dropped one at a time to avoid deadlocks with user code
+ q = "select * from londiste.find_table_fkeys(%s)"
+ dst_curs.execute(q, [dst_struct.table_name])
+ list = dst_curs.dictfetchall()
+ for row in list:
+ self.log.info('Dropping fkey: %s' % row['fkey_name'])
+ q2 = "select londiste.subscriber_drop_table_fkey(%(from_table)s, %(fkey_name)s)"
+ dst_curs.execute(q2, row)
+ dst_db.commit()
# drop unnecessary stuff
objs = T_CONSTRAINT | T_INDEX | T_TRIGGER | T_RULE
@@ -69,13 +80,14 @@ class CopyTable(Replicator):
# create previously dropped objects
dst_struct.create(dst_curs, objs, log = self.log)
+ dst_db.commit()
# set state
- tbl_stat.change_snapshot(snapshot)
if self.copy_thread:
tbl_stat.change_state(TABLE_CATCHING_UP)
else:
tbl_stat.change_state(TABLE_OK)
+ tbl_stat.change_snapshot(snapshot)
self.save_table_state(dst_curs)
dst_db.commit()
diff --git a/python/skytools/dbstruct.py b/python/skytools/dbstruct.py
index ece0ea54..39e3b838 100644
--- a/python/skytools/dbstruct.py
+++ b/python/skytools/dbstruct.py
@@ -74,7 +74,7 @@ class TConstraint(TElem):
type = T_CONSTRAINT
SQL = """
SELECT conname as name, pg_get_constraintdef(oid) as def, contype
- FROM pg_constraint WHERE conrelid = %(oid)s
+ FROM pg_constraint WHERE conrelid = %(oid)s AND contype != 'f'
"""
def __init__(self, table_name, row):
self.table_name = table_name