diff options
| author | Marko Kreen | 2009-02-13 11:14:37 +0000 |
|---|---|---|
| committer | Marko Kreen | 2009-02-13 13:20:32 +0000 |
| commit | e9b2296f9c273beac1b8cab548d08398914e4de3 (patch) | |
| tree | e3d0479c7eef73808cab39b938e359266ea5ed88 /python/londiste/setup.py | |
| parent | 411f237ebfdce9532bbf01f51d46cdb91f7fe283 (diff) | |
python/londiste rewrite for cascading
New features:
- Cascading
- 'execute' command for running SQL scripts on nodes
- Parallel COPY
- Partition merge
- Sequences are pushed from root
- Rename 'add' command to 'add-table'
- --create switch to add-seq / add-table
also drop the never-implemented file-based transport classes.
Diffstat (limited to 'python/londiste/setup.py')
| -rw-r--r-- | python/londiste/setup.py | 625 |
1 files changed, 401 insertions, 224 deletions
diff --git a/python/londiste/setup.py b/python/londiste/setup.py index fea3e154..30854de2 100644 --- a/python/londiste/setup.py +++ b/python/londiste/setup.py @@ -5,19 +5,37 @@ import sys, os, skytools -import pgq.setadmin +from pgq.cascade.admin import CascadeAdmin __all__ = ['LondisteSetup'] -class LondisteSetup(pgq.setadmin.SetAdmin): +class LondisteSetup(CascadeAdmin): + """Londiste-specific admin commands.""" initial_db_name = 'node_db' extra_objs = [ skytools.DBSchema("londiste", sql_file="londiste.sql") ] + provider_location = None def __init__(self, args): - pgq.setadmin.SetAdmin.__init__(self, 'londiste', args) - self.set_name = self.cf.get("set_name") + """Londiste setup init.""" + CascadeAdmin.__init__(self, 'londiste', 'db', args, worker_setup = True) + + # compat + self.queue_name = self.cf.get('pgq_queue_name', '') + # real + if not self.queue_name: + self.queue_name = self.cf.get('queue_name') + + self.set_name = self.queue_name + + def connection_setup(self, dbname, db): + if dbname == 'db': + curs = db.cursor() + curs.execute("set session_replication_role = 'replica'") + db.commit() def init_optparse(self, parser=None): - p = pgq.setadmin.SetAdmin.init_optparse(self, parser) + """Add londiste switches to cascadeadmin ones.""" + + p = CascadeAdmin.init_optparse(self, parser) p.add_option("--expect-sync", action="store_true", dest="expect_sync", help = "no copy needed", default=False) p.add_option("--skip-truncate", action="store_true", dest="skip_truncate", @@ -26,266 +44,425 @@ class LondisteSetup(pgq.setadmin.SetAdmin): help="force", default=False) p.add_option("--all", action="store_true", help="include all tables", default=False) + p.add_option("--create", action="store_true", + help="include all tables", default=False) + p.add_option("--create-only", + help="pkey,fkeys,indexes") return p def extra_init(self, node_type, node_db, provider_db): + """Callback from CascadeAdmin init.""" if not provider_db: return pcurs = provider_db.cursor() ncurs = node_db.cursor() - q = "select table_name from londiste.set_get_table_list(%s)" + + # sync tables + q = "select table_name from londiste.get_table_list(%s)" pcurs.execute(q, [self.set_name]) for row in pcurs.fetchall(): tbl = row['table_name'] - q = "select * from londiste.set_add_table(%s, %s)" + q = "select * from londiste.global_add_table(%s, %s)" ncurs.execute(q, [self.set_name, tbl]) + + # sync seqs + q = "select seq_name, last_value from londiste.get_seq_list(%s)" + pcurs.execute(q, [self.set_name]) + for row in pcurs.fetchall(): + seq = row['seq_name'] + val = row['last_value'] + q = "select * from londiste.update_seq(%s, %s, %s)" + ncurs.execute(q, [self.set_name, seq, val]) + + # done node_db.commit() provider_db.commit() - def cmd_add(self, *args): - q = "select * from londiste.node_add_table(%s, %s)" - db = self.get_database('node_db') - self.exec_cmd_many(db, q, [self.set_name], args) + def cmd_add_table(self, *args): + """Attach table(s) to local node.""" - def cmd_remove(self, *args): - q = "select * from londiste.node_remove_table(%s, %s)" - db = self.get_database('node_db') + dst_db = self.get_database('db') + dst_curs = dst_db.cursor() + src_db = self.get_provider_db() + src_curs = src_db.cursor() + + src_tbls = self.fetch_set_tables(src_curs) + dst_tbls = self.fetch_set_tables(dst_curs) + src_db.commit() + self.sync_table_list(dst_curs, src_tbls, dst_tbls) + dst_db.commit() + + # dont check for exist/not here (root handling) + problems = False + for tbl in args: + tbl = skytools.fq_name(tbl) + if (tbl in src_tbls) and not src_tbls[tbl]: + self.log.error("Table %s does not exist on provider, need to switch to different provider" % tbl) + problems = True + if problems: + self.log.error("Problems, canceling operation") + sys.exit(1) + + # pick proper create flags + create = self.options.create_only + if not create and self.options.create: + create = 'full' + + fmap = { + "full": skytools.T_ALL, + "pkey": skytools.T_PKEY, + } + create_flags = 0 + if create: + for f in create.split(','): + if f not in fmap: + raise Exception("bad --create-only flag: " + f) + create_flags += fmap[f] + + # seems ok + for tbl in args: + tbl = skytools.fq_name(tbl) + self.add_table(src_db, dst_db, tbl, create_flags) + + def add_table(self, src_db, dst_db, tbl, create_flags): + src_curs = src_db.cursor() + dst_curs = dst_db.cursor() + if create_flags: + if skytools.exists_table(dst_curs, tbl): + self.log.info('Table %s already exist, not touching' % tbl) + else: + s = skytools.TableStruct(src_curs, tbl) + src_db.commit() + s.create(dst_curs, create_flags, log = self.log) + q = "select * from londiste.local_add_table(%s, %s)" + self.exec_cmd(dst_curs, q, [self.set_name, tbl]) + dst_db.commit() + + def sync_table_list(self, dst_curs, src_tbls, dst_tbls): + for tbl in src_tbls.keys(): + q = "select * from londiste.global_add_table(%s, %s)" + if tbl not in dst_tbls: + self.log.info("Table %s info missing from subscriber, adding") + self.exec_cmd(dst_curs, q, [self.set_name, tbl]) + dst_tbls[tbl] = False + for tbl in dst_tbls.keys(): + q = "select * from londiste.global_remove_table(%s, %s)" + if tbl not in src_tbls: + self.log.info("Table %s gone but exists on subscriber, removing") + self.exec_cmd(dst_curs, q, [self.set_name, tbl]) + del dst_tbls[tbl] + + def fetch_set_tables(self, curs): + q = "select table_name, local from londiste.get_table_list(%s)" + curs.execute(q, [self.set_name]) + res = {} + for row in curs.fetchall(): + res[row[0]] = row[1] + return res + + def cmd_remove_table(self, *args): + """Detach table(s) from local node.""" + q = "select * from londiste.local_remove_table(%s, %s)" + db = self.get_database('db') self.exec_cmd_many(db, q, [self.set_name], args) def cmd_add_seq(self, *args): - q = "select * from londiste.node_add_seq(%s, %s)" - db = self.get_database('node_db') - self.exec_cmd_many(db, q, [self.set_name], args) + """Attach seqs(s) to local node.""" + dst_db = self.get_database('db') + dst_curs = dst_db.cursor() + src_db = self.get_provider_db() + src_curs = src_db.cursor() + + src_seqs = self.fetch_seqs(src_curs) + dst_seqs = self.fetch_seqs(dst_curs) + src_db.commit() + self.sync_seq_list(dst_curs, src_seqs, dst_seqs) + dst_db.commit() + + # pick proper create flags + create = self.options.create_only + if not create and self.options.create: + create = 'full' + + fmap = { + "full": skytools.T_SEQUENCE, + } + create_flags = 0 + if create: + for f in create.split(','): + if f not in fmap: + raise Exception("bad --create-only flag: " + f) + create_flags += fmap[f] + + # seems ok + for seq in args: + seq = skytools.fq_name(seq) + self.add_seq(src_db, dst_db, seq, create_flags) + dst_db.commit() + + def add_seq(self, src_db, dst_db, seq, create_flags): + src_curs = src_db.cursor() + dst_curs = dst_db.cursor() + if create_flags: + if skytools.exists_sequence(dst_curs, seq): + self.log.info('Sequence %s already exist, not creating' % seq) + else: + s = skytools.SeqStruct(src_curs, seq) + src_db.commit() + s.create(dst_curs, create_flags, log = self.log) + q = "select * from londiste.local_add_seq(%s, %s)" + self.exec_cmd(dst_curs, q, [self.set_name, seq]) + + def fetch_seqs(self, curs): + q = "select seq_name, last_value, local from londiste.get_seq_list(%s)" + curs.execute(q, [self.set_name]) + res = {} + for row in curs.fetchall(): + res[row[0]] = row + return res + + def sync_seq_list(self, dst_curs, src_seqs, dst_seqs): + for seq in src_seqs.keys(): + q = "select * from londiste.global_update_seq(%s, %s, %s)" + if seq not in dst_seqs: + self.log.info("Sequence %s info missing from subscriber, adding") + self.exec_cmd(dst_curs, q, [self.set_name, seq, src_seqs[seq]['last_value']]) + tmp = src_seqs[seq].copy() + tmp['local'] = False + dst_seqs[seq] = tmp + for seq in dst_seqs.keys(): + q = "select * from londiste.global_remove_seq(%s, %s)" + if seq not in src_seqs: + self.log.info("Sequence %s gone but exists on subscriber, removing") + self.exec_cmd(dst_curs, q, [self.set_name, seq]) + del dst_seqs[seq] def cmd_remove_seq(self, *args): - q = "select * from londiste.node_remove_seq(%s, %s)" - db = self.get_database('node_db') + """Detach seqs(s) from local node.""" + q = "select * from londiste.local_remove_seq(%s, %s)" + db = self.get_database('db') self.exec_cmd_many(db, q, [self.set_name], args) def cmd_resync(self, *args): + """Reload data from provider node..""" + # fixme q = "select * from londiste.node_resync_table(%s, %s)" - db = self.get_database('node_db') + db = self.get_database('db') self.exec_cmd_many(db, q, [self.set_name], args) def cmd_tables(self): - q = "select table_name, merge_state from londiste.node_get_table_list(%s)" - db = self.get_database('node_db') - self.db_display_table(db, "Tables on node", q, [self.set_name]) + """Show attached tables.""" + q = "select table_name, local, merge_state from londiste.get_table_list(%s)" + db = self.get_database('db') + self.display_table(db, "Tables on node", q, [self.set_name]) def cmd_seqs(self): - q = "select seq_namefrom londiste.node_get_seq_list(%s)" - db = self.get_database('node_db') - self.db_display_table(db, "Sequences on node", q, [self.set_name]) + """Show attached seqs.""" + q = "select seq_name, local, last_value from londiste.get_seq_list(%s)" + db = self.get_database('db') + self.display_table(db, "Sequences on node", q, [self.set_name]) def cmd_missing(self): + """Show missing tables on local node.""" + # fixme q = "select * from londiste.node_show_missing(%s)" - db = self.get_database('node_db') - self.db_display_table(db, "Missing objects on node", q, [self.set_name]) + db = self.get_database('db') + self.display_table(db, "Missing objects on node", q, [self.set_name]) def cmd_check(self): + """TODO: check if structs match""" pass def cmd_fkeys(self): + """TODO: show removed fkeys.""" pass def cmd_triggers(self): + """TODO: show removed triggers.""" pass + def cmd_execute(self, *files): + db = self.get_database('db') + curs = db.cursor() + for fn in files: + fname = os.path.basename(fn) + sql = open(fn, "r").read() + q = "select * from londiste.execute_start(%s, %s, %s, true)" + self.exec_cmd(db, q, [self.queue_name, fname, sql], commit = False) + for stmt in skytools.parse_statements(sql): + curs.execute(stmt) + q = "select * from londiste.execute_finish(%s, %s)" + self.exec_cmd(db, q, [self.queue_name, fname], commit = False) + db.commit() + + def get_provider_db(self): + if not self.provider_location: + db = self.get_database('db') + q = 'select * from pgq_node.get_node_info(%s)' + res = self.exec_cmd(db, q, [self.queue_name], quiet = True) + self.provider_location = res[0]['provider_location'] + return self.get_database('provider_db', connstr = self.provider_location) + # # Old commands # -class LondisteSetup_tmp: - - def find_missing_provider_tables(self, pattern='*'): - src_db = self.get_database('provider_db') - src_curs = src_db.cursor() - q = """select schemaname || '.' || tablename as full_name from pg_tables - where schemaname not in ('pgq', 'londiste', 'pg_catalog', 'information_schema') - and schemaname !~ 'pg_.*' - and (schemaname || '.' || tablename) ~ %s - except select table_name from londiste.provider_get_table_list(%s)""" - src_curs.execute(q, [glob2regex(pattern), self.pgq_queue_name]) - rows = src_curs.fetchall() - src_db.commit() - list = [] - for row in rows: - list.append(row[0]) - return list - - def admin(self): - cmd = self.args[2] - if cmd == "tables": - self.subscriber_show_tables() - elif cmd == "missing": - self.subscriber_missing_tables() - elif cmd == "add": - self.subscriber_add_tables(self.args[3:]) - elif cmd == "remove": - self.subscriber_remove_tables(self.args[3:]) - elif cmd == "resync": - self.subscriber_resync_tables(self.args[3:]) - elif cmd == "register": - self.subscriber_register() - elif cmd == "unregister": - self.subscriber_unregister() - elif cmd == "install": - self.subscriber_install() - elif cmd == "check": - self.check_tables(self.get_provider_table_list()) - elif cmd in ["fkeys", "triggers"]: - self.collect_meta(self.get_provider_table_list(), cmd, self.args[3:]) - elif cmd == "seqs": - self.subscriber_list_seqs() - elif cmd == "add-seq": - self.subscriber_add_seq(self.args[3:]) - elif cmd == "remove-seq": - self.subscriber_remove_seq(self.args[3:]) - elif cmd == "restore-triggers": - self.restore_triggers(self.args[3], self.args[4:]) - else: - self.log.error('bad subcommand: ' + cmd) - sys.exit(1) - - def collect_meta(self, table_list, meta, args): - """Display fkey/trigger info.""" - - if args == []: - args = ['pending', 'active'] - - field_map = {'triggers': ['table_name', 'trigger_name', 'trigger_def'], - 'fkeys': ['from_table', 'to_table', 'fkey_name', 'fkey_def']} - - query_map = {'pending': "select %s from londiste.subscriber_get_table_pending_%s(%%s)", - 'active' : "select %s from londiste.find_table_%s(%%s)"} - - table_list = self.clean_subscriber_tables(table_list) - if len(table_list) == 0: - self.log.info("No tables, no fkeys") - return - - dst_db = self.get_database('subscriber_db') - dst_curs = dst_db.cursor() - - for which in args: - union_list = [] - fields = field_map[meta] - q = query_map[which] % (",".join(fields), meta) - for tbl in table_list: - union_list.append(q % skytools.quote_literal(tbl)) - - # use union as fkey may appear in duplicate - sql = " union ".join(union_list) + " order by 1" - desc = "%s %s" % (which, meta) - self.display_table(desc, dst_curs, fields, sql) - dst_db.commit() - - def check_tables(self, table_list): - src_db = self.get_database('provider_db') - src_curs = src_db.cursor() - dst_db = self.get_database('subscriber_db') - dst_curs = dst_db.cursor() - - failed = 0 - for tbl in table_list: - self.log.info('Checking %s' % tbl) - if not skytools.exists_table(src_curs, tbl): - self.log.error('Table %s missing from provider side' % tbl) - failed += 1 - elif not skytools.exists_table(dst_curs, tbl): - self.log.error('Table %s missing from subscriber side' % tbl) - failed += 1 - else: - failed += self.check_table_columns(src_curs, dst_curs, tbl) - - src_db.commit() - dst_db.commit() - - return failed - - def restore_triggers(self, tbl, triggers=None): - tbl = skytools.fq_name(tbl) - if tbl not in self.get_subscriber_table_list(): - self.log.error("Table %s is not in the subscriber queue." % tbl) - sys.exit(1) - - dst_db = self.get_database('subscriber_db') - dst_curs = dst_db.cursor() - - if not triggers: - q = "select count(1) from londiste.subscriber_get_table_pending_triggers(%s)" - dst_curs.execute(q, [tbl]) - if not dst_curs.fetchone()[0]: - self.log.info("No pending triggers found for %s." % tbl) - else: - q = "select londiste.subscriber_restore_all_table_triggers(%s)" - dst_curs.execute(q, [tbl]) - else: - for trigger in triggers: - q = "select count(1) from londiste.find_table_triggers(%s) where trigger_name=%s" - dst_curs.execute(q, [tbl, trigger]) - if dst_curs.fetchone()[0]: - self.log.info("Trigger %s on %s is already active." % (trigger, tbl)) - continue - - q = "select count(1) from londiste.subscriber_get_table_pending_triggers(%s) where trigger_name=%s" - dst_curs.execute(q, [tbl, trigger]) - if not dst_curs.fetchone()[0]: - self.log.info("Trigger %s not found on %s" % (trigger, tbl)) - continue - - q = "select londiste.subscriber_restore_table_trigger(%s, %s)" - dst_curs.execute(q, [tbl, trigger]) - dst_db.commit() - - def check_table_columns(self, src_curs, dst_curs, tbl): - src_colrows = find_column_types(src_curs, tbl) - dst_colrows = find_column_types(dst_curs, tbl) - - src_cols = make_type_string(src_colrows) - dst_cols = make_type_string(dst_colrows) - if src_cols.find('k') < 0: - self.log.error('provider table %s has no primary key (%s)' % ( - tbl, src_cols)) - return 1 - if dst_cols.find('k') < 0: - self.log.error('subscriber table %s has no primary key (%s)' % ( - tbl, dst_cols)) - return 1 - - if src_cols != dst_cols: - self.log.warning('table %s structure is not same (%s/%s)'\ - ', trying to continue' % (tbl, src_cols, dst_cols)) - - err = 0 - for row in src_colrows: - found = 0 - for row2 in dst_colrows: - if row2['name'] == row['name']: - found = 1 - break - if not found: - err = 1 - self.log.error('%s: column %s on provider not on subscriber' - % (tbl, row['name'])) - elif row['type'] != row2['type']: - err = 1 - self.log.error('%s: pk different on column %s' - % (tbl, row['name'])) - - return err - - def find_missing_subscriber_tables(self, pattern='*'): - src_db = self.get_database('subscriber_db') - src_curs = src_db.cursor() - q = """select schemaname || '.' || tablename as full_name from pg_tables - where schemaname not in ('pgq', 'londiste', 'pg_catalog', 'information_schema') - and schemaname !~ 'pg_.*' - and schemaname || '.' || tablename ~ %s - except select table_name from londiste.provider_get_table_list(%s)""" - src_curs.execute(q, [glob2regex(pattern), self.pgq_queue_name]) - rows = src_curs.fetchall() - src_db.commit() - list = [] - for row in rows: - list.append(row[0]) - return list - +#class LondisteSetup_tmp(LondisteSetup): +# +# def find_missing_provider_tables(self, pattern='*'): +# src_db = self.get_database('provider_db') +# src_curs = src_db.cursor() +# q = """select schemaname || '.' || tablename as full_name from pg_tables +# where schemaname not in ('pgq', 'londiste', 'pg_catalog', 'information_schema') +# and schemaname !~ 'pg_.*' +# and (schemaname || '.' || tablename) ~ %s +# except select table_name from londiste.provider_get_table_list(%s)""" +# src_curs.execute(q, [glob2regex(pattern), self.queue_name]) +# rows = src_curs.fetchall() +# src_db.commit() +# list = [] +# for row in rows: +# list.append(row[0]) +# return list +# +# def admin(self): +# cmd = self.args[2] +# if cmd == "tables": +# self.subscriber_show_tables() +# elif cmd == "missing": +# self.subscriber_missing_tables() +# elif cmd == "add": +# self.subscriber_add_tables(self.args[3:]) +# elif cmd == "remove": +# self.subscriber_remove_tables(self.args[3:]) +# elif cmd == "resync": +# self.subscriber_resync_tables(self.args[3:]) +# elif cmd == "register": +# self.subscriber_register() +# elif cmd == "unregister": +# self.subscriber_unregister() +# elif cmd == "install": +# self.subscriber_install() +# elif cmd == "check": +# self.check_tables(self.get_provider_table_list()) +# elif cmd in ["fkeys", "triggers"]: +# self.collect_meta(self.get_provider_table_list(), cmd, self.args[3:]) +# elif cmd == "seqs": +# self.subscriber_list_seqs() +# elif cmd == "add-seq": +# self.subscriber_add_seq(self.args[3:]) +# elif cmd == "remove-seq": +# self.subscriber_remove_seq(self.args[3:]) +# elif cmd == "restore-triggers": +# self.restore_triggers(self.args[3], self.args[4:]) +# else: +# self.log.error('bad subcommand: ' + cmd) +# sys.exit(1) +# +# def collect_meta(self, table_list, meta, args): +# """Display fkey/trigger info.""" +# +# if args == []: +# args = ['pending', 'active'] +# +# field_map = {'triggers': ['table_name', 'trigger_name', 'trigger_def'], +# 'fkeys': ['from_table', 'to_table', 'fkey_name', 'fkey_def']} +# +# query_map = {'pending': "select %s from londiste.subscriber_get_table_pending_%s(%%s)", +# 'active' : "select %s from londiste.find_table_%s(%%s)"} +# +# table_list = self.clean_subscriber_tables(table_list) +# if len(table_list) == 0: +# self.log.info("No tables, no fkeys") +# return +# +# dst_db = self.get_database('subscriber_db') +# dst_curs = dst_db.cursor() +# +# for which in args: +# union_list = [] +# fields = field_map[meta] +# q = query_map[which] % (",".join(fields), meta) +# for tbl in table_list: +# union_list.append(q % skytools.quote_literal(tbl)) +# +# # use union as fkey may appear in duplicate +# sql = " union ".join(union_list) + " order by 1" +# desc = "%s %s" % (which, meta) +# self.display_table(desc, dst_curs, fields, sql) +# dst_db.commit() +# +# def check_tables(self, table_list): +# src_db = self.get_database('provider_db') +# src_curs = src_db.cursor() +# dst_db = self.get_database('subscriber_db') +# dst_curs = dst_db.cursor() +# +# failed = 0 +# for tbl in table_list: +# self.log.info('Checking %s' % tbl) +# if not skytools.exists_table(src_curs, tbl): +# self.log.error('Table %s missing from provider side' % tbl) +# failed += 1 +# elif not skytools.exists_table(dst_curs, tbl): +# self.log.error('Table %s missing from subscriber side' % tbl) +# failed += 1 +# else: +# failed += self.check_table_columns(src_curs, dst_curs, tbl) +# +# src_db.commit() +# dst_db.commit() +# +# return failed +# +# def check_table_columns(self, src_curs, dst_curs, tbl): +# src_colrows = find_column_types(src_curs, tbl) +# dst_colrows = find_column_types(dst_curs, tbl) +# +# src_cols = make_type_string(src_colrows) +# dst_cols = make_type_string(dst_colrows) +# if src_cols.find('k') < 0: +# self.log.error('provider table %s has no primary key (%s)' % ( +# tbl, src_cols)) +# return 1 +# if dst_cols.find('k') < 0: +# self.log.error('subscriber table %s has no primary key (%s)' % ( +# tbl, dst_cols)) +# return 1 +# +# if src_cols != dst_cols: +# self.log.warning('table %s structure is not same (%s/%s)'\ +# ', trying to continue' % (tbl, src_cols, dst_cols)) +# +# err = 0 +# for row in src_colrows: +# found = 0 +# for row2 in dst_colrows: +# if row2['name'] == row['name']: +# found = 1 +# break +# if not found: +# err = 1 +# self.log.error('%s: column %s on provider not on subscriber' +# % (tbl, row['name'])) +# elif row['type'] != row2['type']: +# err = 1 +# self.log.error('%s: pk different on column %s' +# % (tbl, row['name'])) +# +# return err +# +# def find_missing_subscriber_tables(self, pattern='*'): +# src_db = self.get_database('subscriber_db') +# src_curs = src_db.cursor() +# q = """select schemaname || '.' || tablename as full_name from pg_tables +# where schemaname not in ('pgq', 'londiste', 'pg_catalog', 'information_schema') +# and schemaname !~ 'pg_.*' +# and schemaname || '.' || tablename ~ %s +# except select table_name from londiste.provider_get_table_list(%s)""" +# src_curs.execute(q, [glob2regex(pattern), self.queue_name]) +# rows = src_curs.fetchall() +# src_db.commit() +# list = [] +# for row in rows: +# list.append(row[0]) +# return list +# |
