summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pgweb/core/views.py11
-rw-r--r--pgweb/settings.py4
-rw-r--r--sql/varnish.sql20
-rw-r--r--templates/core/admin_purge.html17
-rwxr-xr-xtools/pgq/varnish_consumer.py173
-rw-r--r--tools/pgq/varnish_pgq.ini20
-rwxr-xr-xtools/varnishqueue/nagios_check.py39
-rwxr-xr-xtools/varnishqueue/varnish_queue.py145
8 files changed, 214 insertions, 215 deletions
diff --git a/pgweb/core/views.py b/pgweb/core/views.py
index 6b400bcd..2317a206 100644
--- a/pgweb/core/views.py
+++ b/pgweb/core/views.py
@@ -3,6 +3,7 @@ from django.http import HttpResponse, Http404, HttpResponseRedirect
from django.http import HttpResponseNotModified
from django.template import TemplateDoesNotExist, loader
from django.contrib.auth.decorators import login_required, user_passes_test
+from django.contrib import messages
from django.views.decorators.csrf import csrf_exempt
from django.db.models import Count
from django.db import connection, transaction
@@ -266,17 +267,15 @@ def admin_purge(request):
return HttpResponseRedirect('.')
varnish_purge(url)
transaction.commit_unless_managed()
- completed = '^%s' % url
- else:
- completed = None
+ messages.info(request, "Purge completed: '^%s'" % url)
+ return HttpResponseRedirect('.')
# Fetch list of latest purges
curs = connection.cursor()
- curs.execute("SELECT ev_time, ev_type, ev_data FROM pgq.event_%s WHERE ev_type IN ('P', 'X') ORDER BY ev_time DESC LIMIT 20" % settings.VARNISH_QUEUE_ID)
- latest = [{'t': r[0], 'ty': r[1], 'u': r[2]} for r in curs.fetchall()]
+ curs.execute("SELECT added, completed, consumer, mode, expr FROM varnishqueue.queue q LEFT JOIN varnishqueue.consumers c ON c.consumerid=q.consumerid ORDER BY added DESC")
+ latest = curs.fetchall()
return render_to_response('core/admin_purge.html', {
- 'purge_completed': completed,
'latest_purges': latest,
}, RequestContext(request))
diff --git a/pgweb/settings.py b/pgweb/settings.py
index 624c920d..67da6e70 100644
--- a/pgweb/settings.py
+++ b/pgweb/settings.py
@@ -135,6 +135,9 @@ PASSWORD_HASHERS = (
'django.contrib.auth.hashers.CryptPasswordHasher',
)
+# Default format for date/time (as it changes between machines)
+DATETIME_FORMAT="Y-m-d H:i:s"
+
# Configure recaptcha. Most details contain keys and are thus handled
# in settings_local.py. Override NOCAPTCHA to actually use them.
NOCAPTCHA=True
@@ -162,7 +165,6 @@ FRONTEND_SERVERS=() # A tuple containing the
FTP_MASTERS=() # A tuple containing the *IP addresses* of all machines
# trusted to upload ftp structure data
VARNISH_PURGERS=() # Extra servers that can do varnish purges through our queue
-VARNISH_QUEUE_ID=1 # pgq queue id used for varnish purging
ARCHIVES_SEARCH_SERVER="archives.postgresql.org" # Where to post REST request for archives search
FRONTEND_SMTP_RELAY="magus.postgresql.org" # Where to relay user generated email
SITE_UPDATE_TRIGGER_FILE='/tmp/pgweb.update_trigger' # Where to drop update trigger file
diff --git a/sql/varnish.sql b/sql/varnish.sql
index bfbdb743..10792088 100644
--- a/sql/varnish.sql
+++ b/sql/varnish.sql
@@ -2,20 +2,28 @@ BEGIN;
--
-- Create a function to purge from varnish cache
--- By default this adds the object to a pgq queue,
+-- By default this adds the object to a local queue,
-- but this function can be replaced with a void one
-- when running a development version.
--
+CREATE SCHEMA IF NOT EXISTS varnishqueue;
+CREATE TABLE IF NOT EXISTS varnishqueue.queue (id bigserial primary key, mode char NOT NULL, consumerid int NOT NULL, expr text NOT NULL, added timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP, completed timestamptz NULL);
+CREATE TABLE IF NOT EXISTS varnishqueue.consumers (consumerid serial PRIMARY KEY, consumer text NOT NULL);
+
+DROP FUNCTION IF EXISTS varnish_purge(url text);
CREATE OR REPLACE FUNCTION varnish_purge(url text)
-RETURNS bigint
+RETURNS void
AS $$
- SELECT pgq.insert_event('varnish', 'P', $1);
+ INSERT INTO varnishqueue.queue (mode, consumerid, expr) SELECT 'P', consumerid, $1 FROM varnishqueue.consumers;
+ NOTIFY varnishqueue;
$$ LANGUAGE 'sql';
+DROP FUNCTION IF EXISTS varnish_purge_expr(expr text);
CREATE OR REPLACE FUNCTION varnish_purge_expr(expr text)
-RETURNS bigint
+RETURNS void
AS $$
- SELECT pgq.insert_event('varnish', 'X', $1);
+ INSERT INTO varnishqueue.queue (mode, consumerid, expr) SELECT 'X', consumerid, $1 FROM varnishqueue.consumers;
+ NOTIFY varnishqueue;
$$ LANGUAGE 'sql';
-COMMIT; \ No newline at end of file
+COMMIT;
diff --git a/templates/core/admin_purge.html b/templates/core/admin_purge.html
index 16df1ef0..27bc077e 100644
--- a/templates/core/admin_purge.html
+++ b/templates/core/admin_purge.html
@@ -11,12 +11,6 @@
<h1>Purge URL from Varnish</h1>
<div id="content-main">
-{%if purge_completed %}
- <div class="module">
- Purge completed: {{purge_completed}}
- </div>
-{%endif%}
-
<form method="POST" action=".">{% csrf_token %}
URL (regex, ^ is auto-added): <input type="text" name="url">
<input type="submit" value="Purge" />
@@ -26,11 +20,16 @@ URL (regex, ^ is auto-added): <input type="text" name="url">
<div class="module">
<table summary="Latest purges" width="100%">
<caption><a class="section">Latest purges</a></caption>
+ <tr class="row2">
+ <th width="150">Queued</th>
+ <th width="150">Delivered</th>
+ <th width="150">Frontend</th>
+ <th width="30">Type</th>
+ <th>Expression</th>
+ </tr>
{%for p in latest_purges%}
<tr class="row{%cycle '1' '2'%}">
- <td>{{p.t|date:"c"}}</td>
- <td>{{p.ty}}</td>
- <td>{{p.u}}</td>
+ {%for c in p%}<td>{{c|default:""}}</td>{%endfor%}
</tr>
{%endfor%}
</table>
diff --git a/tools/pgq/varnish_consumer.py b/tools/pgq/varnish_consumer.py
deleted file mode 100755
index 1ef16c7d..00000000
--- a/tools/pgq/varnish_consumer.py
+++ /dev/null
@@ -1,173 +0,0 @@
-#!/usr/bin/env python
-#
-# A pgq consumer that generates varnish purge requests to expire things from
-# the frontend caches.
-#
-# Reads the file varnish_pgq.ini, which is a normal pgq configuration file.
-# Will look for any section starting with varnish_purger_<name>, and start one
-# purger for each such section purging from the frontend <name>.
-#
-# Each purger will run in a process of it's own, because pgq doesn't support
-# running different consumers in different threads.
-#
-# Purging is done by sending a regular GET request to /varnish-purge-url, with
-# the regular expression to purge in the http header X-Purge-URL.
-#
-# Retrying is handled automatically by pgq. In case a subprocess dies, it will
-# be restarted regularly by the remaining watchdog process.
-#
-#
-
-import httplib
-import signal
-import sys
-import time
-import datetime
-from ConfigParser import ConfigParser
-from multiprocessing import Process
-
-import pgq
-
-def print_t(s):
- print "%s: %s" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), s)
-
-class VarnishPurger(pgq.Consumer):
- """
- pgq consumer that purges URLs from varnish as they appear in the queue.
- """
-
- def __init__(self, frontend):
- self.frontend = frontend
- super(VarnishPurger, self).__init__('varnish_purger_%s' % frontend, 'db', ['varnish_pgq.ini'])
-
- def process_batch(self, db, batch_id, ev_list):
- "Called by pgq for each batch of events to run."
-
- for ev in ev_list:
- if ev.type == 'P':
- # 'P' events means purge.
- print_t("Purging '%s' on %s" % (ev.data, self.frontend))
- try:
- if self.do_purge(ev.data):
- ev.tag_done()
- except Exception, e:
- print_t("Failed to purge '%s' on '%s': %s" % (ev.data, self.frontend, e))
- elif ev.type == 'X':
- # 'X' events means ban expression (rather than just urls)
- print_t("Purging expression '%s' on %s" % (ev.data, self.frontend))
- try:
- if self.do_purge_expr(ev.data):
- ev.tag_done()
- except Exception, e:
- print_t("Failed to purge expression '%s' on '%s': %s" % (ev.data, self.frontend, e))
- else:
- print_t("Unknown event type '%s'" % ev.type)
-
-
- def internal_purge(self, headers):
- """
- Send the actual purge request, by contacting the frontend this
- purger is running for and sending a GET request to the special URL
- with our regexp in a special header.
- """
- conn = httplib.HTTPConnection('%s.postgresql.org' % self.frontend)
- conn.request("GET", "/varnish-purge-url", '', headers)
- resp = conn.getresponse()
- conn.close()
- if resp.status == 200:
- return True
-
- print_t("Varnish purge returned status %s (%s)" % (resp.status, resp.reason))
- return False
-
- def do_purge(self, url):
- return self.internal_purge({'X-Purge-URL': url})
-
- def do_purge_expr(self, expr):
- return self.internal_purge({'X-Purge-Expr': expr})
-
-class PurgerProcess(object):
- """
- Wrapper class that represents a subprocess that runs a varnish purger.
- """
- def __init__(self, frontend):
- self.frontend = frontend
- self.start()
-
- def start(self):
- self.process = Process(target=self._run, name=frontend)
- self.process.start()
-
- def _run(self):
- # NOTE! This runs in the client! Must *NOT* be called from the
- # parent process!
-
- # Start by turning off signals so we don't try to restart ourselves
- # and others, entering into possible infinite recursion.
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGQUIT, signal.SIG_DFL)
- signal.signal(signal.SIGHUP, signal.SIG_DFL)
-
- # Start and run the consumer
- print_t("Initiating run of %s" % self.frontend)
- self.purger = VarnishPurger(frontend)
- self.purger.start()
-
- def validate(self):
- """
- Validate that the process is running. If it's no longer running,
- try starting a new one.
- """
- if not self.process.is_alive():
- # Ugh!
- print_t("Process for '%s' died!" % self.frontend)
- self.process.join()
- print_t("Attempting restart of '%s'!" % self.frontend)
- self.start()
-
- def terminate(self):
- """
- Terminate the process running this purger.
- """
- print_t("Terminating process for '%s'" % self.frontend)
- self.process.terminate()
- self.process.join()
-
-
-# We need to keep the list of purgers in a global variable, so we can kill
-# them off from the signal handler.
-global purgers
-purgers = []
-
-def sighandler(signal, frame):
- print_t("Received terminating signal, shutting down subprocesses")
- for p in purgers:
- p.terminate()
- sys.exit(0)
-
-
-if __name__=="__main__":
- cp = ConfigParser()
- cp.read('varnish_pgq.ini')
-
- if len(sys.argv) > 2 and sys.argv[1] == "-logfile":
- # Redirect to whatever is in sys.argv[2]
- # (yes, this is a really ugly way of doing things..)
- f = open(sys.argv[2], "a", 0)
- sys.stdout = f
-
- # Trap signals that shut us down, so we can kill off our subprocesses
- # before we die.
- signal.signal(signal.SIGTERM, sighandler)
- signal.signal(signal.SIGQUIT, sighandler)
- signal.signal(signal.SIGHUP, sighandler)
-
- # Start one process for each of the configured purgers
- for frontend in [section[15:] for section in cp.sections() if section[:15] == 'varnish_purger_']:
- purgers.append(PurgerProcess(frontend))
-
- # Loop forever, restarting any worker process that has potentially died
- while True:
- for p in purgers:
- p.validate()
- time.sleep(10)
diff --git a/tools/pgq/varnish_pgq.ini b/tools/pgq/varnish_pgq.ini
deleted file mode 100644
index 16fc7040..00000000
--- a/tools/pgq/varnish_pgq.ini
+++ /dev/null
@@ -1,20 +0,0 @@
-# Global settings for pgq
-[pgqadm]
-job_name = varnish_ticker
-db = dbname=pgweb user=pgq
-logfile = /var/log/pgweb-pgq.log
-pidfile = /var/run/pgweb-pgq.pid
-
-# We have one consumer for each frontend server,
-# all talking to the same queue and database.
-[varnish_purger_zakdorn]
-pgq_queue_name = varnish
-db = dbname=pgweb user=pgq
-
-[varnish_purger_zalkon]
-pgq_queue_name = varnish
-db = dbname=pgweb user=pgq
-
-[varnish_purger_zetar]
-pgq_queue_name = varnish
-db = dbname=pgweb user=pgq
diff --git a/tools/varnishqueue/nagios_check.py b/tools/varnishqueue/nagios_check.py
new file mode 100755
index 00000000..642996d5
--- /dev/null
+++ b/tools/varnishqueue/nagios_check.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+
+import sys
+import psycopg2
+from datetime import timedelta
+
+# Up to 5 minutes delay is ok
+WARNING_THRESHOLD=timedelta(minutes=5)
+# More than 15 minutes something is definitely wrong
+CRITICAL_THRESHOLD=timedelta(minutes=15)
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print "Usage: nagios_check.py <dsn>"
+ sys.exit(1)
+
+ conn = psycopg2.connect(sys.argv[1])
+ curs = conn.cursor()
+
+ # Get the oldest entry that has not been completed, if any
+ curs.execute("SELECT max(now()-added) FROM varnishqueue.queue WHERE completed IS NULL")
+ rows = curs.fetchall()
+ conn.close()
+
+ if len(rows) == 0:
+ print "OK, queue is empty"
+ sys.exit(0)
+
+ age = rows[0][0]
+
+ if age < WARNING_THRESHOLD:
+ print "OK, queue age is %s" % age
+ sys.exit(0)
+ elif age < CRITICAL_THRESHOLD:
+ print "WARNING, queue age is %s" % age
+ sys.exit(1)
+ else:
+ print "CRITICAL, queue age is %s" % age
+ sys.exit(2)
diff --git a/tools/varnishqueue/varnish_queue.py b/tools/varnishqueue/varnish_queue.py
new file mode 100755
index 00000000..2f6921e4
--- /dev/null
+++ b/tools/varnishqueue/varnish_queue.py
@@ -0,0 +1,145 @@
+#!/usr/bin/env python
+#
+# varnish_queue.py - handle varnish purging queues
+#
+# Spawns a worker for each of the varnish servers, each will drain
+# it's own queue as quickly as it can when told to do so by a notify.
+#
+
+import time
+import sys
+import select
+import httplib
+import multiprocessing
+import logging
+import psycopg2
+from setproctitle import setproctitle
+
+def do_purge(consumername, headers):
+ try:
+ conn = httplib.HTTPConnection('%s.postgresql.org' % consumername)
+ conn.request("GET", "/varnish-purge-url", '', headers)
+ resp = conn.getresponse()
+ conn.close()
+ if resp.status == 200:
+ return True
+ logging.warning("Varnish purge on %s returned status %s (%s)" % (consumername, resp.status, resp.reason))
+ return False
+ except Exception, ex:
+ logging.error("Exception purging on %s: %s" % (consumername, ex))
+ return False
+ return True
+
+def worker(consumerid, consumername, dsn):
+ logging.info("Starting worker for %s" % consumername)
+ setproctitle("varnish_queue - worker for %s" % consumername)
+
+ conn = psycopg2.connect(dsn)
+ curs = conn.cursor()
+ curs.execute("LISTEN varnishqueue")
+ conn.commit()
+
+ while True:
+ # See if there is something to pick up off the queue
+ curs.execute("SELECT id, mode, expr FROM varnishqueue.queue WHERE consumerid=%(consumerid)s AND completed IS NULL FOR UPDATE", {
+ 'consumerid': consumerid,
+ })
+ res = curs.fetchall()
+
+ failed = False
+
+ if len(res):
+ idlist = []
+ for r in res:
+ # Do something with this entry...
+ if r[1] == 'P':
+ logging.info("Purging url %s on %s" % (r[2], consumername))
+ if not do_purge(consumername, {'X-Purge-URL': r[2]}):
+ # Failed, but we will try again, so don't add to list of removals
+ failed = True
+ continue
+ elif r[1] == 'X':
+ logging.info("Purging expression %s on %s" % (r[2], consumername))
+ if not do_purge(consumername, {'X-Purge-Expr': r[2]}):
+ failed = True
+ continue
+ else:
+ logging.warning("Unknown purge type %s on %s, ignoring." % (r[1], consumername))
+
+ # Schedule for removal
+ idlist.append(r[0])
+
+ # Then remove from queue
+ curs.execute("UPDATE varnishqueue.queue SET completed=CURRENT_TIMESTAMP WHERE id=ANY(%(idlist)s)", {
+ 'idlist': idlist
+ })
+ conn.commit()
+ if failed:
+ time.sleep(5)
+ else:
+ # Nothing, so roll back the transaction and wait
+ conn.rollback()
+
+ select.select([conn],[],[],5*60)
+ conn.poll()
+ while conn.notifies:
+ conn.notifies.pop()
+ # Loop back up and process the full queue
+
+
+def housekeeper(dsn):
+ logging.info("Starting housekeeper")
+ setproctitle("varnish_queue - housekeeper")
+ conn = psycopg2.connect(dsn)
+ curs = conn.cursor()
+
+ while True:
+ curs.execute("DELETE FROM varnishqueue.queue WHERE completed IS NOT NULL")
+ if curs.rowcount > 0:
+ conn.commit()
+ else:
+ conn.rollback()
+ time.sleep(5*60)
+
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print "Usage: varnish_queue.py <dsn>"
+ sys.exit(1)
+
+ logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', level=logging.INFO)
+
+ conn = psycopg2.connect(sys.argv[1])
+
+ curs = conn.cursor()
+ curs.execute("SELECT consumerid, consumer FROM varnishqueue.consumers")
+ consumers = curs.fetchall()
+ conn.close()
+
+ # Now spawn a worker for each
+ processes = []
+ for consumerid, consumername in consumers:
+ p = multiprocessing.Process(target=worker, args=(consumerid, consumername, sys.argv[1]))
+ p.start()
+ processes.append(p)
+
+ # Start a housekeeping process as well
+ p = multiprocessing.Process(target=housekeeper, args=(sys.argv[1],))
+ p.start()
+ processes.append(p)
+
+ # They should never die, but if they do, commit suicide and
+ # restart everything.
+ while True:
+ processes[0].join(timeout=120)
+ for p in processes:
+ if not p.is_alive():
+ logging.warning("Child process died, killing all and existing")
+ for p2 in processes:
+ try:
+ p2.terminate()
+ except:
+ pass
+ logging.error("Children killed, existing")
+ sys.exit(1)
+ # If all processes are alive, loop back up and try again