admin: only add link if function returns not None
This commit is contained in:
parent
4d5f688a44
commit
0c9a00acbe
|
@ -18,9 +18,14 @@ def get_links(model_instance):
|
|||
LOGGER.warning("Model %s is not instance of Model", model_instance)
|
||||
return links
|
||||
|
||||
try:
|
||||
for name, method in inspect.getmembers(model_instance, predicate=inspect.ismethod):
|
||||
if name.startswith(prefix):
|
||||
human_name = name.replace(prefix, '').replace('_', ' ').capitalize()
|
||||
links[human_name] = method()
|
||||
link = method()
|
||||
if link:
|
||||
links[human_name] = link
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
return links
|
||||
|
|
|
@ -2,3 +2,4 @@ beautifulsoup4>=4.6.0
|
|||
lxml>=3.8.0
|
||||
signxml
|
||||
defusedxml
|
||||
PyCryptodome
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
"""passbook SAML IDP Views"""
|
||||
from logging import getLogger
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import logout
|
||||
from django.contrib.auth.mixins import LoginRequiredMixin
|
||||
from django.core.exceptions import ValidationError
|
||||
|
@ -9,33 +10,34 @@ from django.http import HttpResponse, HttpResponseBadRequest
|
|||
from django.shortcuts import get_object_or_404, redirect, render, reverse
|
||||
from django.utils.datastructures import MultiValueDictKeyError
|
||||
from django.views import View
|
||||
from saml2 import BINDING_HTTP_POST
|
||||
from saml2.authn_context import PASSWORD, AuthnBroker, authn_context_class_ref
|
||||
from saml2.config import IdPConfig
|
||||
from saml2.ident import NameID
|
||||
from saml2.metadata import entity_descriptor
|
||||
from saml2.s_utils import UnknownPrincipal, UnsupportedBinding
|
||||
from saml2.saml import NAMEID_FORMAT_EMAILADDRESS, NAMEID_FORMAT_UNSPECIFIED
|
||||
from saml2.server import Server
|
||||
from signxml.util import strip_pem_header
|
||||
|
||||
from passbook.core.models import Application
|
||||
from passbook.lib.config import CONFIG
|
||||
# from django.utils.html import escape
|
||||
# from django.utils.translation import ugettext as _
|
||||
from passbook.lib.mixins import CSRFExemptMixin
|
||||
# from passbook.core.models import Event, Setting, UserAcquirableRelationship
|
||||
from passbook.lib.utils.template import render_to_string
|
||||
# from passbook.core.views.common import ErrorResponseView
|
||||
# from passbook.core.views.settings import GenericSettingView
|
||||
from passbook.saml_idp import exceptions, registry
|
||||
from passbook.saml_idp import exceptions
|
||||
from passbook.saml_idp.models import SAMLProvider
|
||||
|
||||
# from OpenSSL.crypto import FILETYPE_PEM
|
||||
# from OpenSSL.crypto import Error as CryptoError
|
||||
# from OpenSSL.crypto import load_certificate
|
||||
|
||||
|
||||
LOGGER = getLogger(__name__)
|
||||
URL_VALIDATOR = URLValidator(schemes=('http', 'https'))
|
||||
|
||||
|
||||
def _generate_response(request, processor, remote):
|
||||
"""Generate a SAML response using processor and return it in the proper Django
|
||||
def _generate_response(request, provider: SAMLProvider):
|
||||
"""Generate a SAML response using processor_instance and return it in the proper Django
|
||||
response."""
|
||||
try:
|
||||
ctx = processor.generate_response()
|
||||
ctx['remote'] = remote
|
||||
ctx = provider.processor.generate_response()
|
||||
ctx['remote'] = provider
|
||||
ctx['is_login'] = True
|
||||
except exceptions.UserNotAuthorized:
|
||||
return render(request, 'saml/idp/invalid_user.html')
|
||||
|
||||
|
@ -47,11 +49,23 @@ def render_xml(request, template, ctx):
|
|||
return render(request, template, context=ctx, content_type="application/xml")
|
||||
|
||||
|
||||
class ProviderMixin:
|
||||
|
||||
_provider = None
|
||||
|
||||
@property
|
||||
def provider(self):
|
||||
if not self._provider:
|
||||
application = get_object_or_404(Application, slug=self.kwargs['application'])
|
||||
self._provider = get_object_or_404(SAMLProvider, pk=application.provider_id)
|
||||
return self._provider
|
||||
|
||||
|
||||
class LoginBeginView(CSRFExemptMixin, View):
|
||||
"""Receives a SAML 2.0 AuthnRequest from a Service Provider and
|
||||
stores it in the session prior to enforcing login."""
|
||||
|
||||
def dispatch(self, request):
|
||||
def dispatch(self, request, application):
|
||||
if request.method == 'POST':
|
||||
source = request.POST
|
||||
else:
|
||||
|
@ -64,7 +78,9 @@ class LoginBeginView(CSRFExemptMixin, View):
|
|||
return HttpResponseBadRequest('the SAML request payload is missing')
|
||||
|
||||
request.session['RelayState'] = source.get('RelayState', '')
|
||||
return redirect(reverse('passbook_saml_idp:saml_login_process'))
|
||||
return redirect(reverse('passbook_saml_idp:saml_login_process'), kwargs={
|
||||
'application': application
|
||||
})
|
||||
|
||||
|
||||
class RedirectToSPView(View):
|
||||
|
@ -81,35 +97,18 @@ class RedirectToSPView(View):
|
|||
})
|
||||
|
||||
|
||||
class LoginProcessView(View):
|
||||
class LoginProcessView(ProviderMixin, View):
|
||||
"""Processor-based login continuation.
|
||||
Presents a SAML 2.0 Assertion for POSTing back to the Service Provider."""
|
||||
|
||||
def dispatch(self, request):
|
||||
def dispatch(self, request, application):
|
||||
LOGGER.debug("Request: %s", request)
|
||||
proc, provider = registry.find_processor(request)
|
||||
# Check if user has access
|
||||
access = True
|
||||
# if provider.productextensionsaml2_set.exists() and \
|
||||
# provider.productextensionsaml2_set.first().product_set.exists():
|
||||
# # Only check if there is a connection from OAuth2 Application to product
|
||||
# product = provider.productextensionsaml2_set.first().product_set.first()
|
||||
# relationship = UserAcquirableRelationship.objects.
|
||||
# filter(user=request.user, model=product)
|
||||
# # Product is invitation_only = True and no relation with user exists
|
||||
# if product.invitation_only and not relationship.exists():
|
||||
# access = False
|
||||
# Check if we should just autosubmit
|
||||
if provider.skip_authorization and access:
|
||||
# full_res = _generate_response(request, proc, provider)
|
||||
ctx = proc.generate_response()
|
||||
# User accepted request
|
||||
# Event.create(
|
||||
# user=request.user,
|
||||
# message=_('You authenticated %s (via SAML) (skipped Authz)' % provider.name),
|
||||
# request=request,
|
||||
# current=False,
|
||||
# hidden=True)
|
||||
# TODO: Check access here
|
||||
if self.provider.skip_authorization and access:
|
||||
ctx = self.provider.processor.generate_response()
|
||||
# TODO: AuditLog Skipped Authz
|
||||
return RedirectToSPView.as_view()(
|
||||
request=request,
|
||||
acs_url=ctx['acs_url'],
|
||||
|
@ -117,27 +116,17 @@ class LoginProcessView(View):
|
|||
relay_state=ctx['relay_state'])
|
||||
if request.method == 'POST' and request.POST.get('ACSUrl', None) and access:
|
||||
# User accepted request
|
||||
# Event.create(
|
||||
# user=request.user,
|
||||
# message=_('You authenticated %s (via SAML)' % provider.name),
|
||||
# request=request,
|
||||
# current=False,
|
||||
# hidden=True)
|
||||
# TODO: AuditLog accepted
|
||||
return RedirectToSPView.as_view()(
|
||||
request=request,
|
||||
acs_url=request.POST.get('ACSUrl'),
|
||||
saml_response=request.POST.get('SAMLResponse'),
|
||||
relay_state=request.POST.get('RelayState'))
|
||||
try:
|
||||
full_res = _generate_response(request, proc, provider)
|
||||
# if not access:
|
||||
# LOGGER.warning("User '%s' has no invitation to '%s'", request.user, product)
|
||||
# messages.error(request, "You have no access to '%s'" % product.name)
|
||||
# raise Http404
|
||||
full_res = _generate_response(request, provider)
|
||||
return full_res
|
||||
except exceptions.CannotHandleAssertion as exc:
|
||||
LOGGER.debug(exc)
|
||||
# return ErrorResponseView.as_view()(request, str(exc))
|
||||
|
||||
|
||||
class LogoutView(CSRFExemptMixin, View):
|
||||
|
@ -177,17 +166,34 @@ class SLOLogout(CSRFExemptMixin, LoginRequiredMixin, View):
|
|||
logout(request)
|
||||
return render(request, 'saml/idp/logged_out.html')
|
||||
|
||||
class IdPMixin(ProviderMixin):
|
||||
|
||||
class DescriptorDownloadView(View):
|
||||
provider = None
|
||||
|
||||
def dispatch(self, request, application):
|
||||
|
||||
def get_identity(self, provider, user):
|
||||
""" Create Identity dict (using SP-specific mapping)
|
||||
"""
|
||||
sp_mapping = {'username': 'username'}
|
||||
# return provider.processor.create_identity(user, sp_mapping)
|
||||
return {
|
||||
out_attr: getattr(user, user_attr)
|
||||
for user_attr, out_attr in sp_mapping.items()
|
||||
if hasattr(user, user_attr)
|
||||
}
|
||||
|
||||
|
||||
class DescriptorDownloadView(ProviderMixin, View):
|
||||
"""Replies with the XML Metadata IDSSODescriptor."""
|
||||
|
||||
def get(self, request, application_id):
|
||||
def get(self, request, application):
|
||||
"""Replies with the XML Metadata IDSSODescriptor."""
|
||||
application = get_object_or_404(SAMLProvider, pk=application_id)
|
||||
super().dispatch(request, application)
|
||||
entity_id = CONFIG.y('saml_idp.issuer')
|
||||
slo_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_logout'))
|
||||
sso_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_login_begin'))
|
||||
pubkey = application.signing_cert
|
||||
pubkey = strip_pem_header(self.provider.signing_cert.replace('\r', '')).replace('\n', '')
|
||||
ctx = {
|
||||
'entity_id': entity_id,
|
||||
'cert_public_key': pubkey,
|
||||
|
@ -196,25 +202,24 @@ class DescriptorDownloadView(View):
|
|||
}
|
||||
metadata = render_to_string('saml/xml/metadata.xml', ctx)
|
||||
response = HttpResponse(metadata, content_type='application/xml')
|
||||
response['Content-Disposition'] = 'attachment; filename="passbook_metadata.xml'
|
||||
response['Content-Disposition'] = ('attachment; filename="'
|
||||
'%s_passbook_meta.xml"' % self.provider.name)
|
||||
return response
|
||||
|
||||
|
||||
# class IDPSettingsView(GenericSettingView):
|
||||
# """IDP Settings"""
|
||||
class LoginInitView(IdPMixin, LoginRequiredMixin, View):
|
||||
|
||||
# form = IDPSettingsForm
|
||||
# template_name = 'saml/idp/settings.html'
|
||||
def dispatch(self, request, application):
|
||||
"""Initiates an IdP-initiated link to a simple SP resource/target URL."""
|
||||
super().dispatch(request, application)
|
||||
|
||||
# def dispatch(self, request, *args, **kwargs):
|
||||
# self.extra_data['metadata'] = escape(descriptor(request).content.decode('utf-8'))
|
||||
|
||||
# # Show the certificate fingerprint
|
||||
# sha1_fingerprint = _('<failed to parse certificate>')
|
||||
# try:
|
||||
# cert = load_certificate(FILETYPE_PEM, CONFIG.y('saml_idp.certificate'))
|
||||
# sha1_fingerprint = cert.digest("sha1")
|
||||
# except CryptoError:
|
||||
# pass
|
||||
# self.extra_data['fingerprint'] = sha1_fingerprint
|
||||
# return super().dispatch(request, *args, **kwargs)
|
||||
# # linkdict = dict(metadata.get_links(sp_config))
|
||||
# # pattern = linkdict[resource]
|
||||
# # is_simple_link = ('/' not in resource)
|
||||
# # if is_simple_link:
|
||||
# # simple_target = kwargs['target']
|
||||
# # url = pattern % simple_target
|
||||
# # else:
|
||||
# # url = pattern % kwargs
|
||||
# provider.processor.init_deep_link(request, 'deep url')
|
||||
# return _generate_response(request, provider)
|
||||
|
|
Reference in New Issue