summaryrefslogtreecommitdiff
path: root/python/skytools/skylog.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/skytools/skylog.py')
-rw-r--r--python/skytools/skylog.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/python/skytools/skylog.py b/python/skytools/skylog.py
new file mode 100644
index 00000000..2f6344ae
--- /dev/null
+++ b/python/skytools/skylog.py
@@ -0,0 +1,173 @@
+"""Our log handlers for Python's logging package.
+"""
+
+import sys, os, time, socket, psycopg
+import logging, logging.handlers
+
+from quoting import quote_json
+
+# configurable file logger
+class EasyRotatingFileHandler(logging.handlers.RotatingFileHandler):
+ """Easier setup for RotatingFileHandler."""
+ def __init__(self, filename, maxBytes = 10*1024*1024, backupCount = 3):
+ """Args same as for RotatingFileHandler, but in filename '~' is expanded."""
+ fn = os.path.expanduser(filename)
+ logging.handlers.RotatingFileHandler.__init__(self, fn, maxBytes=maxBytes, backupCount=backupCount)
+
+# send JSON message over UDP
+class UdpLogServerHandler(logging.handlers.DatagramHandler):
+ """Sends log records over UDP to logserver in JSON format."""
+
+ # map logging levels to logserver levels
+ _level_map = {
+ logging.DEBUG : 'DEBUG',
+ logging.INFO : 'INFO',
+ logging.WARNING : 'WARN',
+ logging.ERROR : 'ERROR',
+ logging.CRITICAL: 'FATAL',
+ }
+
+ # JSON message template
+ _log_template = '{\n\t'\
+ '"logger": "skytools.UdpLogServer",\n\t'\
+ '"timestamp": %.0f,\n\t'\
+ '"level": "%s",\n\t'\
+ '"thread": null,\n\t'\
+ '"message": %s,\n\t'\
+ '"properties": {"application":"%s", "hostname":"%s"}\n'\
+ '}'
+
+ # cut longer msgs
+ MAXMSG = 1024
+
+ def makePickle(self, record):
+ """Create message in JSON format."""
+ # get & cut msg
+ msg = self.format(record)
+ if len(msg) > self.MAXMSG:
+ msg = msg[:self.MAXMSG]
+ txt_level = self._level_map.get(record.levelno, "ERROR")
+ pkt = self._log_template % (time.time()*1000, txt_level,
+ quote_json(msg), record.name, socket.gethostname())
+ return pkt
+
+class LogDBHandler(logging.handlers.SocketHandler):
+ """Sends log records into PostgreSQL server.
+
+ Additionally, does some statistics aggregating,
+ to avoid overloading log server.
+
+ It subclasses SocketHandler to get throtthling for
+ failed connections.
+ """
+
+ # map codes to string
+ _level_map = {
+ logging.DEBUG : 'DEBUG',
+ logging.INFO : 'INFO',
+ logging.WARNING : 'WARNING',
+ logging.ERROR : 'ERROR',
+ logging.CRITICAL: 'FATAL',
+ }
+
+ def __init__(self, connect_string):
+ """
+ Initializes the handler with a specific connection string.
+ """
+
+ logging.handlers.SocketHandler.__init__(self, None, None)
+ self.closeOnError = 1
+
+ self.connect_string = connect_string
+
+ self.stat_cache = {}
+ self.stat_flush_period = 60
+ # send first stat line immidiately
+ self.last_stat_flush = 0
+
+ def createSocket(self):
+ try:
+ logging.handlers.SocketHandler.createSocket(self)
+ except:
+ self.sock = self.makeSocket()
+
+ def makeSocket(self):
+ """Create server connection.
+ In this case its not socket but psycopg conection."""
+
+ db = psycopg.connect(self.connect_string)
+ db.autocommit(1)
+ return db
+
+ def emit(self, record):
+ """Process log record."""
+
+ # we do not want log debug messages
+ if record.levelno < logging.INFO:
+ return
+
+ try:
+ self.process_rec(record)
+ except (SystemExit, KeyboardInterrupt):
+ raise
+ except:
+ self.handleError(record)
+
+ def process_rec(self, record):
+ """Aggregate stats if needed, and send to logdb."""
+ # render msg
+ msg = self.format(record)
+
+ # dont want to send stats too ofter
+ if record.levelno == logging.INFO and msg and msg[0] == "{":
+ self.aggregate_stats(msg)
+ if time.time() - self.last_stat_flush >= self.stat_flush_period:
+ self.flush_stats(record.name)
+ return
+
+ if record.levelno < logging.INFO:
+ self.flush_stats(record.name)
+
+ # dont send more than one line
+ ln = msg.find('\n')
+ if ln > 0:
+ msg = msg[:ln]
+
+ txt_level = self._level_map.get(record.levelno, "ERROR")
+ self.send_to_logdb(record.name, txt_level, msg)
+
+ def aggregate_stats(self, msg):
+ """Sum stats together, to lessen load on logdb."""
+
+ msg = msg[1:-1]
+ for rec in msg.split(", "):
+ k, v = rec.split(": ")
+ agg = self.stat_cache.get(k, 0)
+ if v.find('.') >= 0:
+ agg += float(v)
+ else:
+ agg += int(v)
+ self.stat_cache[k] = agg
+
+ def flush_stats(self, service):
+ """Send awuired stats to logdb."""
+ res = []
+ for k, v in self.stat_cache.items():
+ res.append("%s: %s" % (k, str(v)))
+ if len(res) > 0:
+ logmsg = "{%s}" % ", ".join(res)
+ self.send_to_logdb(service, "INFO", logmsg)
+ self.stat_cache = {}
+ self.last_stat_flush = time.time()
+
+ def send_to_logdb(self, service, type, msg):
+ """Actual sending is done here."""
+
+ if self.sock is None:
+ self.createSocket()
+
+ if self.sock:
+ logcur = self.sock.cursor()
+ query = "select * from log.add(%s, %s, %s)"
+ logcur.execute(query, [type, service, msg])
+