"""authentik e2e testing utilities""" import json import os from functools import lru_cache, wraps from os import environ, makedirs from time import sleep, time from typing import Any, Callable, Optional from channels.testing import ChannelsLiveServerTestCase from django.apps import apps from django.db import connection from django.db.migrations.loader import MigrationLoader from django.db.migrations.operations.special import RunPython from django.test.testcases import TransactionTestCase from django.urls import reverse from docker import DockerClient, from_env from docker.errors import DockerException from docker.models.containers import Container from selenium import webdriver from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.remote.webdriver import WebDriver from selenium.webdriver.remote.webelement import WebElement from selenium.webdriver.support.ui import WebDriverWait from structlog.stdlib import get_logger from authentik.core.api.users import UserSerializer from authentik.core.models import User from authentik.managed.manager import ObjectManager RETRIES = int(environ.get("RETRIES", "5")) # pylint: disable=invalid-name def USER() -> User: # noqa """Cached function that always returns akadmin""" return User.objects.get(username="akadmin") def get_docker_tag() -> str: """Get docker-tag based off of CI variables""" env_pr_branch = "GITHUB_HEAD_REF" default_branch = "GITHUB_REF" branch_name = os.environ.get(default_branch, "master") if os.environ.get(env_pr_branch, "") != "": branch_name = os.environ[env_pr_branch] branch_name = branch_name.replace("refs/heads/", "").replace("/", "-") return f"gh-{branch_name}" class SeleniumTestCase(ChannelsLiveServerTestCase): """StaticLiveServerTestCase which automatically creates a Webdriver instance""" container: Optional[Container] = None wait_timeout: int def setUp(self): super().setUp() self.maxDiff = None self.wait_timeout = 60 self.driver = self._get_driver() self.driver.maximize_window() self.driver.implicitly_wait(30) self.wait = WebDriverWait(self.driver, self.wait_timeout) self.logger = get_logger() if specs := self.get_container_specs(): self.container = self._start_container(specs) def get_container_image(self, base: str) -> str: """Try to pull docker image based on git branch, fallback to master if not found.""" client: DockerClient = from_env() image = f"{base}:gh-master" try: branch_image = f"{base}:{get_docker_tag()}" client.images.pull(branch_image) return branch_image except DockerException: client.images.pull(image) return image def _start_container(self, specs: dict[str, Any]) -> Container: client: DockerClient = from_env() container = client.containers.run(**specs) if "healthcheck" not in specs: return container while True: container.reload() status = container.attrs.get("State", {}).get("Health", {}).get("Status") if status == "healthy": return container self.logger.info("Container failed healthcheck") sleep(1) def output_container_logs(self, container: Optional[Container] = None): """Output the container logs to our STDOUT""" ct = container or self.container self.logger.debug("--------container logs", container=ct.image.tags[0]) for log in ct.logs().decode().split("\n"): self.logger.debug(log, container=ct.image.tags[0]) self.logger.debug("--------end container logs", container=ct.image.tags[0]) def get_container_specs(self) -> Optional[dict[str, Any]]: """Optionally get container specs which will launched on setup, wait for the container to be healthy, and deleted again on tearDown""" return None def _get_driver(self) -> WebDriver: count = 0 while count < RETRIES: try: return webdriver.Remote( command_executor="http://localhost:4444/wd/hub", options=webdriver.ChromeOptions(), ) except WebDriverException: count += 1 raise ValueError(f"Webdriver failed after {RETRIES}.") def tearDown(self): if "TF_BUILD" in environ: makedirs("selenium_screenshots/", exist_ok=True) screenshot_file = f"selenium_screenshots/{self.__class__.__name__}_{time()}.png" self.driver.save_screenshot(screenshot_file) self.logger.warning("Saved screenshot", file=screenshot_file) self.logger.debug("--------browser logs") for line in self.driver.get_log("browser"): self.logger.debug(line["message"], source=line["source"], level=line["level"]) self.logger.debug("--------end browser logs") if self.container: self.output_container_logs() self.container.kill() self.driver.quit() super().tearDown() def wait_for_url(self, desired_url): """Wait until URL is `desired_url`.""" self.wait.until( lambda driver: driver.current_url == desired_url, f"URL {self.driver.current_url} doesn't match expected URL {desired_url}", ) def url(self, view, **kwargs) -> str: """reverse `view` with `**kwargs` into full URL using live_server_url""" return self.live_server_url + reverse(view, kwargs=kwargs) def if_user_url(self, view) -> str: """same as self.url() but show URL in shell""" return f"{self.live_server_url}/if/user/#{view}" def get_shadow_root(self, selector: str, container: Optional[WebElement] = None) -> WebElement: """Get shadow root element's inner shadowRoot""" if not container: container = self.driver shadow_root = container.find_element(By.CSS_SELECTOR, selector) element = self.driver.execute_script("return arguments[0].shadowRoot", shadow_root) return element def login(self): """Do entire login flow and check user afterwards""" flow_executor = self.get_shadow_root("ak-flow-executor") identification_stage = self.get_shadow_root("ak-stage-identification", flow_executor) identification_stage.find_element(By.CSS_SELECTOR, "input[name=uidField]").click() identification_stage.find_element(By.CSS_SELECTOR, "input[name=uidField]").send_keys( USER().username ) identification_stage.find_element(By.CSS_SELECTOR, "input[name=uidField]").send_keys( Keys.ENTER ) flow_executor = self.get_shadow_root("ak-flow-executor") password_stage = self.get_shadow_root("ak-stage-password", flow_executor) password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys( USER().username ) password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys(Keys.ENTER) sleep(1) def assert_user(self, expected_user: User): """Check users/me API and assert it matches expected_user""" self.driver.get(self.url("authentik_api:user-me") + "?format=json") user_json = self.driver.find_element(By.CSS_SELECTOR, "pre").text user = UserSerializer(data=json.loads(user_json)["user"]) user.is_valid() self.assertEqual(user["username"].value, expected_user.username) self.assertEqual(user["name"].value, expected_user.name) self.assertEqual(user["email"].value, expected_user.email) @lru_cache def get_loader(): """Thin wrapper to lazily get a Migration Loader, only when it's needed and only once""" return MigrationLoader(connection) def apply_migration(app_name: str, migration_name: str): """Re-apply migrations that create objects using RunPython before test cases""" def wrapper_outter(func: Callable): """Retry test multiple times""" @wraps(func) def wrapper(self: TransactionTestCase, *args, **kwargs): migration = get_loader().get_migration(app_name, migration_name) with connection.schema_editor() as schema_editor: for operation in migration.operations: if not isinstance(operation, RunPython): continue operation.code(apps, schema_editor) return func(self, *args, **kwargs) return wrapper return wrapper_outter def object_manager(func: Callable): """Run objectmanager before a test function""" @wraps(func) def wrapper(*args, **kwargs): """Run objectmanager before a test function""" ObjectManager().run() return func(*args, **kwargs) return wrapper def retry(max_retires=RETRIES, exceptions=None): """Retry test multiple times. Default to catching Selenium Timeout Exception""" if not exceptions: exceptions = [WebDriverException, TimeoutException, NoSuchElementException] logger = get_logger() def retry_actual(func: Callable): """Retry test multiple times""" count = 1 @wraps(func) def wrapper(self: TransactionTestCase, *args, **kwargs): """Run test again if we're below max_retries, including tearDown and setUp. Otherwise raise the error""" nonlocal count try: return func(self, *args, **kwargs) # pylint: disable=catching-non-exception except tuple(exceptions) as exc: count += 1 if count > max_retires: logger.debug("Exceeded retry count", exc=exc, test=self) # pylint: disable=raising-non-exception raise exc logger.debug("Retrying on error", exc=exc, test=self) self.tearDown() self._post_teardown() # noqa self.setUp() return wrapper(self, *args, **kwargs) return wrapper return retry_actual