diff options
author | Marko Kreen | 2009-10-14 13:37:17 +0000 |
---|---|---|
committer | Marko Kreen | 2009-10-14 13:37:17 +0000 |
commit | 66c72793edd322c55dd02744abd8fab822e211b4 (patch) | |
tree | 5339b7070d5fa9cf19d6de1117a940ea4a48f8c7 /python/skytools/sqltools.py | |
parent | fc41ef5f83372e766ba31165d01d7ed2d74b034c (diff) |
python/skytools: add doctest-based regtests to few non-sql functions
Seems to be better testing method than ad-hoc scripts. They will
serve as examples too.
Also fix few minor problems found in the process:
- parse_pgarray: check if str ends with }
- parse_pgarray: support NULL
- quote_fqident: add 'public.' schema to idents without schema
- fq_name_parts: return always list
Diffstat (limited to 'python/skytools/sqltools.py')
-rw-r--r-- | python/skytools/sqltools.py | 60 |
1 files changed, 52 insertions, 8 deletions
diff --git a/python/skytools/sqltools.py b/python/skytools/sqltools.py index 3037df69..902bf9a6 100644 --- a/python/skytools/sqltools.py +++ b/python/skytools/sqltools.py @@ -44,18 +44,34 @@ class dbdict(dict): # def fq_name_parts(tbl): - "Return fully qualified name parts." + """Return fully qualified name parts. + + >>> fq_name_parts('tbl') + ['public', 'tbl'] + >>> fq_name_parts('foo.tbl') + ['foo', 'tbl'] + >>> fq_name_parts('foo.tbl.baz') + ['foo', 'tbl.baz'] + """ tmp = tbl.split('.', 1) if len(tmp) == 1: - return ('public', tbl) + return ['public', tbl] elif len(tmp) == 2: return tmp else: raise Exception('Syntax error in table name:'+tbl) def fq_name(tbl): - "Return fully qualified name." + """Return fully qualified name. + + >>> fq_name('tbl') + 'public.tbl' + >>> fq_name('foo.tbl') + 'foo.tbl' + >>> fq_name('foo.tbl.baz') + 'foo.tbl.baz' + """ return '.'.join(fq_name_parts(tbl)) # @@ -171,7 +187,19 @@ def exists_temp_table(curs, tbl): # class Snapshot(object): - "Represents a PostgreSQL snapshot." + """Represents a PostgreSQL snapshot. + + Example: + >>> sn = Snapshot('11:20:11,12,15') + >>> sn.contains(9) + True + >>> sn.contains(11) + False + >>> sn.contains(17) + True + >>> sn.contains(20) + False + """ def __init__(self, str): "Create snapshot from string." @@ -235,11 +263,15 @@ def _gen_list_insert(tbl, row, fields, qfields): return fmt % (tbl, ",".join(qfields), ",".join(tmp)) def magic_insert(curs, tablename, data, fields = None, use_insert = 0): - """Copy/insert a list of dict/list data to database. - + r"""Copy/insert a list of dict/list data to database. + If curs == None, then the copy or insert statements are returned as string. For list of dict the field list is optional, as its possible to guess them from dict keys. + + Example: + >>> magic_insert(None, 'tbl', [[1, '1'], [2, '2']], ['col1', 'col2']) + 'COPY public.tbl (col1,col2) FROM STDIN;\n1\t1\n2\t2\n\\.\n' """ if len(data) == 0: return @@ -486,7 +518,11 @@ def installer_apply_file(db, filename, log): # def mk_insert_sql(row, tbl, pkey_list = None, field_map = None): - """Generate INSERT statement from dict data.""" + """Generate INSERT statement from dict data. + + >>> mk_insert_sql({'id': '1', 'data': None}, 'tbl') + "insert into public.tbl (data, id) values (null, '1');" + """ col_list = [] val_list = [] @@ -504,7 +540,11 @@ def mk_insert_sql(row, tbl, pkey_list = None, field_map = None): quote_fqident(tbl), col_str, val_str) def mk_update_sql(row, tbl, pkey_list, field_map = None): - """Generate UPDATE statement from dict data.""" + r"""Generate UPDATE statement from dict data. + + >>> mk_update_sql({'id': 0, 'id2': '2', 'data': 'str\\'}, 'Table', ['id', 'id2']) + 'update only public."Table" set data = E\'str\\\\\' where id = \'0\' and id2 = \'2\';' + """ if len(pkey_list) < 1: raise Exception("update needs pkeys") @@ -787,3 +827,7 @@ class PLPyQueryBuilder(QueryBuilder): res = [dbdict(r) for r in res] return res +if __name__ == '__main__': + import doctest + doctest.testmod() + |