summaryrefslogtreecommitdiff
path: root/postgresqleu/util/db.py
blob: 93093938c510685f41d644c25d50ced014bde59a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from django.db import connection
import collections

def exec_no_result(query, params=None):
	curs = connection.cursor()
	curs.execute(query, params)

def exec_to_list(query, params=None):
	curs = connection.cursor()
	curs.execute(query, params)
	return curs.fetchall()

def exec_to_dict(query, params=None):
	curs = connection.cursor()
	curs.execute(query, params)
	columns = [col[0] for col in curs.description]
	return [dict(zip(columns, row))for row in curs.fetchall()]

def exec_to_scalar(query, params=None):
	curs = connection.cursor()
	curs.execute(query, params)
	r = curs.fetchone()
	if r:
		return r[0]
	# If the query returns no rows at all, then just return None
	return None

def conditional_exec_to_scalar(condition, query, params=None):
	if condition:
		return exec_to_scalar(query, params)
	else:
		return False

def exec_to_keyed_dict(query, params=None):
	curs = connection.cursor()
	curs.execute(query, params)
	columns = [col[0] for col in curs.description]
	return {r[columns[0]]:r for r in (dict(zip(columns, row))for row in curs.fetchall())}

def exec_to_grouped_dict(query, params=None):
	curs = connection.cursor()
	curs.execute(query, params)
	columns = [col[0] for col in curs.description[1:]]
	full = collections.OrderedDict()
	last = None
	curr = []
	for row in curs.fetchall():
		if last != row[0]:
			if curr:
				full[last] = curr
			curr = []
			last = row[0]
		curr.append(dict(zip(columns, row[1:])))
	if last:
		full[last] = curr
	return full