summaryrefslogtreecommitdiff
path: root/pgmailmgr/mailmgr/forms.py
blob: 0197e15bca9f7c601e7e93cf25ad25c6dfa8068a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
from django import forms
from django.forms import ValidationError
from django.db import connection
from django.contrib.auth.password_validation import validate_password, MinimumLengthValidator, UserAttributeSimilarityValidator
from django.contrib.auth.password_validation import CommonPasswordValidator, NumericPasswordValidator

from pgmailmgr.auth import user_search, user_import

from .models import *


class AccountEmailWidget(forms.TextInput):
    def __init__(self, account):
        self.account = account
        super().__init__()

    def get_context(self, name, value, attrs):
        c = super().get_context(name, value, attrs)
        c['widget']['value'] = self.account and self.account.email or ''
        c['widget']['attrs']['placeholder'] = 'Enter email address of a community account to connect to'
        return c


class password_validator:
    def __init__(self, mailbox):
        self.mailbox = mailbox

    def __call__(self, password):
        return validate_password(password, self.mailbox, password_validators=[
            MinimumLengthValidator(10),
            CommonPasswordValidator(),
            NumericPasswordValidator(),
            UserAttributeSimilarityValidator(user_attributes=['local_part', 'full_name']),
        ])


class PasswordChangeForm(forms.Form):
    password = forms.CharField(widget=forms.PasswordInput)
    verify_password = forms.CharField(widget=forms.PasswordInput)

    def __init__(self, *args, **kwargs):
        mailbox = kwargs.pop('mailbox')
        super().__init__(*args, **kwargs)

        self.fields['password'].validators = [password_validator(mailbox)]

    def clean(self):
        d = super().clean()
        if 'password' in d and d.get('password') != d.get('verify_password', None):
            self.add_error('verify_password', "Passwords don't match")
        return d


class VirtualUserForm(forms.ModelForm):
    class Meta:
        model = VirtualUser
        fields = ('local_domain', 'local_part', 'full_name', 'mail_quota', 'account', 'passwd')

    def __init__(self, data=None, instance=None, user=None):
        super(VirtualUserForm, self).__init__(data=data, instance=instance)
        self.user = user

        self.fields['passwd'].widget = forms.PasswordInput()
        self.fields['passwd'].widget.attrs['autocomplete'] = 'new-password'

        self.fields['account'] = forms.CharField(max_length=200, widget=AccountEmailWidget(instance.account), required=False)

        if not user.is_superuser:
            self.fields['local_domain'].queryset = LocalDomain.objects.only('domain_name').filter(userpermissions__user=user).distinct()
        if self.instance.pk:
            self.fields['local_domain'].disabled = True

        if not user.is_superuser:
            del self.fields['mail_quota']

    def clean_local_domain(self):
        if not self.instance.pk:
            return self.cleaned_data['local_domain']
        if self.cleaned_data['local_domain'] != self.instance.local_domain:
            raise ValidationError("Can't change local domain!")
        return self.cleaned_data['local_domain']

    def clean_local_part(self):
        if not self.instance.pk:
            return self.cleaned_data['local_part']
        if self.cleaned_data['local_part'] != self.instance.local_part:
            raise ValidationError("Renaming accounts is not possible - you have to delete and add!")
        return self.cleaned_data['local_part']

    def clean_mail_quota(self):
        if self.cleaned_data['mail_quota'] <= 1:
            raise ValidationError("Mail quota must be set")
        return self.cleaned_data['mail_quota']

    def clean_passwd(self):
        if self.cleaned_data['passwd'] != self.instance.passwd:
            # Changing password requires calling pgcrypto. So let's do that...
            curs = connection.cursor()
            curs.execute("SELECT public.crypt(%(pwd)s, public.gen_salt('md5'))", {
                'pwd': self.cleaned_data['passwd']
            })
            return curs.fetchall()[0][0]

        return self.cleaned_data['passwd']

    def clean_account(self):
        a = self.cleaned_data['account']
        if a == '':
            return None

        try:
            return User.objects.get(email=a)
        except User.DoesNotExist:
            # Import the user if we can
            users = user_search(searchterm=a)
            if len(users) == 1 and users[0]['e'] == a:
                return user_import(users[0]['u'])
        raise ValidationError("User not found")

    def clean(self):
        if 'local_part' not in self.cleaned_data:
            return {}
        if 'local_domain' not in self.cleaned_data:
            return {}

        if not self.user.is_superuser:
            # Validate that the pattern is allowed
            curs = connection.cursor()
            curs.execute("SELECT 1 FROM mailmgr_userpermissions WHERE user_id=%(uid)s AND domain_id=%(domain)s AND %(lp)s ~* ('^'||pattern||'$')", {
                'uid': self.user.pk,
                'domain': self.cleaned_data['local_domain'].pk,
                'lp': self.cleaned_data['local_part'],
            })
            perms = curs.fetchall()

            if len(perms) < 1:
                raise ValidationError("Permission denied to create that user for that domain!")

        # If it's a new user, also check against if it already exists
        if not self.instance.pk:
            old = VirtualUser.objects.filter(local_part=self.cleaned_data['local_part'], local_domain=self.cleaned_data['local_domain'])
            if len(old):
                raise ValidationError("A user with that name already exists in that domain!")

        # Make sure we can't get a collision with a forwarding
        forwarders = Forwarder.objects.filter(local_part=self.cleaned_data['local_part'], local_domain=self.cleaned_data['local_domain'])
        if len(forwarders):
            raise ValidationError("A forwarder with that name already exists in that domain!")

        return self.cleaned_data


class ForwarderForm(forms.ModelForm):
    class Meta:
        model = Forwarder
        fields = ('local_domain', 'local_part', 'remote_name')

    def __init__(self, data=None, instance=None, user=None):
        super(ForwarderForm, self).__init__(data=data, instance=instance)
        self.user = user

        if not user.is_superuser:
            self.fields['local_domain'].queryset = LocalDomain.objects.only('domain_name').filter(userpermissions__user=user).distinct()

    def clean_local_domain(self):
        if not self.instance.pk:
            return self.cleaned_data['local_domain']
        if self.cleaned_data['local_domain'] != self.instance.local_domain:
            raise ValidationError("Can't change local domain!")
        return self.cleaned_data['local_domain']

    def clean_local_part(self):
        if not self.instance.pk:
            return self.cleaned_data['local_part']
        if self.cleaned_data['local_part'] != self.instance.local_part:
            raise ValidationError("Renaming forwarders is not possible - you have to delete and add!")
        return self.cleaned_data['local_part']

    def clean(self):
        if 'local_part' not in self.cleaned_data:
            return {}
        if 'local_domain' not in self.cleaned_data:
            return {}

        if not self.user.is_superuser:
            # Validate that the pattern is allowed
            curs = connection.cursor()
            curs.execute("SELECT 1 FROM mailmgr_userpermissions WHERE user_id=%(uid)s AND domain_id=%(domain)s AND %(lp)s ~* ('^'||pattern||'$')", {
                'uid': self.user.pk,
                'domain': self.cleaned_data['local_domain'].pk,
                'lp': self.cleaned_data['local_part'],
            })
            perms = curs.fetchall()

            if len(perms) < 1:
                raise ValidationError("Permission denied to create that forwarder for that domain!")

        # If it's a new user, also check against if it already exists
        if not self.instance.pk:
            old = Forwarder.objects.filter(local_part=self.cleaned_data['local_part'], local_domain=self.cleaned_data['local_domain'])
            if len(old):
                raise ValidationError("A forwarder with that name already exists in that domain!")

        # Make sure we can't get a collision with a user
        users = VirtualUser.objects.filter(local_part=self.cleaned_data['local_part'], local_domain=self.cleaned_data['local_domain'])
        if len(users):
            raise ValidationError("A user with that name already exists in that domain!")

        return self.cleaned_data


class ConfirmForm(forms.Form):
    confirm = forms.CharField(max_length=100, required=True)

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.fields['confirm'].widget.attrs['placeholder'] = 'Please type "delete" to confirm the delete operation'
        self.fields['confirm'].widget.attrs['autocomplete'] = 'off'

    def clean_confirm(self):
        if self.cleaned_data['confirm'] != 'delete':
            raise ValidationError('Please type "delete" to confirm the delete operation')