"""test SAML Source""" from sys import platform from time import sleep from typing import Any, Optional from unittest.case import skipUnless from docker.types import Healthcheck from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support import expected_conditions as ec from selenium.webdriver.support.wait import WebDriverWait from structlog.stdlib import get_logger from authentik.crypto.models import CertificateKeyPair from authentik.flows.models import Flow from authentik.sources.saml.models import SAMLBindingTypes, SAMLSource from tests.e2e.utils import SeleniumTestCase, apply_migration, object_manager, retry LOGGER = get_logger() IDP_CERT = """-----BEGIN CERTIFICATE----- MIIDXTCCAkWgAwIBAgIJALmVVuDWu4NYMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX aWRnaXRzIFB0eSBMdGQwHhcNMTYxMjMxMTQzNDQ3WhcNNDgwNjI1MTQzNDQ3WjBF MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50 ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB CgKCAQEAzUCFozgNb1h1M0jzNRSCjhOBnR+uVbVpaWfXYIR+AhWDdEe5ryY+Cgav Og8bfLybyzFdehlYdDRgkedEB/GjG8aJw06l0qF4jDOAw0kEygWCu2mcH7XOxRt+ YAH3TVHa/Hu1W3WjzkobqqqLQ8gkKWWM27fOgAZ6GieaJBN6VBSMMcPey3HWLBmc +TYJmv1dbaO2jHhKh8pfKw0W12VM8P1PIO8gv4Phu/uuJYieBWKixBEyy0lHjyix YFCR12xdh4CA47q958ZRGnnDUGFVE1QhgRacJCOZ9bd5t9mr8KLaVBYTCJo5ERE8 jymab5dPqe5qKfJsCZiqWglbjUo9twIDAQABo1AwTjAdBgNVHQ4EFgQUxpuwcs/C YQOyui+r1G+3KxBNhxkwHwYDVR0jBBgwFoAUxpuwcs/CYQOyui+r1G+3KxBNhxkw DAYDVR0TBAUwAwEB/zANBgkqhkiG9w0BAQsFAAOCAQEAAiWUKs/2x/viNCKi3Y6b lEuCtAGhzOOZ9EjrvJ8+COH3Rag3tVBWrcBZ3/uhhPq5gy9lqw4OkvEws99/5jFs X1FJ6MKBgqfuy7yh5s1YfM0ANHYczMmYpZeAcQf2CGAaVfwTTfSlzNLsF2lW/ly7 yapFzlYSJLGoVE+OHEu8g5SlNACUEfkXw+5Eghh+KzlIN7R6Q7r2ixWNFBC/jWf7 NKUfJyX8qIG5md1YUeT6GBW9Bm2/1/RiO24JTaYlfLdKK9TYb8sG5B+OLab2DImG 99CJ25RkAcSobWNF5zD0O6lgOo3cEdB/ksCq3hmtlC/DlLZ/D8CJ+7VuZnS1rR2n aQ== -----END CERTIFICATE-----""" IDP_KEY = """-----BEGIN PRIVATE KEY----- MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDNQIWjOA1vWHUz SPM1FIKOE4GdH65VtWlpZ9dghH4CFYN0R7mvJj4KBq86Dxt8vJvLMV16GVh0NGCR 50QH8aMbxonDTqXSoXiMM4DDSQTKBYK7aZwftc7FG35gAfdNUdr8e7VbdaPOShuq qotDyCQpZYzbt86ABnoaJ5okE3pUFIwxw97LcdYsGZz5Ngma/V1to7aMeEqHyl8r DRbXZUzw/U8g7yC/g+G7+64liJ4FYqLEETLLSUePKLFgUJHXbF2HgIDjur3nxlEa ecNQYVUTVCGBFpwkI5n1t3m32avwotpUFhMImjkRETyPKZpvl0+p7mop8mwJmKpa CVuNSj23AgMBAAECggEABn4I/B20xxXcNzASiVZJvua9DdRHtmxTlkLznBj0x2oY y1/Nbs3d3oFRn5uEuhBZOTcphsgwdRSHDXZsP3gUObew+d2N/zieUIj8hLDVlvJP rU/s4U/l53Q0LiNByE9ThvL+zJLPCKJtd5uHZjB5fFm69+Q7gu8xg4xHIub+0pP5 PHanmHCDrbgNN/oqlar4FZ2MXTgekW6Amyc/koE9hIn4Baa2Ke/B/AUGY4pMRLqp TArt+GTVeWeoFY9QACUpaHpJhGb/Piou6tlU57e42cLoki1f0+SARsBBKyXA7BB1 1fMH10KQYFA68dTYWlKzQau/K4xaqg4FKmtwF66GQQKBgQD9OpNUS7oRxMHVJaBR TNWW+V1FXycqojekFpDijPb2X5CWV16oeWgaXp0nOHFdy9EWs3GtGpfZasaRVHsX SHtPh4Nb8JqHdGE0/CD6t0+4Dns8Bn9cSqtdQB7R3Jn7IMXi9X/U8LDKo+A18/Jq V8VgUngMny9YjMkQIbK8TRWkYQKBgQDPf4nxO6ju+tOHHORQty3bYDD0+OV3I0+L 0yz0uPreryBVi9nY43KakH52D7UZEwwsBjjGXD+WH8xEsmBWsGNXJu025PvzIJoz lAEiXvMp/NmYp+tY4rDmO8RhyVocBqWHzh38m0IFOd4ByFD5nLEDrA3pDVo0aNgY n0GwRysZFwKBgQDkCj3m6ZMUsUWEty+aR0EJhmKyODBDOnY09IVhH2S/FexVFzUN LtfK9206hp/Awez3Ln2uT4Zzqq5K7fMzUniJdBWdVB004l8voeXpIe9OZuwfcBJ9 gFi1zypx/uFDv421BzQpBN+QfOdKbvbdQVFjnqCxbSDr80yVlGMrI5fbwQKBgG09 oRrepO7EIO8GN/GCruLK/ptKGkyhy3Q6xnVEmdb47hX7ncJA5IoZPmrblCVSUNsw n11XHabksL8OBgg9rt8oQEThQv/aDzTOW9aDlJNragejiBTwq99aYeZ1gjo1CZq4 2jKubpCfyZC4rGDtrIfZYi1q+S2UcQhtd8DdhwQbAoGAAM4EpDA4yHB5yiek1p/o CbqRCta/Dx6Eyo0KlNAyPuFPAshupG4NBx7mT2ASfL+2VBHoi6mHSri+BDX5ryYF fMYvp7URYoq7w7qivRlvvEg5yoYrK13F2+Gj6xJ4jEN9m0KdM/g3mJGq0HBTIQrp Sm75WXsflOxuTn08LbgGc4s= -----END PRIVATE KEY-----""" @skipUnless(platform.startswith("linux"), "requires local docker") class TestSourceSAML(SeleniumTestCase): """test SAML Source flow""" def get_container_specs(self) -> Optional[dict[str, Any]]: return { "image": "kristophjunge/test-saml-idp:1.15", "detach": True, "network_mode": "host", "auto_remove": True, "healthcheck": Healthcheck( test=["CMD", "curl", "http://localhost:8080"], interval=5 * 100 * 1000000, start_period=1 * 100 * 1000000, ), "environment": { "SIMPLESAMLPHP_SP_ENTITY_ID": "entity-id", "SIMPLESAMLPHP_SP_ASSERTION_CONSUMER_SERVICE": ( f"{self.live_server_url}/source/saml/saml-idp-test/acs/" ), }, } @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @apply_migration( "authentik_sources_saml", "0010_samlsource_pre_authentication_flow" ) @object_manager def test_idp_redirect(self): """test SAML Source With redirect binding""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") pre_authentication_flow = Flow.objects.get( slug="default-source-pre-authentication" ) keypair = CertificateKeyPair.objects.create( name="test-idp-cert", certificate_data=IDP_CERT, key_data=IDP_KEY, ) SAMLSource.objects.create( name="saml-idp-test", slug="saml-idp-test", authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, pre_authentication_flow=pre_authentication_flow, issuer="entity-id", sso_url="http://localhost:8080/simplesaml/saml2/idp/SSOService.php", binding_type=SAMLBindingTypes.REDIRECT, signing_kp=keypair, ) self.driver.get(self.live_server_url) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root( "ak-stage-identification", flow_executor ) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CLASS_NAME, "pf-c-login__main-footer-links-item-link") ) ) identification_stage.find_element( By.CLASS_NAME, "pf-c-login__main-footer-links-item-link" ).click() # Now we should be at the IDP, wait for the username field self.wait.until(ec.presence_of_element_located((By.ID, "username"))) self.driver.find_element(By.ID, "username").send_keys("user1") self.driver.find_element(By.ID, "password").send_keys("user1pass") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait_for_url(self.if_admin_url("/library")) self.driver.get(self.url("authentik_core:user-details")) # Wait until we've loaded the user info page self.assertNotEqual( self.driver.find_element(By.ID, "id_username").get_attribute("value"), "" ) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @apply_migration( "authentik_sources_saml", "0010_samlsource_pre_authentication_flow" ) @object_manager def test_idp_post(self): """test SAML Source With post binding""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") pre_authentication_flow = Flow.objects.get( slug="default-source-pre-authentication" ) keypair = CertificateKeyPair.objects.create( name="test-idp-cert", certificate_data=IDP_CERT, key_data=IDP_KEY, ) source = SAMLSource.objects.create( name="saml-idp-test", slug="saml-idp-test", authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, pre_authentication_flow=pre_authentication_flow, issuer="entity-id", sso_url="http://localhost:8080/simplesaml/saml2/idp/SSOService.php", binding_type=SAMLBindingTypes.POST, signing_kp=keypair, ) self.driver.get(self.live_server_url) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root( "ak-stage-identification", flow_executor ) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CLASS_NAME, "pf-c-login__main-footer-links-item-link") ) ) identification_stage.find_element( By.CLASS_NAME, "pf-c-login__main-footer-links-item-link" ).click() sleep(1) flow_executor = self.get_shadow_root("ak-flow-executor") consent_stage = self.get_shadow_root("ak-stage-consent", flow_executor) self.assertIn( source.name, consent_stage.find_element(By.CSS_SELECTOR, "#header-text").text, ) consent_stage.find_element( By.CSS_SELECTOR, ("[type=submit]"), ).click() # Now we should be at the IDP, wait for the username field self.wait.until(ec.presence_of_element_located((By.ID, "username"))) self.driver.find_element(By.ID, "username").send_keys("user1") self.driver.find_element(By.ID, "password").send_keys("user1pass") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait_for_url(self.if_admin_url("/library")) self.driver.get(self.url("authentik_core:user-details")) # Wait until we've loaded the user info page self.assertNotEqual( self.driver.find_element(By.ID, "id_username").get_attribute("value"), "" ) @retry() @apply_migration("authentik_core", "0003_default_user") @apply_migration("authentik_flows", "0008_default_flows") @apply_migration("authentik_flows", "0009_source_flows") @apply_migration("authentik_crypto", "0002_create_self_signed_kp") @apply_migration( "authentik_sources_saml", "0010_samlsource_pre_authentication_flow" ) @object_manager def test_idp_post_auto(self): """test SAML Source With post binding (auto redirect)""" # Bootstrap all needed objects authentication_flow = Flow.objects.get(slug="default-source-authentication") enrollment_flow = Flow.objects.get(slug="default-source-enrollment") pre_authentication_flow = Flow.objects.get( slug="default-source-pre-authentication" ) keypair = CertificateKeyPair.objects.create( name="test-idp-cert", certificate_data=IDP_CERT, key_data=IDP_KEY, ) SAMLSource.objects.create( name="saml-idp-test", slug="saml-idp-test", authentication_flow=authentication_flow, enrollment_flow=enrollment_flow, pre_authentication_flow=pre_authentication_flow, issuer="entity-id", sso_url="http://localhost:8080/simplesaml/saml2/idp/SSOService.php", binding_type=SAMLBindingTypes.POST_AUTO, signing_kp=keypair, ) self.driver.get(self.live_server_url) flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root( "ak-stage-identification", flow_executor ) wait = WebDriverWait(identification_stage, self.wait_timeout) wait.until( ec.presence_of_element_located( (By.CLASS_NAME, "pf-c-login__main-footer-links-item-link") ) ) identification_stage.find_element( By.CLASS_NAME, "pf-c-login__main-footer-links-item-link" ).click() # Now we should be at the IDP, wait for the username field self.wait.until(ec.presence_of_element_located((By.ID, "username"))) self.driver.find_element(By.ID, "username").send_keys("user1") self.driver.find_element(By.ID, "password").send_keys("user1pass") self.driver.find_element(By.ID, "password").send_keys(Keys.ENTER) # Wait until we're logged in self.wait_for_url(self.if_admin_url("/library")) self.driver.get(self.url("authentik_core:user-details")) # Wait until we've loaded the user info page self.assertNotEqual( self.driver.find_element(By.ID, "id_username").get_attribute("value"), "" )