diff options
Diffstat (limited to 'python/skytools')
-rw-r--r-- | python/skytools/gzlog.py | 7 | ||||
-rwxr-xr-x | python/skytools/querybuilder.py | 9 | ||||
-rw-r--r-- | python/skytools/scripting.py | 5 | ||||
-rw-r--r-- | python/skytools/skylog.py | 62 | ||||
-rw-r--r-- | python/skytools/sockutil.py | 11 | ||||
-rw-r--r-- | python/skytools/timeutil.py | 18 | ||||
-rw-r--r-- | python/skytools/tnetstrings.py | 115 |
7 files changed, 203 insertions, 24 deletions
diff --git a/python/skytools/gzlog.py b/python/skytools/gzlog.py index 558e2813..0db40fc3 100644 --- a/python/skytools/gzlog.py +++ b/python/skytools/gzlog.py @@ -1,8 +1,8 @@ """Atomic append of gzipped data. -The point is - if several gzip streams are concated, they -are read back as one whose stream. +The point is - if several gzip streams are concatenated, +they are read back as one whole stream. """ import gzip @@ -22,7 +22,7 @@ def gzip_append(filename, data, level = 6): g.write(data) g.close() zdata = buf.getvalue() - + # append, safely f = open(filename, "a+", 0) f.seek(0, 2) @@ -36,4 +36,3 @@ def gzip_append(filename, data, level = 6): f.truncate() f.close() raise ex - diff --git a/python/skytools/querybuilder.py b/python/skytools/querybuilder.py index c2eead2d..9930daab 100755 --- a/python/skytools/querybuilder.py +++ b/python/skytools/querybuilder.py @@ -319,8 +319,11 @@ class PLPyQuery: arg_list = [arg_dict.get(k) for k in self.arg_map] return plpy.execute(self.plan, arg_list) except KeyError: - plpy.error("Missing argument: QUERY: %s ARGS: %s VALUES: %s" % ( - repr(self.sql), repr(self.arg_map), repr(arg_dict))) + need = set(self.arg_map) + got = set(arg_dict.keys()) + missing = list(need.difference(got)) + plpy.error("Missing arguments: [%s] QUERY: %s" % ( + ','.join(missing), repr(self.sql))) def __repr__(self): return 'PLPyQuery<%s>' % self.sql @@ -341,7 +344,7 @@ def plpy_exec(gd, sql, args, all_keys_required = True): >>> res = plpy_exec(GD, "select {arg1}, {arg2:int4}, {arg1}", {'arg1': '3', 'arg2': '4'}) DBG: plpy.execute(('PLAN', 'select $1, $2, $3', ['text', 'int4', 'text']), ['3', '4', '3']) >>> res = plpy_exec(GD, "select {arg1}, {arg2:int4}, {arg1}", {'arg1': '3'}) - DBG: plpy.error("Missing argument: QUERY: 'select {arg1}, {arg2:int4}, {arg1}' ARGS: ['arg1', 'arg2', 'arg1'] VALUES: {'arg1': '3'}") + DBG: plpy.error("Missing arguments: [arg2] QUERY: 'select {arg1}, {arg2:int4}, {arg1}'") >>> res = plpy_exec(GD, "select {arg1}, {arg2:int4}, {arg1}", {'arg1': '3'}, False) DBG: plpy.execute(('PLAN', 'select $1, $2, $3', ['text', 'int4', 'text']), ['3', None, '3']) """ diff --git a/python/skytools/scripting.py b/python/skytools/scripting.py index a5e82663..840f3cf4 100644 --- a/python/skytools/scripting.py +++ b/python/skytools/scripting.py @@ -588,7 +588,10 @@ class BaseScript(object): self.reset() sys.exit(1) except Exception, d: - self.send_stats() + try: # this may fail too + self.send_stats() + except: + pass emsg = str(d).rstrip() self.reset() self.exception_hook(d, emsg) diff --git a/python/skytools/skylog.py b/python/skytools/skylog.py index 0279d7d0..da8ccb91 100644 --- a/python/skytools/skylog.py +++ b/python/skytools/skylog.py @@ -1,11 +1,22 @@ """Our log handlers for Python's logging package. """ -import os, time, socket -import logging, logging.handlers +import logging +import logging.handlers +import os +import socket +import time import skytools +# use fast implementation if available, otherwise fall back to reference one +try: + import tnetstring as tnetstrings + tnetstrings.parse = tnetstrings.pop +except ImportError: + import skytools.tnetstrings as tnetstrings + tnetstrings.dumps = tnetstrings.dump + __all__ = ['getLogger'] # add TRACE level @@ -17,10 +28,15 @@ logging.addLevelName(TRACE, 'TRACE') _service_name = 'unknown_svc' _job_name = 'unknown_job' _hostname = socket.gethostname() +try: + _hostaddr = socket.gethostbyname(_hostname) +except: + _hostaddr = "0.0.0.0" _log_extra = { 'job_name': _job_name, 'service_name': _service_name, 'hostname': _hostname, + 'hostaddr': _hostaddr, } def set_service_name(service_name, job_name): """Set info about current script.""" @@ -64,6 +80,7 @@ class EasyRotatingFileHandler(logging.handlers.RotatingFileHandler): fn = os.path.expanduser(filename) logging.handlers.RotatingFileHandler.__init__(self, fn, maxBytes=maxBytes, backupCount=backupCount) + # send JSON message over UDP class UdpLogServerHandler(logging.handlers.DatagramHandler): """Sends log records over UDP to logserver in JSON format.""" @@ -98,10 +115,7 @@ class UdpLogServerHandler(logging.handlers.DatagramHandler): msg = msg[:self.MAXMSG] txt_level = self._level_map.get(record.levelno, "ERROR") hostname = _hostname - try: - hostaddr = socket.gethostbyname(hostname) - except: - hostaddr = "0.0.0.0" + hostaddr = _hostaddr jobname = _job_name svcname = _service_name pkt = self._log_template % (time.time()*1000, txt_level, skytools.quote_json(msg), @@ -114,6 +128,40 @@ class UdpLogServerHandler(logging.handlers.DatagramHandler): sock.sendto(s, (self.host, self.port)) sock.close() + +# send TNetStrings message over UDP +class UdpTNetStringsHandler(logging.handlers.DatagramHandler): + """ Sends log records in TNetStrings format over UDP. """ + + # LogRecord fields to send + send_fields = [ + 'created', 'exc_text', 'levelname', 'levelno', 'message', 'msecs', 'name', + 'hostaddr', 'hostname', 'job_name', 'service_name'] + + _udp_reset = 0 + + def makePickle(self, record): + """ Create message in TNetStrings format. + """ + msg = {} + self.format(record) # render 'message' attribute and others + for k in self.send_fields: + msg[k] = record.__dict__[k] + tnetstr = tnetstrings.dumps(msg) + return tnetstr + + def send(self, s): + """ Cache socket for a moment, then recreate it. + """ + now = time.time() + if now - 1 > self._udp_reset: + if self.sock: + self.sock.close() + self.sock = self.makeSocket() + self._udp_reset = now + self.sock.sendto(s, (self.host, self.port)) + + class LogDBHandler(logging.handlers.SocketHandler): """Sends log records into PostgreSQL server. @@ -234,6 +282,7 @@ class LogDBHandler(logging.handlers.SocketHandler): query = "select * from log.add(%s, %s, %s)" logcur.execute(query, [type, service, msg]) + # fix unicode bug in SysLogHandler class SysLogHandler(logging.handlers.SysLogHandler): """Fixes unicode bug in logging.handlers.SysLogHandler.""" @@ -301,6 +350,7 @@ class SysLogHostnameHandler(SysLogHandler): msg) return msg + try: from logging import LoggerAdapter except ImportError: diff --git a/python/skytools/sockutil.py b/python/skytools/sockutil.py index f950f5a0..dbcd021b 100644 --- a/python/skytools/sockutil.py +++ b/python/skytools/sockutil.py @@ -37,10 +37,13 @@ def set_tcp_keepalive(fd, keepalive = True, if not hasattr(socket, 'SO_KEEPALIVE') or not hasattr(socket, 'fromfd'): return - # get numeric fd and cast to socket - if hasattr(fd, 'fileno'): - fd = fd.fileno() - s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) + # need socket object + if isinstance(fd, socket.SocketType): + s = fd + else: + if hasattr(fd, 'fileno'): + fd = fd.fileno() + s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) # skip if unix socket if type(s.getsockname()) != type(()): diff --git a/python/skytools/timeutil.py b/python/skytools/timeutil.py index a29e050c..2ea63082 100644 --- a/python/skytools/timeutil.py +++ b/python/skytools/timeutil.py @@ -134,18 +134,24 @@ def datetime_to_timestamp(dt, local_time=True): Returns seconds since epoch as float. - >>> datetime_to_timestamp(parse_iso_timestamp("2005-06-01 15:00:59.33 +02")) - 1117630859.33 - >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.33, UTC)) - 1117630859.33 - >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.33)) - 1117630859.33 + >>> datetime_to_timestamp(parse_iso_timestamp("2005-06-01 15:00:59.5 +02")) + 1117630859.5 + >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.5, UTC)) + 1117630859.5 + >>> datetime_to_timestamp(datetime.fromtimestamp(1117630859.5)) + 1117630859.5 >>> now = datetime.utcnow() >>> now2 = datetime.utcfromtimestamp(datetime_to_timestamp(now, False)) + >>> abs(now2.microsecond - now.microsecond) < 100 + True + >>> now2 = now2.replace(microsecond = now.microsecond) >>> now == now2 True >>> now = datetime.now() >>> now2 = datetime.fromtimestamp(datetime_to_timestamp(now)) + >>> abs(now2.microsecond - now.microsecond) < 100 + True + >>> now2 = now2.replace(microsecond = now.microsecond) >>> now == now2 True """ diff --git a/python/skytools/tnetstrings.py b/python/skytools/tnetstrings.py new file mode 100644 index 00000000..afacc09e --- /dev/null +++ b/python/skytools/tnetstrings.py @@ -0,0 +1,115 @@ +# Note this implementation is more strict than necessary to demonstrate +# minimum restrictions on types allowed in dictionaries. + +def dump(data): + if type(data) is long or type(data) is int: + out = str(data) + return '%d:%s#' % (len(out), out) + elif type(data) is float: + out = '%f' % data + return '%d:%s^' % (len(out), out) + elif type(data) is str: + return '%d:' % len(data) + data + ',' + elif type(data) is dict: + return dump_dict(data) + elif type(data) is list: + return dump_list(data) + elif data == None: + return '0:~' + elif type(data) is bool: + out = repr(data).lower() + return '%d:%s!' % (len(out), out) + else: + assert False, "Can't serialize stuff that's %s." % type(data) + + +def parse(data): + payload, payload_type, remain = parse_payload(data) + + if payload_type == '#': + value = int(payload) + elif payload_type == '}': + value = parse_dict(payload) + elif payload_type == ']': + value = parse_list(payload) + elif payload_type == '!': + value = payload == 'true' + elif payload_type == '^': + value = float(payload) + elif payload_type == '~': + assert len(payload) == 0, "Payload must be 0 length for null." + value = None + elif payload_type == ',': + value = payload + else: + assert False, "Invalid payload type: %r" % payload_type + + return value, remain + +def parse_payload(data): + assert data, "Invalid data to parse, it's empty." + length, extra = data.split(':', 1) + length = int(length) + + payload, extra = extra[:length], extra[length:] + assert extra, "No payload type: %r, %r" % (payload, extra) + payload_type, remain = extra[0], extra[1:] + + assert len(payload) == length, "Data is wrong length %d vs %d" % (length, len(payload)) + return payload, payload_type, remain + +def parse_list(data): + if len(data) == 0: return [] + + result = [] + value, extra = parse(data) + result.append(value) + + while extra: + value, extra = parse(extra) + result.append(value) + + return result + +def parse_pair(data): + key, extra = parse(data) + assert extra, "Unbalanced dictionary store." + value, extra = parse(extra) + + return key, value, extra + +def parse_dict(data): + if len(data) == 0: return {} + + key, value, extra = parse_pair(data) + assert type(key) is str, "Keys can only be strings." + + result = {key: value} + + while extra: + key, value, extra = parse_pair(extra) + result[key] = value + + return result + + + +def dump_dict(data): + result = [] + for k,v in data.items(): + result.append(dump(str(k))) + result.append(dump(v)) + + payload = ''.join(result) + return '%d:' % len(payload) + payload + '}' + + +def dump_list(data): + result = [] + for i in data: + result.append(dump(i)) + + payload = ''.join(result) + return '%d:' % len(payload) + payload + ']' + + |