*/saml: fix creation and validation of detached signatures

This commit is contained in:
Jens Langhammer 2020-11-12 00:12:59 +01:00
parent c304b40e1b
commit 9877ef99c4
12 changed files with 240 additions and 127 deletions

View File

@ -4,12 +4,13 @@ disable=arguments-differ,no-self-use,fixme,locally-disabled,too-many-ancestors,t
load-plugins=pylint_django,pylint.extensions.bad_builtin load-plugins=pylint_django,pylint.extensions.bad_builtin
extension-pkg-whitelist=lxml extension-pkg-whitelist=lxml,xmlsec
# Allow constants to be shorter than normal (and lowercase, for settings.py) # Allow constants to be shorter than normal (and lowercase, for settings.py)
const-rgx=[a-zA-Z0-9_]{1,40}$ const-rgx=[a-zA-Z0-9_]{1,40}$
ignored-modules=django-otp ignored-modules=django-otp
generated-members=xmlsec.constants.*,xmlsec.tree.*
ignore=migrations ignore=migrations
max-attributes=12 max-attributes=12

View File

@ -44,6 +44,7 @@ channels = "*"
channels-redis = "*" channels-redis = "*"
kubernetes = "*" kubernetes = "*"
docker = "*" docker = "*"
xmlsec = "*"
[requires] [requires]
python_version = "3.8" python_version = "3.8"

View File

@ -24,7 +24,6 @@ class SAMLProviderSerializer(ModelSerializer):
"digest_algorithm", "digest_algorithm",
"signature_algorithm", "signature_algorithm",
"signing_kp", "signing_kp",
"require_signing",
"verification_kp", "verification_kp",
] ]

View File

@ -39,7 +39,6 @@ class SAMLProviderForm(forms.ModelForm):
"assertion_valid_not_on_or_after", "assertion_valid_not_on_or_after",
"session_valid_not_on_or_after", "session_valid_not_on_or_after",
"digest_algorithm", "digest_algorithm",
"require_signing",
"signature_algorithm", "signature_algorithm",
"signing_kp", "signing_kp",
"verification_kp", "verification_kp",

View File

@ -1,25 +0,0 @@
# Generated by Django 3.1.3 on 2020-11-11 21:35
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('passbook_crypto', '0002_create_self_signed_kp'),
('passbook_providers_saml', '0007_samlprovider_verification_kp'),
]
operations = [
migrations.AlterField(
model_name='samlprovider',
name='signing_kp',
field=models.ForeignKey(blank=True, default=None, help_text='Singing is enabled upon selection of a Key Pair.', null=True, on_delete=django.db.models.deletion.SET_NULL, to='passbook_crypto.certificatekeypair', verbose_name='Signing Keypair'),
),
migrations.AlterField(
model_name='samlprovider',
name='verification_kp',
field=models.ForeignKey(blank=True, default=None, help_text="If selected, incoming assertion's Signatures will be validated.", null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='+', to='passbook_crypto.certificatekeypair', verbose_name='Verification Keypair'),
),
]

View File

@ -0,0 +1,71 @@
# Generated by Django 3.1.3 on 2020-11-12 10:36
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("passbook_crypto", "0002_create_self_signed_kp"),
("passbook_providers_saml", "0007_samlprovider_verification_kp"),
]
operations = [
migrations.RemoveField(
model_name="samlprovider",
name="require_signing",
),
migrations.AlterField(
model_name="samlprovider",
name="audience",
field=models.TextField(
default="",
help_text="Value of the audience restriction field of the asseration.",
),
),
migrations.AlterField(
model_name="samlprovider",
name="issuer",
field=models.TextField(
default="passbook", help_text="Also known as EntityID"
),
),
migrations.AlterField(
model_name="samlprovider",
name="signing_kp",
field=models.ForeignKey(
blank=True,
default=None,
help_text="Keypair used to sign outgoing Responses going to the Service Provider.",
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="passbook_crypto.certificatekeypair",
verbose_name="Signing Keypair",
),
),
migrations.AlterField(
model_name="samlprovider",
name="sp_binding",
field=models.TextField(
choices=[("redirect", "Redirect"), ("post", "Post")],
default="redirect",
help_text="This determines how passbook sends the response back to the Service Provider.",
verbose_name="Service Provider Binding",
),
),
migrations.AlterField(
model_name="samlprovider",
name="verification_kp",
field=models.ForeignKey(
blank=True,
default=None,
help_text="When selected, incoming assertion's Signatures will be validated against this certificate. To allow unsigned Requests, leave on default.",
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="+",
to="passbook_crypto.certificatekeypair",
verbose_name="Verification Certificate",
),
),
]

View File

@ -28,12 +28,21 @@ class SAMLProvider(Provider):
"""SAML 2.0 Endpoint for applications which support SAML.""" """SAML 2.0 Endpoint for applications which support SAML."""
acs_url = models.URLField(verbose_name=_("ACS URL")) acs_url = models.URLField(verbose_name=_("ACS URL"))
audience = models.TextField(default="") audience = models.TextField(
issuer = models.TextField(help_text=_("Also known as EntityID")) default="",
help_text=_("Value of the audience restriction field of the asseration."),
)
issuer = models.TextField(help_text=_("Also known as EntityID"), default="passbook")
sp_binding = models.TextField( sp_binding = models.TextField(
choices=SAMLBindings.choices, choices=SAMLBindings.choices,
default=SAMLBindings.REDIRECT, default=SAMLBindings.REDIRECT,
verbose_name=_("Service Prodier Binding"), verbose_name=_("Service Provider Binding"),
help_text=_(
(
"This determines how passbook sends the "
"response back to the Service Provider."
)
),
) )
assertion_valid_not_before = models.TextField( assertion_valid_not_before = models.TextField(
@ -92,9 +101,14 @@ class SAMLProvider(Provider):
default=None, default=None,
null=True, null=True,
blank=True, blank=True,
help_text=_("If selected, incoming assertion's Signatures will be validated."), help_text=_(
(
"When selected, incoming assertion's Signatures will be validated against this "
"certificate. To allow unsigned Requests, leave on default."
)
),
on_delete=models.SET_NULL, on_delete=models.SET_NULL,
verbose_name=_("Verification Keypair"), verbose_name=_("Verification Certificate"),
related_name="+", related_name="+",
) )
signing_kp = models.ForeignKey( signing_kp = models.ForeignKey(
@ -102,19 +116,13 @@ class SAMLProvider(Provider):
default=None, default=None,
null=True, null=True,
blank=True, blank=True,
help_text=_("Singing is enabled upon selection of a Key Pair."), help_text=_(
"Keypair used to sign outgoing Responses going to the Service Provider."
),
on_delete=models.SET_NULL, on_delete=models.SET_NULL,
verbose_name=_("Signing Keypair"), verbose_name=_("Signing Keypair"),
) )
require_signing = models.BooleanField(
default=False,
help_text=_(
"Require Requests to be signed by an X509 Certificate. "
"Must match the Certificate selected in `Singing Keypair`."
),
)
@property @property
def launch_url(self) -> Optional[str]: def launch_url(self) -> Optional[str]:
"""Guess launch_url based on acs URL""" """Guess launch_url based on acs URL"""

View File

@ -4,22 +4,32 @@ from dataclasses import dataclass
from typing import Optional from typing import Optional
from urllib.parse import quote_plus from urllib.parse import quote_plus
from cryptography.exceptions import InvalidSignature import xmlsec
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from defusedxml import ElementTree from defusedxml import ElementTree
from signxml import XMLVerifier from lxml import etree # nosec
from structlog import get_logger from structlog import get_logger
from passbook.providers.saml.exceptions import CannotHandleAssertion from passbook.providers.saml.exceptions import CannotHandleAssertion
from passbook.providers.saml.models import SAMLProvider from passbook.providers.saml.models import SAMLProvider
from passbook.providers.saml.utils.encoding import decode_base64_and_inflate from passbook.providers.saml.utils.encoding import decode_base64_and_inflate
from passbook.sources.saml.processors.constants import ( from passbook.sources.saml.processors.constants import (
DSA_SHA1,
NS_SAML_PROTOCOL, NS_SAML_PROTOCOL,
RSA_SHA1,
RSA_SHA256,
RSA_SHA384,
RSA_SHA512,
SAML_NAME_ID_FORMAT_EMAIL, SAML_NAME_ID_FORMAT_EMAIL,
) )
LOGGER = get_logger() LOGGER = get_logger()
ERROR_SIGNATURE_REQUIRED_BUT_ABSENT = (
"Verification Certificate configured, but request is not signed."
)
ERROR_SIGNATURE_EXISTS_BUT_NO_VERIFIER = (
"Provider does not have a Validation Certificate configured."
)
ERROR_FAILED_TO_VERIFY = "Failed to verify signature"
@dataclass @dataclass
@ -69,14 +79,30 @@ class AuthNRequestParser:
"""Validate and parse raw request with enveloped signautre.""" """Validate and parse raw request with enveloped signautre."""
decoded_xml = decode_base64_and_inflate(saml_request) decoded_xml = decode_base64_and_inflate(saml_request)
if self.provider.verification_kp: verifier = self.provider.verification_kp
root = etree.fromstring(decoded_xml) # nosec
xmlsec.tree.add_ids(root, ["ID"])
signature_node = xmlsec.tree.find_node(root, xmlsec.constants.NodeSignature)
if verifier and not signature_node:
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
if signature_node:
if not verifier:
raise CannotHandleAssertion(ERROR_SIGNATURE_EXISTS_BUT_NO_VERIFIER)
try: try:
XMLVerifier().verify( ctx = xmlsec.SignatureContext()
decoded_xml, key = xmlsec.Key.from_memory(
x509_cert=self.provider.verification_kp.certificate_data, verifier.certificate_data,
xmlsec.constants.KeyDataFormatCertPem,
None,
) )
except InvalidSignature as exc: ctx.key = key
raise CannotHandleAssertion("Failed to verify signature") from exc ctx.verify(signature_node)
except xmlsec.VerificationError as exc:
raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
return self._parse_xml(decoded_xml, relay_state) return self._parse_xml(decoded_xml, relay_state)
@ -90,31 +116,45 @@ class AuthNRequestParser:
"""Validate and parse raw request with detached signature""" """Validate and parse raw request with detached signature"""
decoded_xml = decode_base64_and_inflate(saml_request) decoded_xml = decode_base64_and_inflate(saml_request)
verifier = self.provider.verification_kp
if verifier and not (signature and sig_alg):
raise CannotHandleAssertion(ERROR_SIGNATURE_REQUIRED_BUT_ABSENT)
if signature and sig_alg: if signature and sig_alg:
# if sig_alg == "http://www.w3.org/2000/09/xmldsig#rsa-sha1": if not verifier:
sig_hash = hashes.SHA1() # nosec raise CannotHandleAssertion(ERROR_SIGNATURE_EXISTS_BUT_NO_VERIFIER)
querystring = f"SAMLRequest={quote_plus(saml_request)}&" querystring = f"SAMLRequest={quote_plus(saml_request)}&"
if relay_state is not None: if relay_state is not None:
querystring += f"RelayState={quote_plus(relay_state)}&" querystring += f"RelayState={quote_plus(relay_state)}&"
querystring += f"SigAlg={sig_alg}" querystring += f"SigAlg={quote_plus(sig_alg)}"
dsig_ctx = xmlsec.SignatureContext()
key = xmlsec.Key.from_memory(
verifier.certificate_data, xmlsec.constants.KeyDataFormatCertPem, None
)
dsig_ctx.key = key
sign_algorithm_transform_map = {
DSA_SHA1: xmlsec.constants.TransformDsaSha1,
RSA_SHA1: xmlsec.constants.TransformRsaSha1,
RSA_SHA256: xmlsec.constants.TransformRsaSha256,
RSA_SHA384: xmlsec.constants.TransformRsaSha384,
RSA_SHA512: xmlsec.constants.TransformRsaSha512,
}
sign_algorithm_transform = sign_algorithm_transform_map.get(
sig_alg, xmlsec.constants.TransformRsaSha1
)
if not self.provider.verification_kp:
raise CannotHandleAssertion(
"Provider does not have a Validation Certificate configured."
)
public_key = self.provider.verification_kp.private_key.public_key()
try: try:
public_key.verify( dsig_ctx.verify_binary(
querystring.encode("utf-8"),
sign_algorithm_transform,
b64decode(signature), b64decode(signature),
querystring.encode(),
padding.PSS(
mgf=padding.MGF1(sig_hash), salt_length=padding.PSS.MAX_LENGTH
),
sig_hash,
) )
except InvalidSignature as exc: except xmlsec.VerificationError as exc:
raise CannotHandleAssertion("Failed to verify signature") from exc raise CannotHandleAssertion(ERROR_FAILED_TO_VERIFY) from exc
return self._parse_xml(decoded_xml, relay_state) return self._parse_xml(decoded_xml, relay_state)
def idp_initiated(self) -> AuthNRequest: def idp_initiated(self) -> AuthNRequest:

View File

@ -1,6 +1,4 @@
"""Test AuthN Request generator and parser""" """Test AuthN Request generator and parser"""
from base64 import b64encode
from passbook.sources.saml.processors.constants import SAML_NAME_ID_FORMAT_EMAIL
from django.contrib.sessions.middleware import SessionMiddleware from django.contrib.sessions.middleware import SessionMiddleware
from django.http.request import HttpRequest, QueryDict from django.http.request import HttpRequest, QueryDict
from django.test import RequestFactory, TestCase from django.test import RequestFactory, TestCase
@ -14,16 +12,30 @@ from passbook.providers.saml.processors.request_parser import AuthNRequestParser
from passbook.providers.saml.utils.encoding import deflate_and_base64_encode from passbook.providers.saml.utils.encoding import deflate_and_base64_encode
from passbook.sources.saml.exceptions import MismatchedRequestID from passbook.sources.saml.exceptions import MismatchedRequestID
from passbook.sources.saml.models import SAMLSource from passbook.sources.saml.models import SAMLSource
from passbook.sources.saml.processors.constants import SAML_NAME_ID_FORMAT_EMAIL
from passbook.sources.saml.processors.request import ( from passbook.sources.saml.processors.request import (
SESSION_REQUEST_ID, SESSION_REQUEST_ID,
RequestProcessor, RequestProcessor,
) )
from passbook.sources.saml.processors.response import ResponseProcessor from passbook.sources.saml.processors.response import ResponseProcessor
REDIRECT_REQUEST = """fZLNbsIwEIRfJfIdbKeFgEUipXAoEm0jSHvopTLJplhK7NTr9Oft6yRUKhekPdk73+yOdoWyqVuRdu6k9/DRAbrgu6k1iuEjJp3VwkhUKLRsAIUrxCF92IlwykRrjTOFqUmQIoJ1yui10dg1YA9gP1UBz/tdTE7OtSgo5WzKQzYditGeP8GW9rSQZk+HnAQbb6+07EGj7EI1j8SCeaVs21oVQ9dAoRqcf6OIhh6VLpV+pxZKZaFwlATbTUzeyqKazaqiDCO5WEQwZzKCagkwr8obWcqjb0PsYKvRSe1iErKQTTj3lYdc3HLBl68kyM4L340u19M5j4LiPs+zybjgC1gclvMNJFn104vB2P5L/TpW/kVNkqvBrug/+mjVikeP224y4/P7CdK6Nl9rC9JBTDihySi5vIbkFw==""" REDIRECT_REQUEST = (
REDIRECT_SIGNATURE = "UlOe1BItHVHM+io6rUZAenIqfibm7hM6wr9I1rcP5kPJ4N8cbkyqmAMh5LD2lUq3PDERJfjdO/oOKnvJmbD2y9MOObyR2d7Udv62KERrA0qM917Q+w8wrLX7w2nHY96EDvkXD4iAomR5EE9dHRuubDy7uRv2syEevc0gfoLi7W/5vp96vJgsaSqxnTp+QiYq49KyWyMtxRULF2yd+vYDnHCDME73mNSULEHfwCU71dvbKpnFaej78q7wS20gUk6ysOOXXtvDHbiVcpUb/9oyDgNAxUjVvPdh96AhBFj2HCuGZhP0CGotafTciu6YlsiwUpuBkIYgZmNWYa3FR9LS4Q==" "fZLNbsIwEIRfJfIdbKeFgEUipXAoEm0jSHvopTLJplhK7NTr9Oft6yRUKhekPdk73+yOdoWyqVuRdu6k9/DRAbrgu6k1iu"
"EjJp3VwkhUKLRsAIUrxCF92IlwykRrjTOFqUmQIoJ1yui10dg1YA9gP1UBz/tdTE7OtSgo5WzKQzYditGeP8GW9rSQZk+H"
"nAQbb6+07EGj7EI1j8SCeaVs21oVQ9dAoRqcf6OIhh6VLpV+pxZKZaFwlATbTUzeyqKazaqiDCO5WEQwZzKCagkwr8obWc"
"qjb0PsYKvRSe1iErKQTTj3lYdc3HLBl68kyM4L340u19M5j4LiPs+zybjgC1gclvMNJFn104vB2P5L/TpW/kVNkqvBrug/"
"+mjVikeP224y4/P7CdK6Nl9rC9JBTDihySi5vIbkFw=="
)
REDIRECT_SIGNATURE = (
"UlOe1BItHVHM+io6rUZAenIqfibm7hM6wr9I1rcP5kPJ4N8cbkyqmAMh5LD2lUq3PDERJfjdO/oOKnvJmbD2y9MOObyR2d"
"7Udv62KERrA0qM917Q+w8wrLX7w2nHY96EDvkXD4iAomR5EE9dHRuubDy7uRv2syEevc0gfoLi7W/5vp96vJgsaSqxnTp+"
"QiYq49KyWyMtxRULF2yd+vYDnHCDME73mNSULEHfwCU71dvbKpnFaej78q7wS20gUk6ysOOXXtvDHbiVcpUb/9oyDgNAxU"
"jVvPdh96AhBFj2HCuGZhP0CGotafTciu6YlsiwUpuBkIYgZmNWYa3FR9LS4Q=="
)
REDIRECT_SIG_ALG = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256" REDIRECT_SIG_ALG = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
REDIRECT_RELAY_STATE = "ss:mem:7a054b4af44f34f89dd2d973f383c250b6b076e7f06cfa8276008a6504eaf3c7" REDIRECT_RELAY_STATE = (
"ss:mem:7a054b4af44f34f89dd2d973f383c250b6b076e7f06cfa8276008a6504eaf3c7"
)
REDIRECT_CERT = """-----BEGIN CERTIFICATE----- REDIRECT_CERT = """-----BEGIN CERTIFICATE-----
MIIDCDCCAfCgAwIBAgIRAM5s+bhOHk4ChSpPkGSh0NswDQYJKoZIhvcNAQELBQAw MIIDCDCCAfCgAwIBAgIRAM5s+bhOHk4ChSpPkGSh0NswDQYJKoZIhvcNAQELBQAw
KzEpMCcGA1UEAwwgcGFzc2Jvb2sgU2VsZi1zaWduZWQgQ2VydGlmaWNhdGUwHhcN KzEpMCcGA1UEAwwgcGFzc2Jvb2sgU2VsZi1zaWduZWQgQ2VydGlmaWNhdGUwHhcN
@ -54,18 +66,19 @@ class TestAuthNRequest(TestCase):
"""Test AuthN Request generator and parser""" """Test AuthN Request generator and parser"""
def setUp(self): def setUp(self):
cert = CertificateKeyPair.objects.first()
self.provider = SAMLProvider.objects.create( self.provider = SAMLProvider.objects.create(
authorization_flow=Flow.objects.get( authorization_flow=Flow.objects.get(
slug="default-provider-authorization-implicit-consent" slug="default-provider-authorization-implicit-consent"
), ),
acs_url="http://testserver/source/saml/provider/acs/", acs_url="http://testserver/source/saml/provider/acs/",
signing_kp=CertificateKeyPair.objects.first(), signing_kp=cert,
verification_kp=CertificateKeyPair.objects.first(), verification_kp=cert,
) )
self.source = SAMLSource.objects.create( self.source = SAMLSource.objects.create(
slug="provider", slug="provider",
issuer="passbook", issuer="passbook",
signing_kp=CertificateKeyPair.objects.first(), signing_kp=cert,
) )
self.factory = RequestFactory() self.factory = RequestFactory()
@ -87,24 +100,6 @@ class TestAuthNRequest(TestCase):
self.assertEqual(parsed_request.id, request_proc.request_id) self.assertEqual(parsed_request.id, request_proc.request_id)
self.assertEqual(parsed_request.relay_state, "test_state") self.assertEqual(parsed_request.relay_state, "test_state")
def test_signed_valid_detached(self):
"""Test generated AuthNRequest with valid signature (detached)"""
http_request = self.factory.get("/")
middleware = SessionMiddleware(dummy_get_response)
middleware.process_request(http_request)
http_request.session.save()
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
params = request_proc.build_auth_n_detached()
# Now we check the ID and signature
parsed_request = AuthNRequestParser(self.provider).parse_detached(
params["SAMLRequest"], "test_state", params["Signature"], params["SigAlg"]
)
self.assertEqual(parsed_request.id, request_proc.request_id)
self.assertEqual(parsed_request.relay_state, "test_state")
def test_request_id_invalid(self): def test_request_id_invalid(self):
"""Test generated AuthNRequest with invalid request ID""" """Test generated AuthNRequest with invalid request ID"""
http_request = self.factory.get("/") http_request = self.factory.get("/")
@ -139,12 +134,32 @@ class TestAuthNRequest(TestCase):
with self.assertRaises(MismatchedRequestID): with self.assertRaises(MismatchedRequestID):
response_parser.parse(http_request) response_parser.parse(http_request)
def test_signed_valid_detached(self):
"""Test generated AuthNRequest with valid signature (detached)"""
http_request = self.factory.get("/")
middleware = SessionMiddleware(dummy_get_response)
middleware.process_request(http_request)
http_request.session.save()
# First create an AuthNRequest
request_proc = RequestProcessor(self.source, http_request, "test_state")
params = request_proc.build_auth_n_detached()
# Now we check the ID and signature
parsed_request = AuthNRequestParser(self.provider).parse_detached(
params["SAMLRequest"],
params["RelayState"],
params["Signature"],
params["SigAlg"],
)
self.assertEqual(parsed_request.id, request_proc.request_id)
self.assertEqual(parsed_request.relay_state, "test_state")
def test_signed_detached_static(self): def test_signed_detached_static(self):
"""Test request with detached signature, """Test request with detached signature,
taken from https://www.samltool.com/generic_sso_req.php""" taken from https://www.samltool.com/generic_sso_req.php"""
static_keypair = CertificateKeyPair.objects.create( static_keypair = CertificateKeyPair.objects.create(
name="samltool", name="samltool", certificate_data=REDIRECT_CERT
certificate_data=REDIRECT_CERT
) )
provider = SAMLProvider( provider = SAMLProvider(
name="samltool", name="samltool",
@ -152,6 +167,8 @@ class TestAuthNRequest(TestCase):
slug="default-provider-authorization-implicit-consent" slug="default-provider-authorization-implicit-consent"
), ),
acs_url="https://10.120.20.200/saml-sp/SAML2/POST", acs_url="https://10.120.20.200/saml-sp/SAML2/POST",
audience="https://10.120.20.200/saml-sp/SAML2/POST",
issuer="https://10.120.20.200/saml-sp/SAML2/POST",
signing_kp=static_keypair, signing_kp=static_keypair,
verification_kp=static_keypair, verification_kp=static_keypair,
) )

View File

@ -22,8 +22,8 @@ SAML_NAME_ID_FORMAT_TRANSIENT = "urn:oasis:names:tc:SAML:2.0:nameid-format:trans
SAML_BINDING_POST = "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST" SAML_BINDING_POST = "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-POST"
SAML_BINDING_REDIRECT = "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect" SAML_BINDING_REDIRECT = "urn:oasis:names:tc:SAML:2.0:bindings:HTTP-Redirect"
DSA_SHA1 = 'http://www.w3.org/2000/09/xmldsig#dsa-sha1' DSA_SHA1 = "http://www.w3.org/2000/09/xmldsig#dsa-sha1"
RSA_SHA1 = 'http://www.w3.org/2000/09/xmldsig#rsa-sha1' RSA_SHA1 = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
RSA_SHA256 = 'http://www.w3.org/2001/04/xmldsig-more#rsa-sha256' RSA_SHA256 = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
RSA_SHA384 = 'http://www.w3.org/2001/04/xmldsig-more#rsa-sha384' RSA_SHA384 = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha384"
RSA_SHA512 = 'http://www.w3.org/2001/04/xmldsig-more#rsa-sha512' RSA_SHA512 = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha512"

View File

@ -3,8 +3,7 @@ from base64 import b64encode
from typing import Dict from typing import Dict
from urllib.parse import quote_plus from urllib.parse import quote_plus
from cryptography.hazmat.primitives import hashes import xmlsec
from cryptography.hazmat.primitives.asymmetric import padding
from django.http import HttpRequest from django.http import HttpRequest
from lxml import etree # nosec from lxml import etree # nosec
from lxml.etree import Element # nosec from lxml.etree import Element # nosec
@ -18,6 +17,7 @@ from passbook.sources.saml.processors.constants import (
NS_MAP, NS_MAP,
NS_SAML_ASSERTION, NS_SAML_ASSERTION,
NS_SAML_PROTOCOL, NS_SAML_PROTOCOL,
RSA_SHA256,
) )
SESSION_REQUEST_ID = "passbook_source_saml_request_id" SESSION_REQUEST_ID = "passbook_source_saml_request_id"
@ -51,7 +51,7 @@ class RequestProcessor:
def get_name_id_policy(self) -> Element: def get_name_id_policy(self) -> Element:
"""Get NameID Policy Element""" """Get NameID Policy Element"""
name_id_policy = Element(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy") name_id_policy = Element(f"{{{NS_SAML_PROTOCOL}}}NameIDPolicy")
name_id_policy.text = self.source.name_id_policy name_id_policy.attrib["Format"] = self.source.name_id_policy
return name_id_policy return name_id_policy
def get_auth_n(self) -> Element: def get_auth_n(self) -> Element:
@ -103,22 +103,28 @@ class RequestProcessor:
response_dict["RelayState"] = self.relay_state response_dict["RelayState"] = self.relay_state
if self.source.signing_kp: if self.source.signing_kp:
sig_alg = "http://www.w3.org/2000/09/xmldsig#rsa-sha1" sig_alg = RSA_SHA256
sig_hash = hashes.SHA1() # nosec
# Create the full querystring in the correct order to be signed # Create the full querystring in the correct order to be signed
querystring = f"SAMLRequest={quote_plus(saml_request)}&" querystring = f"SAMLRequest={quote_plus(saml_request)}&"
if self.relay_state != "": if "RelayState" in response_dict:
querystring += f"RelayState={quote_plus(self.relay_state)}&" querystring += f"RelayState={quote_plus(response_dict['RelayState'])}&"
querystring += f"SigAlg={sig_alg}" querystring += f"SigAlg={quote_plus(sig_alg)}"
signature = self.source.signing_kp.private_key.sign( ctx = xmlsec.SignatureContext()
querystring.encode(),
padding.PSS( key = xmlsec.Key.from_memory(
mgf=padding.MGF1(sig_hash), salt_length=padding.PSS.MAX_LENGTH self.source.signing_kp.key_data, xmlsec.constants.KeyDataFormatPem, None
), )
sig_hash, key.load_cert_from_memory(
self.source.signing_kp.certificate_data,
xmlsec.constants.KeyDataFormatPem,
)
ctx.key = key
signature = ctx.sign_binary(
querystring.encode("utf-8"), xmlsec.constants.TransformRsaSha256
) )
response_dict["SigAlg"] = sig_alg
response_dict["Signature"] = b64encode(signature).decode() response_dict["Signature"] = b64encode(signature).decode()
response_dict["SigAlg"] = sig_alg
return response_dict return response_dict

View File

@ -7431,7 +7431,6 @@ definitions:
required: required:
- name - name
- acs_url - acs_url
- issuer
type: object type: object
properties: properties:
pk: pk:
@ -7450,6 +7449,7 @@ definitions:
minLength: 1 minLength: 1
audience: audience:
title: Audience title: Audience
description: Value of the audience restriction field of the asseration.
type: string type: string
minLength: 1 minLength: 1
issuer: issuer:
@ -7497,18 +7497,15 @@ definitions:
- dsa-sha1 - dsa-sha1
signing_kp: signing_kp:
title: Signing Keypair title: Signing Keypair
description: Singing is enabled upon selection of a Key Pair. description: Keypair used to sign outgoing Responses going to the Service
Provider.
type: string type: string
format: uuid format: uuid
x-nullable: true x-nullable: true
require_signing:
title: Require signing
description: Require Requests to be signed by an X509 Certificate. Must match
the Certificate selected in `Singing Keypair`.
type: boolean
verification_kp: verification_kp:
title: Verification Keypair title: Verification Certificate
description: If selected, incoming assertion's Signatures will be validated. description: When selected, incoming assertion's Signatures will be validated
against this certificate. To allow unsigned Requests, leave on default.
type: string type: string
format: uuid format: uuid
x-nullable: true x-nullable: true
@ -7863,8 +7860,7 @@ definitions:
minLength: 1 minLength: 1
signing_kp: signing_kp:
title: Singing Keypair title: Singing Keypair
description: Certificate Key Pair of the IdP which Assertion's Signature is description: Keypair which is used to sign outgoing requests.
validated against.
type: string type: string
format: uuid format: uuid
Stage: Stage: