stages/authentiactor_validate: cookies (#2978)
* stages/authenticator_validate: rewrite to use signed jwt cookie + expiry as MFA threshold Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * add more tests Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * add more tests Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
fb25b28976
commit
9f2529c886
|
@ -9,7 +9,7 @@ from typing import Any, Optional
|
||||||
from django.http import HttpRequest, HttpResponse
|
from django.http import HttpRequest, HttpResponse
|
||||||
from django.utils.timezone import datetime, now
|
from django.utils.timezone import datetime, now
|
||||||
from django.views import View
|
from django.views import View
|
||||||
from jwt import InvalidTokenError, PyJWK, decode
|
from jwt import PyJWK, PyJWTError, decode
|
||||||
from sentry_sdk.hub import Hub
|
from sentry_sdk.hub import Hub
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
|
||||||
|
@ -302,8 +302,8 @@ class TokenParams:
|
||||||
"verify_aud": False,
|
"verify_aud": False,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
except (InvalidTokenError, ValueError, TypeError) as last_exc:
|
except (PyJWTError, ValueError, TypeError) as exc:
|
||||||
LOGGER.warning("failed to validate jwt", last_exc=last_exc)
|
LOGGER.warning("failed to validate jwt", exc=exc)
|
||||||
# TODO: End remove block
|
# TODO: End remove block
|
||||||
|
|
||||||
source: Optional[OAuthSource] = None
|
source: Optional[OAuthSource] = None
|
||||||
|
@ -325,7 +325,7 @@ class TokenParams:
|
||||||
)
|
)
|
||||||
# AttributeError is raised when the configured JWK is a private key
|
# AttributeError is raised when the configured JWK is a private key
|
||||||
# and not a public key
|
# and not a public key
|
||||||
except (InvalidTokenError, ValueError, TypeError, AttributeError) as exc:
|
except (PyJWTError, ValueError, TypeError, AttributeError) as exc:
|
||||||
LOGGER.warning("failed to validate jwt", exc=exc)
|
LOGGER.warning("failed to validate jwt", exc=exc)
|
||||||
|
|
||||||
if not token:
|
if not token:
|
||||||
|
|
|
@ -91,13 +91,13 @@ def select_challenge_sms(request: HttpRequest, device: SMSDevice):
|
||||||
device.stage.send(device.token, device)
|
device.stage.send(device.token, device)
|
||||||
|
|
||||||
|
|
||||||
def validate_challenge_code(code: str, request: HttpRequest, user: User) -> str:
|
def validate_challenge_code(code: str, request: HttpRequest, user: User) -> Device:
|
||||||
"""Validate code-based challenges. We test against every device, on purpose, as
|
"""Validate code-based challenges. We test against every device, on purpose, as
|
||||||
the user mustn't choose between totp and static devices."""
|
the user mustn't choose between totp and static devices."""
|
||||||
device = match_token(user, code)
|
device = match_token(user, code)
|
||||||
if not device:
|
if not device:
|
||||||
raise ValidationError(_("Invalid Token"))
|
raise ValidationError(_("Invalid Token"))
|
||||||
return code
|
return device
|
||||||
|
|
||||||
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
|
@ -129,7 +129,7 @@ def validate_challenge_webauthn(data: dict, request: HttpRequest, user: User) ->
|
||||||
return device
|
return device
|
||||||
|
|
||||||
|
|
||||||
def validate_challenge_duo(device_pk: int, request: HttpRequest, user: User) -> int:
|
def validate_challenge_duo(device_pk: int, request: HttpRequest, user: User) -> Device:
|
||||||
"""Duo authentication"""
|
"""Duo authentication"""
|
||||||
device = get_object_or_404(DuoDevice, pk=device_pk)
|
device = get_object_or_404(DuoDevice, pk=device_pk)
|
||||||
if device.user != user:
|
if device.user != user:
|
||||||
|
@ -148,4 +148,4 @@ def validate_challenge_duo(device_pk: int, request: HttpRequest, user: User) ->
|
||||||
if response["result"] == "deny":
|
if response["result"] == "deny":
|
||||||
raise ValidationError("Duo denied access")
|
raise ValidationError("Duo denied access")
|
||||||
device.save()
|
device.save()
|
||||||
return device_pk
|
return device
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
"""Authenticator Validation"""
|
"""Authenticator Validation"""
|
||||||
from datetime import timezone
|
from datetime import datetime
|
||||||
|
from hashlib import sha256
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
from django.http import HttpRequest, HttpResponse
|
from django.http import HttpRequest, HttpResponse
|
||||||
from django.utils.timezone import datetime, now
|
|
||||||
from django_otp import devices_for_user
|
from django_otp import devices_for_user
|
||||||
from django_otp.models import Device
|
from django_otp.models import Device
|
||||||
|
from jwt import PyJWTError, decode, encode
|
||||||
from rest_framework.fields import CharField, IntegerField, JSONField, ListField, UUIDField
|
from rest_framework.fields import CharField, IntegerField, JSONField, ListField, UUIDField
|
||||||
from rest_framework.serializers import ValidationError
|
from rest_framework.serializers import ValidationError
|
||||||
from structlog.stdlib import get_logger
|
from structlog.stdlib import get_logger
|
||||||
|
@ -34,6 +37,7 @@ from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
|
||||||
from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
|
from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
|
||||||
|
|
||||||
LOGGER = get_logger()
|
LOGGER = get_logger()
|
||||||
|
COOKIE_NAME_MFA = "authentik_mfa"
|
||||||
SESSION_STAGES = "goauthentik.io/stages/authenticator_validate/stages"
|
SESSION_STAGES = "goauthentik.io/stages/authenticator_validate/stages"
|
||||||
SESSION_SELECTED_STAGE = "goauthentik.io/stages/authenticator_validate/selected_stage"
|
SESSION_SELECTED_STAGE = "goauthentik.io/stages/authenticator_validate/selected_stage"
|
||||||
SESSION_DEVICE_CHALLENGES = "goauthentik.io/stages/authenticator_validate/device_challenges"
|
SESSION_DEVICE_CHALLENGES = "goauthentik.io/stages/authenticator_validate/device_challenges"
|
||||||
|
@ -59,6 +63,8 @@ class AuthenticatorValidationChallenge(WithUserInfoChallenge):
|
||||||
class AuthenticatorValidationChallengeResponse(ChallengeResponse):
|
class AuthenticatorValidationChallengeResponse(ChallengeResponse):
|
||||||
"""Challenge used for Code-based and WebAuthn authenticators"""
|
"""Challenge used for Code-based and WebAuthn authenticators"""
|
||||||
|
|
||||||
|
device: Optional[Device]
|
||||||
|
|
||||||
selected_challenge = DeviceChallenge(required=False)
|
selected_challenge = DeviceChallenge(required=False)
|
||||||
selected_stage = CharField(required=False)
|
selected_stage = CharField(required=False)
|
||||||
|
|
||||||
|
@ -68,33 +74,40 @@ class AuthenticatorValidationChallengeResponse(ChallengeResponse):
|
||||||
component = CharField(default="ak-stage-authenticator-validate")
|
component = CharField(default="ak-stage-authenticator-validate")
|
||||||
|
|
||||||
def _challenge_allowed(self, classes: list):
|
def _challenge_allowed(self, classes: list):
|
||||||
device_challenges: list[dict] = self.stage.request.session.get(SESSION_DEVICE_CHALLENGES)
|
device_challenges: list[dict] = self.stage.request.session.get(
|
||||||
|
SESSION_DEVICE_CHALLENGES, []
|
||||||
|
)
|
||||||
if not any(x["device_class"] in classes for x in device_challenges):
|
if not any(x["device_class"] in classes for x in device_challenges):
|
||||||
raise ValidationError("No compatible device class allowed")
|
raise ValidationError("No compatible device class allowed")
|
||||||
|
|
||||||
def validate_code(self, code: str) -> str:
|
def validate_code(self, code: str) -> str:
|
||||||
"""Validate code-based response, raise error if code isn't allowed"""
|
"""Validate code-based response, raise error if code isn't allowed"""
|
||||||
self._challenge_allowed([DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS])
|
self._challenge_allowed([DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS])
|
||||||
return validate_challenge_code(code, self.stage.request, self.stage.get_pending_user())
|
self.device = validate_challenge_code(
|
||||||
|
code, self.stage.request, self.stage.get_pending_user()
|
||||||
|
)
|
||||||
|
return code
|
||||||
|
|
||||||
def validate_webauthn(self, webauthn: dict) -> dict:
|
def validate_webauthn(self, webauthn: dict) -> dict:
|
||||||
"""Validate webauthn response, raise error if webauthn wasn't allowed
|
"""Validate webauthn response, raise error if webauthn wasn't allowed
|
||||||
or response is invalid"""
|
or response is invalid"""
|
||||||
self._challenge_allowed([DeviceClasses.WEBAUTHN])
|
self._challenge_allowed([DeviceClasses.WEBAUTHN])
|
||||||
return validate_challenge_webauthn(
|
self.device = validate_challenge_webauthn(
|
||||||
webauthn, self.stage.request, self.stage.get_pending_user()
|
webauthn, self.stage.request, self.stage.get_pending_user()
|
||||||
)
|
)
|
||||||
|
return webauthn
|
||||||
|
|
||||||
def validate_duo(self, duo: int) -> int:
|
def validate_duo(self, duo: int) -> int:
|
||||||
"""Initiate Duo authentication"""
|
"""Initiate Duo authentication"""
|
||||||
self._challenge_allowed([DeviceClasses.DUO])
|
self._challenge_allowed([DeviceClasses.DUO])
|
||||||
return validate_challenge_duo(duo, self.stage.request, self.stage.get_pending_user())
|
self.device = validate_challenge_duo(duo, self.stage.request, self.stage.get_pending_user())
|
||||||
|
return duo
|
||||||
|
|
||||||
def validate_selected_challenge(self, challenge: dict) -> dict:
|
def validate_selected_challenge(self, challenge: dict) -> dict:
|
||||||
"""Check which challenge the user has selected. Actual logic only used for SMS stage."""
|
"""Check which challenge the user has selected. Actual logic only used for SMS stage."""
|
||||||
# First check if the challenge is valid
|
# First check if the challenge is valid
|
||||||
allowed = False
|
allowed = False
|
||||||
for device_challenge in self.stage.request.session.get(SESSION_DEVICE_CHALLENGES):
|
for device_challenge in self.stage.request.session.get(SESSION_DEVICE_CHALLENGES, []):
|
||||||
if device_challenge.get("device_class", "") == challenge.get(
|
if device_challenge.get("device_class", "") == challenge.get(
|
||||||
"device_class", ""
|
"device_class", ""
|
||||||
) and device_challenge.get("device_uid", "") == challenge.get("device_uid", ""):
|
) and device_challenge.get("device_uid", "") == challenge.get("device_uid", ""):
|
||||||
|
@ -127,15 +140,6 @@ class AuthenticatorValidationChallengeResponse(ChallengeResponse):
|
||||||
return attrs
|
return attrs
|
||||||
|
|
||||||
|
|
||||||
def get_device_last_usage(device: Device) -> datetime:
|
|
||||||
"""Get a datetime object from last_t"""
|
|
||||||
if not hasattr(device, "last_t"):
|
|
||||||
return datetime.fromtimestamp(0, tz=timezone.utc)
|
|
||||||
if isinstance(device.last_t, datetime):
|
|
||||||
return device.last_t
|
|
||||||
return datetime.fromtimestamp(device.last_t * device.step, tz=timezone.utc)
|
|
||||||
|
|
||||||
|
|
||||||
class AuthenticatorValidateStageView(ChallengeStageView):
|
class AuthenticatorValidateStageView(ChallengeStageView):
|
||||||
"""Authenticator Validation"""
|
"""Authenticator Validation"""
|
||||||
|
|
||||||
|
@ -154,23 +158,19 @@ class AuthenticatorValidateStageView(ChallengeStageView):
|
||||||
|
|
||||||
stage: AuthenticatorValidateStage = self.executor.current_stage
|
stage: AuthenticatorValidateStage = self.executor.current_stage
|
||||||
|
|
||||||
_now = now()
|
|
||||||
threshold = timedelta_from_string(stage.last_auth_threshold)
|
threshold = timedelta_from_string(stage.last_auth_threshold)
|
||||||
|
allowed_devices = []
|
||||||
|
|
||||||
for device in user_devices:
|
for device in user_devices:
|
||||||
device_class = device.__class__.__name__.lower().replace("device", "")
|
device_class = device.__class__.__name__.lower().replace("device", "")
|
||||||
if device_class not in stage.device_classes:
|
if device_class not in stage.device_classes:
|
||||||
LOGGER.debug("device class not allowed", device_class=device_class)
|
LOGGER.debug("device class not allowed", device_class=device_class)
|
||||||
continue
|
continue
|
||||||
|
allowed_devices.append(device)
|
||||||
# Ensure only one challenge per device class
|
# Ensure only one challenge per device class
|
||||||
# WebAuthn does another device loop to find all webuahtn devices
|
# WebAuthn does another device loop to find all webuahtn devices
|
||||||
if device_class in seen_classes:
|
if device_class in seen_classes:
|
||||||
continue
|
continue
|
||||||
# check if device has been used within threshold and skip this stage if so
|
|
||||||
if threshold.total_seconds() > 0:
|
|
||||||
if _now - get_device_last_usage(device) <= threshold:
|
|
||||||
LOGGER.info("Device has been used within threshold", device=device)
|
|
||||||
raise FlowSkipStageException()
|
|
||||||
if device_class not in seen_classes:
|
if device_class not in seen_classes:
|
||||||
seen_classes.append(device_class)
|
seen_classes.append(device_class)
|
||||||
challenge = DeviceChallenge(
|
challenge = DeviceChallenge(
|
||||||
|
@ -183,6 +183,9 @@ class AuthenticatorValidateStageView(ChallengeStageView):
|
||||||
challenge.is_valid()
|
challenge.is_valid()
|
||||||
challenges.append(challenge.data)
|
challenges.append(challenge.data)
|
||||||
LOGGER.debug("adding challenge for device", challenge=challenge)
|
LOGGER.debug("adding challenge for device", challenge=challenge)
|
||||||
|
# check if we have an MFA cookie and if it's valid
|
||||||
|
if threshold.total_seconds() > 0:
|
||||||
|
self.check_mfa_cookie(allowed_devices)
|
||||||
return challenges
|
return challenges
|
||||||
|
|
||||||
def get_userless_webauthn_challenge(self) -> list[dict]:
|
def get_userless_webauthn_challenge(self) -> list[dict]:
|
||||||
|
@ -301,6 +304,68 @@ class AuthenticatorValidateStageView(ChallengeStageView):
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def cookie_jwt_key(self) -> str:
|
||||||
|
"""Signing key for MFA Cookie for this stage"""
|
||||||
|
return sha256(
|
||||||
|
f"{settings.SECRET_KEY}:{self.executor.current_stage.pk.hex}".encode("ascii")
|
||||||
|
).hexdigest()
|
||||||
|
|
||||||
|
def check_mfa_cookie(self, allowed_devices: list[Device]):
|
||||||
|
"""Check if an MFA cookie has been set, whether it's valid and applies
|
||||||
|
to the current stage and device.
|
||||||
|
|
||||||
|
The list of devices passed to this function must only contain devices for the
|
||||||
|
correct user and with an allowed class"""
|
||||||
|
if COOKIE_NAME_MFA not in self.request.COOKIES:
|
||||||
|
return
|
||||||
|
stage: AuthenticatorValidateStage = self.executor.current_stage
|
||||||
|
threshold = timedelta_from_string(stage.last_auth_threshold)
|
||||||
|
latest_allowed = datetime.now() + threshold
|
||||||
|
try:
|
||||||
|
payload = decode(self.request.COOKIES[COOKIE_NAME_MFA], self.cookie_jwt_key, ["HS256"])
|
||||||
|
if payload["stage"] != stage.pk.hex:
|
||||||
|
LOGGER.warning("Invalid stage PK")
|
||||||
|
return
|
||||||
|
if datetime.fromtimestamp(payload["exp"]) > latest_allowed:
|
||||||
|
LOGGER.warning("Expired MFA cookie")
|
||||||
|
return
|
||||||
|
if not any(device.pk == payload["device"] for device in allowed_devices):
|
||||||
|
LOGGER.warning("Invalid device PK")
|
||||||
|
return
|
||||||
|
LOGGER.info("MFA has been used within threshold")
|
||||||
|
raise FlowSkipStageException()
|
||||||
|
except (PyJWTError, ValueError, TypeError) as exc:
|
||||||
|
LOGGER.info("Invalid mfa cookie for device", exc=exc)
|
||||||
|
|
||||||
|
def set_valid_mfa_cookie(self, device: Device) -> HttpResponse:
|
||||||
|
"""Set an MFA cookie to allow users to skip MFA validation in this context (browser)
|
||||||
|
|
||||||
|
The cookie is JWT which is signed with a hash of the secret key and the UID of the stage"""
|
||||||
|
stage: AuthenticatorValidateStage = self.executor.current_stage
|
||||||
|
delta = timedelta_from_string(stage.last_auth_threshold)
|
||||||
|
if delta.total_seconds() < 1:
|
||||||
|
LOGGER.info("Not setting MFA cookie since threshold is not set.")
|
||||||
|
return self.executor.stage_ok()
|
||||||
|
expiry = datetime.now() + delta
|
||||||
|
cookie_payload = {
|
||||||
|
"device": device.pk,
|
||||||
|
"stage": stage.pk.hex,
|
||||||
|
"exp": expiry.timestamp(),
|
||||||
|
}
|
||||||
|
response = self.executor.stage_ok()
|
||||||
|
cookie = encode(cookie_payload, self.cookie_jwt_key)
|
||||||
|
response.set_cookie(
|
||||||
|
COOKIE_NAME_MFA,
|
||||||
|
cookie,
|
||||||
|
expires=expiry,
|
||||||
|
path="/",
|
||||||
|
max_age=delta,
|
||||||
|
domain=settings.SESSION_COOKIE_DOMAIN,
|
||||||
|
samesite="Lax",
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
|
def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
|
||||||
# All validation is done by the serializer
|
# All validation is done by the serializer
|
||||||
|
@ -309,7 +374,7 @@ class AuthenticatorValidateStageView(ChallengeStageView):
|
||||||
webauthn_device: WebAuthnDevice = response.data.get("webauthn", None)
|
webauthn_device: WebAuthnDevice = response.data.get("webauthn", None)
|
||||||
if not webauthn_device:
|
if not webauthn_device:
|
||||||
return self.executor.stage_ok()
|
return self.executor.stage_ok()
|
||||||
LOGGER.debug("Set user from userless flow", user=webauthn_device.user)
|
LOGGER.debug("Set user from user-less flow", user=webauthn_device.user)
|
||||||
self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = webauthn_device.user
|
self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = webauthn_device.user
|
||||||
self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl"
|
self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl"
|
||||||
self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS] = cleanse_dict(
|
self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS] = cleanse_dict(
|
||||||
|
@ -319,4 +384,4 @@ class AuthenticatorValidateStageView(ChallengeStageView):
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
return self.executor.stage_ok()
|
return self.set_valid_mfa_cookie(response.device)
|
||||||
|
|
|
@ -45,9 +45,7 @@ class AuthenticatorValidateStageDuoTests(FlowTestCase):
|
||||||
"authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.client",
|
"authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.client",
|
||||||
duo_mock,
|
duo_mock,
|
||||||
):
|
):
|
||||||
self.assertEqual(
|
self.assertEqual(duo_device, validate_challenge_duo(duo_device.pk, request, self.user))
|
||||||
duo_device.pk, validate_challenge_duo(duo_device.pk, request, self.user)
|
|
||||||
)
|
|
||||||
with patch(
|
with patch(
|
||||||
"authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.client",
|
"authentik.stages.authenticator_duo.models.AuthenticatorDuoStage.client",
|
||||||
failed_duo_mock,
|
failed_duo_mock,
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
"""Test validator stage"""
|
"""Test validator stage"""
|
||||||
from time import sleep
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
from django.test.client import RequestFactory
|
from django.test.client import RequestFactory
|
||||||
from django.urls.base import reverse
|
from django.urls.base import reverse
|
||||||
|
@ -9,6 +9,7 @@ from authentik.flows.models import Flow, FlowStageBinding, NotConfiguredAction
|
||||||
from authentik.flows.tests import FlowTestCase
|
from authentik.flows.tests import FlowTestCase
|
||||||
from authentik.stages.authenticator_sms.models import AuthenticatorSMSStage, SMSDevice, SMSProviders
|
from authentik.stages.authenticator_sms.models import AuthenticatorSMSStage, SMSDevice, SMSProviders
|
||||||
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
|
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
|
||||||
|
from authentik.stages.authenticator_validate.stage import COOKIE_NAME_MFA
|
||||||
from authentik.stages.identification.models import IdentificationStage, UserFields
|
from authentik.stages.identification.models import IdentificationStage, UserFields
|
||||||
|
|
||||||
|
|
||||||
|
@ -26,7 +27,7 @@ class AuthenticatorValidateStageSMSTests(FlowTestCase):
|
||||||
|
|
||||||
def test_last_auth_threshold(self):
|
def test_last_auth_threshold(self):
|
||||||
"""Test last_auth_threshold"""
|
"""Test last_auth_threshold"""
|
||||||
conf_stage = IdentificationStage.objects.create(
|
ident_stage = IdentificationStage.objects.create(
|
||||||
name="conf",
|
name="conf",
|
||||||
user_fields=[
|
user_fields=[
|
||||||
UserFields.USERNAME,
|
UserFields.USERNAME,
|
||||||
|
@ -37,19 +38,16 @@ class AuthenticatorValidateStageSMSTests(FlowTestCase):
|
||||||
confirmed=True,
|
confirmed=True,
|
||||||
stage=self.stage,
|
stage=self.stage,
|
||||||
)
|
)
|
||||||
# Verify token once here to set last_t etc
|
|
||||||
token = device.generate_token()
|
|
||||||
device.verify_token(token)
|
|
||||||
stage = AuthenticatorValidateStage.objects.create(
|
stage = AuthenticatorValidateStage.objects.create(
|
||||||
name="foo",
|
name="foo",
|
||||||
last_auth_threshold="milliseconds=0",
|
last_auth_threshold="milliseconds=0",
|
||||||
not_configured_action=NotConfiguredAction.CONFIGURE,
|
not_configured_action=NotConfiguredAction.CONFIGURE,
|
||||||
device_classes=[DeviceClasses.SMS],
|
device_classes=[DeviceClasses.SMS],
|
||||||
)
|
)
|
||||||
sleep(1)
|
stage.configuration_stages.set([ident_stage])
|
||||||
stage.configuration_stages.set([conf_stage])
|
|
||||||
flow = Flow.objects.create(name="test", slug="test", title="test")
|
flow = Flow.objects.create(name="test", slug="test", title="test")
|
||||||
FlowStageBinding.objects.create(target=flow, stage=conf_stage, order=0)
|
FlowStageBinding.objects.create(target=flow, stage=ident_stage, order=0)
|
||||||
FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
|
FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
|
||||||
|
|
||||||
response = self.client.post(
|
response = self.client.post(
|
||||||
|
@ -57,19 +55,16 @@ class AuthenticatorValidateStageSMSTests(FlowTestCase):
|
||||||
{"uid_field": self.user.username},
|
{"uid_field": self.user.username},
|
||||||
)
|
)
|
||||||
self.assertEqual(response.status_code, 302)
|
self.assertEqual(response.status_code, 302)
|
||||||
response = self.client.get(
|
device.generate_token()
|
||||||
|
response = self.client.post(
|
||||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
||||||
follow=True,
|
{"code": device.token},
|
||||||
)
|
|
||||||
self.assertStageResponse(
|
|
||||||
response,
|
|
||||||
flow,
|
|
||||||
component="ak-stage-authenticator-validate",
|
|
||||||
)
|
)
|
||||||
|
self.assertNotIn(COOKIE_NAME_MFA, response.cookies)
|
||||||
|
|
||||||
def test_last_auth_threshold_valid(self):
|
def test_last_auth_threshold_valid(self):
|
||||||
"""Test last_auth_threshold"""
|
"""Test last_auth_threshold"""
|
||||||
conf_stage = IdentificationStage.objects.create(
|
ident_stage = IdentificationStage.objects.create(
|
||||||
name="conf",
|
name="conf",
|
||||||
user_fields=[
|
user_fields=[
|
||||||
UserFields.USERNAME,
|
UserFields.USERNAME,
|
||||||
|
@ -80,27 +75,44 @@ class AuthenticatorValidateStageSMSTests(FlowTestCase):
|
||||||
confirmed=True,
|
confirmed=True,
|
||||||
stage=self.stage,
|
stage=self.stage,
|
||||||
)
|
)
|
||||||
# Verify token once here to set last_t etc
|
|
||||||
token = device.generate_token()
|
|
||||||
device.verify_token(token)
|
|
||||||
stage = AuthenticatorValidateStage.objects.create(
|
stage = AuthenticatorValidateStage.objects.create(
|
||||||
name="foo",
|
name="foo",
|
||||||
last_auth_threshold="hours=1",
|
last_auth_threshold="hours=1",
|
||||||
not_configured_action=NotConfiguredAction.CONFIGURE,
|
not_configured_action=NotConfiguredAction.CONFIGURE,
|
||||||
device_classes=[DeviceClasses.SMS],
|
device_classes=[DeviceClasses.SMS],
|
||||||
)
|
)
|
||||||
stage.configuration_stages.set([conf_stage])
|
stage.configuration_stages.set([ident_stage])
|
||||||
flow = Flow.objects.create(name="test", slug="test", title="test")
|
flow = Flow.objects.create(name="test", slug="test", title="test")
|
||||||
FlowStageBinding.objects.create(target=flow, stage=conf_stage, order=0)
|
FlowStageBinding.objects.create(target=flow, stage=ident_stage, order=0)
|
||||||
FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
|
FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
|
||||||
|
|
||||||
response = self.client.post(
|
response = self.client.post(
|
||||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
||||||
{"uid_field": self.user.username},
|
{"uid_field": self.user.username},
|
||||||
)
|
|
||||||
self.assertEqual(response.status_code, 302)
|
|
||||||
response = self.client.get(
|
|
||||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
|
||||||
follow=True,
|
follow=True,
|
||||||
)
|
)
|
||||||
|
self.assertEqual(response.status_code, 200)
|
||||||
|
send_mock = MagicMock()
|
||||||
|
with patch(
|
||||||
|
"authentik.stages.authenticator_sms.models.AuthenticatorSMSStage.send", send_mock
|
||||||
|
):
|
||||||
|
response = self.client.post(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
||||||
|
{
|
||||||
|
"component": "ak-stage-authenticator-validate",
|
||||||
|
"selected_challenge": {
|
||||||
|
"device_class": "sms",
|
||||||
|
"device_uid": str(device.pk),
|
||||||
|
"challenge": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
self.assertEqual(send_mock.call_count, 1)
|
||||||
|
device.generate_token()
|
||||||
|
response = self.client.post(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
||||||
|
{"code": device.token},
|
||||||
|
)
|
||||||
|
self.assertIn(COOKIE_NAME_MFA, response.cookies)
|
||||||
self.assertStageResponse(response, component="xak-flow-redirect", to="/")
|
self.assertStageResponse(response, component="xak-flow-redirect", to="/")
|
||||||
|
|
|
@ -1,20 +1,26 @@
|
||||||
"""Test validator stage"""
|
"""Test validator stage"""
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from hashlib import sha256
|
||||||
|
from http.cookies import SimpleCookie
|
||||||
from time import sleep
|
from time import sleep
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
from django.test.client import RequestFactory
|
from django.test.client import RequestFactory
|
||||||
from django.urls.base import reverse
|
from django.urls.base import reverse
|
||||||
from django_otp.oath import TOTP
|
from django_otp.oath import TOTP
|
||||||
from django_otp.plugins.otp_totp.models import TOTPDevice
|
from django_otp.plugins.otp_totp.models import TOTPDevice
|
||||||
|
from jwt import encode
|
||||||
from rest_framework.exceptions import ValidationError
|
from rest_framework.exceptions import ValidationError
|
||||||
|
|
||||||
from authentik.core.tests.utils import create_test_admin_user
|
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
|
||||||
from authentik.flows.models import Flow, FlowStageBinding, NotConfiguredAction
|
from authentik.flows.models import FlowDesignation, FlowStageBinding, NotConfiguredAction
|
||||||
from authentik.flows.tests import FlowTestCase
|
from authentik.flows.tests import FlowTestCase
|
||||||
from authentik.stages.authenticator_validate.challenge import (
|
from authentik.stages.authenticator_validate.challenge import (
|
||||||
get_challenge_for_device,
|
get_challenge_for_device,
|
||||||
validate_challenge_code,
|
validate_challenge_code,
|
||||||
)
|
)
|
||||||
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
|
from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
|
||||||
|
from authentik.stages.authenticator_validate.stage import COOKIE_NAME_MFA
|
||||||
from authentik.stages.identification.models import IdentificationStage, UserFields
|
from authentik.stages.identification.models import IdentificationStage, UserFields
|
||||||
|
|
||||||
|
|
||||||
|
@ -24,10 +30,11 @@ class AuthenticatorValidateStageTOTPTests(FlowTestCase):
|
||||||
def setUp(self) -> None:
|
def setUp(self) -> None:
|
||||||
self.user = create_test_admin_user()
|
self.user = create_test_admin_user()
|
||||||
self.request_factory = RequestFactory()
|
self.request_factory = RequestFactory()
|
||||||
|
self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
|
||||||
|
|
||||||
def test_last_auth_threshold(self):
|
def test_last_auth_threshold(self):
|
||||||
"""Test last_auth_threshold"""
|
"""Test last_auth_threshold"""
|
||||||
conf_stage = IdentificationStage.objects.create(
|
ident_stage = IdentificationStage.objects.create(
|
||||||
name="conf",
|
name="conf",
|
||||||
user_fields=[
|
user_fields=[
|
||||||
UserFields.USERNAME,
|
UserFields.USERNAME,
|
||||||
|
@ -47,29 +54,28 @@ class AuthenticatorValidateStageTOTPTests(FlowTestCase):
|
||||||
not_configured_action=NotConfiguredAction.CONFIGURE,
|
not_configured_action=NotConfiguredAction.CONFIGURE,
|
||||||
device_classes=[DeviceClasses.TOTP],
|
device_classes=[DeviceClasses.TOTP],
|
||||||
)
|
)
|
||||||
stage.configuration_stages.set([conf_stage])
|
stage.configuration_stages.set([ident_stage])
|
||||||
flow = Flow.objects.create(name="test", slug="test", title="test")
|
FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
|
||||||
FlowStageBinding.objects.create(target=flow, stage=conf_stage, order=0)
|
FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
|
||||||
FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
|
|
||||||
|
|
||||||
response = self.client.post(
|
response = self.client.post(
|
||||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
{"uid_field": self.user.username},
|
{"uid_field": self.user.username},
|
||||||
)
|
)
|
||||||
self.assertEqual(response.status_code, 302)
|
self.assertEqual(response.status_code, 302)
|
||||||
response = self.client.get(
|
response = self.client.get(
|
||||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
follow=True,
|
follow=True,
|
||||||
)
|
)
|
||||||
self.assertStageResponse(
|
self.assertStageResponse(
|
||||||
response,
|
response,
|
||||||
flow,
|
self.flow,
|
||||||
component="ak-stage-authenticator-validate",
|
component="ak-stage-authenticator-validate",
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_last_auth_threshold_valid(self):
|
def test_last_auth_threshold_valid(self) -> SimpleCookie:
|
||||||
"""Test last_auth_threshold"""
|
"""Test last_auth_threshold"""
|
||||||
conf_stage = IdentificationStage.objects.create(
|
ident_stage = IdentificationStage.objects.create(
|
||||||
name="conf",
|
name="conf",
|
||||||
user_fields=[
|
user_fields=[
|
||||||
UserFields.USERNAME,
|
UserFields.USERNAME,
|
||||||
|
@ -79,32 +85,168 @@ class AuthenticatorValidateStageTOTPTests(FlowTestCase):
|
||||||
user=self.user,
|
user=self.user,
|
||||||
confirmed=True,
|
confirmed=True,
|
||||||
)
|
)
|
||||||
# Verify token once here to set last_t etc
|
|
||||||
totp = TOTP(device.bin_key)
|
|
||||||
sleep(1)
|
|
||||||
self.assertTrue(device.verify_token(totp.token()))
|
|
||||||
stage = AuthenticatorValidateStage.objects.create(
|
stage = AuthenticatorValidateStage.objects.create(
|
||||||
name="foo",
|
name="foo",
|
||||||
last_auth_threshold="hours=1",
|
last_auth_threshold="hours=1",
|
||||||
not_configured_action=NotConfiguredAction.CONFIGURE,
|
not_configured_action=NotConfiguredAction.CONFIGURE,
|
||||||
device_classes=[DeviceClasses.TOTP],
|
device_classes=[DeviceClasses.TOTP],
|
||||||
)
|
)
|
||||||
stage.configuration_stages.set([conf_stage])
|
stage.configuration_stages.set([ident_stage])
|
||||||
flow = Flow.objects.create(name="test", slug="test", title="test")
|
FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
|
||||||
FlowStageBinding.objects.create(target=flow, stage=conf_stage, order=0)
|
FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
|
||||||
FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
|
|
||||||
|
|
||||||
response = self.client.post(
|
response = self.client.post(
|
||||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
{"uid_field": self.user.username},
|
{"uid_field": self.user.username},
|
||||||
)
|
)
|
||||||
self.assertEqual(response.status_code, 302)
|
self.assertEqual(response.status_code, 302)
|
||||||
response = self.client.get(
|
response = self.client.get(
|
||||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
follow=True,
|
)
|
||||||
|
# Verify token once here to set last_t etc
|
||||||
|
totp = TOTP(device.bin_key)
|
||||||
|
sleep(1)
|
||||||
|
response = self.client.post(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
|
{"code": str(totp.token())},
|
||||||
|
)
|
||||||
|
self.assertIn(COOKIE_NAME_MFA, response.cookies)
|
||||||
|
self.assertStageResponse(response, component="xak-flow-redirect", to="/")
|
||||||
|
return response.cookies
|
||||||
|
|
||||||
|
def test_last_auth_skip(self):
|
||||||
|
"""Test valid cookie"""
|
||||||
|
cookies = self.test_last_auth_threshold_valid()
|
||||||
|
mfa_cookie = cookies[COOKIE_NAME_MFA]
|
||||||
|
self.client.logout()
|
||||||
|
self.client.cookies[COOKIE_NAME_MFA] = mfa_cookie
|
||||||
|
response = self.client.post(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
|
{"uid_field": self.user.username},
|
||||||
|
)
|
||||||
|
self.assertEqual(response.status_code, 302)
|
||||||
|
response = self.client.get(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
)
|
)
|
||||||
self.assertStageResponse(response, component="xak-flow-redirect", to="/")
|
self.assertStageResponse(response, component="xak-flow-redirect", to="/")
|
||||||
|
|
||||||
|
def test_last_auth_stage_pk(self):
|
||||||
|
"""Test MFA cookie with wrong stage PK"""
|
||||||
|
ident_stage = IdentificationStage.objects.create(
|
||||||
|
name="conf",
|
||||||
|
user_fields=[
|
||||||
|
UserFields.USERNAME,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
device: TOTPDevice = TOTPDevice.objects.create(
|
||||||
|
user=self.user,
|
||||||
|
confirmed=True,
|
||||||
|
)
|
||||||
|
stage = AuthenticatorValidateStage.objects.create(
|
||||||
|
name="foo",
|
||||||
|
last_auth_threshold="hours=1",
|
||||||
|
not_configured_action=NotConfiguredAction.CONFIGURE,
|
||||||
|
device_classes=[DeviceClasses.TOTP],
|
||||||
|
)
|
||||||
|
stage.configuration_stages.set([ident_stage])
|
||||||
|
FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
|
||||||
|
FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
|
||||||
|
self.client.cookies[COOKIE_NAME_MFA] = encode(
|
||||||
|
payload={
|
||||||
|
"device": device.pk,
|
||||||
|
"stage": stage.pk.hex + "foo",
|
||||||
|
"exp": (datetime.now() + timedelta(days=3)).timestamp(),
|
||||||
|
},
|
||||||
|
key=sha256(f"{settings.SECRET_KEY}:{stage.pk.hex}".encode("ascii")).hexdigest(),
|
||||||
|
)
|
||||||
|
response = self.client.post(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
|
{"uid_field": self.user.username},
|
||||||
|
)
|
||||||
|
self.assertEqual(response.status_code, 302)
|
||||||
|
response = self.client.get(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
|
)
|
||||||
|
self.assertStageResponse(response, component="ak-stage-authenticator-validate")
|
||||||
|
|
||||||
|
def test_last_auth_stage_device(self):
|
||||||
|
"""Test MFA cookie with wrong device PK"""
|
||||||
|
ident_stage = IdentificationStage.objects.create(
|
||||||
|
name="conf",
|
||||||
|
user_fields=[
|
||||||
|
UserFields.USERNAME,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
device: TOTPDevice = TOTPDevice.objects.create(
|
||||||
|
user=self.user,
|
||||||
|
confirmed=True,
|
||||||
|
)
|
||||||
|
stage = AuthenticatorValidateStage.objects.create(
|
||||||
|
name="foo",
|
||||||
|
last_auth_threshold="hours=1",
|
||||||
|
not_configured_action=NotConfiguredAction.CONFIGURE,
|
||||||
|
device_classes=[DeviceClasses.TOTP],
|
||||||
|
)
|
||||||
|
stage.configuration_stages.set([ident_stage])
|
||||||
|
FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
|
||||||
|
FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
|
||||||
|
self.client.cookies[COOKIE_NAME_MFA] = encode(
|
||||||
|
payload={
|
||||||
|
"device": device.pk + 1,
|
||||||
|
"stage": stage.pk.hex,
|
||||||
|
"exp": (datetime.now() + timedelta(days=3)).timestamp(),
|
||||||
|
},
|
||||||
|
key=sha256(f"{settings.SECRET_KEY}:{stage.pk.hex}".encode("ascii")).hexdigest(),
|
||||||
|
)
|
||||||
|
response = self.client.post(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
|
{"uid_field": self.user.username},
|
||||||
|
)
|
||||||
|
self.assertEqual(response.status_code, 302)
|
||||||
|
response = self.client.get(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
|
)
|
||||||
|
self.assertStageResponse(response, component="ak-stage-authenticator-validate")
|
||||||
|
|
||||||
|
def test_last_auth_stage_expired(self):
|
||||||
|
"""Test MFA cookie with expired cookie"""
|
||||||
|
ident_stage = IdentificationStage.objects.create(
|
||||||
|
name="conf",
|
||||||
|
user_fields=[
|
||||||
|
UserFields.USERNAME,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
device: TOTPDevice = TOTPDevice.objects.create(
|
||||||
|
user=self.user,
|
||||||
|
confirmed=True,
|
||||||
|
)
|
||||||
|
stage = AuthenticatorValidateStage.objects.create(
|
||||||
|
name="foo",
|
||||||
|
last_auth_threshold="hours=1",
|
||||||
|
not_configured_action=NotConfiguredAction.CONFIGURE,
|
||||||
|
device_classes=[DeviceClasses.TOTP],
|
||||||
|
)
|
||||||
|
stage.configuration_stages.set([ident_stage])
|
||||||
|
FlowStageBinding.objects.create(target=self.flow, stage=ident_stage, order=0)
|
||||||
|
FlowStageBinding.objects.create(target=self.flow, stage=stage, order=1)
|
||||||
|
self.client.cookies[COOKIE_NAME_MFA] = encode(
|
||||||
|
payload={
|
||||||
|
"device": device.pk,
|
||||||
|
"stage": stage.pk.hex,
|
||||||
|
"exp": (datetime.now() - timedelta(days=3)).timestamp(),
|
||||||
|
},
|
||||||
|
key=sha256(f"{settings.SECRET_KEY}:{stage.pk.hex}".encode("ascii")).hexdigest(),
|
||||||
|
)
|
||||||
|
response = self.client.post(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
|
{"uid_field": self.user.username},
|
||||||
|
)
|
||||||
|
self.assertEqual(response.status_code, 302)
|
||||||
|
response = self.client.get(
|
||||||
|
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
|
||||||
|
)
|
||||||
|
self.assertStageResponse(response, component="ak-stage-authenticator-validate")
|
||||||
|
|
||||||
def test_device_challenge_totp(self):
|
def test_device_challenge_totp(self):
|
||||||
"""Test device challenge"""
|
"""Test device challenge"""
|
||||||
request = self.request_factory.get("/")
|
request = self.request_factory.get("/")
|
||||||
|
|
|
@ -28,7 +28,7 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
|
||||||
|
|
||||||
def test_last_auth_threshold(self):
|
def test_last_auth_threshold(self):
|
||||||
"""Test last_auth_threshold"""
|
"""Test last_auth_threshold"""
|
||||||
conf_stage = IdentificationStage.objects.create(
|
ident_stage = IdentificationStage.objects.create(
|
||||||
name="conf",
|
name="conf",
|
||||||
user_fields=[
|
user_fields=[
|
||||||
UserFields.USERNAME,
|
UserFields.USERNAME,
|
||||||
|
@ -46,9 +46,9 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
|
||||||
device_classes=[DeviceClasses.WEBAUTHN],
|
device_classes=[DeviceClasses.WEBAUTHN],
|
||||||
)
|
)
|
||||||
sleep(1)
|
sleep(1)
|
||||||
stage.configuration_stages.set([conf_stage])
|
stage.configuration_stages.set([ident_stage])
|
||||||
flow = Flow.objects.create(name="test", slug="test", title="test")
|
flow = Flow.objects.create(name="test", slug="test", title="test")
|
||||||
FlowStageBinding.objects.create(target=flow, stage=conf_stage, order=0)
|
FlowStageBinding.objects.create(target=flow, stage=ident_stage, order=0)
|
||||||
FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
|
FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
|
||||||
|
|
||||||
response = self.client.post(
|
response = self.client.post(
|
||||||
|
@ -66,41 +66,6 @@ class AuthenticatorValidateStageWebAuthnTests(FlowTestCase):
|
||||||
component="ak-stage-authenticator-validate",
|
component="ak-stage-authenticator-validate",
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_last_auth_threshold_valid(self):
|
|
||||||
"""Test last_auth_threshold"""
|
|
||||||
conf_stage = IdentificationStage.objects.create(
|
|
||||||
name="conf",
|
|
||||||
user_fields=[
|
|
||||||
UserFields.USERNAME,
|
|
||||||
],
|
|
||||||
)
|
|
||||||
device: WebAuthnDevice = WebAuthnDevice.objects.create(
|
|
||||||
user=self.user,
|
|
||||||
confirmed=True,
|
|
||||||
)
|
|
||||||
device.set_sign_count(device.sign_count + 1)
|
|
||||||
stage = AuthenticatorValidateStage.objects.create(
|
|
||||||
name="foo",
|
|
||||||
last_auth_threshold="hours=1",
|
|
||||||
not_configured_action=NotConfiguredAction.CONFIGURE,
|
|
||||||
device_classes=[DeviceClasses.WEBAUTHN],
|
|
||||||
)
|
|
||||||
stage.configuration_stages.set([conf_stage])
|
|
||||||
flow = Flow.objects.create(name="test", slug="test", title="test")
|
|
||||||
FlowStageBinding.objects.create(target=flow, stage=conf_stage, order=0)
|
|
||||||
FlowStageBinding.objects.create(target=flow, stage=stage, order=1)
|
|
||||||
|
|
||||||
response = self.client.post(
|
|
||||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
|
||||||
{"uid_field": self.user.username},
|
|
||||||
)
|
|
||||||
self.assertEqual(response.status_code, 302)
|
|
||||||
response = self.client.get(
|
|
||||||
reverse("authentik_api:flow-executor", kwargs={"flow_slug": flow.slug}),
|
|
||||||
follow=True,
|
|
||||||
)
|
|
||||||
self.assertStageResponse(response, component="xak-flow-redirect", to="/")
|
|
||||||
|
|
||||||
def test_device_challenge_webauthn(self):
|
def test_device_challenge_webauthn(self):
|
||||||
"""Test webauthn"""
|
"""Test webauthn"""
|
||||||
request = get_request("/")
|
request = get_request("/")
|
||||||
|
|
|
@ -104,6 +104,7 @@ class AuthenticatorWebAuthnStageView(ChallengeStageView):
|
||||||
)
|
)
|
||||||
|
|
||||||
self.request.session["challenge"] = registration_options.challenge
|
self.request.session["challenge"] = registration_options.challenge
|
||||||
|
self.request.session.save()
|
||||||
return AuthenticatorWebAuthnChallenge(
|
return AuthenticatorWebAuthnChallenge(
|
||||||
data={
|
data={
|
||||||
"type": ChallengeTypes.NATIVE.value,
|
"type": ChallengeTypes.NATIVE.value,
|
||||||
|
|
|
@ -31,10 +31,11 @@ export function me(): Promise<SessionUser> {
|
||||||
avatar: "",
|
avatar: "",
|
||||||
uid: "",
|
uid: "",
|
||||||
username: "",
|
username: "",
|
||||||
name: ""
|
name: "",
|
||||||
|
settings: {},
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
if (ex.status === 401 || ex.status === 403) {
|
if (ex.response.status === 401 || ex.response.status === 403) {
|
||||||
window.location.assign("/");
|
window.location.assign("/");
|
||||||
}
|
}
|
||||||
return defaultUser;
|
return defaultUser;
|
||||||
|
|
Reference in New Issue