summaryrefslogtreecommitdiff
path: root/python/skytools
diff options
context:
space:
mode:
Diffstat (limited to 'python/skytools')
-rw-r--r--python/skytools/gzlog.py7
-rwxr-xr-xpython/skytools/querybuilder.py9
-rw-r--r--python/skytools/scripting.py5
-rw-r--r--python/skytools/skylog.py62
-rw-r--r--python/skytools/sockutil.py11
-rw-r--r--python/skytools/timeutil.py18
-rw-r--r--python/skytools/tnetstrings.py115
7 files changed, 203 insertions, 24 deletions
diff --git a/python/skytools/gzlog.py b/python/skytools/gzlog.py
index 558e2813..0db40fc3 100644
--- a/python/skytools/gzlog.py
+++ b/python/skytools/gzlog.py
@@ -1,8 +1,8 @@
"""Atomic append of gzipped data.
-The point is - if several gzip streams are concated, they
-are read back as one whose stream.
+The point is - if several gzip streams are concatenated,
+they are read back as one whole stream.
"""
import gzip
@@ -22,7 +22,7 @@ def gzip_append(filename, data, level = 6):
g.write(data)
g.close()
zdata = buf.getvalue()
-
+
# append, safely
f = open(filename, "a+", 0)
f.seek(0, 2)
@@ -36,4 +36,3 @@ def gzip_append(filename, data, level = 6):
f.truncate()
f.close()
raise ex
-
diff --git a/python/skytools/querybuilder.py b/python/skytools/querybuilder.py
index c2eead2d..9930daab 100755
--- a/python/skytools/querybuilder.py
+++ b/python/skytools/querybuilder.py
@@ -319,8 +319,11 @@ class PLPyQuery:
arg_list = [arg_dict.get(k) for k in self.arg_map]
return plpy.execute(self.plan, arg_list)
except KeyError:
- plpy.error("Missing argument: QUERY: %s ARGS: %s VALUES: %s" % (
- repr(self.sql), repr(self.arg_map), repr(arg_dict)))
+ need = set(self.arg_map)
+ got = set(arg_dict.keys())
+ missing = list(need.difference(got))
+ plpy.error("Missing arguments: [%s] QUERY: %s" % (
+ ','.join(missing), repr(self.sql)))
def __repr__(self):
return 'PLPyQuery<%s>' % self.sql
@@ -341,7 +344,7 @@ def plpy_exec(gd, sql, args, all_keys_required = True):
>>> res = plpy_exec(GD, "select {arg1}, {arg2:int4}, {arg1}", {'arg1': '3', 'arg2': '4'})
DBG: plpy.execute(('PLAN', 'select $1, $2, $3', ['text', 'int4', 'text']), ['3', '4', '3'])
>>> res = plpy_exec(GD, "select {arg1}, {arg2:int4}, {arg1}", {'arg1': '3'})
- DBG: plpy.error("Missing argument: QUERY: 'select {arg1}, {arg2:int4}, {arg1}' ARGS: ['arg1', 'arg2', 'arg1'] VALUES: {'arg1': '3'}")
+ DBG: plpy.error("Missing arguments: [arg2] QUERY: 'select {arg1}, {arg2:int4}, {arg1}'")
>>> res = plpy_exec(GD, "select {arg1}, {arg2:int4}, {arg1}", {'arg1': '3'}, False)
DBG: plpy.execute(('PLAN', 'select $1, $2, $3', ['text', 'int4', 'text']), ['3', None, '3'])
"""
diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py
index a5e82663..840f3cf4 100644
--- a/python/skytools/scripting.py
+++ b/python/skytools/scripting.py
@@ -588,7 +588,10 @@ class BaseScript(object):
self.reset()
sys.exit(1)
except Exception, d:
- self.send_stats()
+ try: # this may fail too
+ self.send_stats()
+ except:
+ pass
emsg = str(d).rstrip()
self.reset()
self.exception_hook(d, emsg)
diff --git a/python/skytools/skylog.py b/python/skytools/skylog.py
index 0279d7d0..da8ccb91 100644
--- a/python/skytools/skylog.py
+++ b/python/skytools/skylog.py
@@ -1,11 +1,22 @@
"""Our log handlers for Python's logging package.
"""
-import os, time, socket
-import logging, logging.handlers
+import logging
+import logging.handlers
+import os
+import socket
+import time
import skytools
+# use fast implementation if available, otherwise fall back to reference one
+try:
+ import tnetstring as tnetstrings
+ tnetstrings.parse = tnetstrings.pop
+except ImportError:
+ import skytools.tnetstrings as tnetstrings
+ tnetstrings.dumps = tnetstrings.dump
+
__all__ = ['getLogger']
# add TRACE level
@@ -17,10 +28,15 @@ logging.addLevelName(TRACE, 'TRACE')
_service_name = 'unknown_svc'
_job_name = 'unknown_job'
_hostname = socket.gethostname()
+try:
+ _hostaddr = socket.gethostbyname(_hostname)
+except:
+ _hostaddr = "0.0.0.0"
_log_extra = {
'job_name': _job_name,
'service_name': _service_name,
'hostname': _hostname,
+ 'hostaddr': _hostaddr,
}
def set_service_name(service_name, job_name):
"""Set info about current script."""
@@ -64,6 +80,7 @@ class EasyRotatingFileHandler(logging.handlers.RotatingFileHandler):
fn = os.path.expanduser(filename)
logging.handlers.RotatingFileHandler.__init__(self, fn, maxBytes=maxBytes, backupCount=backupCount)
+
# send JSON message over UDP
class UdpLogServerHandler(logging.handlers.DatagramHandler):
"""Sends log records over UDP to logserver in JSON format."""
@@ -98,10 +115,7 @@ class UdpLogServerHandler(logging.handlers.DatagramHandler):
msg = msg[:self.MAXMSG]
txt_level = self._level_map.get(record.levelno, "ERROR")
hostname = _hostname
- try:
- hostaddr = socket.gethostbyname(hostname)
- except:
- hostaddr = "0.0.0.0"
+ hostaddr = _hostaddr
jobname = _job_name
svcname = _service_name
pkt = self._log_template % (time.time()*1000, txt_level, skytools.quote_json(msg),
@@ -114,6 +128,40 @@ class UdpLogServerHandler(logging.handlers.DatagramHandler):
sock.sendto(s, (self.host, self.port))
sock.close()
+
+# send TNetStrings message over UDP
+class UdpTNetStringsHandler(logging.handlers.DatagramHandler):
+ """ Sends log records in TNetStrings format over UDP. """
+
+ # LogRecord fields to send
+ send_fields = [
+ 'created', 'exc_text', 'levelname', 'levelno', 'message', 'msecs', 'name',
+ 'hostaddr', 'hostname', 'job_name', 'service_name']
+
+ _udp_reset = 0
+
+ def makePickle(self, record):
+ """ Create message in TNetStrings format.
+ """
+ msg = {}
+ self.format(record) # render 'message' attribute and others
+ for k in self.send_fields:
+ msg[k] = record.__dict__[k]
+ tnetstr = tnetstrings.dumps(msg)
+ return tnetstr
+
+ def send(self, s):
+ """ Cache socket for a moment, then recreate it.
+ """
+ now = time.time()
+ if now - 1 > self._udp_reset:
+ if self.sock:
+ self.sock.close()
+ self.sock = self.makeSocket()
+ self._udp_reset = now
+ self.sock.sendto(s, (self.host, self.port))
+
+
class LogDBHandler(logging.handlers.SocketHandler):
"""Sends log records into PostgreSQL server.
@@ -234,6 +282,7 @@ class LogDBHandler(logging.handlers.SocketHandler):
query = "select * from log.add(%s, %s, %s)"
logcur.execute(query, [type, service, msg])
+
# fix unicode bug in SysLogHandler
class SysLogHandler(logging.handlers.SysLogHandler):
"""Fixes unicode bug in logging.handlers.SysLogHandler."""
@@ -301,6 +350,7 @@ class SysLogHostnameHandler(SysLogHandler):
msg)
return msg
+
try:
from logging import LoggerAdapter
except ImportError:
diff --git a/python/skytools/sockutil.py b/python/skytools/sockutil.py
index f950f5a0..dbcd021b 100644
--- a/python/skytools/sockutil.py
+++ b/python/skytools/sockutil.py
@@ -37,10 +37,13 @@ def set_tcp_keepalive(fd, keepalive = True,
if not hasattr(socket, 'SO_KEEPALIVE') or not hasattr(socket, 'fromfd'):
return
- # get numeric fd and cast to socket
- if hasattr(fd, 'fileno'):
- fd = fd.fileno()
- s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ # need socket object
+ if isinstance(fd, socket.SocketType):
+ s = fd
+ else:
+ if hasattr(fd, 'fileno'):
+ fd = fd.fileno()
+ s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
# skip if unix socket
if type(s.getsockname()) != type(()):
diff --git a/python/skytools/timeutil.py b/python/skytools/timeutil.py
index a29e050c..2ea63082 100644
--- a/python/skytools/timeutil.py
+++ b/python/skytools/timeutil.py
@@ -134,18 +134,24 @@ def datetime_to_timestamp(dt, local_time=True):
Returns seconds since epoch as float.
- >>> datetime_to_timestamp(parse_iso_timestamp("2005-06-01 15:00:59.33 +02"))
- 1117630859.33
- >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.33, UTC))
- 1117630859.33
- >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.33))
- 1117630859.33
+ >>> datetime_to_timestamp(parse_iso_timestamp("2005-06-01 15:00:59.5 +02"))
+ 1117630859.5
+ >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.5, UTC))
+ 1117630859.5
+ >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.5))
+ 1117630859.5
>>> now = datetime.utcnow()
>>> now2 = datetime.utcfromtimestamp(datetime_to_timestamp(now, False))
+ >>> abs(now2.microsecond - now.microsecond) < 100
+ True
+ >>> now2 = now2.replace(microsecond = now.microsecond)
>>> now == now2
True
>>> now = datetime.now()
>>> now2 = datetime.fromtimestamp(datetime_to_timestamp(now))
+ >>> abs(now2.microsecond - now.microsecond) < 100
+ True
+ >>> now2 = now2.replace(microsecond = now.microsecond)
>>> now == now2
True
"""
diff --git a/python/skytools/tnetstrings.py b/python/skytools/tnetstrings.py
new file mode 100644
index 00000000..afacc09e
--- /dev/null
+++ b/python/skytools/tnetstrings.py
@@ -0,0 +1,115 @@
+# Note this implementation is more strict than necessary to demonstrate
+# minimum restrictions on types allowed in dictionaries.
+
+def dump(data):
+ if type(data) is long or type(data) is int:
+ out = str(data)
+ return '%d:%s#' % (len(out), out)
+ elif type(data) is float:
+ out = '%f' % data
+ return '%d:%s^' % (len(out), out)
+ elif type(data) is str:
+ return '%d:' % len(data) + data + ','
+ elif type(data) is dict:
+ return dump_dict(data)
+ elif type(data) is list:
+ return dump_list(data)
+ elif data == None:
+ return '0:~'
+ elif type(data) is bool:
+ out = repr(data).lower()
+ return '%d:%s!' % (len(out), out)
+ else:
+ assert False, "Can't serialize stuff that's %s." % type(data)
+
+
+def parse(data):
+ payload, payload_type, remain = parse_payload(data)
+
+ if payload_type == '#':
+ value = int(payload)
+ elif payload_type == '}':
+ value = parse_dict(payload)
+ elif payload_type == ']':
+ value = parse_list(payload)
+ elif payload_type == '!':
+ value = payload == 'true'
+ elif payload_type == '^':
+ value = float(payload)
+ elif payload_type == '~':
+ assert len(payload) == 0, "Payload must be 0 length for null."
+ value = None
+ elif payload_type == ',':
+ value = payload
+ else:
+ assert False, "Invalid payload type: %r" % payload_type
+
+ return value, remain
+
+def parse_payload(data):
+ assert data, "Invalid data to parse, it's empty."
+ length, extra = data.split(':', 1)
+ length = int(length)
+
+ payload, extra = extra[:length], extra[length:]
+ assert extra, "No payload type: %r, %r" % (payload, extra)
+ payload_type, remain = extra[0], extra[1:]
+
+ assert len(payload) == length, "Data is wrong length %d vs %d" % (length, len(payload))
+ return payload, payload_type, remain
+
+def parse_list(data):
+ if len(data) == 0: return []
+
+ result = []
+ value, extra = parse(data)
+ result.append(value)
+
+ while extra:
+ value, extra = parse(extra)
+ result.append(value)
+
+ return result
+
+def parse_pair(data):
+ key, extra = parse(data)
+ assert extra, "Unbalanced dictionary store."
+ value, extra = parse(extra)
+
+ return key, value, extra
+
+def parse_dict(data):
+ if len(data) == 0: return {}
+
+ key, value, extra = parse_pair(data)
+ assert type(key) is str, "Keys can only be strings."
+
+ result = {key: value}
+
+ while extra:
+ key, value, extra = parse_pair(extra)
+ result[key] = value
+
+ return result
+
+
+
+def dump_dict(data):
+ result = []
+ for k,v in data.items():
+ result.append(dump(str(k)))
+ result.append(dump(v))
+
+ payload = ''.join(result)
+ return '%d:' % len(payload) + payload + '}'
+
+
+def dump_list(data):
+ result = []
+ for i in data:
+ result.append(dump(i))
+
+ payload = ''.join(result)
+ return '%d:' % len(payload) + payload + ']'
+
+