summaryrefslogtreecommitdiff
path: root/python/skytools
diff options
context:
space:
mode:
authorAsko Oja2009-05-04 12:22:25 +0000
committerAsko Oja2009-05-04 12:22:25 +0000
commit7d2aff70af0e9743d5e54f4c4c5b11af0c1df41f (patch)
tree15c4fe756065ec93e8e6a890cb54df51f655ab48 /python/skytools
parent7b538859c90a902cf9b2872feb912d0a6e2f7447 (diff)
Add querybuilder into sql tools
Querybulder is used to manage parametrisized queries both in plpython/dbservice stored procedures and python scripts.
Diffstat (limited to 'python/skytools')
-rw-r--r--python/skytools/sqltools.py249
1 files changed, 249 insertions, 0 deletions
diff --git a/python/skytools/sqltools.py b/python/skytools/sqltools.py
index 2dee2cc1..c0e7e554 100644
--- a/python/skytools/sqltools.py
+++ b/python/skytools/sqltools.py
@@ -5,6 +5,10 @@ import os
from cStringIO import StringIO
from skytools.quoting import quote_copy, quote_literal, quote_ident, quote_fqident
import skytools.installer_config
+try:
+ import plpy
+except ImportError:
+ pass
__all__ = [
"fq_name_parts", "fq_name", "get_table_oid", "get_table_pkeys",
@@ -14,8 +18,13 @@ __all__ = [
"CopyPipe", "full_copy", "DBObject", "DBSchema", "DBTable", "DBFunction",
"DBLanguage", "db_install", "installer_find_file", "installer_apply_file",
"dbdict", "mk_insert_sql", "mk_update_sql", "mk_delete_sql",
+ 'QueryBuilder', 'PLPyQueryBuilder',
]
+PARAM_INLINE = 0 # quote_literal()
+PARAM_DBAPI = 1 # %()s
+PARAM_PLPY = 2 # $n
+
class dbdict(dict):
"""Wrapper on actual dict that allows
accessing dict keys as attributes."""
@@ -538,3 +547,243 @@ def mk_delete_sql(row, tbl, pkey_list, field_map = None):
whe_str = " and ".join(whe_list)
return "delete from %s where %s;" % (quote_fqident(tbl), whe_str)
+class QArgConf:
+ """Per-query arg-type config object."""
+ param_type = None
+
+class QArg:
+ """Place-holder for a query parameter."""
+ def __init__(self, name, value, pos, conf):
+ self.name = name
+ self.value = value
+ self.pos = pos
+ self.conf = conf
+ def __str__(self):
+ if self.conf.param_type == PARAM_INLINE:
+ return skytools.quote_literal(self.value)
+ elif self.conf.param_type == PARAM_DBAPI:
+ return "%s"
+ elif self.conf.param_type == PARAM_PLPY:
+ return "$%d" % self.pos
+ else:
+ raise Exception("bad QArgConf.param_type")
+
+
+# need an structure with fast remove-from-middle
+# and append operations.
+class DList:
+ """Simple double-linked list."""
+ def __init__(self):
+ self.next = self
+ self.prev = self
+
+ def append(self, obj):
+ obj.next = self
+ obj.prev = self.prev
+ self.prev.next = obj
+ self.prev = obj
+
+ def remove(self, obj):
+ obj.next.prev = obj.prev
+ obj.prev.next = obj.next
+ obj.next = obj.prev = None
+
+ def empty(self):
+ return self.next == self
+
+ def pop(self):
+ """Remove and return first element."""
+ obj = None
+ if not self.empty():
+ obj = self.next
+ self.remove(obj)
+ return obj
+
+
+class CachedPlan:
+ """Wrapper around prepared plan."""
+ def __init__(self, key, plan):
+ self.key = key # (sql, (types))
+ self.plan = plan
+
+
+class PlanCache:
+ """Cache for limited amount of plans."""
+
+ def __init__(self, maxplans = 100):
+ self.maxplans = maxplans
+ self.plan_map = {}
+ self.plan_list = DList()
+
+ def get_plan(self, sql, types):
+ """Prepare the plan and cache it."""
+
+ t = (sql, tuple(types))
+ if t in self.plan_map:
+ pc = self.plan_map[t]
+ # put to the end
+ self.plan_list.remove(pc)
+ self.plan_list.append(pc)
+ return pc.plan
+
+ # prepare new plan
+ plan = plpy.prepare(sql, types)
+
+ # add to cache
+ pc = CachedPlan(t, plan)
+ self.plan_list.append(pc)
+ self.plan_map[t] = plan
+
+ # remove plans if too much
+ while len(self.plan_map) > self.maxplans:
+ pc = self.plan_list.pop()
+ del self.plan_map[pc.key]
+
+ return plan
+
+
+class QueryBuilder:
+ """Helper for query building."""
+
+ def __init__(self, sqlexpr, params):
+ """Init the object.
+
+ @param sqlexpr: Partial sql fragment.
+ @param params: Dict of parameter values.
+ """
+ self._params = params
+ self._arg_type_list = []
+ self._arg_value_list = []
+ self._sql_parts = []
+ self._arg_conf = QArgConf()
+ self._nargs = 0
+
+ if sqlexpr:
+ self.add(sqlexpr, required = True)
+
+ def add(self, expr, type = "text", required = False):
+ """Add SQL fragment to query.
+ """
+ self._add_expr('', expr, self._params, type, required)
+
+ def get_sql(self, param_type = PARAM_INLINE):
+ """Return generated SQL (thus far) as string.
+
+ Possible values for param_type:
+ - 0: Insert values quoted with quote_literal()
+ - 1: Insert %()s in place of parameters.
+ - 2: Insert $n in place of parameters.
+ """
+ self._arg_conf.param_type = param_type
+ tmp = map(str, self._sql_parts)
+ return "".join(tmp)
+
+ def _add_expr(self, pfx, expr, params, type, required):
+ parts = []
+ types = []
+ values = []
+ nargs = self._nargs
+ if pfx:
+ parts.append(pfx)
+ pos = 0
+ while 1:
+ # find start of next argument
+ a1 = expr.find('{', pos)
+ if a1 < 0:
+ parts.append(expr[pos:])
+ break
+
+ # find end end of argument name
+ a2 = expr.find('}', a1)
+ if a2 < 0:
+ raise Exception("missing argument terminator: "+expr)
+
+ # add plain sql
+ if a1 > pos:
+ parts.append(expr[pos:a1])
+ pos = a2 + 1
+
+ # get arg name, check if exists
+ k = expr[a1 + 1 : a2]
+ if k not in params:
+ if required:
+ raise Exception("required parameter missing: "+k)
+ return
+
+ # got arg
+ nargs += 1
+ val = params[k]
+ values.append(val)
+ types.append(type)
+ arg = QArg(k, val, nargs, self._arg_conf)
+ parts.append(arg)
+
+ # add to the main sql only if all args exist
+ self._sql_parts.extend(parts)
+ if types:
+ self._arg_type_list.extend(types)
+ if values:
+ self._arg_value_list.extend(values)
+ self._nargs = nargs
+
+ def execute(self, curs):
+ """Client-side query execution on DB-API 2.0 cursor.
+
+ Calls C{curs.execute()} with proper arguments.
+
+ Returns result of curs.execute(), although that does not
+ return anything interesting. Later curs.fetch* methods
+ must be called to get result.
+ """
+ q = self.get_sql(PARAM_DBAPI)
+ args = self._params
+ return curs.execute(q, args)
+
+class PLPyQueryBuilder(QueryBuilder):
+
+ def __init__(self, sqlexpr, params, plan_cache = None, sqls = None):
+ """Init the object.
+
+ @param sqlexpr: Partial sql fragment.
+ @param params: Dict of parameter values.
+ @param plan_cache: (PL/Python) A dict object where to store the plan cache, under the key C{"plan_cache"}.
+ If not given, plan will not be cached and values will be inserted directly
+ to query. Usually either C{GD} or C{SD} should be given here.
+ @param sqls: list object where to append executed sqls (used for debugging)
+ """
+ QueryBuilder.__init__(self, sqlexpr, params)
+ self._sqls = sqls
+
+ if plan_cache:
+ if 'plan_cache' not in plan_cache:
+ plan_cache['plan_cache'] = PlanCache()
+ self._plan_cache = plan_cache['plan_cache']
+ else:
+ self._plan_cache = None
+
+ def execute(self):
+ """Server-size query execution via plpy.
+
+ Query can be run either cached or uncached, depending
+ on C{plan_cache} setting given to L{__init__}.
+
+ Returns result of plpy.execute().
+ """
+
+ args = self._arg_value_list
+ types = self._arg_type_list
+
+ if self._sqls is not None:
+ self._sqls.append( { "sql": self.get_sql(PARAM_INLINE) } )
+
+ if self._plan_cache:
+ sql = self.get_sql(PARAM_PLPY)
+ plan = self._plan_cache.get_plan(sql, types)
+ res = plpy.execute(plan, args)
+ else:
+ sql = self.get_sql(PARAM_INLINE)
+ res = plpy.execute(sql)
+ if res:
+ res = [dbdict(r) for r in res]
+ return res
+