import copy from urllib.parse import parse_qs from django import forms from django.contrib import admin, messages from django.urls import reverse from django.db.models import F, Count, Value as V from django.db.models.functions import Concat from django.utils.html import format_html, format_html_join from django.utils.safestring import mark_safe from django.utils.translation import gettext_lazy as _ from orchestra.admin import ExtendedModelAdmin, ChangePasswordAdminMixin from orchestra.admin.actions import disable, enable from orchestra.admin.utils import admin_link, change_url from orchestra.contrib.accounts.actions import list_accounts from orchestra.contrib.accounts.admin import SelectAccountAdminMixin from orchestra.contrib.accounts.filters import IsActiveListFilter from orchestra.core import caches from . import settings from .actions import SendMailboxEmail, SendAddressEmail from .filters import HasMailboxListFilter, HasForwardListFilter, HasAddressListFilter, HasTipeServerFilter from .forms import MailboxCreationForm, MailboxChangeForm, AddressForm from .models import Mailbox, Address, Autoresponse from .widgets import OpenCustomFilteringOnSelect class AutoresponseInline(admin.StackedInline): model = Autoresponse verbose_name_plural = _("autoresponse") def formfield_for_dbfield(self, db_field, **kwargs): if db_field.name == 'subject': kwargs['widget'] = forms.TextInput(attrs={'size':'118'}) return super(AutoresponseInline, self).formfield_for_dbfield(db_field, **kwargs) class MailboxAdmin(ChangePasswordAdminMixin, SelectAccountAdminMixin, ExtendedModelAdmin): list_display = ( 'name', 'account_link', 'display_filtering', 'display_addresses', 'display_active', ) list_filter = (IsActiveListFilter, HasAddressListFilter, 'filtering', HasTipeServerFilter) search_fields = ( 'account__username', 'account__short_name', 'account__full_name', 'name', 'addresses__name', 'addresses__domain__name', ) add_fieldsets = ( (None, { 'fields': ('account_link', 'name', 'password1', 'password2', 'filtering'), }), (_("Custom filtering"), { 'classes': ('collapse',), 'description': _("Please remember to select custom filtering " "if you want this filter to be applied."), 'fields': ('custom_filtering',), }), (_("Addresses"), { 'fields': ('addresses',) }), ) fieldsets = ( (None, { 'fields': ('name', 'password', 'is_active', 'account_link', 'filtering'), }), (_("Custom filtering"), { 'classes': ('collapse',), 'fields': ('custom_filtering',), }), (_("Addresses"), { 'fields': ('addresses', 'display_forwards') }), ) readonly_fields = ('account_link', 'display_addresses', 'display_forwards') change_readonly_fields = ('name',) add_form = MailboxCreationForm form = MailboxChangeForm list_prefetch_related = ('addresses__domain',) actions = (disable, enable, list_accounts) def __init__(self, *args, **kwargs): super(MailboxAdmin, self).__init__(*args, **kwargs) if settings.MAILBOXES_LOCAL_DOMAIN: type(self).actions = self.actions + (SendMailboxEmail(),) @admin.display( description=_("Addresses") ) @mark_safe def display_addresses(self, mailbox): # Get from forwards cache = caches.get_request_cache() cached_forwards = cache.get('forwards') if cached_forwards is None: cached_forwards = {} qs = Address.objects.filter(forward__regex=r'(^|.*\s)[^@]+(\s.*|$)') qs = qs.annotate(email=Concat('name', V('@'), 'domain__name')) qs = qs.values_list('id', 'email', 'forward') for addr_id, email, mbox in qs: url = reverse('admin:mailboxes_address_change', args=(addr_id,)) link = format_html('{}', url, email) try: cached_forwards[mbox].append(link) except KeyError: cached_forwards[mbox] = [link] cache.set('forwards', cached_forwards) try: forwards = cached_forwards[mailbox.name] except KeyError: forwards = [] # Get from mailboxes addresses = [] for addr in mailbox.addresses.all(): url = change_url(addr) addresses.append(format_html('{}', url, addr.email)) return '
'.join(addresses+forwards) @admin.display( description=_("Forward from") ) def display_forwards(self, mailbox): forwards = mailbox.get_forwards() return format_html_join( '
', '{}', [(change_url(addr), addr.email) for addr in forwards] ) @admin.display( description=_("Filtering"), ordering='filtering', ) @mark_safe def display_filtering(self, mailbox): return mailbox.get_filtering_display() def formfield_for_dbfield(self, db_field, **kwargs): if db_field.name == 'filtering': kwargs['widget'] = OpenCustomFilteringOnSelect() return super(MailboxAdmin, self).formfield_for_dbfield(db_field, **kwargs) def get_fieldsets(self, request, obj=None): fieldsets = super(MailboxAdmin, self).get_fieldsets(request, obj) if obj and obj.filtering == obj.CUSTOM: # not collapsed filtering when exists fieldsets = copy.deepcopy(fieldsets) fieldsets[1][1]['classes'] = fieldsets[0][1]['fields'] + ('collapse', 'open',) elif '_to_field' in parse_qs(request.META['QUERY_STRING']): # remove address from popup fieldsets = list(copy.deepcopy(fieldsets)) fieldsets.pop(-1) return fieldsets def get_form(self, *args, **kwargs): form = super(MailboxAdmin, self).get_form(*args, **kwargs) form.modeladmin = self return form def get_search_results(self, request, queryset, search_term): # Remove local domain from the search term if present (implicit local addreƧ) search_term = search_term.replace('@'+settings.MAILBOXES_LOCAL_DOMAIN, '') # Split address name from domain in order to support address searching search_term = search_term.replace('@', ' ') return super(MailboxAdmin, self).get_search_results(request, queryset, search_term) def render_change_form(self, request, context, add=False, change=False, form_url='', obj=None): if not add: self.check_unrelated_address(request, obj) self.check_matching_address(request, obj) return super(MailboxAdmin, self).render_change_form( request, context, add, change, form_url, obj) def log_addition(self, request, object, *args, **kwargs): self.check_unrelated_address(request, object) self.check_matching_address(request, object) return super(MailboxAdmin, self).log_addition(request, object, *args, **kwargs) def check_matching_address(self, request, obj): local_domain = settings.MAILBOXES_LOCAL_DOMAIN if obj.name and local_domain: try: addr = Address.objects.get( name=obj.name, domain__name=local_domain, account_id=self.account.pk) except Address.DoesNotExist: pass else: if addr not in obj.addresses.all(): msg = _("Mailbox '%s' local address matches '%s', please consider if " "selecting it makes sense.") % (obj, addr) if msg not in (m.message for m in messages.get_messages(request)): self.message_user(request, msg, level=messages.WARNING) def check_unrelated_address(self, request, obj): # Check if there exists an unrelated local Address for this mbox local_domain = settings.MAILBOXES_LOCAL_DOMAIN if local_domain and obj.name: non_mbox_addresses = Address.objects.exclude(mailboxes__name=obj.name).exclude( forward__regex=r'.*(^|\s)+%s($|\s)+.*' % obj.name) try: addr = non_mbox_addresses.get(name=obj.name, domain__name=local_domain) except Address.DoesNotExist: pass else: url = reverse('admin:mailboxes_address_change', args=(addr.pk,)) msg = mark_safe( _("Address {addr} clashes with '{mailbox}' mailbox " "local address. Consider adding this mailbox to the address.").format( mailbox=obj.name, url=url, addr=addr) ) # Prevent duplication (add_view+continue) if msg not in (m.message for m in messages.get_messages(request)): self.message_user(request, msg, level=messages.WARNING) def save_model(self, request, obj, form, change): """ save hacky mailbox.addresses and local domain clashing """ if obj.filtering != obj.CUSTOM: msg = _("You have provided a custom filtering but filtering " "selected option is %s") % obj.get_filtering_display() if change: old = Mailbox.objects.get(pk=obj.pk) if old.custom_filtering != obj.custom_filtering: messages.warning(request, msg) # pass var to the backend so you don't change the password obj.changepass = None elif obj.custom_filtering: messages.warning(request, msg) super(MailboxAdmin, self).save_model(request, obj, form, change) obj.addresses.set(form.cleaned_data['addresses']) @admin.register(Address) class AddressAdmin(SelectAccountAdminMixin, ExtendedModelAdmin): list_display = ( 'display_email', 'account_link', 'domain_link', 'display_mailboxes', 'display_forward', ) list_filter = (HasMailboxListFilter, HasForwardListFilter) fields = ('account_link', 'email_link', 'mailboxes', 'forward', 'display_all_mailboxes') add_fields = ('account_link', ('name', 'domain'), 'mailboxes', 'forward') # inlines = [AutoresponseInline] search_fields = ( 'forward', 'mailboxes__name', 'account__username', 'computed_email', 'domain__name' ) readonly_fields = ('account_link', 'domain_link', 'email_link', 'display_all_mailboxes') actions = (SendAddressEmail(),) filter_by_account_fields = ('domain', 'mailboxes') filter_horizontal = ['mailboxes'] form = AddressForm list_prefetch_related = ('mailboxes', 'domain') domain_link = admin_link('domain', order='domain__name') @admin.display( description=_("Email"), ordering='computed_email', ) def display_email(self, address): return address.computed_email @admin.display( description=_("Email") ) def email_link(self, address): link = self.domain_link(address) return format_html("{}@{}", address.name, link) @admin.display( description=_("Mailboxes"), ordering='mailboxes__count', ) def display_mailboxes(self, address): boxes = address.mailboxes.all() return format_html_join( mark_safe('
'), '{}', [(change_url(mailbox), mailbox.name) for mailbox in boxes] ) @admin.display( description=_("Mailboxes links") ) def display_all_mailboxes(self, address): boxes = address.get_mailboxes() return format_html_join( mark_safe('
'), '{}', [(change_url(mailbox), mailbox.name) for mailbox in boxes] ) @admin.display( description=_("Forward"), ordering='forward', ) @mark_safe def display_forward(self, address): forward_mailboxes = {m.name: m for m in address.get_forward_mailboxes()} values = [] for forward in address.forward.split(): mbox = forward_mailboxes.get(forward) if mbox: values.append(admin_link()(mbox)) else: values.append(forward) return '
'.join(values) def formfield_for_dbfield(self, db_field, **kwargs): if db_field.name == 'forward': kwargs['widget'] = forms.TextInput(attrs={'size':'118'}) return super(AddressAdmin, self).formfield_for_dbfield(db_field, **kwargs) def get_fields(self, request, obj=None): """ Remove mailboxes field when creating address from a popup i.e. from mailbox add form """ fields = super(AddressAdmin, self).get_fields(request, obj) if '_to_field' in parse_qs(request.META['QUERY_STRING']): # Add address popup fields = list(fields) fields.remove('mailboxes') return fields def get_queryset(self, request): qs = super(AddressAdmin, self).get_queryset(request) qs = qs.annotate(computed_email=Concat(F('name'), V('@'), F('domain__name'))) return qs.annotate(Count('mailboxes')) def render_change_form(self, request, context, add=False, change=False, form_url='', obj=None): if not add: self.check_matching_mailbox(request, obj) return super(AddressAdmin, self).render_change_form( request, context, add, change, form_url, obj) def log_addition(self, request, object, *args, **kwargs): self.check_matching_mailbox(request, object) return super(AddressAdmin, self).log_addition(request, object, *args, **kwargs) def check_matching_mailbox(self, request, obj): # Check if new addresse matches with a mbox because of having a local domain if obj.name and obj.domain and obj.domain.name == settings.MAILBOXES_LOCAL_DOMAIN: if obj.name not in obj.forward.split() and Mailbox.objects.filter(name=obj.name).exists(): for mailbox in obj.mailboxes.all(): if mailbox.name == obj.name: return msg = _("Address '%s' matches mailbox '%s' local address, please consider " "if makes sense adding the mailbox on the mailboxes or forward field." ) % (obj, obj.name) if msg not in (m.message for m in messages.get_messages(request)): self.message_user(request, msg, level=messages.WARNING) admin.site.register(Mailbox, MailboxAdmin)