diff options
author | Asko Oja | 2009-05-04 12:22:25 +0000 |
---|---|---|
committer | Asko Oja | 2009-05-04 12:22:25 +0000 |
commit | 7d2aff70af0e9743d5e54f4c4c5b11af0c1df41f (patch) | |
tree | 15c4fe756065ec93e8e6a890cb54df51f655ab48 /python/skytools | |
parent | 7b538859c90a902cf9b2872feb912d0a6e2f7447 (diff) |
Add querybuilder into sql tools
Querybulder is used to manage parametrisized queries both in
plpython/dbservice stored procedures and python scripts.
Diffstat (limited to 'python/skytools')
-rw-r--r-- | python/skytools/sqltools.py | 249 |
1 files changed, 249 insertions, 0 deletions
diff --git a/python/skytools/sqltools.py b/python/skytools/sqltools.py index 2dee2cc1..c0e7e554 100644 --- a/python/skytools/sqltools.py +++ b/python/skytools/sqltools.py @@ -5,6 +5,10 @@ import os from cStringIO import StringIO from skytools.quoting import quote_copy, quote_literal, quote_ident, quote_fqident import skytools.installer_config +try: + import plpy +except ImportError: + pass __all__ = [ "fq_name_parts", "fq_name", "get_table_oid", "get_table_pkeys", @@ -14,8 +18,13 @@ __all__ = [ "CopyPipe", "full_copy", "DBObject", "DBSchema", "DBTable", "DBFunction", "DBLanguage", "db_install", "installer_find_file", "installer_apply_file", "dbdict", "mk_insert_sql", "mk_update_sql", "mk_delete_sql", + 'QueryBuilder', 'PLPyQueryBuilder', ] +PARAM_INLINE = 0 # quote_literal() +PARAM_DBAPI = 1 # %()s +PARAM_PLPY = 2 # $n + class dbdict(dict): """Wrapper on actual dict that allows accessing dict keys as attributes.""" @@ -538,3 +547,243 @@ def mk_delete_sql(row, tbl, pkey_list, field_map = None): whe_str = " and ".join(whe_list) return "delete from %s where %s;" % (quote_fqident(tbl), whe_str) +class QArgConf: + """Per-query arg-type config object.""" + param_type = None + +class QArg: + """Place-holder for a query parameter.""" + def __init__(self, name, value, pos, conf): + self.name = name + self.value = value + self.pos = pos + self.conf = conf + def __str__(self): + if self.conf.param_type == PARAM_INLINE: + return skytools.quote_literal(self.value) + elif self.conf.param_type == PARAM_DBAPI: + return "%s" + elif self.conf.param_type == PARAM_PLPY: + return "$%d" % self.pos + else: + raise Exception("bad QArgConf.param_type") + + +# need an structure with fast remove-from-middle +# and append operations. +class DList: + """Simple double-linked list.""" + def __init__(self): + self.next = self + self.prev = self + + def append(self, obj): + obj.next = self + obj.prev = self.prev + self.prev.next = obj + self.prev = obj + + def remove(self, obj): + obj.next.prev = obj.prev + obj.prev.next = obj.next + obj.next = obj.prev = None + + def empty(self): + return self.next == self + + def pop(self): + """Remove and return first element.""" + obj = None + if not self.empty(): + obj = self.next + self.remove(obj) + return obj + + +class CachedPlan: + """Wrapper around prepared plan.""" + def __init__(self, key, plan): + self.key = key # (sql, (types)) + self.plan = plan + + +class PlanCache: + """Cache for limited amount of plans.""" + + def __init__(self, maxplans = 100): + self.maxplans = maxplans + self.plan_map = {} + self.plan_list = DList() + + def get_plan(self, sql, types): + """Prepare the plan and cache it.""" + + t = (sql, tuple(types)) + if t in self.plan_map: + pc = self.plan_map[t] + # put to the end + self.plan_list.remove(pc) + self.plan_list.append(pc) + return pc.plan + + # prepare new plan + plan = plpy.prepare(sql, types) + + # add to cache + pc = CachedPlan(t, plan) + self.plan_list.append(pc) + self.plan_map[t] = plan + + # remove plans if too much + while len(self.plan_map) > self.maxplans: + pc = self.plan_list.pop() + del self.plan_map[pc.key] + + return plan + + +class QueryBuilder: + """Helper for query building.""" + + def __init__(self, sqlexpr, params): + """Init the object. + + @param sqlexpr: Partial sql fragment. + @param params: Dict of parameter values. + """ + self._params = params + self._arg_type_list = [] + self._arg_value_list = [] + self._sql_parts = [] + self._arg_conf = QArgConf() + self._nargs = 0 + + if sqlexpr: + self.add(sqlexpr, required = True) + + def add(self, expr, type = "text", required = False): + """Add SQL fragment to query. + """ + self._add_expr('', expr, self._params, type, required) + + def get_sql(self, param_type = PARAM_INLINE): + """Return generated SQL (thus far) as string. + + Possible values for param_type: + - 0: Insert values quoted with quote_literal() + - 1: Insert %()s in place of parameters. + - 2: Insert $n in place of parameters. + """ + self._arg_conf.param_type = param_type + tmp = map(str, self._sql_parts) + return "".join(tmp) + + def _add_expr(self, pfx, expr, params, type, required): + parts = [] + types = [] + values = [] + nargs = self._nargs + if pfx: + parts.append(pfx) + pos = 0 + while 1: + # find start of next argument + a1 = expr.find('{', pos) + if a1 < 0: + parts.append(expr[pos:]) + break + + # find end end of argument name + a2 = expr.find('}', a1) + if a2 < 0: + raise Exception("missing argument terminator: "+expr) + + # add plain sql + if a1 > pos: + parts.append(expr[pos:a1]) + pos = a2 + 1 + + # get arg name, check if exists + k = expr[a1 + 1 : a2] + if k not in params: + if required: + raise Exception("required parameter missing: "+k) + return + + # got arg + nargs += 1 + val = params[k] + values.append(val) + types.append(type) + arg = QArg(k, val, nargs, self._arg_conf) + parts.append(arg) + + # add to the main sql only if all args exist + self._sql_parts.extend(parts) + if types: + self._arg_type_list.extend(types) + if values: + self._arg_value_list.extend(values) + self._nargs = nargs + + def execute(self, curs): + """Client-side query execution on DB-API 2.0 cursor. + + Calls C{curs.execute()} with proper arguments. + + Returns result of curs.execute(), although that does not + return anything interesting. Later curs.fetch* methods + must be called to get result. + """ + q = self.get_sql(PARAM_DBAPI) + args = self._params + return curs.execute(q, args) + +class PLPyQueryBuilder(QueryBuilder): + + def __init__(self, sqlexpr, params, plan_cache = None, sqls = None): + """Init the object. + + @param sqlexpr: Partial sql fragment. + @param params: Dict of parameter values. + @param plan_cache: (PL/Python) A dict object where to store the plan cache, under the key C{"plan_cache"}. + If not given, plan will not be cached and values will be inserted directly + to query. Usually either C{GD} or C{SD} should be given here. + @param sqls: list object where to append executed sqls (used for debugging) + """ + QueryBuilder.__init__(self, sqlexpr, params) + self._sqls = sqls + + if plan_cache: + if 'plan_cache' not in plan_cache: + plan_cache['plan_cache'] = PlanCache() + self._plan_cache = plan_cache['plan_cache'] + else: + self._plan_cache = None + + def execute(self): + """Server-size query execution via plpy. + + Query can be run either cached or uncached, depending + on C{plan_cache} setting given to L{__init__}. + + Returns result of plpy.execute(). + """ + + args = self._arg_value_list + types = self._arg_type_list + + if self._sqls is not None: + self._sqls.append( { "sql": self.get_sql(PARAM_INLINE) } ) + + if self._plan_cache: + sql = self.get_sql(PARAM_PLPY) + plan = self._plan_cache.get_plan(sql, types) + res = plpy.execute(plan, args) + else: + sql = self.get_sql(PARAM_INLINE) + res = plpy.execute(sql) + if res: + res = [dbdict(r) for r in res] + return res + |