diff options
-rw-r--r-- | pgweb/core/views.py | 11 | ||||
-rw-r--r-- | pgweb/settings.py | 4 | ||||
-rw-r--r-- | sql/varnish.sql | 20 | ||||
-rw-r--r-- | templates/core/admin_purge.html | 17 | ||||
-rwxr-xr-x | tools/pgq/varnish_consumer.py | 173 | ||||
-rw-r--r-- | tools/pgq/varnish_pgq.ini | 20 | ||||
-rwxr-xr-x | tools/varnishqueue/nagios_check.py | 39 | ||||
-rwxr-xr-x | tools/varnishqueue/varnish_queue.py | 145 |
8 files changed, 214 insertions, 215 deletions
diff --git a/pgweb/core/views.py b/pgweb/core/views.py index 6b400bcd..2317a206 100644 --- a/pgweb/core/views.py +++ b/pgweb/core/views.py @@ -3,6 +3,7 @@ from django.http import HttpResponse, Http404, HttpResponseRedirect from django.http import HttpResponseNotModified from django.template import TemplateDoesNotExist, loader from django.contrib.auth.decorators import login_required, user_passes_test +from django.contrib import messages from django.views.decorators.csrf import csrf_exempt from django.db.models import Count from django.db import connection, transaction @@ -266,17 +267,15 @@ def admin_purge(request): return HttpResponseRedirect('.') varnish_purge(url) transaction.commit_unless_managed() - completed = '^%s' % url - else: - completed = None + messages.info(request, "Purge completed: '^%s'" % url) + return HttpResponseRedirect('.') # Fetch list of latest purges curs = connection.cursor() - curs.execute("SELECT ev_time, ev_type, ev_data FROM pgq.event_%s WHERE ev_type IN ('P', 'X') ORDER BY ev_time DESC LIMIT 20" % settings.VARNISH_QUEUE_ID) - latest = [{'t': r[0], 'ty': r[1], 'u': r[2]} for r in curs.fetchall()] + curs.execute("SELECT added, completed, consumer, mode, expr FROM varnishqueue.queue q LEFT JOIN varnishqueue.consumers c ON c.consumerid=q.consumerid ORDER BY added DESC") + latest = curs.fetchall() return render_to_response('core/admin_purge.html', { - 'purge_completed': completed, 'latest_purges': latest, }, RequestContext(request)) diff --git a/pgweb/settings.py b/pgweb/settings.py index 624c920d..67da6e70 100644 --- a/pgweb/settings.py +++ b/pgweb/settings.py @@ -135,6 +135,9 @@ PASSWORD_HASHERS = ( 'django.contrib.auth.hashers.CryptPasswordHasher', ) +# Default format for date/time (as it changes between machines) +DATETIME_FORMAT="Y-m-d H:i:s" + # Configure recaptcha. Most details contain keys and are thus handled # in settings_local.py. Override NOCAPTCHA to actually use them. NOCAPTCHA=True @@ -162,7 +165,6 @@ FRONTEND_SERVERS=() # A tuple containing the FTP_MASTERS=() # A tuple containing the *IP addresses* of all machines # trusted to upload ftp structure data VARNISH_PURGERS=() # Extra servers that can do varnish purges through our queue -VARNISH_QUEUE_ID=1 # pgq queue id used for varnish purging ARCHIVES_SEARCH_SERVER="archives.postgresql.org" # Where to post REST request for archives search FRONTEND_SMTP_RELAY="magus.postgresql.org" # Where to relay user generated email SITE_UPDATE_TRIGGER_FILE='/tmp/pgweb.update_trigger' # Where to drop update trigger file diff --git a/sql/varnish.sql b/sql/varnish.sql index bfbdb743..10792088 100644 --- a/sql/varnish.sql +++ b/sql/varnish.sql @@ -2,20 +2,28 @@ BEGIN; -- -- Create a function to purge from varnish cache --- By default this adds the object to a pgq queue, +-- By default this adds the object to a local queue, -- but this function can be replaced with a void one -- when running a development version. -- +CREATE SCHEMA IF NOT EXISTS varnishqueue; +CREATE TABLE IF NOT EXISTS varnishqueue.queue (id bigserial primary key, mode char NOT NULL, consumerid int NOT NULL, expr text NOT NULL, added timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP, completed timestamptz NULL); +CREATE TABLE IF NOT EXISTS varnishqueue.consumers (consumerid serial PRIMARY KEY, consumer text NOT NULL); + +DROP FUNCTION IF EXISTS varnish_purge(url text); CREATE OR REPLACE FUNCTION varnish_purge(url text) -RETURNS bigint +RETURNS void AS $$ - SELECT pgq.insert_event('varnish', 'P', $1); + INSERT INTO varnishqueue.queue (mode, consumerid, expr) SELECT 'P', consumerid, $1 FROM varnishqueue.consumers; + NOTIFY varnishqueue; $$ LANGUAGE 'sql'; +DROP FUNCTION IF EXISTS varnish_purge_expr(expr text); CREATE OR REPLACE FUNCTION varnish_purge_expr(expr text) -RETURNS bigint +RETURNS void AS $$ - SELECT pgq.insert_event('varnish', 'X', $1); + INSERT INTO varnishqueue.queue (mode, consumerid, expr) SELECT 'X', consumerid, $1 FROM varnishqueue.consumers; + NOTIFY varnishqueue; $$ LANGUAGE 'sql'; -COMMIT;
\ No newline at end of file +COMMIT; diff --git a/templates/core/admin_purge.html b/templates/core/admin_purge.html index 16df1ef0..27bc077e 100644 --- a/templates/core/admin_purge.html +++ b/templates/core/admin_purge.html @@ -11,12 +11,6 @@ <h1>Purge URL from Varnish</h1> <div id="content-main"> -{%if purge_completed %} - <div class="module"> - Purge completed: {{purge_completed}} - </div> -{%endif%} - <form method="POST" action=".">{% csrf_token %} URL (regex, ^ is auto-added): <input type="text" name="url"> <input type="submit" value="Purge" /> @@ -26,11 +20,16 @@ URL (regex, ^ is auto-added): <input type="text" name="url"> <div class="module"> <table summary="Latest purges" width="100%"> <caption><a class="section">Latest purges</a></caption> + <tr class="row2"> + <th width="150">Queued</th> + <th width="150">Delivered</th> + <th width="150">Frontend</th> + <th width="30">Type</th> + <th>Expression</th> + </tr> {%for p in latest_purges%} <tr class="row{%cycle '1' '2'%}"> - <td>{{p.t|date:"c"}}</td> - <td>{{p.ty}}</td> - <td>{{p.u}}</td> + {%for c in p%}<td>{{c|default:""}}</td>{%endfor%} </tr> {%endfor%} </table> diff --git a/tools/pgq/varnish_consumer.py b/tools/pgq/varnish_consumer.py deleted file mode 100755 index 1ef16c7d..00000000 --- a/tools/pgq/varnish_consumer.py +++ /dev/null @@ -1,173 +0,0 @@ -#!/usr/bin/env python -# -# A pgq consumer that generates varnish purge requests to expire things from -# the frontend caches. -# -# Reads the file varnish_pgq.ini, which is a normal pgq configuration file. -# Will look for any section starting with varnish_purger_<name>, and start one -# purger for each such section purging from the frontend <name>. -# -# Each purger will run in a process of it's own, because pgq doesn't support -# running different consumers in different threads. -# -# Purging is done by sending a regular GET request to /varnish-purge-url, with -# the regular expression to purge in the http header X-Purge-URL. -# -# Retrying is handled automatically by pgq. In case a subprocess dies, it will -# be restarted regularly by the remaining watchdog process. -# -# - -import httplib -import signal -import sys -import time -import datetime -from ConfigParser import ConfigParser -from multiprocessing import Process - -import pgq - -def print_t(s): - print "%s: %s" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), s) - -class VarnishPurger(pgq.Consumer): - """ - pgq consumer that purges URLs from varnish as they appear in the queue. - """ - - def __init__(self, frontend): - self.frontend = frontend - super(VarnishPurger, self).__init__('varnish_purger_%s' % frontend, 'db', ['varnish_pgq.ini']) - - def process_batch(self, db, batch_id, ev_list): - "Called by pgq for each batch of events to run." - - for ev in ev_list: - if ev.type == 'P': - # 'P' events means purge. - print_t("Purging '%s' on %s" % (ev.data, self.frontend)) - try: - if self.do_purge(ev.data): - ev.tag_done() - except Exception, e: - print_t("Failed to purge '%s' on '%s': %s" % (ev.data, self.frontend, e)) - elif ev.type == 'X': - # 'X' events means ban expression (rather than just urls) - print_t("Purging expression '%s' on %s" % (ev.data, self.frontend)) - try: - if self.do_purge_expr(ev.data): - ev.tag_done() - except Exception, e: - print_t("Failed to purge expression '%s' on '%s': %s" % (ev.data, self.frontend, e)) - else: - print_t("Unknown event type '%s'" % ev.type) - - - def internal_purge(self, headers): - """ - Send the actual purge request, by contacting the frontend this - purger is running for and sending a GET request to the special URL - with our regexp in a special header. - """ - conn = httplib.HTTPConnection('%s.postgresql.org' % self.frontend) - conn.request("GET", "/varnish-purge-url", '', headers) - resp = conn.getresponse() - conn.close() - if resp.status == 200: - return True - - print_t("Varnish purge returned status %s (%s)" % (resp.status, resp.reason)) - return False - - def do_purge(self, url): - return self.internal_purge({'X-Purge-URL': url}) - - def do_purge_expr(self, expr): - return self.internal_purge({'X-Purge-Expr': expr}) - -class PurgerProcess(object): - """ - Wrapper class that represents a subprocess that runs a varnish purger. - """ - def __init__(self, frontend): - self.frontend = frontend - self.start() - - def start(self): - self.process = Process(target=self._run, name=frontend) - self.process.start() - - def _run(self): - # NOTE! This runs in the client! Must *NOT* be called from the - # parent process! - - # Start by turning off signals so we don't try to restart ourselves - # and others, entering into possible infinite recursion. - signal.signal(signal.SIGTERM, signal.SIG_DFL) - signal.signal(signal.SIGQUIT, signal.SIG_DFL) - signal.signal(signal.SIGHUP, signal.SIG_DFL) - - # Start and run the consumer - print_t("Initiating run of %s" % self.frontend) - self.purger = VarnishPurger(frontend) - self.purger.start() - - def validate(self): - """ - Validate that the process is running. If it's no longer running, - try starting a new one. - """ - if not self.process.is_alive(): - # Ugh! - print_t("Process for '%s' died!" % self.frontend) - self.process.join() - print_t("Attempting restart of '%s'!" % self.frontend) - self.start() - - def terminate(self): - """ - Terminate the process running this purger. - """ - print_t("Terminating process for '%s'" % self.frontend) - self.process.terminate() - self.process.join() - - -# We need to keep the list of purgers in a global variable, so we can kill -# them off from the signal handler. -global purgers -purgers = [] - -def sighandler(signal, frame): - print_t("Received terminating signal, shutting down subprocesses") - for p in purgers: - p.terminate() - sys.exit(0) - - -if __name__=="__main__": - cp = ConfigParser() - cp.read('varnish_pgq.ini') - - if len(sys.argv) > 2 and sys.argv[1] == "-logfile": - # Redirect to whatever is in sys.argv[2] - # (yes, this is a really ugly way of doing things..) - f = open(sys.argv[2], "a", 0) - sys.stdout = f - - # Trap signals that shut us down, so we can kill off our subprocesses - # before we die. - signal.signal(signal.SIGTERM, sighandler) - signal.signal(signal.SIGQUIT, sighandler) - signal.signal(signal.SIGHUP, sighandler) - - # Start one process for each of the configured purgers - for frontend in [section[15:] for section in cp.sections() if section[:15] == 'varnish_purger_']: - purgers.append(PurgerProcess(frontend)) - - # Loop forever, restarting any worker process that has potentially died - while True: - for p in purgers: - p.validate() - time.sleep(10) diff --git a/tools/pgq/varnish_pgq.ini b/tools/pgq/varnish_pgq.ini deleted file mode 100644 index 16fc7040..00000000 --- a/tools/pgq/varnish_pgq.ini +++ /dev/null @@ -1,20 +0,0 @@ -# Global settings for pgq -[pgqadm] -job_name = varnish_ticker -db = dbname=pgweb user=pgq -logfile = /var/log/pgweb-pgq.log -pidfile = /var/run/pgweb-pgq.pid - -# We have one consumer for each frontend server, -# all talking to the same queue and database. -[varnish_purger_zakdorn] -pgq_queue_name = varnish -db = dbname=pgweb user=pgq - -[varnish_purger_zalkon] -pgq_queue_name = varnish -db = dbname=pgweb user=pgq - -[varnish_purger_zetar] -pgq_queue_name = varnish -db = dbname=pgweb user=pgq diff --git a/tools/varnishqueue/nagios_check.py b/tools/varnishqueue/nagios_check.py new file mode 100755 index 00000000..642996d5 --- /dev/null +++ b/tools/varnishqueue/nagios_check.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python + +import sys +import psycopg2 +from datetime import timedelta + +# Up to 5 minutes delay is ok +WARNING_THRESHOLD=timedelta(minutes=5) +# More than 15 minutes something is definitely wrong +CRITICAL_THRESHOLD=timedelta(minutes=15) + +if __name__ == "__main__": + if len(sys.argv) != 2: + print "Usage: nagios_check.py <dsn>" + sys.exit(1) + + conn = psycopg2.connect(sys.argv[1]) + curs = conn.cursor() + + # Get the oldest entry that has not been completed, if any + curs.execute("SELECT max(now()-added) FROM varnishqueue.queue WHERE completed IS NULL") + rows = curs.fetchall() + conn.close() + + if len(rows) == 0: + print "OK, queue is empty" + sys.exit(0) + + age = rows[0][0] + + if age < WARNING_THRESHOLD: + print "OK, queue age is %s" % age + sys.exit(0) + elif age < CRITICAL_THRESHOLD: + print "WARNING, queue age is %s" % age + sys.exit(1) + else: + print "CRITICAL, queue age is %s" % age + sys.exit(2) diff --git a/tools/varnishqueue/varnish_queue.py b/tools/varnishqueue/varnish_queue.py new file mode 100755 index 00000000..2f6921e4 --- /dev/null +++ b/tools/varnishqueue/varnish_queue.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python +# +# varnish_queue.py - handle varnish purging queues +# +# Spawns a worker for each of the varnish servers, each will drain +# it's own queue as quickly as it can when told to do so by a notify. +# + +import time +import sys +import select +import httplib +import multiprocessing +import logging +import psycopg2 +from setproctitle import setproctitle + +def do_purge(consumername, headers): + try: + conn = httplib.HTTPConnection('%s.postgresql.org' % consumername) + conn.request("GET", "/varnish-purge-url", '', headers) + resp = conn.getresponse() + conn.close() + if resp.status == 200: + return True + logging.warning("Varnish purge on %s returned status %s (%s)" % (consumername, resp.status, resp.reason)) + return False + except Exception, ex: + logging.error("Exception purging on %s: %s" % (consumername, ex)) + return False + return True + +def worker(consumerid, consumername, dsn): + logging.info("Starting worker for %s" % consumername) + setproctitle("varnish_queue - worker for %s" % consumername) + + conn = psycopg2.connect(dsn) + curs = conn.cursor() + curs.execute("LISTEN varnishqueue") + conn.commit() + + while True: + # See if there is something to pick up off the queue + curs.execute("SELECT id, mode, expr FROM varnishqueue.queue WHERE consumerid=%(consumerid)s AND completed IS NULL FOR UPDATE", { + 'consumerid': consumerid, + }) + res = curs.fetchall() + + failed = False + + if len(res): + idlist = [] + for r in res: + # Do something with this entry... + if r[1] == 'P': + logging.info("Purging url %s on %s" % (r[2], consumername)) + if not do_purge(consumername, {'X-Purge-URL': r[2]}): + # Failed, but we will try again, so don't add to list of removals + failed = True + continue + elif r[1] == 'X': + logging.info("Purging expression %s on %s" % (r[2], consumername)) + if not do_purge(consumername, {'X-Purge-Expr': r[2]}): + failed = True + continue + else: + logging.warning("Unknown purge type %s on %s, ignoring." % (r[1], consumername)) + + # Schedule for removal + idlist.append(r[0]) + + # Then remove from queue + curs.execute("UPDATE varnishqueue.queue SET completed=CURRENT_TIMESTAMP WHERE id=ANY(%(idlist)s)", { + 'idlist': idlist + }) + conn.commit() + if failed: + time.sleep(5) + else: + # Nothing, so roll back the transaction and wait + conn.rollback() + + select.select([conn],[],[],5*60) + conn.poll() + while conn.notifies: + conn.notifies.pop() + # Loop back up and process the full queue + + +def housekeeper(dsn): + logging.info("Starting housekeeper") + setproctitle("varnish_queue - housekeeper") + conn = psycopg2.connect(dsn) + curs = conn.cursor() + + while True: + curs.execute("DELETE FROM varnishqueue.queue WHERE completed IS NOT NULL") + if curs.rowcount > 0: + conn.commit() + else: + conn.rollback() + time.sleep(5*60) + + +if __name__ == "__main__": + if len(sys.argv) != 2: + print "Usage: varnish_queue.py <dsn>" + sys.exit(1) + + logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', level=logging.INFO) + + conn = psycopg2.connect(sys.argv[1]) + + curs = conn.cursor() + curs.execute("SELECT consumerid, consumer FROM varnishqueue.consumers") + consumers = curs.fetchall() + conn.close() + + # Now spawn a worker for each + processes = [] + for consumerid, consumername in consumers: + p = multiprocessing.Process(target=worker, args=(consumerid, consumername, sys.argv[1])) + p.start() + processes.append(p) + + # Start a housekeeping process as well + p = multiprocessing.Process(target=housekeeper, args=(sys.argv[1],)) + p.start() + processes.append(p) + + # They should never die, but if they do, commit suicide and + # restart everything. + while True: + processes[0].join(timeout=120) + for p in processes: + if not p.is_alive(): + logging.warning("Child process died, killing all and existing") + for p2 in processes: + try: + p2.terminate() + except: + pass + logging.error("Children killed, existing") + sys.exit(1) + # If all processes are alive, loop back up and try again |