"""test SAML Source""" from sys import platform from time import sleep from typing import Any, Optional from unittest.case import skipUnless from docker.types import Healthcheck from guardian.utils import get_anonymous_user from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.wait import WebDriverWait from structlog.stdlib import get_logger from authentik.core.models import User from authentik.crypto.models import CertificateKeyPair from authentik.flows.models import Flow from authentik.sources.saml.models import SAMLBindingTypes, SAMLSource from tests.e2e.utils import SeleniumTestCase, apply_migration, object_manager, retry LOGGER = get_logger() IDP_CERT = """-----BEGIN CERTIFICATE----- MIIDXTCCAkWgAwIBAgIJALmVVuDWu4NYMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX aWRnaXRzIFB0eSBMdGQwHhcNMTYxMjMxMTQzNDQ3WhcNNDgwNjI1MTQzNDQ3WjBF MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB CgKCAQEAzUCFozgNb1h1M0jzNRSCjhOBnR+uVbVpaWfXYIR+AhWDdEe5ryY+Cgav Og8bfLybyzFdehlYdDRgkedEB/GjG8aJw06l0qF4jDOAw0kEygWCu2mcH7XOxRt+ YAH3TVHa/Hu1W3WjzkobqqqLQ8gkKWWM27fOgAZ6GieaJBN6VBSMMcPey3HWLBmc +TYJmv1dbaO2jHhKh8pfKw0W12VM8P1PIO8gv4Phu/uuJYieBWKixBEyy0lHjyix YFCR12xdh4CA47q958ZRGnnDUGFVE1QhgRacJCOZ9bd5t9mr8KLaVBYTCJo5ERE8 jymab5dPqe5qKfJsCZiqWglbjUo9twIDAQABo1AwTjAdBgNVHQ4EFgQUxpuwcs/C YQOyui+r1G+3KxBNhxkwHwYDVR0jBBgwFoAUxpuwcs/CYQOyui+r1G+3KxBNhxkw DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAAiWUKs/2x/viNCKi3Y6b lEuCtAGhzOOZ9EjrvJ8+COH3Rag3tVBWrcBZ3/uhhPq5gy9lqw4OkvEws99/5jFs X1FJ6MKBgqfuy7yh5s1YfM0ANHYczMmYpZeAcQf2CGAaVfwTTfSlzNLsF2lW/ly7 yapFzlYSJLGoVE+OHEu8g5SlNACUEfkXw+5Eghh+KzlIN7R6Q7r2ixWNFBC/jWf7 NKUfJyX8qIG5md1YUeT6GBW9Bm2/1/RiO24JTaYlfLdKK9TYb8sG5B+OLab2DImG 99CJ25RkAcSobWNF5zD0O6lgOo3cEdB/ksCq3hmtlC/DlLZ/D8CJ+7VuZnS1rR2n aQ== -----END CERTIFICATE-----""" IDP_KEY = """-----BEGIN PRIVATE KEY----- MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDNQIWjOA1vWHUz SPM1FIKOE4GdH65VtWlpZ9dghH4CFYN0R7mvJj4KBq86Dxt8vJvLMV16GVh0NGCR 50QH8aMbxonDTqXSoXiMM4DDSQTKBYK7aZwftc7FG35gAfdNUdr8e7VbdaPOShuq qotDyCQpZYzbt86ABnoaJ5okE3pUFIwxw97LcdYsGZz5Ngma/V1to7aMeEqHyl8r DRbXZUzw/U8g7yC/g+G7+64liJ4FYqLEETLLSUePKLFgUJHXbF2HgIDjur3nxlEa ecNQYVUTVCGBFpwkI5n1t3m32avwotpUFhMImjkRETyPKZpvl0+p7mop8mwJmKpa CVuNSj23AgMBAAECggEABn4I/B20xxXcNzASiVZJvua9DdRHtmxTlkLznBj0x2oY y1/Nbs3d3oFRn5uEuhBZOTcphsgwdRSHDXZsP3gUObew+d2N/zieUIj8hLDVlvJP rU/s4U/l53Q0LiNByE9ThvL+zJLPCKJtd5uHZjB5fFm69+Q7gu8xg4xHIub+0pP5 PHanmHCDrbgNN/oqlar4FZ2MXTgekW6Amyc/koE9hIn4Baa2Ke/B/AUGY4pMRLqp TArt+GTVeWeoFY9QACUpaHpJhGb/Piou6tlU57e42cLoki1f0+SARsBBKyXA7BB1 1fMH10KQYFA68dTYWlKzQau/K4xaqg4FKmtwF66GQQKBgQD9OpNUS7oRxMHVJaBR TNWW+V1FXycqojekFpDijPb2X5CWV16oeWgaXp0nOHFdy9EWs3GtGpfZasaRVHsX SHtPh4Nb8JqHdGE0/CD6t0+4Dns8Bn9cSqtdQB7R3Jn7IMXi9X/U8LDKo+A18/Jq V8VgUngMny9YjMkQIbK8TRWkYQKBgQDPf4nxO6ju+tOHHORQty3bYDD0+OV3I0+L 0yz0uPreryBVi9nY43KakH52D7UZEwwsBjjGXD+WH8xEsmBWsGNXJu025PvzIJoz lAEiXvMp/NmYp+tY4rDmO8RhyVocBqWHzh38m0IFOd4ByFD5nLEDrA3pDVo0aNgY n0GwRysZFwKBgQDkCj3m6ZMUsUWEty+aR0EJhmKyODBDOnY09IVhH2S/FexVFzUN LtfK9206hp/Awez3Ln2uT4Zzqq5K7fMzUniJdBWdVB004l8voeXpIe9OZuwfcBJ9 gFi1zypx/uFDv421BzQpBN+QfOdKbvbdQVFjnqCxbSDr80yVlGMrI5fbwQKBgG09 oRrepO7EIO8GN/GCruLK/ptKGkyhy3Q6xnVEmdb47hX7ncJA5IoZPmrblCVSUNsw n11XHabksL8OBgg9rt8oQEThQv/aDzTOW9aDlJNragejiBTwq99aYeZ1gjo1CZq4 2jKubpCfyZC4rGDtrIfZYi1q+S2UcQhtd8DdhwQbAoGAAM4EpDA4yHB5yiek1p/o CbqRCta/Dx6Eyo0KlNAyPuFPAshupG4NBx7mT2ASfL+2VBHoi6mHSri+BDX5ryYF fMYvp7URYoq7w7qivRlvvEg5yoYrK13F2+Gj6xJ4jEN9m0KdM/g3mJGq0HBTIQrp Sm75WXsflOxuTn08LbgGc4s= -----END PRIVATE KEY-----""" @skipUnless(platform.startswith("linux"), "requires local docker") class TestSourceSAML(SeleniumTestCase): """test SAML Source flow""" def get_container_specs(self) -> Optional[dict[str, Any]]: return { "image": "kristophjunge/test-saml-idp:1.15", "detach": True, "network_mode": "host", "auto_remove": True, "healthcheck": Healthcheck( test=["CMD", "curl", "http://localhost:8080"], interval=5 * 100 * 1000000, start_period=1 * 100 * 1000000, ), "environment": { "SIMPLESAMLPHP_SP_ENTITY_ID": "entity-id", "SIMPLESAMLPHP_SP_ASSERTION_CONSUMER_SERVICE": ( f"{self.live_server_url}/source/saml/saml-idp-test/acs/" ), }, } @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @apply_migration( "authentik_sources_saml", "0010_samlsource_pre_authentication_flow" ) @object_manager def test_idp_redirect(self): """test SAML Source With redirect binding""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") pre_authentication_flow = Flow.objects.get( slug="default-source-pre-authentication" ) keypair = CertificateKeyPair.objects.create( name="test-idp-cert", certificate_data=IDP_CERT, key_data=IDP_KEY, ) SAMLSource.objects.create( name="saml-idp-test", slug="saml-idp-test", authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, pre_authentication_flow=pre_authentication_flow, issuer="entity-id", sso_url="http://localhost:8080/simplesaml/saml2/idp/SSOService.php", binding_type=SAMLBindingTypes.REDIRECT, signing_kp=keypair, ) self.driver.get(self.live_server_url) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root( "ak-stage-identification", flow_executor ) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CSS_SELECTOR, "pf-c-login__main-footer-links-item > button") ) ) identification_stage.find_element( By.CSS_SELECTOR, "pf-c-login__main-footer-links-item > button" ).click() # Now we should be at the IDP, wait for the username field self.wait.until(ec.presence_of_element_located((By.ID, "username"))) self.driver.find_element(By.ID, "username").send_keys("user1") self.driver.find_element(By.ID, "password").send_keys("user1pass") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait_for_url(self.if_admin_url("/library")) self.driver.get(self.if_admin_url("/user")) self.assert_user( User.objects.exclude(username="akadmin") .exclude(pk=get_anonymous_user().pk) .first() ) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @apply_migration( "authentik_sources_saml", "0010_samlsource_pre_authentication_flow" ) @object_manager def test_idp_post(self): """test SAML Source With post binding""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") pre_authentication_flow = Flow.objects.get( slug="default-source-pre-authentication" ) keypair = CertificateKeyPair.objects.create( name="test-idp-cert", certificate_data=IDP_CERT, key_data=IDP_KEY, ) source = SAMLSource.objects.create( name="saml-idp-test", slug="saml-idp-test", authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, pre_authentication_flow=pre_authentication_flow, issuer="entity-id", sso_url="http://localhost:8080/simplesaml/saml2/idp/SSOService.php", binding_type=SAMLBindingTypes.POST, signing_kp=keypair, ) self.driver.get(self.live_server_url) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root( "ak-stage-identification", flow_executor ) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CSS_SELECTOR, "pf-c-login__main-footer-links-item > button") ) ) identification_stage.find_element( By.CSS_SELECTOR, "pf-c-login__main-footer-links-item > button" ).click() sleep(1) flow_executor = self.get_shadow_root("ak-flow-executor") consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor) self.assertIn( source.name, consent_stage.find_element(By.CSS_SELECTOR, "#header-text").text, ) consent_stage.find_element( By.CSS_SELECTOR, ("[type=submit]"), ).click() # Now we should be at the IDP, wait for the username field self.wait.until(ec.presence_of_element_located((By.ID, "username"))) self.driver.find_element(By.ID, "username").send_keys("user1") self.driver.find_element(By.ID, "password").send_keys("user1pass") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait_for_url(self.if_admin_url("/library")) self.driver.get(self.if_admin_url("/user")) self.assert_user( User.objects.exclude(username="akadmin") .exclude(pk=get_anonymous_user().pk) .first() ) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @apply_migration( "authentik_sources_saml", "0010_samlsource_pre_authentication_flow" ) @object_manager def test_idp_post_auto(self): """test SAML Source With post binding (auto redirect)""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") pre_authentication_flow = Flow.objects.get( slug="default-source-pre-authentication" ) keypair = CertificateKeyPair.objects.create( name="test-idp-cert", certificate_data=IDP_CERT, key_data=IDP_KEY, ) SAMLSource.objects.create( name="saml-idp-test", slug="saml-idp-test", authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, pre_authentication_flow=pre_authentication_flow, issuer="entity-id", sso_url="http://localhost:8080/simplesaml/saml2/idp/SSOService.php", binding_type=SAMLBindingTypes.POST_AUTO, signing_kp=keypair, ) self.driver.get(self.live_server_url) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root( "ak-stage-identification", flow_executor ) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CSS_SELECTOR, "pf-c-login__main-footer-links-item > button") ) ) identification_stage.find_element( By.CSS_SELECTOR, "pf-c-login__main-footer-links-item > button" ).click() # Now we should be at the IDP, wait for the username field self.wait.until(ec.presence_of_element_located((By.ID, "username"))) self.driver.find_element(By.ID, "username").send_keys("user1") self.driver.find_element(By.ID, "password").send_keys("user1pass") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait_for_url(self.if_admin_url("/library")) self.driver.get(self.if_admin_url("/user")) self.assert_user( User.objects.exclude(username="akadmin") .exclude(pk=get_anonymous_user().pk) .first() )