summaryrefslogtreecommitdiff
path: root/pgweb/util/middleware.py
blob: 288684597108dedc533d98bdb44e938fa22861de (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from django.conf import settings
from django.http import QueryDict, HttpResponse

from pgweb.util.templateloader import initialize_template_collection, get_all_templates

from collections import OrderedDict
import hashlib

# Use thread local storage to pass the username down.
# http://code.djangoproject.com/wiki/CookBookThreadlocalsAndUser
try:
    from threading import local
except ImportError:
    from django.utils._threading_local import local

_thread_locals = local()


def get_current_user():
    return getattr(_thread_locals, 'user', None)


# General middleware for all middleware functionality specific to the pgweb
# project.
class PgMiddleware(object):
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        # Thread local store for username, see comment at the top of this file
        _thread_locals.user = getattr(request, 'user', None)
        initialize_template_collection()

        # Call the view
        response = self.get_response(request)

        # Set xkey representing the templates that are in use so we can do efficient
        # varnish purging on commits.
        tlist = get_all_templates()
        if 'base/esi.html' in tlist:
            response['x-do-esi'] = "1"
            tlist.remove('base/esi.html')
        if tlist:
            response['xkey'] = ' '.join(["pgwt_{0}".format(hashlib.md5(t.encode('ascii', errors='replace')).hexdigest()) for t in tlist] + [response.get('xkey', '')])

        # Set security headers
        sources = OrderedDict([
            ('default', ["'self'", ]),
            ('img', ['*', 'data:', ]),
            ('script', ["'unsafe-eval'", "'self'", "www.google-analytics.com", "ssl.google-analytics.com", "www.googletagmanager.com", "tagmanager.google.com", "data:"]),
            ('connect', ["'self'", "www.google-analytics.com", "ssl.google-analytics.com"]),
            ('media', ["'self'", ]),
            ('style', ["'self'", "fonts.googleapis.com", "tagmanager.google.com"]),
            ('font', ["'self'", "fonts.gstatic.com", "data:", ]),
        ])
        if hasattr(response, 'x_allow_extra_sources'):
            for k, v in list(response.x_allow_extra_sources.items()):
                if k in sources:
                    sources[k].extend(v)
                else:
                    sources[k] = v

        security_policies = ["{0}-src {1}".format(k, " ".join(v)) for k, v in list(sources.items())]

        if not getattr(response, 'x_allow_frames', False):
            response['X-Frame-Options'] = 'DENY'
            security_policies.append("frame-ancestors 'none'")

        if hasattr(settings, 'SECURITY_POLICY_REPORT_URI'):
            security_policies.append("report-uri " + settings.SECURITY_POLICY_REPORT_URI)

        if security_policies:
            if getattr(settings, 'SECURITY_POLICY_REPORT_ONLY', False):
                response['Content-Security-Policy-Report-Only'] = " ; ".join(security_policies)
            else:
                response['Content-Security-Policy'] = " ; ".join(security_policies)

        response['X-XSS-Protection'] = "1; mode=block"
        return response

    def process_view(self, request, view_func, view_args, view_kwargs):
        # Filter out any query parameters that are not explicitly allowed. We do the same thing in Varnish,
        # and it's better to also do it in django if they show up here, so issues because of it are caught
        # in local testing where there is no Varnish.
        if not request.GET:
            # If there are no parameters, just skip this whole process
            return None

        if request.path.startswith('/admin/'):
            # django-admin uses it a lot and it's not for us to change
            return None

        if settings.DEBUG_TOOLBAR and request.path.startswith('/__debug__/'):
            # The debug toolbar also uses a lot
            return None

        allowed = getattr(view_func, 'queryparams', None)

        if allowed:
            # Filter the QueryDict for only the allowed parameters
            result = request.GET.copy()
            for k in request.GET.keys():
                if k not in allowed:
                    del result[k]
                if "\0" in request.GET[k]:
                    return HttpResponse(
                        "NUL escapes not allowed in query parameters",
                        content_type='text/plain',
                        status=400
                    )
            result.mutable = False
            request.GET = result
        else:
            request.GET = QueryDict()
        return None