saml_idp: Rewrite to CBV

This commit is contained in:
Jens Langhammer 2018-12-16 17:09:26 +01:00
parent 7a62bf9829
commit 764282ea9e
No known key found for this signature in database
GPG Key ID: BEBC05297D92821B
4 changed files with 150 additions and 133 deletions

View File

@ -47,10 +47,6 @@ class SignUpForm(forms.Form):
widget=forms.PasswordInput(attrs={ widget=forms.PasswordInput(attrs={
'placeholder': _('Repeat Password') 'placeholder': _('Repeat Password')
})) }))
# captcha = ReCaptchaField(
# required=(not settings.DEBUG and not settings.TEST),
# private_key=Setting.get('recaptcha:private'),
# public_key=Setting.get('recaptcha:public'))
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -58,9 +54,6 @@ class SignUpForm(forms.Form):
if 'initial' in kwargs: if 'initial' in kwargs:
for field in kwargs.get('initial').keys(): for field in kwargs.get('initial').keys():
self.fields[field].widget.attrs['readonly'] = 'readonly' self.fields[field].widget.attrs['readonly'] = 'readonly'
# TODO: Dynamically add captcha here
# if not Setting.get_bool('recaptcha:enabled'):
# self.fields.pop('captcha')
def clean_username(self): def clean_username(self):
"""Check if username is used already""" """Check if username is used already"""

12
passbook/lib/mixins.py Normal file
View File

@ -0,0 +1,12 @@
"""passbook util mixins"""
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_exempt
class CSRFExemptMixin:
"""wrapper to apply @csrf_exempt to CBV"""
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
"""wrapper to apply @csrf_exempt to CBV"""
return super().dispatch(*args, **kwargs)

View File

@ -4,9 +4,8 @@ from django.conf.urls import url
from passbook.saml_idp import views from passbook.saml_idp import views
urlpatterns = [ urlpatterns = [
url(r'^login/$', views.login_begin, name="saml_login_begin"), url(r'^login/$', views.LoginBeginView.as_view(), name="saml_login_begin"),
url(r'^login/process/$', views.login_process, name='saml_login_process'), url(r'^login/process/$', views.LoginProcessView.as_view(), name='saml_login_process'),
url(r'^logout/$', views.logout, name="saml_logout"), url(r'^logout/$', views.LogoutView.as_view(), name="saml_logout"),
url(r'^metadata/xml/$', views.descriptor, name='metadata_xml'), url(r'^metadata/xml/$', views.DescriptorView.as_view(), name='metadata_xml'),
# url(r'^settings/$', views.IDPSettingsView.as_view(), name='admin_settings'),
] ]

View File

@ -1,25 +1,25 @@
"""passbook SAML IDP Views""" """passbook SAML IDP Views"""
from logging import getLogger from logging import getLogger
from django.contrib import auth from django.contrib.auth import logout
from django.contrib.auth.decorators import login_required from django.contrib.auth.mixins import LoginRequiredMixin
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError
from django.core.validators import URLValidator from django.core.validators import URLValidator
from django.http import (HttpResponse, HttpResponseBadRequest, from django.http import HttpResponse, HttpResponseBadRequest
HttpResponseRedirect) from django.shortcuts import get_object_or_404, redirect, render, reverse
from django.shortcuts import redirect, render
from django.urls import reverse
from django.utils.datastructures import MultiValueDictKeyError from django.utils.datastructures import MultiValueDictKeyError
# from django.utils.html import escape from django.views import View
# from django.utils.translation import ugettext as _
from django.views.decorators.csrf import csrf_exempt
from passbook.lib.config import CONFIG from passbook.lib.config import CONFIG
# from django.utils.html import escape
# from django.utils.translation import ugettext as _
from passbook.lib.mixins import CSRFExemptMixin
# from passbook.core.models import Event, Setting, UserAcquirableRelationship # from passbook.core.models import Event, Setting, UserAcquirableRelationship
from passbook.lib.utils.template import render_to_string from passbook.lib.utils.template import render_to_string
# from passbook.core.views.common import ErrorResponseView # from passbook.core.views.common import ErrorResponseView
# from passbook.core.views.settings import GenericSettingView # from passbook.core.views.settings import GenericSettingView
from passbook.saml_idp import exceptions, registry from passbook.saml_idp import exceptions, registry
from passbook.saml_idp.models import SAMLProvider
# from OpenSSL.crypto import FILETYPE_PEM # from OpenSSL.crypto import FILETYPE_PEM
# from OpenSSL.crypto import Error as CryptoError # from OpenSSL.crypto import Error as CryptoError
@ -47,144 +47,157 @@ def render_xml(request, template, ctx):
return render(request, template, context=ctx, content_type="application/xml") return render(request, template, context=ctx, content_type="application/xml")
@csrf_exempt class LoginBeginView(CSRFExemptMixin, View):
def login_begin(request):
"""Receives a SAML 2.0 AuthnRequest from a Service Provider and """Receives a SAML 2.0 AuthnRequest from a Service Provider and
stores it in the session prior to enforcing login.""" stores it in the session prior to enforcing login."""
if request.method == 'POST':
source = request.POST
else:
source = request.GET
# Store these values now, because Django's login cycle won't preserve them.
try: def dispatch(self, request):
request.session['SAMLRequest'] = source['SAMLRequest'] if request.method == 'POST':
except (KeyError, MultiValueDictKeyError): source = request.POST
return HttpResponseBadRequest('the SAML request payload is missing') else:
source = request.GET
# Store these values now, because Django's login cycle won't preserve them.
request.session['RelayState'] = source.get('RelayState', '') try:
return redirect(reverse('passbook_saml_idp:saml_login_process')) request.session['SAMLRequest'] = source['SAMLRequest']
except (KeyError, MultiValueDictKeyError):
return HttpResponseBadRequest('the SAML request payload is missing')
request.session['RelayState'] = source.get('RelayState', '')
return redirect(reverse('passbook_saml_idp:saml_login_process'))
def redirect_to_sp(request, acs_url, saml_response, relay_state): class RedirectToSPView(View):
"""Return autosubmit form""" """Return autosubmit form"""
return render(request, 'core/autosubmit_form.html', {
'url': acs_url, def get(self, request, acs_url, saml_response, relay_state):
'attrs': { """Return autosubmit form"""
'SAMLResponse': saml_response, return render(request, 'core/autosubmit_form.html', {
'RelayState': relay_state 'url': acs_url,
} 'attrs': {
}) 'SAMLResponse': saml_response,
'RelayState': relay_state
}
})
@login_required class LoginProcessView(View):
def login_process(request):
"""Processor-based login continuation. """Processor-based login continuation.
Presents a SAML 2.0 Assertion for POSTing back to the Service Provider.""" Presents a SAML 2.0 Assertion for POSTing back to the Service Provider."""
LOGGER.debug("Request: %s", request)
proc, remote = registry.find_processor(request) def dispatch(self, request):
# Check if user has access LOGGER.debug("Request: %s", request)
access = True proc, provider = registry.find_processor(request)
# if remote.productextensionsaml2_set.exists() and \ # Check if user has access
# remote.productextensionsaml2_set.first().product_set.exists(): access = True
# # Only check if there is a connection from OAuth2 Application to product # if provider.productextensionsaml2_set.exists() and \
# product = remote.productextensionsaml2_set.first().product_set.first() # provider.productextensionsaml2_set.first().product_set.exists():
# relationship = UserAcquirableRelationship.objects.filter(user=request.user, model=product) # # Only check if there is a connection from OAuth2 Application to product
# # Product is invitation_only = True and no relation with user exists # product = provider.productextensionsaml2_set.first().product_set.first()
# if product.invitation_only and not relationship.exists(): # relationship = UserAcquirableRelationship.objects.
# access = False # filter(user=request.user, model=product)
# Check if we should just autosubmit # # Product is invitation_only = True and no relation with user exists
if remote.skip_authorization and access: # if product.invitation_only and not relationship.exists():
# full_res = _generate_response(request, proc, remote) # access = False
ctx = proc.generate_response() # Check if we should just autosubmit
# User accepted request if provider.skip_authorization and access:
# Event.create( # full_res = _generate_response(request, proc, provider)
# user=request.user, ctx = proc.generate_response()
# message=_('You authenticated %s (via SAML) (skipped Authz)' % remote.name), # User accepted request
# request=request, # Event.create(
# current=False, # user=request.user,
# hidden=True) # message=_('You authenticated %s (via SAML) (skipped Authz)' % provider.name),
return redirect_to_sp( # request=request,
request=request, # current=False,
acs_url=ctx['acs_url'], # hidden=True)
saml_response=ctx['saml_response'], return RedirectToSPView.as_view()(
relay_state=ctx['relay_state']) request=request,
if request.method == 'POST' and request.POST.get('ACSUrl', None) and access: acs_url=ctx['acs_url'],
# User accepted request saml_response=ctx['saml_response'],
# Event.create( relay_state=ctx['relay_state'])
# user=request.user, if request.method == 'POST' and request.POST.get('ACSUrl', None) and access:
# message=_('You authenticated %s (via SAML)' % remote.name), # User accepted request
# request=request, # Event.create(
# current=False, # user=request.user,
# hidden=True) # message=_('You authenticated %s (via SAML)' % provider.name),
return redirect_to_sp( # request=request,
request=request, # current=False,
acs_url=request.POST.get('ACSUrl'), # hidden=True)
saml_response=request.POST.get('SAMLResponse'), return RedirectToSPView.as_view()(
relay_state=request.POST.get('RelayState')) request=request,
try: acs_url=request.POST.get('ACSUrl'),
full_res = _generate_response(request, proc, remote) saml_response=request.POST.get('SAMLResponse'),
# if not access: relay_state=request.POST.get('RelayState'))
# LOGGER.warning("User '%s' has no invitation to '%s'", request.user, product) try:
# messages.error(request, "You have no access to '%s'" % product.name) full_res = _generate_response(request, proc, provider)
# raise Http404 # if not access:
return full_res # LOGGER.warning("User '%s' has no invitation to '%s'", request.user, product)
except exceptions.CannotHandleAssertion as exc: # messages.error(request, "You have no access to '%s'" % product.name)
LOGGER.debug(exc) # raise Http404
# return ErrorResponseView.as_view()(request, str(exc)) return full_res
except exceptions.CannotHandleAssertion as exc:
LOGGER.debug(exc)
# return ErrorResponseView.as_view()(request, str(exc))
@csrf_exempt class LogoutView(CSRFExemptMixin, View):
def logout(request):
"""Allows a non-SAML 2.0 URL to log out the user and """Allows a non-SAML 2.0 URL to log out the user and
returns a standard logged-out page. (SalesForce and others use this method, returns a standard logged-out page. (SalesForce and others use this method,
though it's technically not SAML 2.0).""" though it's technically not SAML 2.0)."""
auth.logout(request)
redirect_url = request.GET.get('redirect_to', '') def get(self, request):
"""Perform logout"""
logout(request)
try: redirect_url = request.GET.get('redirect_to', '')
URL_VALIDATOR(redirect_url)
except ValidationError:
pass
else:
return HttpResponseRedirect(redirect_url)
return render(request, 'saml/idp/logged_out.html') try:
URL_VALIDATOR(redirect_url)
except ValidationError:
pass
else:
return redirect(redirect_url)
return render(request, 'saml/idp/logged_out.html')
@login_required class SLOLogout(CSRFExemptMixin, LoginRequiredMixin, View):
@csrf_exempt
def slo_logout(request):
"""Receives a SAML 2.0 LogoutRequest from a Service Provider, """Receives a SAML 2.0 LogoutRequest from a Service Provider,
logs out the user and returns a standard logged-out page.""" logs out the user and returns a standard logged-out page."""
request.session['SAMLRequest'] = request.POST['SAMLRequest']
# TODO: Parse SAML LogoutRequest from POST data, similar to login_process(). def post(self, request):
# TODO: Add a URL dispatch for this view. """Perform logout"""
# TODO: Modify the base processor to handle logouts? request.session['SAMLRequest'] = request.POST['SAMLRequest']
# TODO: Combine this with login_process(), since they are so very similar? # TODO: Parse SAML LogoutRequest from POST data, similar to login_process().
# TODO: Format a LogoutResponse and return it to the browser. # TODO: Add a URL dispatch for this view.
# XXX: For now, simply log out without validating the request. # TODO: Modify the base processor to handle logouts?
auth.logout(request) # TODO: Combine this with login_process(), since they are so very similar?
return render(request, 'saml/idp/logged_out.html') # TODO: Format a LogoutResponse and return it to the browser.
# XXX: For now, simply log out without validating the request.
logout(request)
return render(request, 'saml/idp/logged_out.html')
def descriptor(request): class DescriptorView(View):
"""Replies with the XML Metadata IDSSODescriptor.""" """Replies with the XML Metadata IDSSODescriptor."""
entity_id = CONFIG.y('saml_idp.issuer')
slo_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_logout')) def get(self, request, application_id):
sso_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_login_begin')) """Replies with the XML Metadata IDSSODescriptor."""
pubkey = '' # TODO: Extract application/provider for pubkey application = get_object_or_404(SAMLProvider, pk=application_id)
ctx = { entity_id = CONFIG.y('saml_idp.issuer')
'entity_id': entity_id, slo_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_logout'))
'cert_public_key': pubkey, sso_url = request.build_absolute_uri(reverse('passbook_saml_idp:saml_login_begin'))
'slo_url': slo_url, pubkey = application.signing_cert
'sso_url': sso_url ctx = {
} 'entity_id': entity_id,
metadata = render_to_string('saml/xml/metadata.xml', ctx) 'cert_public_key': pubkey,
response = HttpResponse(metadata, content_type='application/xml') 'slo_url': slo_url,
response['Content-Disposition'] = 'attachment; filename="passbook_metadata.xml' 'sso_url': sso_url
return response }
metadata = render_to_string('saml/xml/metadata.xml', ctx)
response = HttpResponse(metadata, content_type='application/xml')
response['Content-Disposition'] = 'attachment; filename="passbook_metadata.xml'
return response
# class IDPSettingsView(GenericSettingView): # class IDPSettingsView(GenericSettingView):