summaryrefslogtreecommitdiff
path: root/python/skytools/sqltools.py
diff options
context:
space:
mode:
authorMarko Kreen2009-10-14 13:37:17 +0000
committerMarko Kreen2009-10-14 13:37:17 +0000
commit66c72793edd322c55dd02744abd8fab822e211b4 (patch)
tree5339b7070d5fa9cf19d6de1117a940ea4a48f8c7 /python/skytools/sqltools.py
parentfc41ef5f83372e766ba31165d01d7ed2d74b034c (diff)
python/skytools: add doctest-based regtests to few non-sql functions
Seems to be better testing method than ad-hoc scripts. They will serve as examples too. Also fix few minor problems found in the process: - parse_pgarray: check if str ends with } - parse_pgarray: support NULL - quote_fqident: add 'public.' schema to idents without schema - fq_name_parts: return always list
Diffstat (limited to 'python/skytools/sqltools.py')
-rw-r--r--python/skytools/sqltools.py60
1 files changed, 52 insertions, 8 deletions
diff --git a/python/skytools/sqltools.py b/python/skytools/sqltools.py
index 3037df69..902bf9a6 100644
--- a/python/skytools/sqltools.py
+++ b/python/skytools/sqltools.py
@@ -44,18 +44,34 @@ class dbdict(dict):
#
def fq_name_parts(tbl):
- "Return fully qualified name parts."
+ """Return fully qualified name parts.
+
+ >>> fq_name_parts('tbl')
+ ['public', 'tbl']
+ >>> fq_name_parts('foo.tbl')
+ ['foo', 'tbl']
+ >>> fq_name_parts('foo.tbl.baz')
+ ['foo', 'tbl.baz']
+ """
tmp = tbl.split('.', 1)
if len(tmp) == 1:
- return ('public', tbl)
+ return ['public', tbl]
elif len(tmp) == 2:
return tmp
else:
raise Exception('Syntax error in table name:'+tbl)
def fq_name(tbl):
- "Return fully qualified name."
+ """Return fully qualified name.
+
+ >>> fq_name('tbl')
+ 'public.tbl'
+ >>> fq_name('foo.tbl')
+ 'foo.tbl'
+ >>> fq_name('foo.tbl.baz')
+ 'foo.tbl.baz'
+ """
return '.'.join(fq_name_parts(tbl))
#
@@ -171,7 +187,19 @@ def exists_temp_table(curs, tbl):
#
class Snapshot(object):
- "Represents a PostgreSQL snapshot."
+ """Represents a PostgreSQL snapshot.
+
+ Example:
+ >>> sn = Snapshot('11:20:11,12,15')
+ >>> sn.contains(9)
+ True
+ >>> sn.contains(11)
+ False
+ >>> sn.contains(17)
+ True
+ >>> sn.contains(20)
+ False
+ """
def __init__(self, str):
"Create snapshot from string."
@@ -235,11 +263,15 @@ def _gen_list_insert(tbl, row, fields, qfields):
return fmt % (tbl, ",".join(qfields), ",".join(tmp))
def magic_insert(curs, tablename, data, fields = None, use_insert = 0):
- """Copy/insert a list of dict/list data to database.
-
+ r"""Copy/insert a list of dict/list data to database.
+
If curs == None, then the copy or insert statements are returned
as string. For list of dict the field list is optional, as its
possible to guess them from dict keys.
+
+ Example:
+ >>> magic_insert(None, 'tbl', [[1, '1'], [2, '2']], ['col1', 'col2'])
+ 'COPY public.tbl (col1,col2) FROM STDIN;\n1\t1\n2\t2\n\\.\n'
"""
if len(data) == 0:
return
@@ -486,7 +518,11 @@ def installer_apply_file(db, filename, log):
#
def mk_insert_sql(row, tbl, pkey_list = None, field_map = None):
- """Generate INSERT statement from dict data."""
+ """Generate INSERT statement from dict data.
+
+ >>> mk_insert_sql({'id': '1', 'data': None}, 'tbl')
+ "insert into public.tbl (data, id) values (null, '1');"
+ """
col_list = []
val_list = []
@@ -504,7 +540,11 @@ def mk_insert_sql(row, tbl, pkey_list = None, field_map = None):
quote_fqident(tbl), col_str, val_str)
def mk_update_sql(row, tbl, pkey_list, field_map = None):
- """Generate UPDATE statement from dict data."""
+ r"""Generate UPDATE statement from dict data.
+
+ >>> mk_update_sql({'id': 0, 'id2': '2', 'data': 'str\\'}, 'Table', ['id', 'id2'])
+ 'update only public."Table" set data = E\'str\\\\\' where id = \'0\' and id2 = \'2\';'
+ """
if len(pkey_list) < 1:
raise Exception("update needs pkeys")
@@ -787,3 +827,7 @@ class PLPyQueryBuilder(QueryBuilder):
res = [dbdict(r) for r in res]
return res
+if __name__ == '__main__':
+ import doctest
+ doctest.testmod()
+