summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMagnus Hagander2016-04-27 11:17:32 +0000
committerMagnus Hagander2016-05-14 17:49:12 +0000
commit2f8bbc40dd7e80b3d9ee2aa1ac7ee9c5bcfff777 (patch)
tree467e26af3ca2d8b7d57218fede0b388ba066da79
parent37a24af1e8a7ab7be957b38e81d808b9b186aa22 (diff)
Replace pgq with trivial local queue implementation
The queue used for varnish purges has so few entries that it's really not worth paying the management overhead for skytools/pgq. Instead we can use a very simple local deamon using LISTEN/NOTIFY to fire them off. Now include a proper nagios plugin in this package, so we can get rid of the not-very-nice munin plugin currently used in the deployment.
-rw-r--r--pgweb/core/views.py11
-rw-r--r--pgweb/settings.py4
-rw-r--r--sql/varnish.sql20
-rw-r--r--templates/core/admin_purge.html17
-rwxr-xr-xtools/pgq/varnish_consumer.py173
-rw-r--r--tools/pgq/varnish_pgq.ini20
-rwxr-xr-xtools/varnishqueue/nagios_check.py39
-rwxr-xr-xtools/varnishqueue/varnish_queue.py145
8 files changed, 214 insertions, 215 deletions
diff --git a/pgweb/core/views.py b/pgweb/core/views.py
index 6b400bcd..2317a206 100644
--- a/pgweb/core/views.py
+++ b/pgweb/core/views.py
@@ -3,6 +3,7 @@ from django.http import HttpResponse, Http404, HttpResponseRedirect
from django.http import HttpResponseNotModified
from django.template import TemplateDoesNotExist, loader
from django.contrib.auth.decorators import login_required, user_passes_test
+from django.contrib import messages
from django.views.decorators.csrf import csrf_exempt
from django.db.models import Count
from django.db import connection, transaction
@@ -266,17 +267,15 @@ def admin_purge(request):
return HttpResponseRedirect('.')
varnish_purge(url)
transaction.commit_unless_managed()
- completed = '^%s' % url
- else:
- completed = None
+ messages.info(request, "Purge completed: '^%s'" % url)
+ return HttpResponseRedirect('.')
# Fetch list of latest purges
curs = connection.cursor()
- curs.execute("SELECT ev_time, ev_type, ev_data FROM pgq.event_%s WHERE ev_type IN ('P', 'X') ORDER BY ev_time DESC LIMIT 20" % settings.VARNISH_QUEUE_ID)
- latest = [{'t': r[0], 'ty': r[1], 'u': r[2]} for r in curs.fetchall()]
+ curs.execute("SELECT added, completed, consumer, mode, expr FROM varnishqueue.queue q LEFT JOIN varnishqueue.consumers c ON c.consumerid=q.consumerid ORDER BY added DESC")
+ latest = curs.fetchall()
return render_to_response('core/admin_purge.html', {
- 'purge_completed': completed,
'latest_purges': latest,
}, RequestContext(request))
diff --git a/pgweb/settings.py b/pgweb/settings.py
index 624c920d..67da6e70 100644
--- a/pgweb/settings.py
+++ b/pgweb/settings.py
@@ -135,6 +135,9 @@ PASSWORD_HASHERS = (
'django.contrib.auth.hashers.CryptPasswordHasher',
)
+# Default format for date/time (as it changes between machines)
+DATETIME_FORMAT="Y-m-d H:i:s"
+
# Configure recaptcha. Most details contain keys and are thus handled
# in settings_local.py. Override NOCAPTCHA to actually use them.
NOCAPTCHA=True
@@ -162,7 +165,6 @@ FRONTEND_SERVERS=() # A tuple containing the
FTP_MASTERS=() # A tuple containing the *IP addresses* of all machines
# trusted to upload ftp structure data
VARNISH_PURGERS=() # Extra servers that can do varnish purges through our queue
-VARNISH_QUEUE_ID=1 # pgq queue id used for varnish purging
ARCHIVES_SEARCH_SERVER="archives.postgresql.org" # Where to post REST request for archives search
FRONTEND_SMTP_RELAY="magus.postgresql.org" # Where to relay user generated email
SITE_UPDATE_TRIGGER_FILE='/tmp/pgweb.update_trigger' # Where to drop update trigger file
diff --git a/sql/varnish.sql b/sql/varnish.sql
index bfbdb743..10792088 100644
--- a/sql/varnish.sql
+++ b/sql/varnish.sql
@@ -2,20 +2,28 @@ BEGIN;
--
-- Create a function to purge from varnish cache
--- By default this adds the object to a pgq queue,
+-- By default this adds the object to a local queue,
-- but this function can be replaced with a void one
-- when running a development version.
--
+CREATE SCHEMA IF NOT EXISTS varnishqueue;
+CREATE TABLE IF NOT EXISTS varnishqueue.queue (id bigserial primary key, mode char NOT NULL, consumerid int NOT NULL, expr text NOT NULL, added timestamptz NOT NULL DEFAULT CURRENT_TIMESTAMP, completed timestamptz NULL);
+CREATE TABLE IF NOT EXISTS varnishqueue.consumers (consumerid serial PRIMARY KEY, consumer text NOT NULL);
+
+DROP FUNCTION IF EXISTS varnish_purge(url text);
CREATE OR REPLACE FUNCTION varnish_purge(url text)
-RETURNS bigint
+RETURNS void
AS $$
- SELECT pgq.insert_event('varnish', 'P', $1);
+ INSERT INTO varnishqueue.queue (mode, consumerid, expr) SELECT 'P', consumerid, $1 FROM varnishqueue.consumers;
+ NOTIFY varnishqueue;
$$ LANGUAGE 'sql';
+DROP FUNCTION IF EXISTS varnish_purge_expr(expr text);
CREATE OR REPLACE FUNCTION varnish_purge_expr(expr text)
-RETURNS bigint
+RETURNS void
AS $$
- SELECT pgq.insert_event('varnish', 'X', $1);
+ INSERT INTO varnishqueue.queue (mode, consumerid, expr) SELECT 'X', consumerid, $1 FROM varnishqueue.consumers;
+ NOTIFY varnishqueue;
$$ LANGUAGE 'sql';
-COMMIT; \ No newline at end of file
+COMMIT;
diff --git a/templates/core/admin_purge.html b/templates/core/admin_purge.html
index 16df1ef0..27bc077e 100644
--- a/templates/core/admin_purge.html
+++ b/templates/core/admin_purge.html
@@ -11,12 +11,6 @@
<h1>Purge URL from Varnish</h1>
<div id="content-main">
-{%if purge_completed %}
- <div class="module">
- Purge completed: {{purge_completed}}
- </div>
-{%endif%}
-
<form method="POST" action=".">{% csrf_token %}
URL (regex, ^ is auto-added): <input type="text" name="url">
<input type="submit" value="Purge" />
@@ -26,11 +20,16 @@ URL (regex, ^ is auto-added): <input type="text" name="url">
<div class="module">
<table summary="Latest purges" width="100%">
<caption><a class="section">Latest purges</a></caption>
+ <tr class="row2">
+ <th width="150">Queued</th>
+ <th width="150">Delivered</th>
+ <th width="150">Frontend</th>
+ <th width="30">Type</th>
+ <th>Expression</th>
+ </tr>
{%for p in latest_purges%}
<tr class="row{%cycle '1' '2'%}">
- <td>{{p.t|date:"c"}}</td>
- <td>{{p.ty}}</td>
- <td>{{p.u}}</td>
+ {%for c in p%}<td>{{c|default:""}}</td>{%endfor%}
</tr>
{%endfor%}
</table>
diff --git a/tools/pgq/varnish_consumer.py b/tools/pgq/varnish_consumer.py
deleted file mode 100755
index 1ef16c7d..00000000
--- a/tools/pgq/varnish_consumer.py
+++ /dev/null
@@ -1,173 +0,0 @@
-#!/usr/bin/env python
-#
-# A pgq consumer that generates varnish purge requests to expire things from
-# the frontend caches.
-#
-# Reads the file varnish_pgq.ini, which is a normal pgq configuration file.
-# Will look for any section starting with varnish_purger_<name>, and start one
-# purger for each such section purging from the frontend <name>.
-#
-# Each purger will run in a process of it's own, because pgq doesn't support
-# running different consumers in different threads.
-#
-# Purging is done by sending a regular GET request to /varnish-purge-url, with
-# the regular expression to purge in the http header X-Purge-URL.
-#
-# Retrying is handled automatically by pgq. In case a subprocess dies, it will
-# be restarted regularly by the remaining watchdog process.
-#
-#
-
-import httplib
-import signal
-import sys
-import time
-import datetime
-from ConfigParser import ConfigParser
-from multiprocessing import Process
-
-import pgq
-
-def print_t(s):
- print "%s: %s" % (datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), s)
-
-class VarnishPurger(pgq.Consumer):
- """
- pgq consumer that purges URLs from varnish as they appear in the queue.
- """
-
- def __init__(self, frontend):
- self.frontend = frontend
- super(VarnishPurger, self).__init__('varnish_purger_%s' % frontend, 'db', ['varnish_pgq.ini'])
-
- def process_batch(self, db, batch_id, ev_list):
- "Called by pgq for each batch of events to run."
-
- for ev in ev_list:
- if ev.type == 'P':
- # 'P' events means purge.
- print_t("Purging '%s' on %s" % (ev.data, self.frontend))
- try:
- if self.do_purge(ev.data):
- ev.tag_done()
- except Exception, e:
- print_t("Failed to purge '%s' on '%s': %s" % (ev.data, self.frontend, e))
- elif ev.type == 'X':
- # 'X' events means ban expression (rather than just urls)
- print_t("Purging expression '%s' on %s" % (ev.data, self.frontend))
- try:
- if self.do_purge_expr(ev.data):
- ev.tag_done()
- except Exception, e:
- print_t("Failed to purge expression '%s' on '%s': %s" % (ev.data, self.frontend, e))
- else:
- print_t("Unknown event type '%s'" % ev.type)
-
-
- def internal_purge(self, headers):
- """
- Send the actual purge request, by contacting the frontend this
- purger is running for and sending a GET request to the special URL
- with our regexp in a special header.
- """
- conn = httplib.HTTPConnection('%s.postgresql.org' % self.frontend)
- conn.request("GET", "/varnish-purge-url", '', headers)
- resp = conn.getresponse()
- conn.close()
- if resp.status == 200:
- return True
-
- print_t("Varnish purge returned status %s (%s)" % (resp.status, resp.reason))
- return False
-
- def do_purge(self, url):
- return self.internal_purge({'X-Purge-URL': url})
-
- def do_purge_expr(self, expr):
- return self.internal_purge({'X-Purge-Expr': expr})
-
-class PurgerProcess(object):
- """
- Wrapper class that represents a subprocess that runs a varnish purger.
- """
- def __init__(self, frontend):
- self.frontend = frontend
- self.start()
-
- def start(self):
- self.process = Process(target=self._run, name=frontend)
- self.process.start()
-
- def _run(self):
- # NOTE! This runs in the client! Must *NOT* be called from the
- # parent process!
-
- # Start by turning off signals so we don't try to restart ourselves
- # and others, entering into possible infinite recursion.
- signal.signal(signal.SIGTERM, signal.SIG_DFL)
- signal.signal(signal.SIGQUIT, signal.SIG_DFL)
- signal.signal(signal.SIGHUP, signal.SIG_DFL)
-
- # Start and run the consumer
- print_t("Initiating run of %s" % self.frontend)
- self.purger = VarnishPurger(frontend)
- self.purger.start()
-
- def validate(self):
- """
- Validate that the process is running. If it's no longer running,
- try starting a new one.
- """
- if not self.process.is_alive():
- # Ugh!
- print_t("Process for '%s' died!" % self.frontend)
- self.process.join()
- print_t("Attempting restart of '%s'!" % self.frontend)
- self.start()
-
- def terminate(self):
- """
- Terminate the process running this purger.
- """
- print_t("Terminating process for '%s'" % self.frontend)
- self.process.terminate()
- self.process.join()
-
-
-# We need to keep the list of purgers in a global variable, so we can kill
-# them off from the signal handler.
-global purgers
-purgers = []
-
-def sighandler(signal, frame):
- print_t("Received terminating signal, shutting down subprocesses")
- for p in purgers:
- p.terminate()
- sys.exit(0)
-
-
-if __name__=="__main__":
- cp = ConfigParser()
- cp.read('varnish_pgq.ini')
-
- if len(sys.argv) > 2 and sys.argv[1] == "-logfile":
- # Redirect to whatever is in sys.argv[2]
- # (yes, this is a really ugly way of doing things..)
- f = open(sys.argv[2], "a", 0)
- sys.stdout = f
-
- # Trap signals that shut us down, so we can kill off our subprocesses
- # before we die.
- signal.signal(signal.SIGTERM, sighandler)
- signal.signal(signal.SIGQUIT, sighandler)
- signal.signal(signal.SIGHUP, sighandler)
-
- # Start one process for each of the configured purgers
- for frontend in [section[15:] for section in cp.sections() if section[:15] == 'varnish_purger_']:
- purgers.append(PurgerProcess(frontend))
-
- # Loop forever, restarting any worker process that has potentially died
- while True:
- for p in purgers:
- p.validate()
- time.sleep(10)
diff --git a/tools/pgq/varnish_pgq.ini b/tools/pgq/varnish_pgq.ini
deleted file mode 100644
index 16fc7040..00000000
--- a/tools/pgq/varnish_pgq.ini
+++ /dev/null
@@ -1,20 +0,0 @@
-# Global settings for pgq
-[pgqadm]
-job_name = varnish_ticker
-db = dbname=pgweb user=pgq
-logfile = /var/log/pgweb-pgq.log
-pidfile = /var/run/pgweb-pgq.pid
-
-# We have one consumer for each frontend server,
-# all talking to the same queue and database.
-[varnish_purger_zakdorn]
-pgq_queue_name = varnish
-db = dbname=pgweb user=pgq
-
-[varnish_purger_zalkon]
-pgq_queue_name = varnish
-db = dbname=pgweb user=pgq
-
-[varnish_purger_zetar]
-pgq_queue_name = varnish
-db = dbname=pgweb user=pgq
diff --git a/tools/varnishqueue/nagios_check.py b/tools/varnishqueue/nagios_check.py
new file mode 100755
index 00000000..642996d5
--- /dev/null
+++ b/tools/varnishqueue/nagios_check.py
@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+
+import sys
+import psycopg2
+from datetime import timedelta
+
+# Up to 5 minutes delay is ok
+WARNING_THRESHOLD=timedelta(minutes=5)
+# More than 15 minutes something is definitely wrong
+CRITICAL_THRESHOLD=timedelta(minutes=15)
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print "Usage: nagios_check.py <dsn>"
+ sys.exit(1)
+
+ conn = psycopg2.connect(sys.argv[1])
+ curs = conn.cursor()
+
+ # Get the oldest entry that has not been completed, if any
+ curs.execute("SELECT max(now()-added) FROM varnishqueue.queue WHERE completed IS NULL")
+ rows = curs.fetchall()
+ conn.close()
+
+ if len(rows) == 0:
+ print "OK, queue is empty"
+ sys.exit(0)
+
+ age = rows[0][0]
+
+ if age < WARNING_THRESHOLD:
+ print "OK, queue age is %s" % age
+ sys.exit(0)
+ elif age < CRITICAL_THRESHOLD:
+ print "WARNING, queue age is %s" % age
+ sys.exit(1)
+ else:
+ print "CRITICAL, queue age is %s" % age
+ sys.exit(2)
diff --git a/tools/varnishqueue/varnish_queue.py b/tools/varnishqueue/varnish_queue.py
new file mode 100755
index 00000000..2f6921e4
--- /dev/null
+++ b/tools/varnishqueue/varnish_queue.py
@@ -0,0 +1,145 @@
+#!/usr/bin/env python
+#
+# varnish_queue.py - handle varnish purging queues
+#
+# Spawns a worker for each of the varnish servers, each will drain
+# it's own queue as quickly as it can when told to do so by a notify.
+#
+
+import time
+import sys
+import select
+import httplib
+import multiprocessing
+import logging
+import psycopg2
+from setproctitle import setproctitle
+
+def do_purge(consumername, headers):
+ try:
+ conn = httplib.HTTPConnection('%s.postgresql.org' % consumername)
+ conn.request("GET", "/varnish-purge-url", '', headers)
+ resp = conn.getresponse()
+ conn.close()
+ if resp.status == 200:
+ return True
+ logging.warning("Varnish purge on %s returned status %s (%s)" % (consumername, resp.status, resp.reason))
+ return False
+ except Exception, ex:
+ logging.error("Exception purging on %s: %s" % (consumername, ex))
+ return False
+ return True
+
+def worker(consumerid, consumername, dsn):
+ logging.info("Starting worker for %s" % consumername)
+ setproctitle("varnish_queue - worker for %s" % consumername)
+
+ conn = psycopg2.connect(dsn)
+ curs = conn.cursor()
+ curs.execute("LISTEN varnishqueue")
+ conn.commit()
+
+ while True:
+ # See if there is something to pick up off the queue
+ curs.execute("SELECT id, mode, expr FROM varnishqueue.queue WHERE consumerid=%(consumerid)s AND completed IS NULL FOR UPDATE", {
+ 'consumerid': consumerid,
+ })
+ res = curs.fetchall()
+
+ failed = False
+
+ if len(res):
+ idlist = []
+ for r in res:
+ # Do something with this entry...
+ if r[1] == 'P':
+ logging.info("Purging url %s on %s" % (r[2], consumername))
+ if not do_purge(consumername, {'X-Purge-URL': r[2]}):
+ # Failed, but we will try again, so don't add to list of removals
+ failed = True
+ continue
+ elif r[1] == 'X':
+ logging.info("Purging expression %s on %s" % (r[2], consumername))
+ if not do_purge(consumername, {'X-Purge-Expr': r[2]}):
+ failed = True
+ continue
+ else:
+ logging.warning("Unknown purge type %s on %s, ignoring." % (r[1], consumername))
+
+ # Schedule for removal
+ idlist.append(r[0])
+
+ # Then remove from queue
+ curs.execute("UPDATE varnishqueue.queue SET completed=CURRENT_TIMESTAMP WHERE id=ANY(%(idlist)s)", {
+ 'idlist': idlist
+ })
+ conn.commit()
+ if failed:
+ time.sleep(5)
+ else:
+ # Nothing, so roll back the transaction and wait
+ conn.rollback()
+
+ select.select([conn],[],[],5*60)
+ conn.poll()
+ while conn.notifies:
+ conn.notifies.pop()
+ # Loop back up and process the full queue
+
+
+def housekeeper(dsn):
+ logging.info("Starting housekeeper")
+ setproctitle("varnish_queue - housekeeper")
+ conn = psycopg2.connect(dsn)
+ curs = conn.cursor()
+
+ while True:
+ curs.execute("DELETE FROM varnishqueue.queue WHERE completed IS NOT NULL")
+ if curs.rowcount > 0:
+ conn.commit()
+ else:
+ conn.rollback()
+ time.sleep(5*60)
+
+
+if __name__ == "__main__":
+ if len(sys.argv) != 2:
+ print "Usage: varnish_queue.py <dsn>"
+ sys.exit(1)
+
+ logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', level=logging.INFO)
+
+ conn = psycopg2.connect(sys.argv[1])
+
+ curs = conn.cursor()
+ curs.execute("SELECT consumerid, consumer FROM varnishqueue.consumers")
+ consumers = curs.fetchall()
+ conn.close()
+
+ # Now spawn a worker for each
+ processes = []
+ for consumerid, consumername in consumers:
+ p = multiprocessing.Process(target=worker, args=(consumerid, consumername, sys.argv[1]))
+ p.start()
+ processes.append(p)
+
+ # Start a housekeeping process as well
+ p = multiprocessing.Process(target=housekeeper, args=(sys.argv[1],))
+ p.start()
+ processes.append(p)
+
+ # They should never die, but if they do, commit suicide and
+ # restart everything.
+ while True:
+ processes[0].join(timeout=120)
+ for p in processes:
+ if not p.is_alive():
+ logging.warning("Child process died, killing all and existing")
+ for p2 in processes:
+ try:
+ p2.terminate()
+ except:
+ pass
+ logging.error("Children killed, existing")
+ sys.exit(1)
+ # If all processes are alive, loop back up and try again