From 7f12b14145f1f3adf5b99c422f83fe708dd7111e Mon Sep 17 00:00:00 2001 From: Jens Langhammer Date: Mon, 6 Jun 2022 23:42:07 +0200 Subject: [PATCH] start user tests Signed-off-by: Jens Langhammer --- authentik/sources/scim/tests/test_users.py | 83 +++++++++++++++++++ .../scim/views/v2/service_provider_config.py | 2 +- authentik/sources/scim/views/v2/users.py | 3 + 3 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 authentik/sources/scim/tests/test_users.py diff --git a/authentik/sources/scim/tests/test_users.py b/authentik/sources/scim/tests/test_users.py new file mode 100644 index 000000000..5dbe0ee27 --- /dev/null +++ b/authentik/sources/scim/tests/test_users.py @@ -0,0 +1,83 @@ +"""Test SCIM User""" +from json import dumps + +from django.urls import reverse +from rest_framework.test import APITestCase + +from authentik.core.models import Token, TokenIntents, User +from authentik.core.tests.utils import create_test_admin_user +from authentik.lib.generators import generate_id +from authentik.sources.scim.models import USER_ATTRIBUTE_SCIM_ID, SCIMSource +from authentik.sources.scim.views.v2.base import SCIM_CONTENT_TYPE + + +class TestSCIMUsers(APITestCase): + """Test SCIM User view""" + + def setUp(self) -> None: + self.user = create_test_admin_user() + self.token = Token.objects.create( + user=self.user, + identifier=generate_id(), + intent=TokenIntents.INTENT_API, + ) + self.source = SCIMSource.objects.create( + name=generate_id(), slug=generate_id(), token=self.token + ) + + def test_user_list(self): + """Test full user list""" + response = self.client.get( + reverse( + "authentik_sources_scim:v2-users", + kwargs={ + "source_slug": self.source.slug, + }, + ), + HTTP_AUTHORIZATION=f"Bearer {self.token.key}", + ) + self.assertEqual(response.status_code, 200) + + def test_user_list_single(self): + """Test full user list (single user)""" + response = self.client.get( + reverse( + "authentik_sources_scim:v2-users", + kwargs={ + "source_slug": self.source.slug, + "user_id": str(self.user.pk), + }, + ), + HTTP_AUTHORIZATION=f"Bearer {self.token.key}", + ) + self.assertEqual(response.status_code, 200) + + def test_user_create(self): + """Test user create""" + ext_id = generate_id() + response = self.client.post( + reverse( + "authentik_sources_scim:v2-users", + kwargs={ + "source_slug": self.source.slug, + }, + ), + data=dumps( + { + "userName": generate_id(), + "externalId": ext_id, + "emails": [ + { + "primary": True, + "value": self.user.email, + } + ], + } + ), + content_type=SCIM_CONTENT_TYPE, + HTTP_AUTHORIZATION=f"Bearer {self.token.key}", + ) + self.assertEqual(response.status_code, 201) + self.assertTrue( + User.objects.filter(**{f"attributes__{USER_ATTRIBUTE_SCIM_ID}": ext_id}).exists() + ) diff --git a/authentik/sources/scim/views/v2/service_provider_config.py b/authentik/sources/scim/views/v2/service_provider_config.py index 61fab889b..132cf167d 100644 --- a/authentik/sources/scim/views/v2/service_provider_config.py +++ b/authentik/sources/scim/views/v2/service_provider_config.py @@ -28,7 +28,7 @@ class ServiceProviderConfigView(SCIMView): "patch": {"supported": True}, "bulk": {"supported": False, "maxOperations": 0, "maxPayloadSize": 0}, "filter": {"supported": False, "maxResults": 200}, - "changePassword": {"supported": True}, + "changePassword": {"supported": False}, "sort": {"supported": False}, "etag": {"supported": False}, } diff --git a/authentik/sources/scim/views/v2/users.py b/authentik/sources/scim/views/v2/users.py index 030548575..5af7dee12 100644 --- a/authentik/sources/scim/views/v2/users.py +++ b/authentik/sources/scim/views/v2/users.py @@ -5,6 +5,7 @@ from django.core.paginator import Paginator from django.http import Http404, QueryDict from django.urls import reverse from guardian.shortcuts import get_anonymous_user +from rest_framework.exceptions import ValidationError from rest_framework.request import Request from rest_framework.response import Response from structlog.stdlib import get_logger @@ -106,6 +107,8 @@ class UsersView(SCIMView): user.attributes[USER_ATTRIBUTE_SCIM_ENTERPRISE] = data.get( "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" ) + if user.username == "": + raise ValidationError("Invalid user") return user def post(self, request: Request, **kwargs) -> Response: