This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
authentik/tests/e2e/test_provider_oauth2_oidc.py
Jens L 1cfe1aff13
wip: rename to authentik (#361)
* root: initial rename

* web: rename custom element prefix

* root: rename external functions with pb_ prefix

* root: fix formatting

* root: replace domain with goauthentik.io

* proxy: update path

* root: rename remaining prefixes

* flows: rename file extension

* root: pbadmin -> akadmin

* docs: fix image filenames

* lifecycle: ignore migration files

* ci: copy default config from current source before loading last tagged

* *: new sentry dsn

* tests: fix missing python3.9-dev package

* root: add additional migrations for service accounts created by outposts

* core: mark system-created service accounts with attribute

* policies/expression: fix pb_ replacement not working

* web: fix last linting errors, add lit-analyse

* policies/expressions: fix lint errors

* web: fix sidebar display on screens where not all items fit

* proxy: attempt to fix proxy pipeline

* proxy: use go env GOPATH to get gopath

* lib: fix user_default naming inconsistency

* docs: add upgrade docs

* docs: update screenshots to use authentik

* admin: fix create button on empty-state of outpost

* web: fix modal submit not refreshing SiteShell and Table

* web: fix height of app-card and height of generic icon

* web: fix rendering of subtext

* admin: fix version check error not being caught

* web: fix worker count not being shown

* docs: update screenshots

* root: new icon

* web: fix lint error

* admin: fix linting error

* root: migrate coverage config to pyproject
2020-12-05 22:08:42 +01:00

286 lines
11 KiB
Python

"""test OAuth2 OpenID Provider flow"""
from json import loads
from sys import platform
from time import sleep
from unittest.case import skipUnless
from docker import DockerClient, from_env
from docker.models.containers import Container
from docker.types import Healthcheck
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support import expected_conditions as ec
from structlog import get_logger
from authentik.core.models import Application
from authentik.crypto.models import CertificateKeyPair
from authentik.flows.models import Flow
from authentik.policies.expression.models import ExpressionPolicy
from authentik.policies.models import PolicyBinding
from authentik.providers.oauth2.constants import (
SCOPE_OPENID,
SCOPE_OPENID_EMAIL,
SCOPE_OPENID_PROFILE,
)
from authentik.providers.oauth2.generators import (
generate_client_id,
generate_client_secret,
)
from authentik.providers.oauth2.models import (
ClientTypes,
OAuth2Provider,
ResponseTypes,
ScopeMapping,
)
from tests.e2e.utils import USER, SeleniumTestCase, retry
LOGGER = get_logger()
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderOAuth2OIDC(SeleniumTestCase):
"""test OAuth with OpenID Provider flow"""
def setUp(self):
self.client_id = generate_client_id()
self.client_secret = generate_client_secret()
self.application_slug = "test"
super().setUp()
def setup_client(self) -> Container:
"""Setup client saml-sp container which we test SAML against"""
sleep(1)
client: DockerClient = from_env()
client.images.pull("beryju/oidc-test-client")
container = client.containers.run(
image="docker.beryju.org/proxy/beryju/oidc-test-client",
detach=True,
network_mode="host",
auto_remove=True,
healthcheck=Healthcheck(
test=["CMD", "wget", "--spider", "http://localhost:9009/health"],
interval=5 * 100 * 1000000,
start_period=1 * 100 * 1000000,
),
environment={
"OIDC_CLIENT_ID": self.client_id,
"OIDC_CLIENT_SECRET": self.client_secret,
"OIDC_PROVIDER": f"{self.live_server_url}/application/o/{self.application_slug}/",
},
)
while True:
container.reload()
status = container.attrs.get("State", {}).get("Health", {}).get("Status")
if status == "healthy":
return container
LOGGER.info("Container failed healthcheck")
sleep(1)
@retry()
def test_redirect_uri_error(self):
"""test OpenID Provider flow (invalid redirect URI, check error message)"""
sleep(1)
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
)
provider = OAuth2Provider.objects.create(
name=self.application_slug,
client_type=ClientTypes.CONFIDENTIAL,
client_id=self.client_id,
client_secret=self.client_secret,
rsa_key=CertificateKeyPair.objects.first(),
redirect_uris="http://localhost:9009/",
authorization_flow=authorization_flow,
response_type=ResponseTypes.CODE,
)
provider.property_mappings.set(
ScopeMapping.objects.filter(
scope_name__in=[SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE]
)
)
provider.save()
Application.objects.create(
name=self.application_slug,
slug=self.application_slug,
provider=provider,
)
self.container = self.setup_client()
self.driver.get("http://localhost:9009")
self.driver.find_element(By.ID, "id_uid_field").click()
self.driver.find_element(By.ID, "id_uid_field").send_keys(USER().username)
self.driver.find_element(By.ID, "id_uid_field").send_keys(Keys.ENTER)
self.driver.find_element(By.ID, "id_password").send_keys(USER().username)
self.driver.find_element(By.ID, "id_password").send_keys(Keys.ENTER)
sleep(2)
self.assertEqual(
self.driver.find_element(By.CLASS_NAME, "pf-c-title").text,
"Redirect URI Error",
)
@retry()
def test_authorization_consent_implied(self):
"""test OpenID Provider flow (default authorization flow with implied consent)"""
sleep(1)
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
)
provider = OAuth2Provider.objects.create(
name=self.application_slug,
client_type=ClientTypes.CONFIDENTIAL,
client_id=self.client_id,
client_secret=self.client_secret,
rsa_key=CertificateKeyPair.objects.first(),
redirect_uris="http://localhost:9009/auth/callback",
authorization_flow=authorization_flow,
response_type=ResponseTypes.CODE,
)
provider.property_mappings.set(
ScopeMapping.objects.filter(
scope_name__in=[SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE]
)
)
provider.save()
Application.objects.create(
name=self.application_slug,
slug=self.application_slug,
provider=provider,
)
self.container = self.setup_client()
self.driver.get("http://localhost:9009")
self.driver.find_element(By.ID, "id_uid_field").click()
self.driver.find_element(By.ID, "id_uid_field").send_keys(USER().username)
self.driver.find_element(By.ID, "id_uid_field").send_keys(Keys.ENTER)
self.driver.find_element(By.ID, "id_password").send_keys(USER().username)
self.driver.find_element(By.ID, "id_password").send_keys(Keys.ENTER)
self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "pre")))
body = loads(self.driver.find_element(By.CSS_SELECTOR, "pre").text)
self.assertEqual(body["IDTokenClaims"]["nickname"], USER().username)
self.assertEqual(body["UserInfo"]["nickname"], USER().username)
self.assertEqual(body["IDTokenClaims"]["name"], USER().name)
self.assertEqual(body["UserInfo"]["name"], USER().name)
self.assertEqual(body["IDTokenClaims"]["email"], USER().email)
self.assertEqual(body["UserInfo"]["email"], USER().email)
@retry()
def test_authorization_consent_explicit(self):
"""test OpenID Provider flow (default authorization flow with explicit consent)"""
sleep(1)
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-explicit-consent"
)
provider = OAuth2Provider.objects.create(
name=self.application_slug,
authorization_flow=authorization_flow,
response_type=ResponseTypes.CODE,
client_type=ClientTypes.CONFIDENTIAL,
client_id=self.client_id,
client_secret=self.client_secret,
rsa_key=CertificateKeyPair.objects.first(),
redirect_uris="http://localhost:9009/auth/callback",
)
provider.property_mappings.set(
ScopeMapping.objects.filter(
scope_name__in=[SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE]
)
)
provider.save()
app = Application.objects.create(
name=self.application_slug,
slug=self.application_slug,
provider=provider,
)
self.container = self.setup_client()
self.driver.get("http://localhost:9009")
self.driver.find_element(By.ID, "id_uid_field").click()
self.driver.find_element(By.ID, "id_uid_field").send_keys(USER().username)
self.driver.find_element(By.ID, "id_uid_field").send_keys(Keys.ENTER)
self.driver.find_element(By.ID, "id_password").send_keys(USER().username)
self.driver.find_element(By.ID, "id_password").send_keys(Keys.ENTER)
self.assertEqual(
app.name,
self.driver.find_element(By.ID, "application-name").text,
)
self.wait.until(
ec.presence_of_element_located((By.CSS_SELECTOR, "[type=submit]"))
)
sleep(1)
self.driver.find_element(By.CSS_SELECTOR, "[type=submit]").click()
self.wait.until(ec.presence_of_element_located((By.CSS_SELECTOR, "pre")))
body = loads(self.driver.find_element(By.CSS_SELECTOR, "pre").text)
self.assertEqual(body["IDTokenClaims"]["nickname"], USER().username)
self.assertEqual(body["UserInfo"]["nickname"], USER().username)
self.assertEqual(body["IDTokenClaims"]["name"], USER().name)
self.assertEqual(body["UserInfo"]["name"], USER().name)
self.assertEqual(body["IDTokenClaims"]["email"], USER().email)
self.assertEqual(body["UserInfo"]["email"], USER().email)
@retry()
def test_authorization_denied(self):
"""test OpenID Provider flow (default authorization with access deny)"""
sleep(1)
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-explicit-consent"
)
provider = OAuth2Provider.objects.create(
name=self.application_slug,
authorization_flow=authorization_flow,
response_type=ResponseTypes.CODE,
client_type=ClientTypes.CONFIDENTIAL,
client_id=self.client_id,
client_secret=self.client_secret,
rsa_key=CertificateKeyPair.objects.first(),
redirect_uris="http://localhost:9009/auth/callback",
)
provider.property_mappings.set(
ScopeMapping.objects.filter(
scope_name__in=[SCOPE_OPENID, SCOPE_OPENID_EMAIL, SCOPE_OPENID_PROFILE]
)
)
provider.save()
app = Application.objects.create(
name=self.application_slug,
slug=self.application_slug,
provider=provider,
)
negative_policy = ExpressionPolicy.objects.create(
name="negative-static", expression="return False"
)
PolicyBinding.objects.create(target=app, policy=negative_policy, order=0)
self.container = self.setup_client()
self.driver.get("http://localhost:9009")
self.driver.find_element(By.ID, "id_uid_field").click()
self.driver.find_element(By.ID, "id_uid_field").send_keys(USER().username)
self.driver.find_element(By.ID, "id_uid_field").send_keys(Keys.ENTER)
self.driver.find_element(By.ID, "id_password").send_keys(USER().username)
self.driver.find_element(By.ID, "id_password").send_keys(Keys.ENTER)
self.wait.until(
ec.presence_of_element_located((By.CSS_SELECTOR, "header > h1"))
)
self.assertEqual(
self.driver.find_element(By.CSS_SELECTOR, "header > h1").text,
"Permission denied",
)