from django import forms from django.forms import ValidationError from django.db import connection from django.contrib.auth.password_validation import validate_password, MinimumLengthValidator, UserAttributeSimilarityValidator from django.contrib.auth.password_validation import CommonPasswordValidator, NumericPasswordValidator from pgmailmgr.auth import user_search, user_import from .models import * class AccountEmailWidget(forms.TextInput): def __init__(self, account): self.account = account super().__init__() def get_context(self, name, value, attrs): c = super().get_context(name, value, attrs) c['widget']['value'] = self.account and self.account.email or '' c['widget']['attrs']['placeholder'] = 'Enter email address of a community account to connect to' return c class password_validator: def __init__(self, mailbox): self.mailbox = mailbox def __call__(self, password): return validate_password(password, self.mailbox, password_validators=[ MinimumLengthValidator(10), CommonPasswordValidator(), NumericPasswordValidator(), UserAttributeSimilarityValidator(user_attributes=['local_part', 'full_name']), ]) class PasswordChangeForm(forms.Form): password = forms.CharField(widget=forms.PasswordInput) verify_password = forms.CharField(widget=forms.PasswordInput) def __init__(self, *args, **kwargs): mailbox = kwargs.pop('mailbox') super().__init__(*args, **kwargs) self.fields['password'].validators = [password_validator(mailbox)] def clean(self): d = super().clean() if 'password' in d and d.get('password') != d.get('verify_password', None): self.add_error('verify_password', "Passwords don't match") return d class VirtualUserForm(forms.ModelForm): class Meta: model = VirtualUser fields = ('local_domain', 'local_part', 'full_name', 'mail_quota', 'account', 'passwd') def __init__(self, data=None, instance=None, user=None): super(VirtualUserForm, self).__init__(data=data, instance=instance) self.user = user self.fields['passwd'].widget = forms.PasswordInput() self.fields['passwd'].widget.attrs['autocomplete'] = 'new-password' self.fields['account'] = forms.CharField(max_length=200, widget=AccountEmailWidget(instance.account), required=False) if not user.is_superuser: self.fields['local_domain'].queryset = LocalDomain.objects.only('domain_name').filter(userpermissions__user=user).distinct() if self.instance.pk: self.fields['local_domain'].disabled = True if not user.is_superuser: del self.fields['mail_quota'] def clean_local_domain(self): if not self.instance.pk: return self.cleaned_data['local_domain'] if self.cleaned_data['local_domain'] != self.instance.local_domain: raise ValidationError("Can't change local domain!") return self.cleaned_data['local_domain'] def clean_local_part(self): if not self.instance.pk: return self.cleaned_data['local_part'] if self.cleaned_data['local_part'] != self.instance.local_part: raise ValidationError("Renaming accounts is not possible - you have to delete and add!") return self.cleaned_data['local_part'] def clean_mail_quota(self): if self.cleaned_data['mail_quota'] <= 1: raise ValidationError("Mail quota must be set") return self.cleaned_data['mail_quota'] def clean_passwd(self): if self.cleaned_data['passwd'] != self.instance.passwd: # Changing password requires calling pgcrypto. So let's do that... curs = connection.cursor() curs.execute("SELECT public.crypt(%(pwd)s, public.gen_salt('md5'))", { 'pwd': self.cleaned_data['passwd'] }) return curs.fetchall()[0][0] return self.cleaned_data['passwd'] def clean_account(self): a = self.cleaned_data['account'] if a == '': return None try: return User.objects.get(email=a) except User.DoesNotExist: # Import the user if we can users = user_search(searchterm=a) if len(users) == 1 and users[0]['e'] == a: return user_import(users[0]['u']) raise ValidationError("User not found") def clean(self): if 'local_part' not in self.cleaned_data: return {} if 'local_domain' not in self.cleaned_data: return {} if not self.user.is_superuser: # Validate that the pattern is allowed curs = connection.cursor() curs.execute("SELECT 1 FROM mailmgr_userpermissions WHERE user_id=%(uid)s AND domain_id=%(domain)s AND %(lp)s ~* ('^'||pattern||'$')", { 'uid': self.user.pk, 'domain': self.cleaned_data['local_domain'].pk, 'lp': self.cleaned_data['local_part'], }) perms = curs.fetchall() if len(perms) < 1: raise ValidationError("Permission denied to create that user for that domain!") # If it's a new user, also check against if it already exists if not self.instance.pk: old = VirtualUser.objects.filter(local_part=self.cleaned_data['local_part'], local_domain=self.cleaned_data['local_domain']) if len(old): raise ValidationError("A user with that name already exists in that domain!") # Make sure we can't get a collision with a forwarding forwarders = Forwarder.objects.filter(local_part=self.cleaned_data['local_part'], local_domain=self.cleaned_data['local_domain']) if len(forwarders): raise ValidationError("A forwarder with that name already exists in that domain!") return self.cleaned_data class ForwarderForm(forms.ModelForm): class Meta: model = Forwarder fields = ('local_domain', 'local_part', 'remote_name') def __init__(self, data=None, instance=None, user=None): super(ForwarderForm, self).__init__(data=data, instance=instance) self.user = user if not user.is_superuser: self.fields['local_domain'].queryset = LocalDomain.objects.only('domain_name').filter(userpermissions__user=user).distinct() def clean_local_domain(self): if not self.instance.pk: return self.cleaned_data['local_domain'] if self.cleaned_data['local_domain'] != self.instance.local_domain: raise ValidationError("Can't change local domain!") return self.cleaned_data['local_domain'] def clean_local_part(self): if not self.instance.pk: return self.cleaned_data['local_part'] if self.cleaned_data['local_part'] != self.instance.local_part: raise ValidationError("Renaming forwarders is not possible - you have to delete and add!") return self.cleaned_data['local_part'] def clean(self): if 'local_part' not in self.cleaned_data: return {} if 'local_domain' not in self.cleaned_data: return {} if not self.user.is_superuser: # Validate that the pattern is allowed curs = connection.cursor() curs.execute("SELECT 1 FROM mailmgr_userpermissions WHERE user_id=%(uid)s AND domain_id=%(domain)s AND %(lp)s ~* ('^'||pattern||'$')", { 'uid': self.user.pk, 'domain': self.cleaned_data['local_domain'].pk, 'lp': self.cleaned_data['local_part'], }) perms = curs.fetchall() if len(perms) < 1: raise ValidationError("Permission denied to create that forwarder for that domain!") # If it's a new user, also check against if it already exists if not self.instance.pk: old = Forwarder.objects.filter(local_part=self.cleaned_data['local_part'], local_domain=self.cleaned_data['local_domain']) if len(old): raise ValidationError("A forwarder with that name already exists in that domain!") # Make sure we can't get a collision with a user users = VirtualUser.objects.filter(local_part=self.cleaned_data['local_part'], local_domain=self.cleaned_data['local_domain']) if len(users): raise ValidationError("A user with that name already exists in that domain!") return self.cleaned_data class ConfirmForm(forms.Form): confirm = forms.CharField(max_length=100, required=True) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.fields['confirm'].widget.attrs['placeholder'] = 'Please type "delete" to confirm the delete operation' self.fields['confirm'].widget.attrs['autocomplete'] = 'off' def clean_confirm(self): if self.cleaned_data['confirm'] != 'delete': raise ValidationError('Please type "delete" to confirm the delete operation')